Tag Archives: scala

Scala’s groupMap And groupMapReduce

For grouping elements in a Scala collection by a provided key, the de facto method of choice has been groupBy, which has the following signature for an `Iterable`:

// Method groupBy
def groupBy[K](f: (A) => K): immutable.Map[K, Iterable[A]]

It returns an immutable Map of elements each consisting of a key and a collection of values of the original type. To process this collection of values in the resulting Map, Scala provides a method mapValues with the below signature:

// Method mapValues
def mapValues[W](f: (V) => W): Map[K, W]

This `groupBy/mapValues` combo proves to be handy for processing the values of the Map generated from the grouping. However, as of Scala 2.13, method `mapValues` is no longer available.

groupMap

A new method, groupMap, has emerged for grouping of a collection based on provided functions for defining the keys and values of the resulting Map. Here’s the signature of method groupMap for an `Iterable`:

// Method groupMap
def groupMap[K, B](key: (A) => K)(f: (A) => B): immutable.Map[K, Iterable[B]]

Let’s start with a simple example grouping via the good old `groupBy` method:

// Example 1: groupBy
val fruits = List("apple", "apple", "orange", "pear", "pear", "pear")

fruits.groupBy(identity)
// res1: Map[String, List[String]] = Map(
//   "orange" -> List("orange"),
//   "apple" -> List("apple", "apple"),
//   "pear" -> List("pear", "pear", "pear")
// )

We can replace `groupBy` with `groupMap` like below:

// Example 1: groupMap
fruits.groupMap(identity)(identity)

In this particular case, the new method doesn’t offer any benefit over the old one.

Let’s look at another example that involves a collection of class objects:

// Example 2
case class Pet(species: String, name: String, age: Int)

val pets = List(
  Pet("cat", "sassy", 2), Pet("cat", "bella", 3), 
  Pet("dog", "poppy", 3), Pet("dog", "bodie", 4), Pet("dog", "poppy", 2), 
  Pet("bird", "coco", 2), Pet("bird", "kiwi", 1)
)

If we want to list all pet names per species, a `groupBy` coupled with `mapValues` will do:

// Example 2: groupBy
pets.groupBy(_.species).mapValues(_.map(_.name))
// res2: Map[String, List[String]] = Map(
//   "cat" -> List("sassy", "bella"),
//   "bird" -> List("coco", "kiwi"),
//   "dog" -> List("poppy", "bodie", "poppy")
// )

But in this case, `groupMap` can do it with better readability due to the functions for defining the keys and values of the resulting Map being nicely placed side by side as parameters:

// Example 2: groupMap
pets.groupMap(_.species)(_.name)

groupMapReduce

At times, we need to perform reduction on the Map values after grouping of a collection. This is when the other new method groupMapReduce comes in handy:

// Method groupMapReduce
def groupMapReduce[K, B](key: (A) => K)(f: (A) => B)(reduce: (B, B) => B): immutable.Map[K, B]

Besides the parameters for defining the keys and values of the resulting Map like `groupMap`, `groupMapReduce` also expects an additional parameter in the form of a binary operation for reduction.

Using the same pets example, if we want to compute the count of pets per species, a `groupBy/mapValues` approach will look like below:

// Example 3: groupBy/mapValues
pets.groupBy(_.species).mapValues(_.size)
// res1: Map[String, Int] = Map("cat" -> 2, "bird" -> 2, "dog" -> 3)

With `groupMapReduce`, we can “compartmentalize” the functions for the keys, values and reduction operation separately as follows:

// Example 3: groupMapReduce
pets.groupMapReduce(_.species)(_ => 1)(_ + _)

One more example:

// Example 4
import java.time.LocalDate
case class Product(id: String, saleDate: LocalDate, listPrice: Double, discPrice: Double)

