Tag Archives: scala

Scala’s groupMap And groupMapReduce

For grouping elements in a Scala collection by a provided key, the de facto method of choice has been groupBy, which has the following signature for an Iterable:

It returns an immutable Map of elements each consisting of a key and a collection of values of the original type. To process this collection of values in the resulting Map, Scala provides a method mapValues with the below signature:

This groupBy/mapValues combo proves to be handy for processing the values of the Map generated from the grouping. However, as of Scala 2.13, method mapValues is no longer available.

groupMap

A new method, groupMap, has emerged for grouping of a collection based on provided functions for defining the keys and values of the resulting Map. Here’s the signature of method groupMap for an Iterable:

Let’s start with a simple example grouping via the good old groupBy method:

We can replace groupBy with groupMap like below:

In this particular case, the new method doesn’t offer any benefit over the old one.

Let’s look at another example that involves a collection of class objects:

If we want to list all pet names per species, a groupBy coupled with mapValues will do:

But in this case, groupMap can do it with better readability due to the functions for defining the keys and values of the resulting Map being nicely placed side by side as parameters:

groupMapReduce

At times, we need to perform reduction on the Map values after grouping of a collection. This is when the other new method groupMapReduce comes in handy:

Besides the parameters for defining the keys and values of the resulting Map like groupMap, groupMapReduce also expects an additional parameter in the form of a binary operation for reduction.

Using the same pets example, if we want to compute the count of pets per species, a groupBy/mapValues approach will look like below:

With groupMapReduce, we can “compartmentalize” the functions for the keys, values and reduction operation separately as follows:

One more example:

Let’s say we want to compute the monthly total of list price and discounted price of the product list. In the groupBy/mapValues way:

Using groupMapReduce:

Fibonacci In Scala: Tailrec, Memoized

One of the most popular number series being used as a programming exercise is undoubtedly the Fibonacci numbers:

Perhaps a prominent reason why the Fibonacci sequence is of vast interest in Math is the associated Golden Ratio, but I think what makes it a great programming exercise is that despite a simplistic definition, the sequence’s exponential growth rate presents challenges in implementations with space/time efficiency in mind.

Having seen various ways of implementing methods for the Fibonacci numbers, I thought it might be worth putting them together, from a naive implementation to something more space/time efficient. But first, let’s take a quick look at the computational complexity of Fibonacci.

Fibonacci complexity

If we denote T(n) as the time required to compute F(n), by definition:

where K is the time taken by some simple arithmetic to arrive at F(n) from F(n-1) and F(n-2).

With some approximation Math analysis (see this post), it can be shown that the lower bound and upper bound of T(n) are O(2^(n/2)) and O(2^n), respectively. For better precision, one can derive a more exact time complexity by solving the associated characteristic equation, x^2 = x + 1, which yields x = ~1.618 to deduce that:

where R = ~1.618 is the Golden Ratio.

As for space complexity, if one looks at the recursive tree for computing F(n), it’s pretty clear that its depth is F(n-1)’s tree depth plus one. Thus, the required space for F(n) is proportional to n. In other words:

The relatively small space complexity compared with the exponential time complexity explains why computing a Fibonacci number too large for a computer would generally lead to an infinite run rather than a out-of-memory/stack overflow problem.

It’s worth noting, though, if F(n) is computed via conventional iterations (e.g. a while-loop or tail recursion which gets translated into iterations by Scala under the hood), the time complexity would be reduced to O(n) proportional to the number of the loop cycles. And the space complexity would be O(1) since no n-dependent extra space is needed other than that for storing the Fibonacci sequence.

Naive Fibonacci

To generate Fibonacci numbers, the most straight forward approach is via a basic recursive function like below:

With such a naive recursive function, computing the 50th number, i.e. fib(50), would take minutes on a typical laptop, and attempts to compute any number higher up like fib(90) would most certainly lead to an infinite run.

Tail recursive Fibonacci

So, let’s come up with a tail recursive method:

As shown above, tail recursion is accomplished by means of a couple of accumulators as parameters for the inner method to recursively carry over the two numbers that precede the current number.

With the Fibonacci TailRec version, computing, say, the 90th number would finish instantaneously.

Fibonacci in a Scala Stream

Another way of implementing Fibonacci is to define the sequence to be stored in a “lazy” collection, such as a Scala Stream:

Using method scan, scan(1)(_ + _) generates a Stream with each of its elements being successively assigned the sum of the previous two elements. Since Streams are “lazy”, none of the element values in the defined fibStream will be evaluated until the element is being requested.

While at it, there is a couple of other commonly seen Fibonacci implementation variants with Scala Stream:

Scala Stream memoizes by design

These Stream-based Fibonacci implementations perform reasonably well, somewhat comparable to the tail recursive Fibonacci. But while these Stream implementations all involve recursion, none is tail recursive. So, why doesn’t it suffer the same performance issue like the naive Fibonacci implementation does? The short answer is memoization.

Digging into the source code of Scala Stream would reveal that method #:: (which is wrapped in class ConsWrapper) is defined as:

Tracing method cons further reveals that the Stream tail is a by-name parameter to class Cons, thus ensuring that the concatenation is performed lazily:

But lazy evaluation via by-name parameter does nothing to memoization. Digging deeper into the source code, one would see that Stream content is iterated through a StreamIterator class defined as follows:

The inner class LazyCell not only has a by-name parameter but, more importantly, makes the Stream represented by the StreamIterator instance a lazy val which, by nature, enables memoization by caching the value upon the first (and only first) evaluation.

Memoized Fibonacci using a mutable Map

