Tag Archives: asynchronous programming

Scala Promises – Futures In Your Hands

In the previous blog post, we saw how Scala Futures serve as a handy wrapper for running asynchronous tasks and allow non-blocking functional transformations via composable functions. Despite all the goodies, a plain Future, once started, is read-only.

A “manipulable” Future

To make things a little more interesting, let’s take a glimpse into an interesting “container” that holds an “uncertain” Future. Scala provides another abstraction called Promise that allows programmers to have some control in the “when” and “what” in completing a Future. A Promise is like a container holding a Future which can be completed by assigning a value (with success or failure) at any point of time. The catch is that it can only be completed once.

The Promise companion object has the following apply method that creates a DefaultPromise:

As shown below, the DefaultPromise class extends AtomicReference to ensure that a Promise instance will be completed in an atomic fashion.

A trivial producer and consumer

A common use case of Promise is like this:

  1. a Promise which holds an “open” Future is created
  2. run some business logic to come up with some value
  3. complete the Promise by assigning its Future the value via methods like success(), failure(), tryComplete(), etc
  4. return the “closed” Future

Here’s a hello-world example of Scala Promise used in a trivialized producer and consumer:

The above code snippet is rather self-explanatory. The producer running in one thread completes the Promise’s future based on the result of a randomly generated integer and the consumer in another thread checks and reports the value of the completed future.

Simulating a CPU-bound task, again

For up-coming illustrations, let’s borrow the CPU-bound task simulation doWork method used in the coding examples from the previous blog post:

Revisiting first completed Future

Recall that method Future.firstCompletedOf from the previous blog post can be used to capture the first completed Future out of a list of Futures running in parallel:

Now, let’s see how firstCompletedOf is actually implemented in Scala Future using Promise:

In the firstCompletedOf method implementation, the helper callback function firstCompleteHandler for each of the Futures in the input list ensures by means of an AtomicReference that the first completed Future will be the Promise’s future.

First completed Future with a condition

What if we want to get the first completed Future from a number of Futures whose values meet a certain condition? One approach would be to derive from the firstCompletedOf method implementation.

We pick the default ExecutionContext like how we did in some coding examples from the previous blog. Besides the list of Futures, the derived method firstConditionallyCompletedOf[T] would also take a T => Boolean filtering condition as a parameter. Piggybacking on the core logic from method firstCompletedOf, we simply apply the input filter to each of the Futures in the input list before the callback.

First N completed Futures

While at it, rather than just the first completed Future, what if we want to capture the first few completed Futures? Deriving from the firstCompletedOf implementation wouldn’t quite work – the way the helper callback function firstCompleteHandler is structured wouldn’t be useful now that we have a list of Futures to be captured.

We’ll take a straight forward approach of using a var list for capturing the first N (or the size of input Futures, whichever smaller) Future results and update the list inside a synchronized block. Since we want to capture the first few completed Futures (success or failure), we make the return Future consisting of a List[Either[Throwable, T]], rather than just List[T].

Simulating a non-CPU-bound task

Rather than keeping the CPU busy (thus CPU-bound), a non-CPU-bound asynchronous task does not demand extensive processing resource. The following snippet defines a method that mimics a non-CPU-bound asynchronous task which could be, say, a non-blocking call to a remote database. This time, we’re going to run on an Akka Actor system, using the ExecutionContext that comes with its default dispatcher. Besides the Fork/Join Executor provided by the dispatcher, we pick the Akka runtime library also to leverage its high-throughput scheduler.

In this example, a Promise which contains a Future is created and after a random duration, the scheduler triggers the completion of the Future with success or failure depending on the random time.

Launching method nonCPUbound() with some value a few times would yield results similar to the following:

CPU-bound versus non-CPU-bound tasks

By wrapping a CPU-bound task like doWork() with a Future, the task becomes non-blocking but it still consumes processing power. The default ExecutionContext via the scala.concurrent.ExecutionContext.Implicits.global import will optimally set scala.concurrent.context.maxThreads to the number of CPU cores of the machine the application resides on. One can raise the maxThreads and handcraft a custom ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numOfThreads)) to allow more threads to be run. To set the value of maxThreads to, say 16, simply add the following javaOptions to build.sbt.

However, that wouldn’t necessarily make more instances of Future{ doWork() } than the number of CPU cores execute in parallel since each of them consumes CPU resource while executing.

On the other hand, a non-CPU-bound task like nonCPUbound() takes little CPU resource. In this case, configuring an ExecutionContext with more threads than the CPU cores of the local machine can increase performance, since none of the individual tasks would consume anywhere near the full capacity of a CPU core. It’s not uncommon to configure a pool of hundreds of threads to handle a large amount of such tasks on a machine with just a handful of CPU cores.

Futures or Promises?

While the Scala Future API extensively utilizes Promises in its function implementations, we don’t need to explicitly use Promises very often as the Futures API already delivers a suite of common concurrent features for writing asynchronous code. If the business logic doesn’t need Promises, just stick to the Futures API. But for cases in which you need to provide a “contract to be fulfilled at most once in the future”, say, between two modules like the producer/consumer example above, Promises do come in handy.

