Akka Stream comes with a comprehensive set of fan-in/fan-out features for stream processing. It’s worth noting that rather than as substreams, fan-in/fan-out operations take regular streams as input and generate regular streams as output. These operations are different from substreaming which produces nested SubSource or SubFlow instances with operators like groupBy
which, in turn, can be merged back into a regular stream via functions like mergeSubstreams
.
Fan-in: Zip versus Merge
For fan-in functionalities, they primarily belong to two types of operations: Zip and Merge. One of the main differences between the them is that Zip may combine streams of different element types to generate a stream of tuple-typed elements whereas Merge takes streams of same type and generates a stream of elements (or a stream of collections of elements). Another difference is that the resulting stream emits when each of the input streams has an element for Zip; as opposed to emitting as soon as any one of the input streams has an element for Merge.
Starting v2.6, Akka Stream introduces a few additional flavors of Merge functions such as mergeLatest
, mergePreferred
, mergePrioritized
. In this blog post, we’re going to focus on Merge, in particular, mergeLatest
which, unlike most other Merge functions, generates a list of elements for each element emitted from any of the input streams.
MergeLatest
Function mergeLatest
takes a couple of parameters: inputPorts
which is the number of input streams and eagerClose
which specifics whether the stream completes when all upstreams complete (false) or one upstream completes (true).
Let’s try it out using Source.combine
, which takes two or more Sources
and apply the provided uniform fan-in operator (in this case, MergeLatest
):
import akka.stream.scaladsl._ import akka.actor.ActorSystem implicit val system = ActorSystem("system") val s1 = Source(1 to 3) val s2 = Source(11 to 13).throttle(1, 50.millis) val s3 = Source(101 to 103).throttle(1, 100.millis) // Source.combine(s1, s2, s3)(Merge[Int](_)).runForeach(println) // Ordinary Merge Source.combine(s1, s2, s3)(MergeLatest[Int](_, 0)).runForeach(println) // Output: // // List(1, 11, 101) // List(2, 11, 101) // List(2, 12, 101) // List(3, 12, 101) // List(3, 13, 101) // List(3, 13, 102) // List(3, 13, 103)
For comparison, had MergeLatest
been replaced with the ordinary Merge
, the output would be like this:
// Output: // // 1 // 11 // 101 // 2 // 12 // 3 // 13 // 102 // 103
As can be seen from Akka Stream’s Flow source code, mergeLatest
uses the stream processing operator MergeLatest
for the special case of 2 input streams:
def mergeLatest[U >: Out, M](that: Graph[SourceShape[U], M], eagerComplete: Boolean = false): Repr[immutable.Seq[U]] = via(mergeLatestGraph(that, eagerComplete)) protected def mergeLatestGraph[U >: Out, M]( that: Graph[SourceShape[U], M], eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, immutable.Seq[U]], M] = GraphDSL.create(that) { implicit b => r => val merge = b.add(MergeLatest[U](2, eagerComplete)) r ~> merge.in(1) FlowShape(merge.in(0), merge.out) }
And below is how the MergeLatest operator is implemented:
object MergeLatest { def apply[T](inputPorts: Int, eagerComplete: Boolean = false): GraphStage[UniformFanInShape[T, List[T]]] = new MergeLatest[T, List[T]](inputPorts, eagerComplete)(_.toList) } final class MergeLatest[T, M](val inputPorts: Int, val eagerClose: Boolean)(buildElem: Array[T] => M) extends GraphStage[UniformFanInShape[T, M]] { require(inputPorts >= 1, "input ports must be >= 1") val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("MergeLatest.in" + i)) val out: Outlet[M] = Outlet[M]("MergeLatest.out") override val shape: UniformFanInShape[T, M] = UniformFanInShape(out, in: _*) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { private val activeStreams: java.util.HashSet[Int] = new java.util.HashSet[Int]() private var runningUpstreams: Int = inputPorts private def upstreamsClosed: Boolean = runningUpstreams == 0 private def allMessagesReady: Boolean = activeStreams.size == inputPorts private val messages: Array[Any] = new Array[Any](inputPorts) override def preStart(): Unit = in.foreach(tryPull) in.zipWithIndex.foreach { case (input, index) => setHandler( input, new InHandler { override def onPush(): Unit = { messages.update(index, grab(input)) activeStreams.add(index) if (allMessagesReady) emit(out, buildElem(messages.asInstanceOf[Array[T]])) tryPull(input) } override def onUpstreamFinish(): Unit = { if (!eagerClose) { runningUpstreams -= 1 if (upstreamsClosed) completeStage() } else completeStage() } }) } override def onPull(): Unit = { var i = 0 while (i < inputPorts) { if (!hasBeenPulled(in(i))) tryPull(in(i)) i += 1 } } setHandler(out, this) } override def toString = "MergeLatest" }
As shown in the source code, it’s implemented as a standard GraphStage
of UniformFanInShape
. The implementation is so modular that repurposing it to do something a little differently can be rather easy.
Repurposing MergeLatest
There was a relevant use case inquiry on Stack Overflow to which I offered a solution for changing the initial stream emission behavior. MergeLatest
by design starts emitting the output stream only after each input stream has emitted an initial element, which is somewhat an exception to typical Merge behavior as mentioned earlier. The solution I suggested is to revise the operator to change the emission behavior similar to other Merge operators — i.e. start emitting as soon as one of the input streams has an element by filling in the rest with a user-provided default element.
Below is the repurposed code:
import akka.stream.scaladsl._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.stream.{ Attributes, Inlet, Outlet, UniformFanInShape } import scala.collection.immutable object MergeLatestWithDefault { def apply[T](inputPorts: Int, default: T, eagerComplete: Boolean = false): GraphStage[UniformFanInShape[T, List[T]]] = new MergeLatestWithDefault[T, List[T]](inputPorts, default, eagerComplete)(_.toList) } final class MergeLatestWithDefault[T, M](val inputPorts: Int, val default: T, val eagerClose: Boolean)(buildElem: Array[T] => M) extends GraphStage[UniformFanInShape[T, M]] { require(inputPorts >= 1, "input ports must be >= 1") val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("MergeLatestWithDefault.in" + i)) val out: Outlet[M] = Outlet[M]("MergeLatestWithDefault.out") override val shape: UniformFanInShape[T, M] = UniformFanInShape(out, in: _*) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { private val activeStreams: java.util.HashSet[Int] = new java.util.HashSet[Int]() private var runningUpstreams: Int = inputPorts private def upstreamsClosed: Boolean = runningUpstreams == 0 private val messages: Array[Any] = Array.fill[Any](inputPorts)(default) override def preStart(): Unit = in.foreach(tryPull) in.zipWithIndex.foreach { case (input, index) => setHandler( input, new InHandler { override def onPush(): Unit = { messages.update(index, grab(input)) activeStreams.add(index) emit(out, buildElem(messages.asInstanceOf[Array[T]])) tryPull(input) } override def onUpstreamFinish(): Unit = { if (!eagerClose) { runningUpstreams -= 1 if (upstreamsClosed) completeStage() } else completeStage() } }) } override def onPull(): Unit = { var i = 0 while (i < inputPorts) { if (!hasBeenPulled(in(i))) tryPull(in(i)) i += 1 } } setHandler(out, this) } override def toString = "MergeLatestWithDefault" }
Little code change is necessary in this case. Besides an additional parameter for the default
element value to be pre-filled in an internal array, the only change is that emit
within onPush
within the InHandler
is no longer conditional.
Testing it out:
import akka.stream.scaladsl._ import akka.actor.ActorSystem implicit val system = ActorSystem("system") val s1 = Source(1 to 3) val s2 = Source(11 to 13).throttle(1, 50.millis) val s3 = Source(101 to 103).throttle(1, 100.millis) Source.combine(s1, s2, s3)(MergeLatestWithDefault[Int](_, 0)).runForeach(println) // Output: // // List(1, 0, 0) // List(1, 11, 0) // List(1, 11, 101) // List(2, 11, 101) // List(2, 12, 101) // List(3, 12, 101) // List(3, 13, 101) // List(3, 13, 102) // List(3, 13, 103)