Monthly Archives: November 2020

Akka Stream Stateful MapConcat

If you’ve been building applications with Akka Stream in Scala, you would probably have used mapConcat (and perhaps flatMapConcat as well). It’s a handy method for expanding and flattening content of a Stream, much like how flatMap operates on an ordinary Scala collection. The method has the following signature:

Here’s a trivial example using mapConcat:

A mapConcat with an internal state

A relatively less popular method that allows one to expand and flatten Stream elements while iteratively processing some internal state is statefulMapConcat, with method signature as follows:

Interestingly, method mapConcat is just a parametrically restricted version of method statefulMapConcat. Here’s how mapConcat[T] is implemented in Akka Stream Flow:

Example 1: Extracting sections of elements

Let’s look at a simple example that illustrates how statefulMapConcat can be used to extract sections of a given Source in accordance with special elements designated for section-start / stop.

The internal state in the above example is the mutable Boolean variable discard being toggled in accordance with the designated start/stop element to either return an empty Iterable (in this case, Nil) or an Iterable consisting the element in a given iteration.

Example 2: Conditional element-wise pairing of streams

Next, we look at a slightly more complex example. Say, we have two Sources of integer elements and we would like to pair up the elements from the two Sources based on some condition provided as a (Int, Int) => Boolean function.

In the main method ConditionalZip, a couple of Lists are maintained for the two Stream Sources to keep track of elements held off in previous iterations to be conditionally consumed in subsequent iterations based on the provided condition function.

Utility method popFirstMatch is for extracting the first element in a List that satisfies the condition derived from the condition function. It also returns the resulting List consisting of the remaining elements.

Note that the filler elements are for method zipAll (available on Akka Stream 2.6+) to cover all elements in the “bigger” Stream Source of the two. The provided filler value should be distinguishable from the Stream elements (Int.Minvalue in this example) so that the condition logic can be applied accordingly.

Test running ConditionalZip: