Tag Archives: callback function

Scala Promises – Futures In Your Hands

In the previous blog post, we saw how Scala Futures serve as a handy wrapper for running asynchronous tasks and allow non-blocking functional transformations via composable functions. Despite all the goodies, a plain Future, once started, is read-only.

A “manipulable” Future

To make things a little more interesting, let’s take a glimpse into an interesting “container” that holds an “uncertain” Future. Scala provides another abstraction called Promise that allows programmers to have some control in the “when” and “what” in completing a Future. A Promise is like a container holding a Future which can be completed by assigning a value (with success or failure) at any point of time. The catch is that it can only be completed once.

The Promise companion object has the following `apply` method that creates a `DefaultPromise`:

object Promise {
  def apply[T](): Promise[T] = new impl.Promise.DefaultPromise[T]()
  // ...
}

As shown below, the DefaultPromise class extends AtomicReference to ensure that a Promise instance will be completed in an atomic fashion.

class DefaultPromise[T] extends AtomicReference[AnyRef](Nil) with Promise[T]

A trivial producer and consumer

A common use case of Promise is like this:

  1. a Promise which holds an “open” Future is created
  2. run some business logic to come up with some value
  3. complete the Promise by assigning its Future the value via methods like `success()`, `failure()`, `tryComplete()`, etc
  4. return the “closed” Future

Here’s a hello-world example of Scala Promise used in a trivialized producer and consumer:

import scala.concurrent.{Future, Promise}
import scala.util.{Success, Failure}
import scala.concurrent.ExecutionContext.Implicits.global
import java.util.concurrent.ThreadLocalRandom

val p = Promise[Int]()
val f = p.future

val producer = Future {
  val r = ThreadLocalRandom.current.nextInt(0, 2000)
  println("Inside producer: r = " + r)
  if (r < 1000)
    p success r
  else
    p failure (new Exception("r > 999!"))
}

val consumer = Future {
  println("Inside consumer!")
  f onComplete {
    case Success(r) => println("Success: r = " + r)
    case Failure(e) => println("Error: " + e)
  }
}

The above code snippet is rather self-explanatory. The producer running in one thread completes the Promise’s future based on the result of a randomly generated integer and the consumer in another thread checks and reports the value of the completed future.

// * Test running #1:

// Inside producer: r = 1278
// producer: scala.concurrent.Future[scala.concurrent.Promise[Int]] = Future()
// consumer: scala.concurrent.Future[Unit] = Future()
// Error: java.lang.Exception: r > 999!

// * Test running #2:

// Inside producer: r = 547
// producer: scala.concurrent.Future[scala.concurrent.Promise[Int]] = Future()
// consumer: scala.concurrent.Future[Unit] = Future()
// Success: r = 547

Simulating a CPU-bound task, again

For up-coming illustrations, let’s borrow the CPU-bound task simulation `doWork` method used in the coding examples from the previous blog post:

case class Result(val id: Int, val millis: Long)

def doWork(id: Int): Result = {
  import java.util.concurrent.ThreadLocalRandom
  val millis = ThreadLocalRandom.current.nextLong(1000, 2000)
  val millisTMO = 1500
  val start = System.currentTimeMillis()

  while ((System.currentTimeMillis() - start) < millis) {}

  if (millis < millisTMO) {
    println("Work Result: " + Result(id, millis))
    Result(id, millis)
  } else {
    println(s"TimeOut: work id $id would take ${millis - millisTMO}ms above limit to complete!")
    throw new Throwable("TimeOut!")
  }
}

Revisiting first completed Future

Recall that method `Future.firstCompletedOf` from the previous blog post can be used to capture the first completed Future out of a list of Futures running in parallel:

val firstCompleted = Future.firstCompletedOf( (1 to 4).map( id =>
  Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
) )

Now, let’s see how `firstCompletedOf` is actually implemented in Scala Future using Promise:

object Future {
  // ...

  def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
    val p = Promise[T]()
    val firstCompleteHandler = new AtomicReference[Promise[T]](p) with (Try[T] => Unit) {
      override def apply(v1: Try[T]): Unit = getAndSet(null) match {
        case null => ()
        case some => some tryComplete v1
      }
    }
    futures foreach { _ onComplete firstCompleteHandler }
    p.future
  }

  // ...
}

In the `firstCompletedOf` method implementation, the helper callback function `firstCompleteHandler` for each of the Futures in the input list ensures by means of an `AtomicReference` that the first completed Future will be the Promise’s future.

First completed Future with a condition

What if we want to get the first completed Future from a number of Futures whose values meet a certain condition? One approach would be to derive from the `firstCompletedOf` method implementation.

We pick the default ExecutionContext like how we did in some coding examples from the previous blog. Besides the list of Futures, the derived method `firstConditionallyCompletedOf[T]` would also take a `T => Boolean` filtering condition as a parameter. Piggybacking on the core logic from method `firstCompletedOf`, we simply apply the input filter to each of the Futures in the input list before the callback.

import scala.concurrent.{ExecutionContext, Future, Promise, Await}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Try, Success, Failure}

import java.util.concurrent.atomic.AtomicReference

def firstConditionallyCompletedOf[T](futures: List[Future[T]], condition: T => Boolean)(
  implicit ec: ExecutionContext): Future[T] = {

  val p = Promise[T]()
  val firstCompleteHandler = new AtomicReference[Promise[T]](p) with (Try[T] => Unit) {
    override def apply(v1: Try[T]): Unit = getAndSet(null) match {
      case null => ()
      case some => some tryComplete v1
    }
  }
  futures.foreach{ _.filter(condition).onComplete(firstCompleteHandler) }
  p.future
}

val firstCompleted = firstConditionallyCompletedOf(
  (1 to 4).map( id =>
    Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
  ),
  (e: Either[Throwable, Result]) => e match {
    case Left(_) => false
    case Right(res) => res.millis < 1250
  }
)

Await.ready(firstCompleted, 10.seconds)

// * Test running #1:

// Work Result: Result(4, 1037)
// Work Result: Result(2, 1268)
// Work Result: Result(3, 1386)
// TimeOut: work id 1 would take 139ms above limit to complete!
// res1: firstCompleted.type = Future(Success(Right(Result(4, 1037))))

// * Test running #2:

// Work Result: Result(4, 1320)
// TimeOut: work id 1 would take 22ms above limit to complete!
// TimeOut: work id 3 would take 253ms above limit to complete!
// TimeOut: work id 2 would take 438ms above limit to complete!
// res2: firstCompleted.type = Future(Failure(
//   java.util.NoSuchElementException: Future.filter predicate is not satisfied
// ))

First N completed Futures

While at it, rather than just the first completed Future, what if we want to capture the first few completed Futures? Deriving from the `firstCompletedOf` implementation wouldn’t quite work – the way the helper callback function `firstCompleteHandler` is structured wouldn’t be useful now that we have a list of Futures to be captured.

We’ll take a straight forward approach of using a `var` list for capturing the first N (or the size of input Futures, whichever smaller) Future results and update the list inside a `synchronized` block. Since we want to capture the first few completed Futures (success or failure), we make the return Future consisting of a `List[Either[Throwable, T]]`, rather than just `List[T]`.

import scala.concurrent.{ExecutionContext, Future, Promise, Await}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Try, Success, Failure}

def firstNCompletedOf[T](n: Int, futures: List[Future[T]])(implicit ec: ExecutionContext):
  Future[List[Either[Throwable, T]]] = {

  val nLimit = n min futures.size
  val p = Promise[List[Either[Throwable, T]]]()
  var list = List.empty[Either[Throwable, T]]
  futures foreach { _.onComplete{ tryT =>
    synchronized {
      if (list.size < nLimit)
        list ::= tryT.toEither
      else
        p trySuccess list.reverse
    }
  } }
  p.future
}

val firstNCompleted = firstNCompletedOf( 3, (1 to 5).toList.map( id => Future{ doWork(id) } ) )

Await.ready(firstNCompleted, 15.seconds)

// Work Result: Result(1,1168)
// Work Result: Result(5,1180)
// TimeOut: work id 3 would take 200ms above limit to complete!
// TimeOut: work id 2 would take 399ms above limit to complete!
// TimeOut: work id 4 would take 448ms above limit to complete!

// res1: firstNCompleted.type = Future(Success(List(
//   Right(Result(1,1168)), Right(Result(5,1180)), Left(java.lang.Throwable: java.lang.Throwable: TimeOut!)
// )))

Simulating a non-CPU-bound task

