"Side-effects are bugs". Rob Norris.
val expr = 123
(expr, expr)
(123, 123)
val expr = println("Hey!")
(expr, expr)
(println("Hey!"), println("Hey!"))
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val expr = Future(println("Hey!"))
(expr, expr)
(Future(println("Hey!")), Future(println("Hey!")))
import cats.effect.IO
val expr = IO(println("Hey!"))
(expr, expr)
(IO(println("Hey!")), IO(println("Hey!")))
IO[A]
Represents the intention to perform a side effect
In addition to Cats Effect you'll see usage of the following libraries:
val program: IO[Unit] =
for {
_ <- IO(println("Enter your name:"))
n <- IO(scala.io.StdIn.readLine)
_ <- IO(println(s"Hello $n!"))
} yield ()
> Enter your name:
Gabriel
> Hello Gabriel!
trait Console[F[_]] {
def putStrLn(str: String): F[Unit]
def readLn: F[String]
}
def program[F[_]: Monad](implicit C: Console[F]): F[Unit] =
for {
_ <- C.putStrLn("Enter your name:")
n <- C.readLn
_ <- C.putStrLn(s"Hello $n!")
} yield ()
class StdConsole[F[_]: Sync] extends Console[F] {
def putStrLn(str: String) = Sync[F].delay(println(str))
def readLn = Sync[F].delay(scala.io.StdIn.readLine)
}
class RemoteConsole[F[_]: Async] extends Console[F] {
private def fromFuture[A](fa: F[Future[A]]): F[A] =
fa.flatMap { future =>
Async[F].async { cb =>
future.onComplete {
case Success(x) => cb(Right(x))
case Failure(e) => cb(Left(e))
}
}
}
def putStrLn(str: String) = fromFuture(Sync[F].delay(HttpClient.post(str)))
def readLn = fromFuture(Sync[F].delay(HttpClient.get))
}
class TestConsole[F[_]: Applicative](state: Ref[F, List[String]]) extends Console[F] {
def putStrLn(str: String): F[Unit] = state.update(_ :+ str)
def readLn: F[String] = "test".pure[F]
}
test("Console") {
val spec =
for {
state <- Ref.of[IO, List[String]](List.empty[String])
implicit0(c: Console[IO]) = new TestConsole[IO](state)
_ <- program[IO]
st <- state.get
as <- IO { assert(st == List("Enter your name:", "Hello test!")) }
} yield as
spec.unsafeToFuture()
}
val generateChart: IO[Unit] =
for {
b <- getBandsFromFile // IO[List[Band]]
_ <- IO.race(longProcess1(b), longProcess2(b))
id <- generateId
_ <- publishRadioChart(id, SortedSet(b: _*))
_ <- publishTvChart(id, SortedSet(b: _*))
} yield ()
val getBandsFromFile: IO[List[Band]] =
IO {
val file = new File(this.getClass.getClassLoader.getResource("bands.txt").getFile)
new BufferedReader(new FileReader(file))
}.flatMap { br =>
import scala.collection.JavaConverters._
val bands = br.lines.collect(Collectors.toList()).asScala.toList.map(Band)
IO.pure(bands) <* IO(br.close())
}
def generateId: IO[UUID] = IO(UUID.randomUUID())
def longProcess1(bands: List[Band]): IO[Unit] =
putStrLn("Starting process 1") *> IO.sleep(3.seconds) *> putStrLn("Process 1 DONE")
def longProcess2(bands: List[Band]): IO[Unit] =
putStrLn("Starting process 2") *> IO.sleep(2.seconds) *> putStrLn("Process 2 DONE")
def publishRadioChart(id: UUID, bands: SortedSet[Band]): IO[Unit] =
putStrLn(s"Radio Chart for $id: ${bands.map(_.value).mkString(", ")}")
def publishTvChart(id: UUID, bands: SortedSet[Band]): IO[Unit] =
putStrLn(s"TV Chart for $id: ${bands.take(5).map(_.value).mkString(", ")}")
val generateChart: IO[Unit] =
for {
b <- getBandsFromFile // IO[List[Band]]
_ <- IO.race(longProcess1(b), longProcess2(b))
id <- generateId
_ <- publishRadioChart(id, SortedSet(b: _*))
_ <- publishTvChart(id, SortedSet(b: _*))
} yield ()
Can we do better?
trait DataSource[F[_]] {
def bands: F[List[Band]]
}
trait IdGen[F[_]] {
def generate: F[UUID]
}
trait InternalProcess[F[_]] {
def process(bands: SortedSet[Band]): F[Unit]
}
trait RadioChart[F[_]] {
def publish(id: UUID, bands: SortedSet[Band]): F[Unit]
}
trait TvChart[F[_]] {
def publish(id: UUID, bands: SortedSet[Band]): F[Unit]
}
class Charts[F[_]: Monad](
source: DataSource[F],
internal: InternalProcess[F],
idGen: IdGen[F],
radioChart: RadioChart[F],
tvChart: TvChart[F]
) {
def generate: F[Unit] =
for {
b <- source.bands.map(xs => SortedSet(xs: _*))
_ <- internal.process(b)
id <- idGen.generate
_ <- radioChart.publish(b)
_ <- tvChart.publish(b)
} yield ()
}
class MemRadioChart[F[_]: Sync] extends RadioChart[F] {
override def publish(id: UUID, bands: SortedSet[Band]) =
Sync[F].delay {
println(s"Radio Chart for $id: ${bands.map(_.value).mkString(", ")}")
}
}
class MemTvChart[F[_]: Sync] extends TvChart[F] {
override def publish(id: UUID, bands: SortedSet[Band]) =
Sync[F].delay {
println(s"TV Chart for $id: ${bands.map(_.value).take(3).mkString(", ")}")
}
}
class LiveIdGen[F[_]: Sync] extends IdGen[F] {
override def generate: F[UUID] = Sync[F].delay(UUID.randomUUID())
}
class FileDataSource[F[_]](implicit F: Sync[F]) extends DataSource[F] {
override def bands: F[List[Band]] = {
val acquire = F.delay {
val file = new File(getClass.getClassLoader.getResource("bands.txt").getFile)
new BufferedReader(new FileReader(file))
}
acquire.bracket { br =>
import scala.collection.JavaConverters._
br.lines.collect(Collectors.toList()).asScala.toList.map(Band).pure[F]
}(br => F.delay(br.close()))
}
}
class LiveInternalProcess[F[_]](
implicit F: Concurrent[F],
T: Timer[F]
) extends InternalProcess[F] {
private def putStrLn(str: String): F[Unit] = F.delay(println(str))
private def longProcess1(bands: List[Band]): F[Unit] =
putStrLn("Starting process 1") *> T.sleep(3.seconds) *> putStrLn("Process 1 DONE")
private def longProcess2(bands: List[Band]): F[Unit] =
putStrLn("Starting process 2") *> T.sleep(2.seconds) *> putStrLn("Process 2 DONE")
override def process(bands: List[Band]): F[Unit] =
F.race(longProcess1(bands), longProcess2(bands)).void
}
val source: DataSource[IO] = new FileDataSource[IO]
val idGen: IdGen[IO] = new LiveIdGen[IO]
val internal: InternalProcess[IO] = new LiveInternalProcess[IO]
val radioChart: RadioChart[IO] = new MemRadioChart[IO]
val tvChart: TvChart[IO] = new MemTvChart[IO]
val charts = new Charts[IO](source, idGen, internal, radioChart, tvChart)
charts.generate.unsafeRunSync() // HINT: Use `IOApp` instead
class Charts[F[_]: Monad](
source: DataSource[F],
internal: InternalProcess[F],
idGen: IdGen[F],
radioChart: RadioChart[F],
tvChart: TvChart[F]
) {
def generate: F[Unit] =
for {
b <- source.bands.map(xs => SortedSet(xs: _*))
_ <- internal.process(b)
id <- idGen.generate
_ <- radioChart.publish(b)
_ <- tvChart.publish(b)
} yield ()
}
Can we do even better?
class Charts[F[_]: Concurrent: Par](
source: DataSource[F],
internal: InternalProcess[F],
radioChart: RadioChart[F],
tvChart: TvChart[F]
) {
def generate: F[Unit] =
for {
b <- source.bands.map(xs => SortedSet(xs: _*))
_ <- internal.process(b).start
id <- idGen.generate
_ <- (radioChart.publish(id, b), tvChart.publish(id, b)).parTupled.void
} yield ()
}
val charts = new Charts[IO](source, idGen, internal, radioChart, tvChart)
charts.generate.unsafeRunSync() // HINT: Use `IOApp` instead
What about error handling?
charts.generate.attempt.flatMap {
case Right(_) => IO.unit
case Left(e) => putStrLn(s"Failed to generate charts: ${e.getMessage}")
}
charts.generate.handleErrorWith { e =>
putStrLn(s"Failed to generate charts: ${e.getMessage}")
}
def attempt[A](fa: F[A]): F[Either[E, A]]
def handleErrorWith[A](fa: F[A])(f: E => F[A]): F[A]
def chartsRetry[F[_]: Logger: MonadError[?[_], Throwable]: Timer](
charts: Charts[F]
): F[Unit] = {
def resilient(retries: Int): F[Unit] =
charts.generate.handleErrorWith { e =>
Logger[F].info(s"Failed: ${e.getMessage}. Retries left: $retries") >> {
if (retries > 0) Timer[F].sleep(5.seconds) >> resilient(retries - 1)
else Logger[F].error("Program failed after many retries")
}
}
resilient(3)
}
Failed: boom. Retries left: 3
Failed: boom. Retries left: 2
Failed: boom. Retries left: 1
Failed: boom. Retries left: 0
Program failed after many retries
Timer[F]
retry
, attempts
, awakeEvery
, etcFor a more advanced technique check out my two blog posts:
trait Cache[F[_], K, V] {
def get(key: K): F[Option[V]]
def put(key: K, value: V): F[Unit]
}
private class RefCache[F[_]: Clock: Monad, K, V](
state: Ref[F, Map[K, (OffsetDateTime, V)]],
expiresIn: FiniteDuration
) extends Cache[F, K, V] {
import Cache._
def get(key: K): F[Option[V]] =
state.get.map(_.get(key).map { case (_, v) => v })
def put(key: K, value: V): F[Unit] =
DateTime[F](CacheOffset).flatMap { now =>
state.update(_.updated(key, now.plusNanos(expiresIn.toNanos) -> value))
}
}
object Cache {
def of[F[_]: Clock: Concurrent, K, V](
expiresIn: FiniteDuration,
checkOnExpirationsEvery: FiniteDuration
)(implicit T: Timer[F]): F[Cache[F, K, V]] = {
def runExpiration(state: Ref[F, Map[K, (OffsetDateTime, V)]]): F[Unit] = {
val process =
DateTime[F](CacheOffset).flatMap { now =>
state.get.map(_.filter {
case (_, (exp, _)) => exp.isAfter(now.minusNanos(expiresIn.toNanos))
}).flatTap(state.set)
}
T.sleep(checkOnExpirationsEvery) >> process >> runExpiration(state)
}
Ref.of[F, Map[K, (OffsetDateTime, V)]](Map.empty)
.flatTap(runExpiration(_).start.void)
.map(ref => new RefCache[F, K, V](ref, expiresIn))
}
}
It is now part of Mules!
parTraverse
a list of computations, stop on first failure and return successes if any: "Collect Successful".parTraverse
a list of computations and get first successful value or timeout otherwise: "First Successful".Par traverse until first failure and return successful computations
val ioa = IO.sleep(1.second) *> IO.pure("a")
val iob = IO.pure("b")
val ioc = IO.pure("c")
val iod = IO.sleep(3.seconds) *> IO.pure("d")
val ioe = IO.sleep(2.seconds) *> IO.pure("e")
val list = List(ioa, iob, ioc, iod, ioe)
TaskUtil.collectSuccessful(list).flatMap(putStrLn)
res1: List[String] = List(a, b, c, d, e)
Par traverse until first failure and return successful computations
val ioa = IO.sleep(1.second) *> IO.pure("a")
val iob = IO.pure("b")
val ioc = IO.sleep(1.second) *> IO.raiseError(new Exception("boom"))
val iod = IO.sleep(3.seconds) *> IO.pure("d")
val ioe = IO.sleep(2.seconds) *> IO.pure("e")
val list = List(ioa, iob, ioc, iod, ioe)
TaskUtil.collectSuccessful(list).flatMap(putStrLn)
res2: List[String] = List(b, a)
The implementation:
def generic[F[_]: MonadError[?[_], Throwable]: Par, G[_]: Traverse, A](
gfa: G[F[A]],
append: G[A] => A => G[A],
ref: Ref[F, G[A]]
): F[G[A]] =
gfa
.parTraverse(_.attempt.flatTap {
case Right(x) => ref.update(append(_)(x))
case Left(_) => Applicative[F].unit
}.rethrow)
.handleErrorWith(_ => ref.get)
def collectSuccessful[F[_]: Par: Sync](list: List[F[String]]): F[List[String]] =
Ref.of[F, List[String]](List.empty).flatMap { ref =>
import cats.instances.list._
abstractCollectSuccessful[F, List, String](list, g => x => g :+ x, ref)
}
Par traverse until first successful computation and return or timeout
val io1 = IO.sleep(1.second) *> IO.raiseError[String](new Exception("error 1"))
val io2 = IO.sleep(1.1.seconds) *> IO.raiseError[String](new Exception("error 2"))
val io3 = IO.sleep(1.2.seconds) *> IO.pure("success")
val io4 = IO.sleep(1.4.seconds) *> IO.pure("slower success")
val tasks = List(io1, io2, io3, io4)
TaskUtil.firstSuccessful(tasks).flatMap(putStrLn)
res1: String = success
Par traverse until first successful computation and return or timeout
val io1 = IO.sleep(1.second) *> IO.raiseError[String](new Exception("error 1"))
val io2 = IO.sleep(1.1.seconds) *> IO.raiseError[String](new Exception("error 2"))
val tasks = List(io1, io2) // It will time out because there's no successful value
TaskUtil.firstSuccessful(tasks).flatMap(putStrLn)
res2 = java.util.concurrent.TimeoutException: 2 seconds
The implementation:
def tryComplete[F[_]: MonadError[?[_], Throwable], A: Monoid](
d: Deferred[F, A]
)(fa: F[A]): F[A] =
fa.attempt.flatMap {
case Right(x) => d.complete(x).attempt *> x.pure[F] <* new Throwable().raiseError
case Left(_) => Monoid[A].empty.pure[F] // Ignore the errors
}
def firstSuccessful[F[_]: Concurrent: Par: Timer, A: Monoid](list: List[F[A]]): F[A] =
Deferred[F, A].flatMap { d =>
import cats.instances.list._
list.parTraverse(tryComplete[F, A](d)).attempt *> d.get.timeout(2.seconds)
}
Amazing docs, have a look: https://typelevel.org/cats-effect/
Ask questions in the Gitter channel: https://gitter.im/typelevel/cats-effect
And many other projects!