Gabriel Volpe

  • Software Engineer at Paidy Inc
  • Co-organizer of Scala Tokyo Meetup
  • I'm on Twitter!
    • @volpegabriel87
  • Open Source Contributor
    • https://gvolpe.github.io/

Agenda

  • The FP fundamentals
    • Effects vs Side Effects
    • Referential Transparency
  • From IO to Tagless final
    • Interacting with I/O
    • A more complex example
  • Technical challenges at Paidy

Motivations

motivations

Effects vs Side-effects

  • What are effects?

Effects vs Side-effects

Effects

  • Option[A]: May or may not produce a value A.
  • Either[A, B]: Either produces a value A or a value B.
  • List[A]: Produces Zero, One or Many elements of type A.
  • IO[A]: Produces a value A, fails or never terminates.

Effects vs Side-effects

Side-effects

  • println("Hey!"): Writes to the console immediately.
  • scala.io.StdIn.readLine(): Reads from the console immediately.
  • System.nanoTime(): Retrieves current time from the JVM immediately.
  • Future(deleteDB): Deletes database immediately.

"Side-effects are bugs". Rob Norris.

tpolecat

Referential Transparency

Are these two programs the same?

val expr = 123
(expr, expr)
(123, 123)

Referential Transparency

Are these two programs the same?

val expr = println("Hey!")
(expr, expr)
(println("Hey!"), println("Hey!"))

Referential Transparency

Are these two programs the same?

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val expr = Future(println("Hey!"))
(expr, expr)
(Future(println("Hey!")), Future(println("Hey!")))

trolling

Referential Transparency

Are these two programs the same?

import cats.effect.IO
val expr = IO(println("Hey!"))
(expr, expr)
(IO(println("Hey!")), IO(println("Hey!")))

winner

Referential Transparency

IO[A]

Represents the intention to perform a side effect

Referential Transparency

  • Enables composition
  • Local reasoning
  • Fearless refactoring

Before we get into the examples...

Libraries

In addition to Cats Effect you'll see usage of the following libraries:

  • Cats Par: https://github.com/ChristopherDavenport/cats-par
  • Better Monadic For: https://github.com/oleg-py/better-monadic-for

Interacting with I/O

Consider this program

val program: IO[Unit] =
  for {
    _ <- IO(println("Enter your name:"))
    n <- IO(scala.io.StdIn.readLine)
    _ <- IO(println(s"Hello $n!"))
  } yield ()

A possible interaction

> Enter your name:
    Gabriel
> Hello Gabriel!

Interacting with I/O

Tagless Final

trait Console[F[_]] {
  def putStrLn(str: String): F[Unit]
  def readLn: F[String]
}
def program[F[_]: Monad](implicit C: Console[F]): F[Unit] =
  for {
    _ <- C.putStrLn("Enter your name:")
    n <- C.readLn
    _ <- C.putStrLn(s"Hello $n!")
  } yield ()

Interacting with I/O

Standard I/O Console

class StdConsole[F[_]: Sync] extends Console[F] {
  def putStrLn(str: String) = Sync[F].delay(println(str))
  def readLn = Sync[F].delay(scala.io.StdIn.readLine)
}

Remote Console

class RemoteConsole[F[_]: Async] extends Console[F] {
  private def fromFuture[A](fa: F[Future[A]]): F[A] =
    fa.flatMap { future =>
      Async[F].async { cb =>
        future.onComplete {
          case Success(x) => cb(Right(x))
          case Failure(e) => cb(Left(e))
        }
      }
    }

  def putStrLn(str: String) = fromFuture(Sync[F].delay(HttpClient.post(str)))
  def readLn = fromFuture(Sync[F].delay(HttpClient.get))
}

Interacting with I/O

Test Console

class TestConsole[F[_]: Applicative](state: Ref[F, List[String]]) extends Console[F] {
  def putStrLn(str: String): F[Unit] = state.update(_ :+ str)
  def readLn: F[String]              = "test".pure[F]
}
test("Console") {
  val spec =
    for {
      state <- Ref.of[IO, List[String]](List.empty[String])
      implicit0(c: Console[IO]) = new TestConsole[IO](state)
      _ <- program[IO]
      st <- state.get
      as <- IO { assert(st == List("Enter your name:", "Hello test!")) }
    } yield as

  spec.unsafeToFuture()
}

What we've seen so far

  • From IO to Tagless Final
  • Sync[F]: StdConsole[F]
  • Async[F]: RemoteConsole[F]
  • Testing: TestConsole[F] + Ref[F, A]

A more complex example

Consider this program

val generateChart: IO[Unit] =
  for {
    b  <- getBandsFromFile // IO[List[Band]]
    _  <- IO.race(longProcess1(b), longProcess2(b))
    id <- generateId
    _  <- publishRadioChart(id, SortedSet(b: _*))
    _  <- publishTvChart(id, SortedSet(b: _*))
  } yield ()

A more complex example

Possible implementations

val getBandsFromFile: IO[List[Band]] =
  IO {
    val file = new File(this.getClass.getClassLoader.getResource("bands.txt").getFile)
    new BufferedReader(new FileReader(file))
  }.flatMap { br =>
    import scala.collection.JavaConverters._
    val bands = br.lines.collect(Collectors.toList()).asScala.toList.map(Band)
    IO.pure(bands) <* IO(br.close())
  }
def generateId: IO[UUID] = IO(UUID.randomUUID())

def longProcess1(bands: List[Band]): IO[Unit] =
  putStrLn("Starting process 1") *> IO.sleep(3.seconds) *> putStrLn("Process 1 DONE")

def longProcess2(bands: List[Band]): IO[Unit] =
  putStrLn("Starting process 2") *> IO.sleep(2.seconds) *> putStrLn("Process 2 DONE")

A more complex example

Possible implementations

def publishRadioChart(id: UUID, bands: SortedSet[Band]): IO[Unit] =
  putStrLn(s"Radio Chart for $id: ${bands.map(_.value).mkString(", ")}")

def publishTvChart(id: UUID, bands: SortedSet[Band]): IO[Unit] =
  putStrLn(s"TV Chart for $id: ${bands.take(5).map(_.value).mkString(", ")}")

A more complex example

Consider this program

val generateChart: IO[Unit] =
  for {
    b  <- getBandsFromFile  // IO[List[Band]]
    _  <- IO.race(longProcess1(b), longProcess2(b))
    id <- generateId
    _  <- publishRadioChart(id, SortedSet(b: _*))
    _  <- publishTvChart(id, SortedSet(b: _*))
  } yield ()

Can we do better?

A more complex example

Algebras

trait DataSource[F[_]] {
  def bands: F[List[Band]]
}

trait IdGen[F[_]] {
  def generate: F[UUID]
}

trait InternalProcess[F[_]] {
  def process(bands: SortedSet[Band]): F[Unit]
}

trait RadioChart[F[_]] {
  def publish(id: UUID, bands: SortedSet[Band]): F[Unit]
}

trait TvChart[F[_]] {
  def publish(id: UUID, bands: SortedSet[Band]): F[Unit]
}

A more complex example

Program

class Charts[F[_]: Monad](
    source: DataSource[F],
    internal: InternalProcess[F],
    idGen: IdGen[F],
    radioChart: RadioChart[F],
    tvChart: TvChart[F]
) {

  def generate: F[Unit] =
    for {
      b <- source.bands.map(xs => SortedSet(xs: _*))
      _ <- internal.process(b)
      id <- idGen.generate
      _ <- radioChart.publish(b)
      _ <- tvChart.publish(b)
    } yield ()

}

A more complex example

Interpreters

class MemRadioChart[F[_]: Sync] extends RadioChart[F] {
  override def publish(id: UUID, bands: SortedSet[Band]) =
    Sync[F].delay {
      println(s"Radio Chart for $id: ${bands.map(_.value).mkString(", ")}")
    }
}
class MemTvChart[F[_]: Sync] extends TvChart[F] {
  override def publish(id: UUID, bands: SortedSet[Band]) =
    Sync[F].delay {
      println(s"TV Chart for $id: ${bands.map(_.value).take(3).mkString(", ")}")
    }
}
class LiveIdGen[F[_]: Sync] extends IdGen[F] {
  override def generate: F[UUID] = Sync[F].delay(UUID.randomUUID())
}

A more complex example

Interpreters

class FileDataSource[F[_]](implicit F: Sync[F]) extends DataSource[F] {
  override def bands: F[List[Band]] = {
    val acquire = F.delay {
      val file = new File(getClass.getClassLoader.getResource("bands.txt").getFile)
      new BufferedReader(new FileReader(file))
    }

    acquire.bracket { br =>
      import scala.collection.JavaConverters._
      br.lines.collect(Collectors.toList()).asScala.toList.map(Band).pure[F]
    }(br => F.delay(br.close()))
  }
}

A more complex example

Interpreters

class LiveInternalProcess[F[_]](
  implicit F: Concurrent[F],
           T: Timer[F]
) extends InternalProcess[F] {

  private def putStrLn(str: String): F[Unit] = F.delay(println(str))

  private def longProcess1(bands: List[Band]): F[Unit] =
    putStrLn("Starting process 1") *> T.sleep(3.seconds) *> putStrLn("Process 1 DONE")

  private def longProcess2(bands: List[Band]): F[Unit] =
    putStrLn("Starting process 2") *> T.sleep(2.seconds) *> putStrLn("Process 2 DONE")

  override def process(bands: List[Band]): F[Unit] =
    F.race(longProcess1(bands), longProcess2(bands)).void

}

A more complex example

Let's run it!

val source: DataSource[IO]        = new FileDataSource[IO]
val idGen: IdGen[IO]              = new LiveIdGen[IO]
val internal: InternalProcess[IO] = new LiveInternalProcess[IO]
val radioChart: RadioChart[IO]    = new MemRadioChart[IO]
val tvChart: TvChart[IO]          = new MemTvChart[IO]

val charts = new Charts[IO](source, idGen, internal, radioChart, tvChart)

charts.generate.unsafeRunSync() // HINT: Use `IOApp` instead

A more complex example

Program

class Charts[F[_]: Monad](
    source: DataSource[F],
    internal: InternalProcess[F],
    idGen: IdGen[F],
    radioChart: RadioChart[F],
    tvChart: TvChart[F]
) {

  def generate: F[Unit] =
    for {
      b <- source.bands.map(xs => SortedSet(xs: _*))
      _ <- internal.process(b)
      id <- idGen.generate
      _ <- radioChart.publish(b)
      _ <- tvChart.publish(b)
    } yield ()

}

Can we do even better?

A more complex example

Final Program

class Charts[F[_]: Concurrent: Par](
    source: DataSource[F],
    internal: InternalProcess[F],
    radioChart: RadioChart[F],
    tvChart: TvChart[F]
) {

  def generate: F[Unit] =
    for {
      b <- source.bands.map(xs => SortedSet(xs: _*))
      _ <- internal.process(b).start
      id <- idGen.generate
      _ <- (radioChart.publish(id, b), tvChart.publish(id, b)).parTupled.void
    } yield ()

}

What we've seen so far

  • From IO to Tagless Final
  • Sync[F]: StdConsole[F]
  • Async[F]: RemoteConsole[F]
  • Testing: TestConsole[F] + Ref[F, A]
  • Resource management: Bracket[F, E]
  • Concurrency: F.race, F.start
  • Parallelism: F.parTupled
  • Implicit Cancellation: F.race, F.parTupled

Re-thinking our last code

val charts = new Charts[IO](source, idGen, internal, radioChart, tvChart)

charts.generate.unsafeRunSync() // HINT: Use `IOApp` instead

What about error handling?

Error Handling

Logging errors in case of failure

charts.generate.attempt.flatMap {
  case Right(_) => IO.unit
  case Left(e)  => putStrLn(s"Failed to generate charts: ${e.getMessage}")
}
charts.generate.handleErrorWith { e =>
  putStrLn(s"Failed to generate charts: ${e.getMessage}")
}

Applicative Error

def attempt[A](fa: F[A]): F[Either[E, A]]
def handleErrorWith[A](fa: F[A])(f: E => F[A]): F[A]

Error Handling

Logging errors and retry

def chartsRetry[F[_]: Logger: MonadError[?[_], Throwable]: Timer](
    charts: Charts[F]
): F[Unit] = {
  def resilient(retries: Int): F[Unit] =
    charts.generate.handleErrorWith { e =>
      Logger[F].info(s"Failed: ${e.getMessage}. Retries left: $retries") >> {
        if (retries > 0) Timer[F].sleep(5.seconds) >> resilient(retries - 1)
        else Logger[F].error("Program failed after many retries")
      }
    }

  resilient(3)
}

Possible outcome

Failed: boom. Retries left: 3
Failed: boom. Retries left: 2
Failed: boom. Retries left: 1
Failed: boom. Retries left: 0
Program failed after many retries

Retry Combinators

  • Build it yourself using Timer[F]
  • Look into cats-retry and its retry policies.
  • Use the high-level fs2 combinators such as retry, attempts, awakeEvery, etc

Error Handling

Classy optics

For a more advanced technique check out my two blog posts:

  • https://typelevel.org/blog/2018/08/25/http4s-error-handling-mtl.html
  • https://typelevel.org/blog/2018/11/28/http4s-error-handling-mtl-2.html

Tagless Final FTW!

cats

Technical challenges at Paidy

  • Need for a lightweight and performant in-memory Cache

Ref-based Cache

trait Cache[F[_], K, V] {
  def get(key: K): F[Option[V]]
  def put(key: K, value: V): F[Unit]
}
private class RefCache[F[_]: Clock: Monad, K, V](
    state: Ref[F, Map[K, (OffsetDateTime, V)]],
    expiresIn: FiniteDuration
) extends Cache[F, K, V] {

  import Cache._

  def get(key: K): F[Option[V]] =
    state.get.map(_.get(key).map { case (_, v) => v })

  def put(key: K, value: V): F[Unit] =
    DateTime[F](CacheOffset).flatMap { now =>
      state.update(_.updated(key, now.plusNanos(expiresIn.toNanos) -> value))
    }

}

Ref-based Cache

object Cache {
  def of[F[_]: Clock: Concurrent, K, V](
      expiresIn: FiniteDuration,
      checkOnExpirationsEvery: FiniteDuration
  )(implicit T: Timer[F]): F[Cache[F, K, V]] = {
    def runExpiration(state: Ref[F, Map[K, (OffsetDateTime, V)]]): F[Unit] = {
      val process =
        DateTime[F](CacheOffset).flatMap { now =>
          state.get.map(_.filter {
            case (_, (exp, _)) => exp.isAfter(now.minusNanos(expiresIn.toNanos))
          }).flatTap(state.set)
        }
      T.sleep(checkOnExpirationsEvery) >> process >> runExpiration(state)
    }

    Ref.of[F, Map[K, (OffsetDateTime, V)]](Map.empty)
      .flatTap(runExpiration(_).start.void)
      .map(ref => new RefCache[F, K, V](ref, expiresIn))
  }
}

Ref-based Cache

It is now part of Mules!

https://github.com/ChristopherDavenport/mules

Technical challenges at Paidy

  • Authorize payments as quick as possible:
    • parTraverse a list of computations, stop on first failure and return successes if any: "Collect Successful".
    • parTraverse a list of computations and get first successful value or timeout otherwise: "First Successful".

Par Traverse FTW!

Collect Successful

Par traverse until first failure and return successful computations