While using a Scala Stream to implement Fibonacci would automatically leverage memoization, one could also explicitly employ the very feature without Streams. For instance, by leveraging method getOrElseUpdate in a mutable Map, a memoize function can be defined as follows:

For example, the naive Fibonacci equipped with memoization via this memoize function would instantly become a much more efficient implementation:

For the tail recursive Fibonacci fibTR, this memoize function wouldn’t be applicable as its inner function fibFcn takes accumulators as additional parameters. As for the Stream-based fibS which is already equipped with Stream’s memoization, applying memoize wouldn’t produce any significant performance gain.

Scala Promises – Futures In Your Hands

In the previous blog post, we saw how Scala Futures serve as a handy wrapper for running asynchronous tasks and allow non-blocking functional transformations via composable functions. Despite all the goodies, a plain Future, once started, is read-only.

A “manipulable” Future

To make things a little more interesting, let’s take a glimpse into an interesting “container” that holds an “uncertain” Future. Scala provides another abstraction called Promise that allows programmers to have some control in the “when” and “what” in completing a Future. A Promise is like a container holding a Future which can be completed by assigning a value (with success or failure) at any point of time. The catch is that it can only be completed once.

The Promise companion object has the following apply method that creates a DefaultPromise:

As shown below, the DefaultPromise class extends AtomicReference to ensure that a Promise instance will be completed in an atomic fashion.

A trivial producer and consumer

A common use case of Promise is like this:

  1. a Promise which holds an “open” Future is created
  2. run some business logic to come up with some value
  3. complete the Promise by assigning its Future the value via methods like success(), failure(), tryComplete(), etc
  4. return the “closed” Future

Here’s a hello-world example of Scala Promise used in a trivialized producer and consumer:

The above code snippet is rather self-explanatory. The producer running in one thread completes the Promise’s future based on the result of a randomly generated integer and the consumer in another thread checks and reports the value of the completed future.

Simulating a CPU-bound task, again

For up-coming illustrations, let’s borrow the CPU-bound task simulation doWork method used in the coding examples from the previous blog post:

Revisiting first completed Future

Recall that method Future.firstCompletedOf from the previous blog post can be used to capture the first completed Future out of a list of Futures running in parallel:

Now, let’s see how firstCompletedOf is actually implemented in Scala Future using Promise:

In the firstCompletedOf method implementation, the helper callback function firstCompleteHandler for each of the Futures in the input list ensures by means of an AtomicReference that the first completed Future will be the Promise’s future.

First completed Future with a condition

What if we want to get the first completed Future from a number of Futures whose values meet a certain condition? One approach would be to derive from the firstCompletedOf method implementation.

We pick the default ExecutionContext like how we did in some coding examples from the previous blog. Besides the list of Futures, the derived method firstConditionallyCompletedOf[T] would also take a T => Boolean filtering condition as a parameter. Piggybacking on the core logic from method firstCompletedOf, we simply apply the input filter to each of the Futures in the input list before the callback.

First N completed Futures

While at it, rather than just the first completed Future, what if we want to capture the first few completed Futures? Deriving from the firstCompletedOf implementation wouldn’t quite work – the way the helper callback function firstCompleteHandler is structured wouldn’t be useful now that we have a list of Futures to be captured.

We’ll take a straight forward approach of using a var list for capturing the first N (or the size of input Futures, whichever smaller) Future results and update the list inside a synchronized block. Since we want to capture the first few completed Futures (success or failure), we make the return Future consisting of a List[Either[Throwable, T]], rather than just List[T].

Simulating a non-CPU-bound task

Rather than keeping the CPU busy (thus CPU-bound), a non-CPU-bound asynchronous task does not demand extensive processing resource. The following snippet defines a method that mimics a non-CPU-bound asynchronous task which could be, say, a non-blocking call to a remote database. This time, we’re going to run on an Akka Actor system, using the ExecutionContext that comes with its default dispatcher. Besides the Fork/Join Executor provided by the dispatcher, we pick the Akka runtime library also to leverage its high-throughput scheduler.

In this example, a Promise which contains a Future is created and after a random duration, the scheduler triggers the completion of the Future with success or failure depending on the random time.

Launching method nonCPUbound() with some value a few times would yield results similar to the following:

CPU-bound versus non-CPU-bound tasks

By wrapping a CPU-bound task like doWork() with a Future, the task becomes non-blocking but it still consumes processing power. The default ExecutionContext via the scala.concurrent.ExecutionContext.Implicits.global import will optimally set scala.concurrent.context.maxThreads to the number of CPU cores of the machine the application resides on. One can raise the maxThreads and handcraft a custom ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numOfThreads)) to allow more threads to be run. To set the value of maxThreads to, say 16, simply add the following javaOptions to build.sbt.

However, that wouldn’t necessarily make more instances of Future{ doWork() } than the number of CPU cores execute in parallel since each of them consumes CPU resource while executing.

On the other hand, a non-CPU-bound task like nonCPUbound() takes little CPU resource. In this case, configuring an ExecutionContext with more threads than the CPU cores of the local machine can increase performance, since none of the individual tasks would consume anywhere near the full capacity of a CPU core. It’s not uncommon to configure a pool of hundreds of threads to handle a large amount of such tasks on a machine with just a handful of CPU cores.

Futures or Promises?

While the Scala Future API extensively utilizes Promises in its function implementations, we don’t need to explicitly use Promises very often as the Futures API already delivers a suite of common concurrent features for writing asynchronous code. If the business logic doesn’t need Promises, just stick to the Futures API. But for cases in which you need to provide a “contract to be fulfilled at most once in the future”, say, between two modules like the producer/consumer example above, Promises do come in handy.