If you’ve been building applications with Akka Stream in Scala, you would probably have used mapConcat (and perhaps flatMapConcat as well). It’s a handy method for expanding and flattening content of a Stream, much like how `flatMap` operates on an ordinary Scala collection. The method has the following signature:
def mapConcat[T](f: (Out) => Iterable[T]): Repr[T]
Here’s a trivial example using mapConcat:
import akka.actor.ActorSystem import akka.stream.scaladsl._ implicit val system = ActorSystem("system") Source(List("alice", "bob", "charle")). mapConcat(name => List(s"Hi $name", s"Bye $name")). runForeach(println)) // Hi alice // Bye alice // Hi bob // Bye bob // Hi charle // Bye charle
A mapConcat with an internal state
A relatively less popular method that allows one to expand and flatten Stream elements while iteratively processing some internal state is statefulMapConcat, with method signature as follows:
def statefulMapConcat[T](f: () => (Out) => Iterable[T]): Repr[T]
Interestingly, method `mapConcat` is just a parametrically restricted version of method `statefulMapConcat`. Here’s how `mapConcat[T]` is implemented in Akka Stream Flow:
def mapConcat[T](f: Out => immutable.Iterable[T]): Repr[T] = statefulMapConcat(() => f)
Example 1: Extracting sections of elements
Let’s look at a simple example that illustrates how `statefulMapConcat` can be used to extract sections of a given Source in accordance with special elements designated for section-start / stop.
val source = Source(List( "a", ">>", "b", "c", "<<", "d", "e", ">>", "f", "g", "h", "<<", "i", ">>", "j", "<<", "k" )) val extractFlow = Flow[String].statefulMapConcat { () => val start = ">>" val stop = "<<" var discard = true elem => if (discard) { if (elem == start) discard = false Nil } else { if (elem == stop) { discard = true Nil } else elem :: Nil } } source.via(extractFlow).runForeach(x => print(s"$x ")) // b c f g h j
The internal state in the above example is the mutable Boolean variable `discard` being toggled in accordance with the designated start/stop element to either return an empty Iterable (in this case, `Nil`) or an Iterable consisting the element in a given iteration.
Example 2: Conditional element-wise pairing of streams
Next, we look at a slightly more complex example. Say, we have two Sources of integer elements and we would like to pair up the elements from the two Sources based on some condition provided as a `(Int, Int) => Boolean` function.
def popFirstMatch(ls: List[Int], condF: Int => Boolean): (Option[Int], List[Int]) = { ls.find(condF) match { case None => (None, ls) case Some(e) => val idx = ls.indexOf(e) if (idx < 0) (None, ls) else { val (l, r) = ls.splitAt(idx) (r.headOption, l ++ r.tail) } } } def conditionalZip( first: Source[Int, NotUsed], second: Source[Int, NotUsed], filler: Int, condFcn: (Int, Int) => Boolean ): Source[(Int, Int), NotUsed] = { first.zipAll(second, filler, filler).statefulMapConcat{ () => var prevList1 = List.empty[Int] var prevList2 = List.empty[Int] tuple => tuple match { case (e1, e2) => if (e2 != filler) { if (e1 != filler && condFcn(e1, e2)) (e1, e2) :: Nil else { if (e1 != filler) prevList1 :+= e1 prevList2 :+= e2 val (opElem1, rest1) = popFirstMatch(prevList1, condFcn(_, e2)) opElem1 match { case None => if (e1 != filler) { val (opElem2, rest2) = popFirstMatch(prevList2, condFcn(e1, _)) opElem2 match { case None => Nil case Some(e) => prevList2 = rest2 (e1, e) :: Nil } } else Nil case Some(e) => prevList1 = rest1 (e, e2) :: Nil } } } else Nil } } }
In the main method `ConditionalZip`, a couple of Lists are maintained for the two Stream Sources to keep track of elements held off in previous iterations to be conditionally consumed in subsequent iterations based on the provided condition function.
Utility method `popFirstMatch` is for extracting the first element in a List that satisfies the condition derived from the condition function. It also returns the resulting List consisting of the remaining elements.
Note that the `filler` elements are for method `zipAll` (available on Akka Stream 2.6+) to cover all elements in the “bigger” Stream Source of the two. The provided `filler` value should be distinguishable from the Stream elements (`Int.Minvalue` in this example) so that the condition logic can be applied accordingly.
Test running `ConditionalZip`:
//// Case 1: val first = Source(1 :: 2 :: 4 :: 6 :: Nil) val second = Source(1 :: 2 :: 3 :: 4 :: 5 :: 6 :: 7 :: Nil) conditionalZip(first, second, Int.MinValue, _ == _).runForeach(println) // (1,1) // (2,2) // (4,4) // (6,6) conditionalZip(first, second, Int.MinValue, _ > _).runForeach(println) // (2,1) // (4,3) // (6,4) conditionalZip(first, second, Int.MinValue, _ < _).runForeach(println) // (1,2) // (2,3) // (4,5) // (6,7) //// Case 2: val first = Source(3 :: 9 :: 5 :: 5 :: 6 :: Nil) val second = Source(1 :: 3 :: 5 :: 2 :: 5 :: 6 :: Nil) conditionalZip(first, second, Int.MinValue, _ == _).runForeach(println) // (3,3) // (5,5) // (5,5) // (6,6) conditionalZip(first, second, Int.MinValue, _ > _).runForeach(println) // (3,1) // (9,3) // (5,2) // (6,5) conditionalZip(first, second, Int.MinValue, _ < _).runForeach(println) // (3,5) // (5,6)