Monthly Archives: September 2018

Scala Promises – Futures In Your Hands

In the previous blog post, we saw how Scala Futures serve as a handy wrapper for running asynchronous tasks and allow non-blocking functional transformations via composable functions. Despite all the goodies, a plain Future, once started, is read-only.

A “manipulable” Future

To make things a little more interesting, let’s take a glimpse into an interesting “container” that holds an “uncertain” Future. Scala provides another abstraction called Promise that allows programmers to have some control in the “when” and “what” in completing a Future. A Promise is like a container holding a Future which can be completed by assigning a value (with success or failure) at any point of time. The catch is that it can only be completed once.

The Promise companion object has the following `apply` method that creates a `DefaultPromise`:

object Promise {
  def apply[T](): Promise[T] = new impl.Promise.DefaultPromise[T]()
  // ...
}

As shown below, the DefaultPromise class extends AtomicReference to ensure that a Promise instance will be completed in an atomic fashion.

class DefaultPromise[T] extends AtomicReference[AnyRef](Nil) with Promise[T]

A trivial producer and consumer

A common use case of Promise is like this:

  1. a Promise which holds an “open” Future is created
  2. run some business logic to come up with some value
  3. complete the Promise by assigning its Future the value via methods like `success()`, `failure()`, `tryComplete()`, etc
  4. return the “closed” Future

Here’s a hello-world example of Scala Promise used in a trivialized producer and consumer:

import scala.concurrent.{Future, Promise}
import scala.util.{Success, Failure}
import scala.concurrent.ExecutionContext.Implicits.global
import java.util.concurrent.ThreadLocalRandom

val p = Promise[Int]()
val f = p.future

val producer = Future {
  val r = ThreadLocalRandom.current.nextInt(0, 2000)
  println("Inside producer: r = " + r)
  if (r < 1000)
    p success r
  else
    p failure (new Exception("r > 999!"))
}

val consumer = Future {
  println("Inside consumer!")
  f onComplete {
    case Success(r) => println("Success: r = " + r)
    case Failure(e) => println("Error: " + e)
  }
}

The above code snippet is rather self-explanatory. The producer running in one thread completes the Promise’s future based on the result of a randomly generated integer and the consumer in another thread checks and reports the value of the completed future.

// * Test running #1:

// Inside producer: r = 1278
// producer: scala.concurrent.Future[scala.concurrent.Promise[Int]] = Future()
// consumer: scala.concurrent.Future[Unit] = Future()
// Error: java.lang.Exception: r > 999!

// * Test running #2:

// Inside producer: r = 547
// producer: scala.concurrent.Future[scala.concurrent.Promise[Int]] = Future()
// consumer: scala.concurrent.Future[Unit] = Future()
// Success: r = 547

Simulating a CPU-bound task, again

For up-coming illustrations, let’s borrow the CPU-bound task simulation `doWork` method used in the coding examples from the previous blog post:

case class Result(val id: Int, val millis: Long)

def doWork(id: Int): Result = {
  import java.util.concurrent.ThreadLocalRandom
  val millis = ThreadLocalRandom.current.nextLong(1000, 2000)
  val millisTMO = 1500
  val start = System.currentTimeMillis()

  while ((System.currentTimeMillis() - start) < millis) {}

  if (millis < millisTMO) {
    println("Work Result: " + Result(id, millis))
    Result(id, millis)
  } else {
    println(s"TimeOut: work id $id would take ${millis - millisTMO}ms above limit to complete!")
    throw new Throwable("TimeOut!")
  }
}

Revisiting first completed Future

Recall that method `Future.firstCompletedOf` from the previous blog post can be used to capture the first completed Future out of a list of Futures running in parallel:

val firstCompleted = Future.firstCompletedOf( (1 to 4).map( id =>
  Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
) )

Now, let’s see how `firstCompletedOf` is actually implemented in Scala Future using Promise:

object Future {
  // ...

  def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
    val p = Promise[T]()
    val firstCompleteHandler = new AtomicReference[Promise[T]](p) with (Try[T] => Unit) {
      override def apply(v1: Try[T]): Unit = getAndSet(null) match {
        case null => ()
        case some => some tryComplete v1
      }
    }
    futures foreach { _ onComplete firstCompleteHandler }
    p.future
  }

  // ...
}

In the `firstCompletedOf` method implementation, the helper callback function `firstCompleteHandler` for each of the Futures in the input list ensures by means of an `AtomicReference` that the first completed Future will be the Promise’s future.

First completed Future with a condition

What if we want to get the first completed Future from a number of Futures whose values meet a certain condition? One approach would be to derive from the `firstCompletedOf` method implementation.

We pick the default ExecutionContext like how we did in some coding examples from the previous blog. Besides the list of Futures, the derived method `firstConditionallyCompletedOf[T]` would also take a `T => Boolean` filtering condition as a parameter. Piggybacking on the core logic from method `firstCompletedOf`, we simply apply the input filter to each of the Futures in the input list before the callback.

import scala.concurrent.{ExecutionContext, Future, Promise, Await}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Try, Success, Failure}

import java.util.concurrent.atomic.AtomicReference

def firstConditionallyCompletedOf[T](futures: List[Future[T]], condition: T => Boolean)(
  implicit ec: ExecutionContext): Future[T] = {

  val p = Promise[T]()
  val firstCompleteHandler = new AtomicReference[Promise[T]](p) with (Try[T] => Unit) {
    override def apply(v1: Try[T]): Unit = getAndSet(null) match {
      case null => ()
      case some => some tryComplete v1
    }
  }
  futures.foreach{ _.filter(condition).onComplete(firstCompleteHandler) }
  p.future
}

val firstCompleted = firstConditionallyCompletedOf(
  (1 to 4).map( id =>
    Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
  ),
  (e: Either[Throwable, Result]) => e match {
    case Left(_) => false
    case Right(res) => res.millis < 1250
  }
)

Await.ready(firstCompleted, 10.seconds)

// * Test running #1:

// Work Result: Result(4, 1037)
// Work Result: Result(2, 1268)
// Work Result: Result(3, 1386)
// TimeOut: work id 1 would take 139ms above limit to complete!
// res1: firstCompleted.type = Future(Success(Right(Result(4, 1037))))

// * Test running #2:

// Work Result: Result(4, 1320)
// TimeOut: work id 1 would take 22ms above limit to complete!
// TimeOut: work id 3 would take 253ms above limit to complete!
// TimeOut: work id 2 would take 438ms above limit to complete!
// res2: firstCompleted.type = Future(Failure(
//   java.util.NoSuchElementException: Future.filter predicate is not satisfied
// ))

First N completed Futures

While at it, rather than just the first completed Future, what if we want to capture the first few completed Futures? Deriving from the `firstCompletedOf` implementation wouldn’t quite work – the way the helper callback function `firstCompleteHandler` is structured wouldn’t be useful now that we have a list of Futures to be captured.

We’ll take a straight forward approach of using a `var` list for capturing the first N (or the size of input Futures, whichever smaller) Future results and update the list inside a `synchronized` block. Since we want to capture the first few completed Futures (success or failure), we make the return Future consisting of a `List[Either[Throwable, T]]`, rather than just `List[T]`.

import scala.concurrent.{ExecutionContext, Future, Promise, Await}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Try, Success, Failure}

def firstNCompletedOf[T](n: Int, futures: List[Future[T]])(implicit ec: ExecutionContext):
  Future[List[Either[Throwable, T]]] = {

  val nLimit = n min futures.size
  val p = Promise[List[Either[Throwable, T]]]()
  var list = List.empty[Either[Throwable, T]]
  futures foreach { _.onComplete{ tryT =>
    synchronized {
      if (list.size < nLimit)
        list ::= tryT.toEither
      else
        p trySuccess list.reverse
    }
  } }
  p.future
}

val firstNCompleted = firstNCompletedOf( 3, (1 to 5).toList.map( id => Future{ doWork(id) } ) )

