Monthly Archives: September 2018

Scala Promises – Futures In Your Hands

In the previous blog post, we saw how Scala Futures serve as a handy wrapper for running asynchronous tasks and allow non-blocking functional transformations via composable functions. Despite all the goodies, a plain Future, once started, is read-only.

A “manipulable” Future

To make things a little more interesting, let’s take a glimpse into an interesting “container” that holds an “uncertain” Future. Scala provides another abstraction called Promise that allows programmers to have some control in the “when” and “what” in completing a Future. A Promise is like a container holding a Future which can be completed by assigning a value (with success or failure) at any point of time. The catch is that it can only be completed once.

The Promise companion object has the following apply method that creates a DefaultPromise:

As shown below, the DefaultPromise class extends AtomicReference to ensure that a Promise instance will be completed in an atomic fashion.

A trivial producer and consumer

A common use case of Promise is like this:

  1. a Promise which holds an “open” Future is created
  2. run some business logic to come up with some value
  3. complete the Promise by assigning its Future the value via methods like success(), failure(), tryComplete(), etc
  4. return the “closed” Future

Here’s a hello-world example of Scala Promise used in a trivialized producer and consumer:

The above code snippet is rather self-explanatory. The producer running in one thread completes the Promise’s future based on the result of a randomly generated integer and the consumer in another thread checks and reports the value of the completed future.

Simulating a CPU-bound task, again

For up-coming illustrations, let’s borrow the CPU-bound task simulation doWork method used in the coding examples from the previous blog post:

Revisiting first completed Future

Recall that method Future.firstCompletedOf from the previous blog post can be used to capture the first completed Future out of a list of Futures running in parallel:

Now, let’s see how firstCompletedOf is actually implemented in Scala Future using Promise:

In the firstCompletedOf method implementation, the helper callback function firstCompleteHandler for each of the Futures in the input list ensures by means of an AtomicReference that the first completed Future will be the Promise’s future.

First completed Future with a condition

What if we want to get the first completed Future from a number of Futures whose values meet a certain condition? One approach would be to derive from the firstCompletedOf method implementation.

We pick the default ExecutionContext like how we did in some coding examples from the previous blog. Besides the list of Futures, the derived method firstConditionallyCompletedOf[T] would also take a T => Boolean filtering condition as a parameter. Piggybacking on the core logic from method firstCompletedOf, we simply apply the input filter to each of the Futures in the input list before the callback.

First N completed Futures

While at it, rather than just the first completed Future, what if we want to capture the first few completed Futures? Deriving from the firstCompletedOf implementation wouldn’t quite work – the way the helper callback function firstCompleteHandler is structured wouldn’t be useful now that we have a list of Futures to be captured.

We’ll take a straight forward approach of using a var list for capturing the first N (or the size of input Futures, whichever smaller) Future results and update the list inside a synchronized block. Since we want to capture the first few completed Futures (success or failure), we make the return Future consisting of a List[Either[Throwable, T]], rather than just List[T].

Simulating a non-CPU-bound task

Rather than keeping the CPU busy (thus CPU-bound), a non-CPU-bound asynchronous task does not demand extensive processing resource. The following snippet defines a method that mimics a non-CPU-bound asynchronous task which could be, say, a non-blocking call to a remote database. This time, we’re going to run on an Akka Actor system, using the ExecutionContext that comes with its default dispatcher. Besides the Fork/Join Executor provided by the dispatcher, we pick the Akka runtime library also to leverage its high-throughput scheduler.

In this example, a Promise which contains a Future is created and after a random duration, the scheduler triggers the completion of the Future with success or failure depending on the random time.

Launching method nonCPUbound() with some value a few times would yield results similar to the following:

CPU-bound versus non-CPU-bound tasks

By wrapping a CPU-bound task like doWork() with a Future, the task becomes non-blocking but it still consumes processing power. The default ExecutionContext via the scala.concurrent.ExecutionContext.Implicits.global import will optimally set scala.concurrent.context.maxThreads to the number of CPU cores of the machine the application resides on. One can raise the maxThreads and handcraft a custom ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numOfThreads)) to allow more threads to be run. To set the value of maxThreads to, say 16, simply add the following javaOptions to build.sbt.

However, that wouldn’t necessarily make more instances of Future{ doWork() } than the number of CPU cores execute in parallel since each of them consumes CPU resource while executing.

On the other hand, a non-CPU-bound task like nonCPUbound() takes little CPU resource. In this case, configuring an ExecutionContext with more threads than the CPU cores of the local machine can increase performance, since none of the individual tasks would consume anywhere near the full capacity of a CPU core. It’s not uncommon to configure a pool of hundreds of threads to handle a large amount of such tasks on a machine with just a handful of CPU cores.

Futures or Promises?

While the Scala Future API extensively utilizes Promises in its function implementations, we don’t need to explicitly use Promises very often as the Futures API already delivers a suite of common concurrent features for writing asynchronous code. If the business logic doesn’t need Promises, just stick to the Futures API. But for cases in which you need to provide a “contract to be fulfilled at most once in the future”, say, between two modules like the producer/consumer example above, Promises do come in handy.