Monthly Archives: November 2020

Akka Stream Stateful MapConcat

If you’ve been building applications with Akka Stream in Scala, you would probably have used mapConcat (and perhaps flatMapConcat as well). It’s a handy method for expanding and flattening content of a Stream, much like how `flatMap` operates on an ordinary Scala collection. The method has the following signature:

def mapConcat[T](f: (Out) => Iterable[T]): Repr[T]

Here’s a trivial example using mapConcat:

import akka.actor.ActorSystem
import akka.stream.scaladsl._

implicit val system = ActorSystem("system")

Source(List("alice", "bob", "charle")).
  mapConcat(name => List(s"Hi $name", s"Bye $name")).
  runForeach(println))
// Hi alice
// Bye alice
// Hi bob
// Bye bob
// Hi charle
// Bye charle

A mapConcat with an internal state

A relatively less popular method that allows one to expand and flatten Stream elements while iteratively processing some internal state is statefulMapConcat, with method signature as follows:

def statefulMapConcat[T](f: () => (Out) => Iterable[T]): Repr[T]

Interestingly, method `mapConcat` is just a parametrically restricted version of method `statefulMapConcat`. Here’s how `mapConcat[T]` is implemented in Akka Stream Flow:

def mapConcat[T](f: Out => immutable.Iterable[T]): Repr[T] = statefulMapConcat(() => f)

Example 1: Extracting sections of elements

Let’s look at a simple example that illustrates how `statefulMapConcat` can be used to extract sections of a given Source in accordance with special elements designated for section-start / stop.

val source = Source(List(
    "a", ">>", "b", "c", "<<", "d", "e", ">>", "f", "g", "h", "<<", "i", ">>", "j", "<<", "k"
  ))

val extractFlow = Flow[String].statefulMapConcat { () =>
  val start = ">>"
  val stop = "<<"
  var discard = true
  elem =>
    if (discard) {
      if (elem == start)
        discard = false
      Nil
    }
    else {
      if (elem == stop) {
        discard = true
        Nil
      }
      else
        elem :: Nil
    }
}

source.via(extractFlow).runForeach(x => print(s"$x "))
// b c f g h j 

The internal state in the above example is the mutable Boolean variable `discard` being toggled in accordance with the designated start/stop element to either return an empty Iterable (in this case, `Nil`) or an Iterable consisting the element in a given iteration.

Example 2: Conditional element-wise pairing of streams

Next, we look at a slightly more complex example. Say, we have two Sources of integer elements and we would like to pair up the elements from the two Sources based on some condition provided as a `(Int, Int) => Boolean` function.

def popFirstMatch(ls: List[Int], condF: Int => Boolean): (Option[Int], List[Int]) = {
  ls.find(condF) match {
	case None =>
	  (None, ls)
	case Some(e) => 
	  val idx = ls.indexOf(e)
	  if (idx < 0)
		(None, ls)
	  else {
		val (l, r) = ls.splitAt(idx)
		(r.headOption, l ++ r.tail)
	  }
  }
}

def conditionalZip( first: Source[Int, NotUsed],
					second: Source[Int, NotUsed],
					filler: Int,
					condFcn: (Int, Int) => Boolean ): Source[(Int, Int), NotUsed] = {
  first.zipAll(second, filler, filler).statefulMapConcat{ () =>
	var prevList1 = List.empty[Int]
	var prevList2 = List.empty[Int]
	tuple => tuple match { case (e1, e2) =>
	  if (e2 != filler) {
		if (e1 != filler && condFcn(e1, e2))
		  (e1, e2) :: Nil
		else {
		  if (e1 != filler)
			prevList1 :+= e1
		  prevList2 :+= e2
		  val (opElem1, rest1) = popFirstMatch(prevList1, condFcn(_, e2))
		  opElem1 match {
			case None =>
			  if (e1 != filler) {
				val (opElem2, rest2) = popFirstMatch(prevList2, condFcn(e1, _))
				opElem2 match {
				  case None =>
					Nil
				  case Some(e) =>
					prevList2 = rest2
					(e1, e) :: Nil
				}
			  }
			  else
				Nil
			case Some(e) =>
			  prevList1 = rest1
			  (e, e2) :: Nil
		  }
		}
	  }
	  else
		Nil
	}
  }
}

In the main method `ConditionalZip`, a couple of Lists are maintained for the two Stream Sources to keep track of elements held off in previous iterations to be conditionally consumed in subsequent iterations based on the provided condition function.

Utility method `popFirstMatch` is for extracting the first element in a List that satisfies the condition derived from the condition function. It also returns the resulting List consisting of the remaining elements.

Note that the `filler` elements are for method `zipAll` (available on Akka Stream 2.6+) to cover all elements in the “bigger” Stream Source of the two. The provided `filler` value should be distinguishable from the Stream elements (`Int.Minvalue` in this example) so that the condition logic can be applied accordingly.

Test running `ConditionalZip`:

//// Case 1:
val first = Source(1 :: 2 :: 4 :: 6 :: Nil)
val second = Source(1 :: 2 :: 3 :: 4 :: 5 :: 6 :: 7 :: Nil)

conditionalZip(first, second, Int.MinValue, _ == _).runForeach(println) 
// (1,1)
// (2,2)
// (4,4)
// (6,6)

conditionalZip(first, second, Int.MinValue, _ > _).runForeach(println) 
// (2,1)
// (4,3)
// (6,4)

conditionalZip(first, second, Int.MinValue, _ < _).runForeach(println) 
// (1,2)
// (2,3)
// (4,5)
// (6,7)

//// Case 2:
val first = Source(3 :: 9 :: 5 :: 5 :: 6 :: Nil)
val second = Source(1 :: 3 :: 5 :: 2 :: 5 :: 6 :: Nil)

conditionalZip(first, second, Int.MinValue, _ == _).runForeach(println)
// (3,3)
// (5,5)
// (5,5)
// (6,6)

conditionalZip(first, second, Int.MinValue, _ > _).runForeach(println)
// (3,1)
// (9,3)
// (5,2)
// (6,5)

conditionalZip(first, second, Int.MinValue, _ < _).runForeach(println)
// (3,5)
// (5,6)