Await.ready(firstNCompleted, 15.seconds)

// Work Result: Result(1,1168)
// Work Result: Result(5,1180)
// TimeOut: work id 3 would take 200ms above limit to complete!
// TimeOut: work id 2 would take 399ms above limit to complete!
// TimeOut: work id 4 would take 448ms above limit to complete!

// res1: firstNCompleted.type = Future(Success(List(
//   Right(Result(1,1168)), Right(Result(5,1180)), Left(java.lang.Throwable: java.lang.Throwable: TimeOut!)
// )))

Simulating a non-CPU-bound task

Rather than keeping the CPU busy (thus CPU-bound), a non-CPU-bound asynchronous task does not demand extensive processing resource. The following snippet defines a method that mimics a non-CPU-bound asynchronous task which could be, say, a non-blocking call to a remote database. This time, we’re going to run on an Akka Actor system, using the ExecutionContext that comes with its default dispatcher. Besides the Fork/Join Executor provided by the dispatcher, we pick the Akka runtime library also to leverage its high-throughput scheduler.

In this example, a Promise which contains a Future is created and after a random duration, the scheduler triggers the completion of the Future with success or failure depending on the random time.

import scala.concurrent.{ExecutionContext, Future, Promise, Await}
import scala.concurrent.duration._
import akka.actor.ActorSystem
implicit val system = ActorSystem("system")
implicit val ec = system.dispatcher

def nonCPUbound[T](value: T): Future[T] = {  
  import java.util.concurrent.ThreadLocalRandom
  val millis = ThreadLocalRandom.current.nextLong(1000, 2000)
  val millisTMO = 1500
  val promise = Promise[T]

  system.scheduler.scheduleOnce(FiniteDuration(millis, MILLISECONDS)) {
    if (millis < millisTMO) {
      println(s"OK: Got value [${value}] in ${millis}ms")
      promise.success(value)
    } else {
      println(s"TimeOut: It would take ${millis - millisTMO}ms above limit to complete!")
      promise.failure(new Throwable("TimeOut!"))
    }
  }

  promise.future
}

Launching method `nonCPUbound()` with some value a few times would yield results similar to the following:

Await.result(nonCPUbound("something"), 2.seconds)

// * Test running #1:

// OK: Got value [something] in 1259ms
// res1: String = something

// * Test running #2:

// TimeOut: It would take 384ms above limit to complete!
// [ERROR] ... java.lang.Throwable: TimeOut!

CPU-bound versus non-CPU-bound tasks

By wrapping a CPU-bound task like `doWork()` with a Future, the task becomes non-blocking but it still consumes processing power. The default ExecutionContext via the `scala.concurrent.ExecutionContext.Implicits.global` import will optimally set `scala.concurrent.context.maxThreads` to the number of CPU cores of the machine the application resides on. One can raise the `maxThreads` and handcraft a custom `ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numOfThreads))` to allow more threads to be run. To set the value of `maxThreads` to, say 16, simply add the following `javaOptions` to `build.sbt`.

javaOptions += "-Dscala.concurrent.context.maxThreads=16"

However, that wouldn’t necessarily make more instances of `Future{ doWork() }` than the number of CPU cores execute in parallel since each of them consumes CPU resource while executing.

On the other hand, a non-CPU-bound task like `nonCPUbound()` takes little CPU resource. In this case, configuring an ExecutionContext with more threads than the CPU cores of the local machine can increase performance, since none of the individual tasks would consume anywhere near the full capacity of a CPU core. It’s not uncommon to configure a pool of hundreds of threads to handle a large amount of such tasks on a machine with just a handful of CPU cores.

Futures or Promises?

While the Scala Future API extensively utilizes Promises in its function implementations, we don’t need to explicitly use Promises very often as the Futures API already delivers a suite of common concurrent features for writing asynchronous code. If the business logic doesn’t need Promises, just stick to the Futures API. But for cases in which you need to provide a “contract to be fulfilled at most once in the future”, say, between two modules like the `producer/consumer` example above, Promises do come in handy.