val products = List(
  Product("p001", LocalDate.of(2019, 9, 11), 10, 8.5),
  Product("p002", LocalDate.of(2019, 9, 18), 12, 10),
  Product("p003", LocalDate.of(2019, 9, 27), 10, 9),
  Product("p004", LocalDate.of(2019, 10, 6), 15, 12.5),
  Product("p005", LocalDate.of(2019, 10, 20), 12, 8),
  Product("p006", LocalDate.of(2019, 11, 8), 15, 12),
  Product("p007", LocalDate.of(2019, 11, 16), 10, 8.5),
  Product("p008", LocalDate.of(2019, 11, 25), 10, 9)
)

Let’s say we want to compute the monthly total of list price and discounted price of the product list. In the `groupBy/mapValues` way:

// Example 4: groupBy/mapValues
products.groupBy(_.saleDate.getMonth).mapValues(
  _.map(p => (p.listPrice, p.discPrice)).reduce(
    (total, prc) => (total._1 + prc._1, total._2 + prc._2))
)
// res2: scala.collection.immutable.Map[java.time.Month,(Double, Double)] =
//   Map(OCTOBER -> (27.0,20.5), SEPTEMBER -> (32.0,27.5), NOVEMBER -> (35.0,29.5))

Using `groupMapReduce`:

// Example 4: groupMapReduce
products.groupMapReduce(_.saleDate.getMonth)(p => (p.listPrice, p.discPrice))(
  (total, prc) => (total._1 + prc._1, total._2 + prc._2))
)

Fibonacci In Scala: Tailrec, Memoized

One of the most popular number series being used as a programming exercise is undoubtedly the Fibonacci numbers:

F(0) = 1
F(1) = 1
F(n) = F(n-1) + F(n-2)

Perhaps a prominent reason why the Fibonacci sequence is of vast interest in Math is the associated Golden Ratio, but I think what makes it a great programming exercise is that despite a simplistic definition, the sequence’s exponential growth rate presents challenges in implementations with space/time efficiency in mind.

Having seen various ways of implementing methods for the Fibonacci numbers, I thought it might be worth putting them together, from a naive implementation to something more space/time efficient. But first, let’s take a quick look at the computational complexity of Fibonacci.

Fibonacci complexity

If we denote T(n) as the time required to compute F(n), by definition:

T(n) = T(n-1) + T(n-2) + K

where K is the time taken by some simple arithmetic to arrive at F(n) from F(n-1) and F(n-2).

With some approximation Math analysis (see this post), it can be shown that the lower bound and upper bound of T(n) are O(2^(n/2)) and O(2^n), respectively. For better precision, one can derive a more exact time complexity by solving the associated characteristic equation, `x^2 = x + 1`, which yields x = ~1.618 to deduce that:

Time complexity for computing F(n) = O(R^n)

where R = ~1.618 is the Golden Ratio.

As for space complexity, if one looks at the recursive tree for computing F(n), it’s pretty clear that its depth is F(n-1)’s tree depth plus one. Thus, the required space for F(n) is proportional to n. In other words:

Space complexity for computing F(n) = O(n)

The relatively small space complexity compared with the exponential time complexity explains why computing a Fibonacci number too large for a computer would generally lead to an infinite run rather than a out-of-memory/stack overflow problem.

It’s worth noting, though, if F(n) is computed via conventional iterations (e.g. a while-loop or tail recursion which gets translated into iterations by Scala under the hood), the time complexity would be reduced to O(n) proportional to the number of the loop cycles. And the space complexity would be O(1) since no `n`-dependent extra space is needed other than that for storing the Fibonacci sequence.

Naive Fibonacci

To generate Fibonacci numbers, the most straight forward approach is via a basic recursive function like below:

def fib(n: Int): BigInt = n match {
  case 0 => 0
  case 1 => 1
  case _ => fib(n-2) + fib(n-1)
}

(0 to 10).foreach(n => print(fib(n) + " "))
// 0 1 1 2 3 5 8 13 21 34 55

fib(50)
// res1: BigInt = 12586269025

With such a `naive` recursive function, computing the 50th number, i.e. fib(50), would take minutes on a typical laptop, and attempts to compute any number higher up like fib(90) would most certainly lead to an infinite run.

Tail recursive Fibonacci

So, let’s come up with a tail recursive method:

