Monthly Archives: August 2018

A Brief Overview Of Scala Futures

As demand for computing performance continues to grow, contemporary applications have been increasingly exploiting the collective processing power of all the available CPU cores to maximize task execution in parallel. But writing asynchronous code requires methodical processing control to avoid issues such as race condition and can be quite challenging even for experienced programmers.

The Scala Future API provides a comprehensive set of functions for writing concurrent, asynchronous code. By design, Scala is a functional programming language with an inherent emphasis on immutability and composability that help avoid issues like race condition and facilitate successive transformations. In this blog post, we’re going to explore how the Scala Futures benefit from those functional features.

Simulating a CPU-bound task

Let’s first prepare a couple of items for upcoming uses:

  1. a `Result` case class representing result of work with a work `id` and time spent in milliseconds
  2. a `doWork` method which mimics executing some CPU-bound work for a random period of time and returns a `Result` object
case class Result(val id: Int, val millis: Long)

def doWork(id: Int): Result = {
  import java.util.concurrent.ThreadLocalRandom
  val millis = ThreadLocalRandom.current.nextLong(1000, 2000)
  val millisTMO = 1500
  val start = System.currentTimeMillis()

  while ((System.currentTimeMillis() - start) < millis) {}

  if (millis < millisTMO) {
    println("Work Result: " + Result(id, millis))
    Result(id, millis)
  } else {
    println(s"TimeOut: work id $id would take ${millis - millisTMO}ms above limit to complete!")
    throw new Throwable("TimeOut!")
  }
}

Note that the side-effecting println within `doWork` is for illustrating when each of the asynchronously launched tasks is executed.

The conventional way of running asynchronous tasks

Using the `doWork` method, the conventional way of asynchronously running a number of tasks typically involves a configurable thread pool using Java Executor to execute the tasks as individual Runnables.

import java.util.concurrent.Executors

case class Task(id: Int) extends Runnable {
  @Override
  def run(): Unit = {
    doWork(id)
  }
}

val pool = Executors.newFixedThreadPool(4)

(1 to 4).foreach( id => pool.execute(Task(id)) )

// Work Result: Result(3, 1119)
// Work Result: Result(1, 1448)
// TimeOut: work id 4 would take 260ms above limit to complete! Exception ...
// TimeOut: work id 2 would take 471ms above limit to complete! Exception ...

Despite the (1 to 4) ordering of the tasks, the chronological work result printouts with shuffled work ids shows that they were processed in parallel. It’s worth noting that method run() does not return a value.

Using Scala Futures

import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Try, Success, Failure}

val pool = Executors.newFixedThreadPool(4)

implicit val ec = ExecutionContext.fromExecutorService(pool)

(1 to 4).foreach( id => Future{ doWork(id) }.onComplete(println) )

// Work Result: Result(1, 1042)
// Work Result: Result(4, 1128)
// Work Result: Result(3, 1399)
// TimeOut: work id 2 would take 302ms above limit to complete! Exception ...

By simply wrapping `doWork` in a Future, each task is now asynchronously executed and results are captured by the `onComplete` callback method. The callback method takes a `Try[T] => U` function and can be expanded to handle the success/failure cases accordingly:

Future{ doWork(id) }.onComplete{
  case Success(res) => println("Success: " + res)
  case Failure(err) => println("Failure: " + err)
}

We’re using the same Executor thread pool which can be configured to optimize for specific computing environment (e.g. number of CPU cores). The implicit ExecutionContext is required for executing callback methods such as `onComplete`. One could also fall back to Scala’s default ExecutionContext, which is a Fork/Join Executor, by simply importing the following:

import scala.concurrent.ExecutionContext.Implicits.global

However, Scala Futures provide a lot more than just a handy wrapper for executing non-blocking tasks with callback methods.

Immutability and composability

Scala Futures involve code running in multiple threads. By adhering to using Scala’s immutable collections, defining values as immutable `val` (contrary to variables as mutable `var`), and relying on functional transformations (as opposed to mutations), one can easily write concurrent code that is thread-safe, avoiding problems such as race condition.

But perhaps one of the most sought-after features of Scala Futures is the support of composable transformations in the functional programming way. For example, we can chain a bunch of Futures via methods like `map` and `filter`:

Future{ doWork(1) }.
  filter( _.millis < 1400 ).
  map( _ => doWork(2) )

// Work Result: Result(1,1133)
// TimeOut: work id 2 would take 333ms above limit to complete!

The above snippet asynchronously runs doWork(1), and if finished within 1400 ms, continues to run the next task doWork(2).

Another example: let’s say we have a number of predefined methods `doChainedWork(id, res)` with id = 1, 2, 3, …, each taking a work result and deriving a new work result like below:

def doChainedWork(id: Int, res: Result): Result = {
  // Take a `Result` and derive a new `Result` from it ...
}

And let’s say we want to successively apply `doChainedWork` in a non-blocking fashion. We can simply wrap each of them in a Future and chain them using flatMap:

Future{ doChainedWork(1) }.
  flatMap( res => Future{ doChainedWork(2, res) } ).
  flatMap( res => Future{ doChainedWork(3, res) } )

Note that neither of the above trivialized example code handles failures, hence will break upon the first exception. Depending on the specific business logic, that might not be desirable.

Using map and recover on Futures

While the `onComplete` callback in the previous example can handle failures, its `Unit` return type hinders composability. This section addresses the very issue.

import scala.concurrent.{Future, Await}
import scala.concurrent.duration._
import scala.util.{Try, Success, Failure}

import scala.concurrent.ExecutionContext.Implicits.global

(1 to 4).map( id =>
  Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
)

Await.result(combinedFuture, 10.seconds)

// Work Result: Result(2, 1074)
// Work Result: Result(1, 1363)
// Work Result: Result(4, 1369)
// TimeOut: work id 2 would take 370ms above limit to complete! Exception ...

// res1: scala.collection.immutable.IndexedSeq[scala.concurrent.
//   Future[Product with Serializable with scala.util.Either[Throwable,Result[Int]]]] =
//   Vector(
//     Future(Success(Right(Result(1, 1363)))),
//     Future(Success(Right(Result(2, 1074)))),
//     Future(Success(Left(java.lang.Throwable: TimeOut!))),
//     Future(Success(Right(Result(4, 1369))))
//   )

In the Future trait, methods `map` and `recover` have the following signature:

def map[S](f: (T) => S)(implicit executor: ExecutionContext): Future[S]

def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U]

When a Future results in success, method `map` applies the provided function to the result. On the other hand, if the Future results in failure with a Throwable, method `recover` applies a given partial function that matches the Throwable. Both methods return a new Future.

In the above sample code, we use `map` to create a Future of Right[Result] when doWork succeeds, and `recover` to create a Future of Left[Throwable] when doWork fails. Using Either[Throwable, Result[Int]] as the return data type, we capture successful and failed return in a type-safe fashion, allowing composition of any additional transformations.

Method `Await` is used to wait for a given duration for the combined Future to complete and return the result.

From a sequence of Futures to a Future of sequence

Oftentimes, when faced with a set of Futures each of which consists of values to be consumed, we would prefer wrapping the set of values within a single Future. For that purpose, the Scala Future companion object provides a useful method `Future.sequence` which converts a sequence of Futures to a Future of sequence.

val combinedFuture = Future.sequence( (1 to 4).map( id =>
  Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
) )

// Work Result: Result(2, 1262)
// Work Result: Result(3, 1490)
// TimeOut: work id 1 would take 6ms above limit to complete!
// TimeOut: work id 4 would take 405ms above limit to complete!

// res1: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[
//   Product with Serializable with scala.util.Either[Throwable,Result]]] =
//   Future(Success(Vector(
//     Left(java.lang.Throwable: TimeOut!),
//     Right(Result(2, 1262)),
//     Right(Result(3, 1490)),
//     Left(java.lang.Throwable: TimeOut!)
//   )))

As shown in the example, a collection of `Future[Either[Throwable,Result]]` is transformed into a single Future of `Either[Throwable,Result]` elements.

Or, we could use method `Future.traverse` which is a more generalized version of `Future.sequence`. It allows one to provide a function, `f: A => Future[B]`, as an additional input to be applied to the items in the individual Futures of the input sequence. The following snippet that takes a `(1 to 4)` range and an `Int => Future[Either[Throwable,Result]]` function as input carries out the same transformation as the above `Future.sequence` snippet.

val combinedFuture = Future.traverse(1 to 4)( id =>
  Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
)

First completed Future

If one only cares about the first completed Future out of a list of Futures launched in parallel, the Scala Future companion object provides a handy method `Future.firstCompletedOf`.

val firstCompleted = Future.firstCompletedOf( (1 to 4).map( id =>
  Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
) )

Await.ready(firstCompleted, 10.seconds)

// Work Result: Result(4, 1071)
// Work Result: Result(2, 1181)
// Work Result: Result(1, 1321)
// TimeOut: work id 3 would take 152ms above limit to complete!

// res1: Product with Serializable with scala.util.Either[Throwable,Result] = Right(Result(4, 1071))

Grasping the “future”

As the code examples have demonstrated so far, Scala Future is an abstraction which returns a value in the future. A take-away point is that once a Future is “kicked off”, it’s in some sense “untouchable” and its value won’t be available till it’s completed with success or failure. In another blog post some other time, we’ll explore how one can seize a little more control in the “when” and “what” in completing a Future.