Rather than keeping the CPU busy (thus CPU-bound), a non-CPU-bound asynchronous task does not demand extensive processing resource. The following snippet defines a method that mimics a non-CPU-bound asynchronous task which could be, say, a non-blocking call to a remote database. This time, we’re going to run on an Akka Actor system, using the ExecutionContext that comes with its default dispatcher. Besides the Fork/Join Executor provided by the dispatcher, we pick the Akka runtime library also to leverage its high-throughput scheduler.

In this example, a Promise which contains a Future is created and after a random duration, the scheduler triggers the completion of the Future with success or failure depending on the random time.

import scala.concurrent.{ExecutionContext, Future, Promise, Await}
import scala.concurrent.duration._
import akka.actor.ActorSystem
implicit val system = ActorSystem("system")
implicit val ec = system.dispatcher

def nonCPUbound[T](value: T): Future[T] = {  
  import java.util.concurrent.ThreadLocalRandom
  val millis = ThreadLocalRandom.current.nextLong(1000, 2000)
  val millisTMO = 1500
  val promise = Promise[T]

  system.scheduler.scheduleOnce(FiniteDuration(millis, MILLISECONDS)) {
    if (millis < millisTMO) {
      println(s"OK: Got value [${value}] in ${millis}ms")
      promise.success(value)
    } else {
      println(s"TimeOut: It would take ${millis - millisTMO}ms above limit to complete!")
      promise.failure(new Throwable("TimeOut!"))
    }
  }

  promise.future
}

Launching method `nonCPUbound()` with some value a few times would yield results similar to the following:

Await.result(nonCPUbound("something"), 2.seconds)

// * Test running #1:

// OK: Got value [something] in 1259ms
// res1: String = something

// * Test running #2:

// TimeOut: It would take 384ms above limit to complete!
// [ERROR] ... java.lang.Throwable: TimeOut!

CPU-bound versus non-CPU-bound tasks

By wrapping a CPU-bound task like `doWork()` with a Future, the task becomes non-blocking but it still consumes processing power. The default ExecutionContext via the `scala.concurrent.ExecutionContext.Implicits.global` import will optimally set `scala.concurrent.context.maxThreads` to the number of CPU cores of the machine the application resides on. One can raise the `maxThreads` and handcraft a custom `ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numOfThreads))` to allow more threads to be run. To set the value of `maxThreads` to, say 16, simply add the following `javaOptions` to `build.sbt`.

javaOptions += "-Dscala.concurrent.context.maxThreads=16"

However, that wouldn’t necessarily make more instances of `Future{ doWork() }` than the number of CPU cores execute in parallel since each of them consumes CPU resource while executing.

On the other hand, a non-CPU-bound task like `nonCPUbound()` takes little CPU resource. In this case, configuring an ExecutionContext with more threads than the CPU cores of the local machine can increase performance, since none of the individual tasks would consume anywhere near the full capacity of a CPU core. It’s not uncommon to configure a pool of hundreds of threads to handle a large amount of such tasks on a machine with just a handful of CPU cores.

Futures or Promises?

While the Scala Future API extensively utilizes Promises in its function implementations, we don’t need to explicitly use Promises very often as the Futures API already delivers a suite of common concurrent features for writing asynchronous code. If the business logic doesn’t need Promises, just stick to the Futures API. But for cases in which you need to provide a “contract to be fulfilled at most once in the future”, say, between two modules like the `producer/consumer` example above, Promises do come in handy.

A Brief Overview Of Scala Futures

As demand for computing performance continues to grow, contemporary applications have been increasingly exploiting the collective processing power of all the available CPU cores to maximize task execution in parallel. But writing asynchronous code requires methodical processing control to avoid issues such as race condition and can be quite challenging even for experienced programmers.

The Scala Future API provides a comprehensive set of functions for writing concurrent, asynchronous code. By design, Scala is a functional programming language with an inherent emphasis on immutability and composability that help avoid issues like race condition and facilitate successive transformations. In this blog post, we’re going to explore how the Scala Futures benefit from those functional features.

Simulating a CPU-bound task

Let’s first prepare a couple of items for upcoming uses:

  1. a `Result` case class representing result of work with a work `id` and time spent in milliseconds
  2. a `doWork` method which mimics executing some CPU-bound work for a random period of time and returns a `Result` object