val ioa = IO.sleep(1.second) *> IO.pure("a")
val iob = IO.pure("b")
val ioc = IO.pure("c")
val iod = IO.sleep(3.seconds) *> IO.pure("d")
val ioe = IO.sleep(2.seconds) *> IO.pure("e")

val list = List(ioa, iob, ioc, iod, ioe)

TaskUtil.collectSuccessful(list).flatMap(putStrLn)
res1: List[String] = List(a, b, c, d, e)

Collect Successful

Par traverse until first failure and return successful computations

val ioa = IO.sleep(1.second) *> IO.pure("a")
val iob = IO.pure("b")
val ioc = IO.sleep(1.second) *> IO.raiseError(new Exception("boom"))
val iod = IO.sleep(3.seconds) *> IO.pure("d")
val ioe = IO.sleep(2.seconds) *> IO.pure("e")

val list = List(ioa, iob, ioc, iod, ioe)

TaskUtil.collectSuccessful(list).flatMap(putStrLn)
res2: List[String] = List(b, a)

Collect Successful

The implementation:

def generic[F[_]: MonadError[?[_], Throwable]: Par, G[_]: Traverse, A](
    gfa: G[F[A]],
    append: G[A] => A => G[A],
    ref: Ref[F, G[A]]
): F[G[A]] =
  gfa
    .parTraverse(_.attempt.flatTap {
      case Right(x) => ref.update(append(_)(x))
      case Left(_)  => Applicative[F].unit
    }.rethrow)
    .handleErrorWith(_ => ref.get)

def collectSuccessful[F[_]: Par: Sync](list: List[F[String]]): F[List[String]] =
  Ref.of[F, List[String]](List.empty).flatMap { ref =>
    import cats.instances.list._
    abstractCollectSuccessful[F, List, String](list, g => x => g :+ x, ref)
  }

First successful computation

Par traverse until first successful computation and return or timeout

val io1 = IO.sleep(1.second) *> IO.raiseError[String](new Exception("error 1"))
val io2 = IO.sleep(1.1.seconds) *> IO.raiseError[String](new Exception("error 2"))
val io3 = IO.sleep(1.2.seconds) *> IO.pure("success")
val io4 = IO.sleep(1.4.seconds) *> IO.pure("slower success")

val tasks = List(io1, io2, io3, io4)

TaskUtil.firstSuccessful(tasks).flatMap(putStrLn)
res1: String = success

First successful computation

Par traverse until first successful computation and return or timeout

val io1 = IO.sleep(1.second) *> IO.raiseError[String](new Exception("error 1"))
val io2 = IO.sleep(1.1.seconds) *> IO.raiseError[String](new Exception("error 2"))

val tasks = List(io1, io2) // It will time out because there's no successful value

TaskUtil.firstSuccessful(tasks).flatMap(putStrLn)
res2 = java.util.concurrent.TimeoutException: 2 seconds

First successful computation

The implementation:

def tryComplete[F[_]: MonadError[?[_], Throwable], A: Monoid](
    d: Deferred[F, A]
)(fa: F[A]): F[A] =
  fa.attempt.flatMap {
    case Right(x) => d.complete(x).attempt *> x.pure[F] <* new Throwable().raiseError
    case Left(_)  => Monoid[A].empty.pure[F] // Ignore the errors
  }

def firstSuccessful[F[_]: Concurrent: Par: Timer, A: Monoid](list: List[F[A]]): F[A] =
  Deferred[F, A].flatMap { d =>
    import cats.instances.list._
    list.parTraverse(tryComplete[F, A](d)).attempt *> d.get.timeout(2.seconds)
  }

There's much more!

Amazing docs, have a look: https://typelevel.org/cats-effect/

Ask questions in the Gitter channel: https://gitter.im/typelevel/cats-effect

Ecosystem

  • Fs2: http://fs2.io/
  • Http4s: https://http4s.org/

And many other projects!

Slides + Code

  • https://github.com/paidy/talks

Questions?

questions

We are hiring!

https://engineering.paidy.com/