def fibTR(num: Int): BigInt = {
  @scala.annotation.tailrec
  def fibFcn(n: Int, acc1: BigInt, acc2: BigInt): BigInt = n match {
    case 0 => acc1
    case 1 => acc2
    case _ => fibFcn(n - 1, acc2, acc1 + acc2)
  }

  fibFcn(num, 0, 1)
}

As shown above, tail recursion is accomplished by means of a couple of accumulators as parameters for the inner method to recursively carry over the two numbers that precede the current number.

With the Fibonacci `TailRec` version, computing, say, the 90th number would finish instantaneously.

fibTR(90)
// res2: BigInt = 2880067194370816120

Fibonacci in a Scala Stream

Another way of implementing Fibonacci is to define the sequence to be stored in a “lazy” collection, such as a Scala Stream:

val fibS: Stream[BigInt] = 0 #:: fibS.scan(BigInt(1))(_ + _)

fibS(90)
// res3: BigInt = 2880067194370816120

Using method scan, `scan(1)(_ + _)` generates a Stream with each of its elements being successively assigned the sum of the previous two elements. Since Streams are “lazy”, none of the element values in the defined `fibStream` will be evaluated until the element is being requested.

While at it, there is a couple of other commonly seen Fibonacci implementation variants with Scala Stream:

val fibS: Stream[BigInt] = 0 #:: 1 #:: (fibS zip fibS.tail).map(n => n._1 + n._2)

val fibS: Stream[BigInt] = {
  def fs(prev: BigInt, curr: BigInt): Stream[BigInt] = prev #:: fs(curr, prev + curr)
  fs(0, 1)
}

Scala Stream memoizes by design

These Stream-based Fibonacci implementations perform reasonably well, somewhat comparable to the tail recursive Fibonacci. But while these Stream implementations all involve recursion, none is tail recursive. So, why doesn’t it suffer the same performance issue like the `naive` Fibonacci implementation does? The short answer is memoization.

Digging into the source code of Scala Stream would reveal that method `#::` (which is wrapped in class ConsWrapper) is defined as:

def #::[B >: A](hd: B): Stream[B] = cons(hd, tl) 

Tracing method `cons` further reveals that the Stream tail is a by-name parameter to class `Cons`, thus ensuring that the concatenation is performed lazily:

final class Cons[+A](hd: A, tl: => Stream[A]) extends Stream[A]

But lazy evaluation via by-name parameter does nothing to memoization. Digging deeper into the source code, one would see that Stream content is iterated through a StreamIterator class defined as follows:

final class StreamIterator[+A] private() extends AbstractIterator[A] with Iterator[A] {
  def this(self: Stream[A]) {
    this()
    these = new LazyCell(self)
  }

  class LazyCell(st: => Stream[A]) {
    lazy val v = st
  }

  private var these: LazyCell = _

  def hasNext: Boolean = these.v.nonEmpty

  def next(): A =
    if (isEmpty) Iterator.empty.next()
    else {
      val cur    = these.v
      val result = cur.head
      these = new LazyCell(cur.tail)
      result
    }

  ...
}

The inner class `LazyCell` not only has a by-name parameter but, more importantly, makes the Stream represented by the StreamIterator instance a `lazy val` which, by nature, enables memoization by caching the value upon the first (and only first) evaluation.

Memoized Fibonacci using a mutable Map

While using a Scala Stream to implement Fibonacci would automatically leverage memoization, one could also explicitly employ the very feature without Streams. For instance, by leveraging method getOrElseUpdate in a mutable Map, a `memoize` function can be defined as follows:

// Memoization using mutable Map

def memoize[K, V](f: K => V): K => V = {
  val cache = scala.collection.mutable.Map.empty[K, V]
  k => cache.getOrElseUpdate(k, f(k))
}

For example, the `naive` Fibonacci equipped with memoization via this `memoize` function would instantly become a much more efficient implementation:

val fibM: Int => BigInt = memoize(n => n match {
  case 0 => 0
  case 1 => 1
  case _ => fibM(n-2) + fibM(n-1)
})

fibM(90)
// res4: BigInt = 2880067194370816120

For the tail recursive Fibonacci `fibTR`, this `memoize` function wouldn’t be applicable as its inner function `fibFcn` takes accumulators as additional parameters. As for the Stream-based `fibS` which is already equipped with Stream’s memoization, applying `memoize` wouldn’t produce any significant performance gain.

Scala Promises – Futures In Your Hands

In the previous blog post, we saw how Scala Futures serve as a handy wrapper for running asynchronous tasks and allow non-blocking functional transformations via composable functions. Despite all the goodies, a plain Future, once started, is read-only.

A “manipulable” Future

To make things a little more interesting, let’s take a glimpse into an interesting “container” that holds an “uncertain” Future. Scala provides another abstraction called Promise that allows programmers to have some control in the “when” and “what” in completing a Future. A Promise is like a container holding a Future which can be completed by assigning a value (with success or failure) at any point of time. The catch is that it can only be completed once.

The Promise companion object has the following `apply` method that creates a `DefaultPromise`:

object Promise {
  def apply[T](): Promise[T] = new impl.Promise.DefaultPromise[T]()
  // ...
}

As shown below, the DefaultPromise class extends AtomicReference to ensure that a Promise instance will be completed in an atomic fashion.

class DefaultPromise[T] extends AtomicReference[AnyRef](Nil) with Promise[T]

A trivial producer and consumer

A common use case of Promise is like this:

  1. a Promise which holds an “open” Future is created
  2. run some business logic to come up with some value
  3. complete the Promise by assigning its Future the value via methods like `success()`, `failure()`, `tryComplete()`, etc
  4. return the “closed” Future

Here’s a hello-world example of Scala Promise used in a trivialized producer and consumer:

import scala.concurrent.{Future, Promise}
import scala.util.{Success, Failure}
import scala.concurrent.ExecutionContext.Implicits.global
import java.util.concurrent.ThreadLocalRandom

val p = Promise[Int]()
val f = p.future

val producer = Future {
  val r = ThreadLocalRandom.current.nextInt(0, 2000)
  println("Inside producer: r = " + r)
  if (r < 1000)
    p success r
  else
    p failure (new Exception("r > 999!"))
}

val consumer = Future {
  println("Inside consumer!")
  f onComplete {
    case Success(r) => println("Success: r = " + r)
    case Failure(e) => println("Error: " + e)
  }
}

The above code snippet is rather self-explanatory. The producer running in one thread completes the Promise’s future based on the result of a randomly generated integer and the consumer in another thread checks and reports the value of the completed future.

// * Test running #1:

// Inside producer: r = 1278
// producer: scala.concurrent.Future[scala.concurrent.Promise[Int]] = Future()
// consumer: scala.concurrent.Future[Unit] = Future()
// Error: java.lang.Exception: r > 999!

// * Test running #2:

// Inside producer: r = 547
// producer: scala.concurrent.Future[scala.concurrent.Promise[Int]] = Future()
// consumer: scala.concurrent.Future[Unit] = Future()
// Success: r = 547

Simulating a CPU-bound task, again

For up-coming illustrations, let’s borrow the CPU-bound task simulation `doWork` method used in the coding examples from the previous blog post:

case class Result(val id: Int, val millis: Long)

def doWork(id: Int): Result = {
  import java.util.concurrent.ThreadLocalRandom
  val millis = ThreadLocalRandom.current.nextLong(1000, 2000)
  val millisTMO = 1500
  val start = System.currentTimeMillis()

  while ((System.currentTimeMillis() - start) < millis) {}

  if (millis < millisTMO) {
    println("Work Result: " + Result(id, millis))
    Result(id, millis)
  } else {
    println(s"TimeOut: work id $id would take ${millis - millisTMO}ms above limit to complete!")
    throw new Throwable("TimeOut!")
  }
}

Revisiting first completed Future

Recall that method `Future.firstCompletedOf` from the previous blog post can be used to capture the first completed Future out of a list of Futures running in parallel:

val firstCompleted = Future.firstCompletedOf( (1 to 4).map( id =>
  Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
) )

Now, let’s see how `firstCompletedOf` is actually implemented in Scala Future using Promise:

object Future {
  // ...

  def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
    val p = Promise[T]()
    val firstCompleteHandler = new AtomicReference[Promise[T]](p) with (Try[T] => Unit) {
      override def apply(v1: Try[T]): Unit = getAndSet(null) match {
        case null => ()
        case some => some tryComplete v1
      }
    }
    futures foreach { _ onComplete firstCompleteHandler }
    p.future
  }

  // ...
}

In the `firstCompletedOf` method implementation, the helper callback function `firstCompleteHandler` for each of the Futures in the input list ensures by means of an `AtomicReference` that the first completed Future will be the Promise’s future.

First completed Future with a condition

What if we want to get the first completed Future from a number of Futures whose values meet a certain condition? One approach would be to derive from the `firstCompletedOf` method implementation.

We pick the default ExecutionContext like how we did in some coding examples from the previous blog. Besides the list of Futures, the derived method `firstConditionallyCompletedOf[T]` would also take a `T => Boolean` filtering condition as a parameter. Piggybacking on the core logic from method `firstCompletedOf`, we simply apply the input filter to each of the Futures in the input list before the callback.

import scala.concurrent.{ExecutionContext, Future, Promise, Await}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Try, Success, Failure}

import java.util.concurrent.atomic.AtomicReference

def firstConditionallyCompletedOf[T](futures: List[Future[T]], condition: T => Boolean)(
  implicit ec: ExecutionContext): Future[T] = {

  val p = Promise[T]()
  val firstCompleteHandler = new AtomicReference[Promise[T]](p) with (Try[T] => Unit) {
    override def apply(v1: Try[T]): Unit = getAndSet(null) match {
      case null => ()
      case some => some tryComplete v1
    }
  }
  futures.foreach{ _.filter(condition).onComplete(firstCompleteHandler) }
  p.future
}

val firstCompleted = firstConditionallyCompletedOf(
  (1 to 4).map( id =>
    Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
  ),
  (e: Either[Throwable, Result]) => e match {
    case Left(_) => false
    case Right(res) => res.millis < 1250
  }
)

Await.ready(firstCompleted, 10.seconds)

// * Test running #1:

// Work Result: Result(4, 1037)
// Work Result: Result(2, 1268)
// Work Result: Result(3, 1386)
// TimeOut: work id 1 would take 139ms above limit to complete!
// res1: firstCompleted.type = Future(Success(Right(Result(4, 1037))))

// * Test running #2:

// Work Result: Result(4, 1320)
// TimeOut: work id 1 would take 22ms above limit to complete!
// TimeOut: work id 3 would take 253ms above limit to complete!
// TimeOut: work id 2 would take 438ms above limit to complete!
// res2: firstCompleted.type = Future(Failure(
//   java.util.NoSuchElementException: Future.filter predicate is not satisfied
// ))

First N completed Futures

While at it, rather than just the first completed Future, what if we want to capture the first few completed Futures? Deriving from the `firstCompletedOf` implementation wouldn’t quite work – the way the helper callback function `firstCompleteHandler` is structured wouldn’t be useful now that we have a list of Futures to be captured.

We’ll take a straight forward approach of using a `var` list for capturing the first N (or the size of input Futures, whichever smaller) Future results and update the list inside a `synchronized` block. Since we want to capture the first few completed Futures (success or failure), we make the return Future consisting of a `List[Either[Throwable, T]]`, rather than just `List[T]`.

import scala.concurrent.{ExecutionContext, Future, Promise, Await}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Try, Success, Failure}

def firstNCompletedOf[T](n: Int, futures: List[Future[T]])(implicit ec: ExecutionContext):
  Future[List[Either[Throwable, T]]] = {

  val nLimit = n min futures.size
  val p = Promise[List[Either[Throwable, T]]]()
  var list = List.empty[Either[Throwable, T]]
  futures foreach { _.onComplete{ tryT =>
    synchronized {
      if (list.size < nLimit)
        list ::= tryT.toEither
      else
        p trySuccess list.reverse
    }
  } }
  p.future
}

val firstNCompleted = firstNCompletedOf( 3, (1 to 5).toList.map( id => Future{ doWork(id) } ) )

Await.ready(firstNCompleted, 15.seconds)

// Work Result: Result(1,1168)
// Work Result: Result(5,1180)
// TimeOut: work id 3 would take 200ms above limit to complete!
// TimeOut: work id 2 would take 399ms above limit to complete!
// TimeOut: work id 4 would take 448ms above limit to complete!

// res1: firstNCompleted.type = Future(Success(List(
//   Right(Result(1,1168)), Right(Result(5,1180)), Left(java.lang.Throwable: java.lang.Throwable: TimeOut!)
// )))

Simulating a non-CPU-bound task

Rather than keeping the CPU busy (thus CPU-bound), a non-CPU-bound asynchronous task does not demand extensive processing resource. The following snippet defines a method that mimics a non-CPU-bound asynchronous task which could be, say, a non-blocking call to a remote database. This time, we’re going to run on an Akka Actor system, using the ExecutionContext that comes with its default dispatcher. Besides the Fork/Join Executor provided by the dispatcher, we pick the Akka runtime library also to leverage its high-throughput scheduler.

In this example, a Promise which contains a Future is created and after a random duration, the scheduler triggers the completion of the Future with success or failure depending on the random time.

import scala.concurrent.{ExecutionContext, Future, Promise, Await}
import scala.concurrent.duration._
import akka.actor.ActorSystem
implicit val system = ActorSystem("system")
implicit val ec = system.dispatcher

def nonCPUbound[T](value: T): Future[T] = {  
  import java.util.concurrent.ThreadLocalRandom
  val millis = ThreadLocalRandom.current.nextLong(1000, 2000)
  val millisTMO = 1500
  val promise = Promise[T]

  system.scheduler.scheduleOnce(FiniteDuration(millis, MILLISECONDS)) {
    if (millis < millisTMO) {
      println(s"OK: Got value [${value}] in ${millis}ms")
      promise.success(value)
    } else {
      println(s"TimeOut: It would take ${millis - millisTMO}ms above limit to complete!")
      promise.failure(new Throwable("TimeOut!"))
    }
  }

  promise.future
}

Launching method `nonCPUbound()` with some value a few times would yield results similar to the following:

Await.result(nonCPUbound("something"), 2.seconds)

// * Test running #1:

// OK: Got value [something] in 1259ms
// res1: String = something

// * Test running #2:

// TimeOut: It would take 384ms above limit to complete!
// [ERROR] ... java.lang.Throwable: TimeOut!

CPU-bound versus non-CPU-bound tasks

By wrapping a CPU-bound task like `doWork()` with a Future, the task becomes non-blocking but it still consumes processing power. The default ExecutionContext via the `scala.concurrent.ExecutionContext.Implicits.global` import will optimally set `scala.concurrent.context.maxThreads` to the number of CPU cores of the machine the application resides on. One can raise the `maxThreads` and handcraft a custom `ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numOfThreads))` to allow more threads to be run. To set the value of `maxThreads` to, say 16, simply add the following `javaOptions` to `build.sbt`.

javaOptions += "-Dscala.concurrent.context.maxThreads=16"

However, that wouldn’t necessarily make more instances of `Future{ doWork() }` than the number of CPU cores execute in parallel since each of them consumes CPU resource while executing.

On the other hand, a non-CPU-bound task like `nonCPUbound()` takes little CPU resource. In this case, configuring an ExecutionContext with more threads than the CPU cores of the local machine can increase performance, since none of the individual tasks would consume anywhere near the full capacity of a CPU core. It’s not uncommon to configure a pool of hundreds of threads to handle a large amount of such tasks on a machine with just a handful of CPU cores.

Futures or Promises?

While the Scala Future API extensively utilizes Promises in its function implementations, we don’t need to explicitly use Promises very often as the Futures API already delivers a suite of common concurrent features for writing asynchronous code. If the business logic doesn’t need Promises, just stick to the Futures API. But for cases in which you need to provide a “contract to be fulfilled at most once in the future”, say, between two modules like the `producer/consumer` example above, Promises do come in handy.