case class Result(val id: Int, val millis: Long)

def doWork(id: Int): Result = {
  import java.util.concurrent.ThreadLocalRandom
  val millis = ThreadLocalRandom.current.nextLong(1000, 2000)
  val millisTMO = 1500
  val start = System.currentTimeMillis()

  while ((System.currentTimeMillis() - start) < millis) {}

  if (millis < millisTMO) {
    println("Work Result: " + Result(id, millis))
    Result(id, millis)
  } else {
    println(s"TimeOut: work id $id would take ${millis - millisTMO}ms above limit to complete!")
    throw new Throwable("TimeOut!")
  }
}

Note that the side-effecting println within `doWork` is for illustrating when each of the asynchronously launched tasks is executed.

The conventional way of running asynchronous tasks

Using the `doWork` method, the conventional way of asynchronously running a number of tasks typically involves a configurable thread pool using Java Executor to execute the tasks as individual Runnables.

import java.util.concurrent.Executors

case class Task(id: Int) extends Runnable {
  @Override
  def run(): Unit = {
    doWork(id)
  }
}

val pool = Executors.newFixedThreadPool(4)

(1 to 4).foreach( id => pool.execute(Task(id)) )

// Work Result: Result(3, 1119)
// Work Result: Result(1, 1448)
// TimeOut: work id 4 would take 260ms above limit to complete! Exception ...
// TimeOut: work id 2 would take 471ms above limit to complete! Exception ...

Despite the (1 to 4) ordering of the tasks, the chronological work result printouts with shuffled work ids shows that they were processed in parallel. It’s worth noting that method run() does not return a value.

Using Scala Futures

import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Try, Success, Failure}

val pool = Executors.newFixedThreadPool(4)

implicit val ec = ExecutionContext.fromExecutorService(pool)

(1 to 4).foreach( id => Future{ doWork(id) }.onComplete(println) )

// Work Result: Result(1, 1042)
// Work Result: Result(4, 1128)
// Work Result: Result(3, 1399)
// TimeOut: work id 2 would take 302ms above limit to complete! Exception ...

By simply wrapping `doWork` in a Future, each task is now asynchronously executed and results are captured by the `onComplete` callback method. The callback method takes a `Try[T] => U` function and can be expanded to handle the success/failure cases accordingly:

Future{ doWork(id) }.onComplete{
  case Success(res) => println("Success: " + res)
  case Failure(err) => println("Failure: " + err)
}

We’re using the same Executor thread pool which can be configured to optimize for specific computing environment (e.g. number of CPU cores). The implicit ExecutionContext is required for executing callback methods such as `onComplete`. One could also fall back to Scala’s default ExecutionContext, which is a Fork/Join Executor, by simply importing the following:

import scala.concurrent.ExecutionContext.Implicits.global

However, Scala Futures provide a lot more than just a handy wrapper for executing non-blocking tasks with callback methods.

Immutability and composability

Scala Futures involve code running in multiple threads. By adhering to using Scala’s immutable collections, defining values as immutable `val` (contrary to variables as mutable `var`), and relying on functional transformations (as opposed to mutations), one can easily write concurrent code that is thread-safe, avoiding problems such as race condition.

But perhaps one of the most sought-after features of Scala Futures is the support of composable transformations in the functional programming way. For example, we can chain a bunch of Futures via methods like `map` and `filter`:

Future{ doWork(1) }.
  filter( _.millis < 1400 ).
  map( _ => doWork(2) )

// Work Result: Result(1,1133)
// TimeOut: work id 2 would take 333ms above limit to complete!

The above snippet asynchronously runs doWork(1), and if finished within 1400 ms, continues to run the next task doWork(2).

Another example: let’s say we have a number of predefined methods `doChainedWork(id, res)` with id = 1, 2, 3, …, each taking a work result and deriving a new work result like below:

def doChainedWork(id: Int, res: Result): Result = {
  // Take a `Result` and derive a new `Result` from it ...
}

And let’s say we want to successively apply `doChainedWork` in a non-blocking fashion. We can simply wrap each of them in a Future and chain them using flatMap:

Future{ doChainedWork(1) }.
  flatMap( res => Future{ doChainedWork(2, res) } ).
  flatMap( res => Future{ doChainedWork(3, res) } )

Note that neither of the above trivialized example code handles failures, hence will break upon the first exception. Depending on the specific business logic, that might not be desirable.