A Brief Overview Of Scala Futures

As demand for computing performance continues to grow, contemporary applications have been increasingly exploiting the collective processing power of all the available CPU cores to maximize task execution in parallel. But writing asynchronous code requires methodical processing control to avoid issues such as race condition and can be quite challenging even for experienced programmers.

The Scala Future API provides a comprehensive set of functions for writing concurrent, asynchronous code. By design, Scala is a functional programming language with an inherent emphasis on immutability and composability that help avoid issues like race condition and facilitate successive transformations. In this blog post, we’re going to explore how the Scala Futures benefit from those functional features.

Simulating a CPU-bound task

Let’s first prepare a couple of items for upcoming uses:

  1. a Result case class representing result of work with a work id and time spent in milliseconds
  2. a doWork method which mimics executing some CPU-bound work for a random period of time and returns a Result object

Note that the side-effecting println within doWork is for illustrating when each of the asynchronously launched tasks is executed.

The conventional way of running asynchronous tasks

Using the doWork method, the conventional way of asynchronously running a number of tasks typically involves a configurable thread pool using Java Executor to execute the tasks as individual Runnables.

Despite the (1 to 4) ordering of the tasks, the chronological work result printouts with shuffled work ids shows that they were processed in parallel. It’s worth noting that method run() does not return a value.

Using Scala Futures

By simply wrapping doWork in a Future, each task is now asynchronously executed and results are captured by the onComplete callback method. The callback method takes a Try[T] => U function and can be expanded to handle the success/failure cases accordingly:

We’re using the same Executor thread pool which can be configured to optimize for specific computing environment (e.g. number of CPU cores). The implicit ExecutionContext is required for executing callback methods such as onComplete. One could also fall back to Scala’s default ExecutionContext, which is a Fork/Join Executor, by simply importing the following:

However, Scala Futures provide a lot more than just a handy wrapper for executing non-blocking tasks with callback methods.

Immutability and composability

Scala Futures involve code running in multiple threads. By adhering to using Scala’s immutable collections, defining values as immutable val (contrary to variables as mutable var), and relying on functional transformations (as opposed to mutations), one can easily write concurrent code that is thread-safe, avoiding problems such as race condition.

But perhaps one of the most sought-after features of Scala Futures is the support of composable transformations in the functional programming way. For example, we can chain a bunch of Futures via methods like map and filter:

The above snippet asynchronously runs doWork(1), and if finished within 1400 ms, continues to run the next task doWork(2).

Another example: let’s say we have a number of predefined methods doChainedWork(id, res) with id = 1, 2, 3, …, each taking a work result and deriving a new work result like below:

And let’s say we want to successively apply doChainedWork in a non-blocking fashion. We can simply wrap each of them in a Future and chain them using flatMap:

Note that neither of the above trivialized example code handles failures, hence will break upon the first exception. Depending on the specific business logic, that might not be desirable.

Using map and recover on Futures

While the onComplete callback in the previous example can handle failures, its Unit return type hinders composability. This section addresses the very issue.

In the Future trait, methods map and recover have the following signature:

When a Future results in success, method map applies the provided function to the result. On the other hand, if the Future results in failure with a Throwable, method recover applies a given partial function that matches the Throwable. Both methods return a new Future.

In the above sample code, we use map to create a Future of Right[Result] when doWork succeeds, and recover to create a Future of Left[Throwable] when doWork fails. Using Either[Throwable, Result[Int]] as the return data type, we capture successful and failed return in a type-safe fashion, allowing composition of any additional transformations.

Method Await is used to wait for a given duration for the combined Future to complete and return the result.

From a sequence of Futures to a Future of sequence

Oftentimes, when faced with a set of Futures each of which consists of values to be consumed, we would prefer wrapping the set of values within a single Future. For that purpose, the Scala Future companion object provides a useful method Future.sequence which converts a sequence of Futures to a Future of sequence.

As shown in the example, a collection of Future[Either[Throwable,Result]] is transformed into a single Future of Either[Throwable,Result] elements.

Or, we could use method Future.traverse which is a more generalized version of Future.sequence. It allows one to provide a function, f: A => Future[B], as an additional input to be applied to the items in the individual Futures of the input sequence. The following snippet that takes a (1 to 4) range and an Int => Future[Either[Throwable,Result]] function as input carries out the same transformation as the above Future.sequence snippet.

First completed Future

If one only cares about the first completed Future out of a list of Futures launched in parallel, the Scala Future companion object provides a handy method Future.firstCompletedOf.

Grasping the “future”

As the code examples have demonstrated so far, Scala Future is an abstraction which returns a value in the future. A take-away point is that once a Future is “kicked off”, it’s in some sense “untouchable” and its value won’t be available till it’s completed with success or failure. In another blog post some other time, we’ll explore how one can seize a little more control in the “when” and “what” in completing a Future.