Using map and recover on Futures

While the `onComplete` callback in the previous example can handle failures, its `Unit` return type hinders composability. This section addresses the very issue.

import scala.concurrent.{Future, Await}
import scala.concurrent.duration._
import scala.util.{Try, Success, Failure}

import scala.concurrent.ExecutionContext.Implicits.global

(1 to 4).map( id =>
  Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
)

Await.result(combinedFuture, 10.seconds)

// Work Result: Result(2, 1074)
// Work Result: Result(1, 1363)
// Work Result: Result(4, 1369)
// TimeOut: work id 2 would take 370ms above limit to complete! Exception ...

// res1: scala.collection.immutable.IndexedSeq[scala.concurrent.
//   Future[Product with Serializable with scala.util.Either[Throwable,Result[Int]]]] =
//   Vector(
//     Future(Success(Right(Result(1, 1363)))),
//     Future(Success(Right(Result(2, 1074)))),
//     Future(Success(Left(java.lang.Throwable: TimeOut!))),
//     Future(Success(Right(Result(4, 1369))))
//   )

In the Future trait, methods `map` and `recover` have the following signature:

def map[S](f: (T) => S)(implicit executor: ExecutionContext): Future[S]

def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U]

When a Future results in success, method `map` applies the provided function to the result. On the other hand, if the Future results in failure with a Throwable, method `recover` applies a given partial function that matches the Throwable. Both methods return a new Future.

In the above sample code, we use `map` to create a Future of Right[Result] when doWork succeeds, and `recover` to create a Future of Left[Throwable] when doWork fails. Using Either[Throwable, Result[Int]] as the return data type, we capture successful and failed return in a type-safe fashion, allowing composition of any additional transformations.

Method `Await` is used to wait for a given duration for the combined Future to complete and return the result.

From a sequence of Futures to a Future of sequence

Oftentimes, when faced with a set of Futures each of which consists of values to be consumed, we would prefer wrapping the set of values within a single Future. For that purpose, the Scala Future companion object provides a useful method `Future.sequence` which converts a sequence of Futures to a Future of sequence.

val combinedFuture = Future.sequence( (1 to 4).map( id =>
  Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
) )

// Work Result: Result(2, 1262)
// Work Result: Result(3, 1490)
// TimeOut: work id 1 would take 6ms above limit to complete!
// TimeOut: work id 4 would take 405ms above limit to complete!

// res1: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[
//   Product with Serializable with scala.util.Either[Throwable,Result]]] =
//   Future(Success(Vector(
//     Left(java.lang.Throwable: TimeOut!),
//     Right(Result(2, 1262)),
//     Right(Result(3, 1490)),
//     Left(java.lang.Throwable: TimeOut!)
//   )))

As shown in the example, a collection of `Future[Either[Throwable,Result]]` is transformed into a single Future of `Either[Throwable,Result]` elements.

Or, we could use method `Future.traverse` which is a more generalized version of `Future.sequence`. It allows one to provide a function, `f: A => Future[B]`, as an additional input to be applied to the items in the individual Futures of the input sequence. The following snippet that takes a `(1 to 4)` range and an `Int => Future[Either[Throwable,Result]]` function as input carries out the same transformation as the above `Future.sequence` snippet.

val combinedFuture = Future.traverse(1 to 4)( id =>
  Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
)

First completed Future

If one only cares about the first completed Future out of a list of Futures launched in parallel, the Scala Future companion object provides a handy method `Future.firstCompletedOf`.

val firstCompleted = Future.firstCompletedOf( (1 to 4).map( id =>
  Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
) )

Await.ready(firstCompleted, 10.seconds)

// Work Result: Result(4, 1071)
// Work Result: Result(2, 1181)
// Work Result: Result(1, 1321)
// TimeOut: work id 3 would take 152ms above limit to complete!

// res1: Product with Serializable with scala.util.Either[Throwable,Result] = Right(Result(4, 1071))

Grasping the “future”

As the code examples have demonstrated so far, Scala Future is an abstraction which returns a value in the future. A take-away point is that once a Future is “kicked off”, it’s in some sense “untouchable” and its value won’t be available till it’s completed with success or failure. In another blog post some other time, we’ll explore how one can seize a little more control in the “when” and “what” in completing a Future.