Monthly Archives: January 2021

Merging Akka Streams With MergeLatest

Akka Stream comes with a comprehensive set of fan-in/fan-out features for stream processing. It’s worth noting that rather than as substreams, fan-in/fan-out operations take regular streams as input and generate regular streams as output. These operations are different from substreaming which produces nested SubSource or SubFlow instances with operators like groupBy which, in turn, can be merged back into a regular stream via functions like mergeSubstreams.

Fan-in: Zip versus Merge

For fan-in functionalities, they primarily belong to two types of operations: Zip and Merge. One of the main differences between the them is that Zip may combine streams of different element types to generate a stream of tuple-typed elements whereas Merge takes streams of same type and generates a stream of elements (or a stream of collections of elements). Another difference is that the resulting stream emits when each of the input streams has an element for Zip; as opposed to emitting as soon as any one of the input streams has an element for Merge.

Starting v2.6, Akka Stream introduces a few additional flavors of Merge functions such as mergeLatest, mergePreferred, mergePrioritized. In this blog post, we’re going to focus on Merge, in particular, mergeLatest which, unlike most other Merge functions, generates a list of elements for each element emitted from any of the input streams.

MergeLatest

Function mergeLatest takes a couple of parameters: inputPorts which is the number of input streams and eagerClose which specifics whether the stream completes when all upstreams complete (false) or one upstream completes (true).

Let’s try it out using Source.combine, which takes two or more Sources and apply the provided uniform fan-in operator (in this case, MergeLatest):

import akka.stream.scaladsl._
import akka.actor.ActorSystem

implicit val system = ActorSystem("system")

val s1 = Source(1 to 3)
val s2 = Source(11 to 13).throttle(1, 50.millis)
val s3 = Source(101 to 103).throttle(1, 100.millis)

// Source.combine(s1, s2, s3)(Merge[Int](_)).runForeach(println)  // Ordinary Merge
Source.combine(s1, s2, s3)(MergeLatest[Int](_, 0)).runForeach(println)

// Output: 
//
// List(1, 11, 101)
// List(2, 11, 101)
// List(2, 12, 101)
// List(3, 12, 101)
// List(3, 13, 101)
// List(3, 13, 102)
// List(3, 13, 103)

For comparison, had MergeLatest been replaced with the ordinary Merge, the output would be like this:

// Output:
//
// 1
// 11
// 101
// 2
// 12
// 3
// 13
// 102
// 103

As can be seen from Akka Stream’s Flow source code, mergeLatest uses the stream processing operator MergeLatest for the special case of 2 input streams:

def mergeLatest[U >: Out, M](that: Graph[SourceShape[U], M], eagerComplete: Boolean = false): Repr[immutable.Seq[U]] =
  via(mergeLatestGraph(that, eagerComplete))

protected def mergeLatestGraph[U >: Out, M](
    that: Graph[SourceShape[U], M],
      eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, immutable.Seq[U]], M] =
  GraphDSL.create(that) { implicit b => r =>
    val merge = b.add(MergeLatest[U](2, eagerComplete))
    r ~> merge.in(1)
    FlowShape(merge.in(0), merge.out)
  }

And below is how the MergeLatest operator is implemented:

object MergeLatest {
  def apply[T](inputPorts: Int, eagerComplete: Boolean = false): GraphStage[UniformFanInShape[T, List[T]]] =
    new MergeLatest[T, List[T]](inputPorts, eagerComplete)(_.toList)
}

final class MergeLatest[T, M](val inputPorts: Int, val eagerClose: Boolean)(buildElem: Array[T] => M)
    extends GraphStage[UniformFanInShape[T, M]] {
  require(inputPorts >= 1, "input ports must be >= 1")

  val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("MergeLatest.in" + i))
  val out: Outlet[M] = Outlet[M]("MergeLatest.out")
  override val shape: UniformFanInShape[T, M] = UniformFanInShape(out, in: _*)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler {
      private val activeStreams: java.util.HashSet[Int] = new java.util.HashSet[Int]()
      private var runningUpstreams: Int = inputPorts
      private def upstreamsClosed: Boolean = runningUpstreams == 0
      private def allMessagesReady: Boolean = activeStreams.size == inputPorts
      private val messages: Array[Any] = new Array[Any](inputPorts)

      override def preStart(): Unit = in.foreach(tryPull)

      in.zipWithIndex.foreach {
        case (input, index) =>
          setHandler(
            input,
            new InHandler {
              override def onPush(): Unit = {
                messages.update(index, grab(input))
                activeStreams.add(index)
                if (allMessagesReady) emit(out, buildElem(messages.asInstanceOf[Array[T]]))
                tryPull(input)
              }

              override def onUpstreamFinish(): Unit = {
                if (!eagerClose) {
                  runningUpstreams -= 1
                  if (upstreamsClosed) completeStage()
                } else completeStage()
              }
            })
      }

      override def onPull(): Unit = {
        var i = 0
        while (i < inputPorts) {
          if (!hasBeenPulled(in(i))) tryPull(in(i))
          i += 1
        }
      }

      setHandler(out, this)
    }

  override def toString = "MergeLatest"
}

As shown in the source code, it’s implemented as a standard GraphStage of UniformFanInShape. The implementation is so modular that repurposing it to do something a little differently can be rather easy.

Repurposing MergeLatest

There was a relevant use case inquiry on Stack Overflow to which I offered a solution for changing the initial stream emission behavior. MergeLatest by design starts emitting the output stream only after each input stream has emitted an initial element, which is somewhat an exception to typical Merge behavior as mentioned earlier. The solution I suggested is to revise the operator to change the emission behavior similar to other Merge operators — i.e. start emitting as soon as one of the input streams has an element by filling in the rest with a user-provided default element.

Below is the repurposed code:

import akka.stream.scaladsl._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.{ Attributes, Inlet, Outlet, UniformFanInShape }
import scala.collection.immutable

object MergeLatestWithDefault {
  def apply[T](inputPorts: Int, default: T, eagerComplete: Boolean = false): GraphStage[UniformFanInShape[T, List[T]]] =
	new MergeLatestWithDefault[T, List[T]](inputPorts, default, eagerComplete)(_.toList)
}

final class MergeLatestWithDefault[T, M](val inputPorts: Int, val default: T, val eagerClose: Boolean)(buildElem: Array[T] => M)
	extends GraphStage[UniformFanInShape[T, M]] {
  require(inputPorts >= 1, "input ports must be >= 1")

  val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("MergeLatestWithDefault.in" + i))
  val out: Outlet[M] = Outlet[M]("MergeLatestWithDefault.out")
  override val shape: UniformFanInShape[T, M] = UniformFanInShape(out, in: _*)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
	new GraphStageLogic(shape) with OutHandler {
	  private val activeStreams: java.util.HashSet[Int] = new java.util.HashSet[Int]()
	  private var runningUpstreams: Int = inputPorts
	  private def upstreamsClosed: Boolean = runningUpstreams == 0
	  private val messages: Array[Any] = Array.fill[Any](inputPorts)(default)

	  override def preStart(): Unit = in.foreach(tryPull)

	  in.zipWithIndex.foreach {
		case (input, index) =>
		  setHandler(
			input,
			new InHandler {
			  override def onPush(): Unit = {
				messages.update(index, grab(input))
				activeStreams.add(index)
				emit(out, buildElem(messages.asInstanceOf[Array[T]]))
				tryPull(input)
			  }

			  override def onUpstreamFinish(): Unit = {
				if (!eagerClose) {
				  runningUpstreams -= 1
				  if (upstreamsClosed) completeStage()
				} else completeStage()
			  }
			})
	  }

	  override def onPull(): Unit = {
		var i = 0
		while (i < inputPorts) {
		  if (!hasBeenPulled(in(i))) tryPull(in(i))
		  i += 1
		}
	  }

	  setHandler(out, this)
	}

  override def toString = "MergeLatestWithDefault"
}

Little code change is necessary in this case. Besides an additional parameter for the default element value to be pre-filled in an internal array, the only change is that emit within onPush within the InHandler is no longer conditional.

Testing it out:

import akka.stream.scaladsl._
import akka.actor.ActorSystem

implicit val system = ActorSystem("system")

val s1 = Source(1 to 3)
val s2 = Source(11 to 13).throttle(1, 50.millis)
val s3 = Source(101 to 103).throttle(1, 100.millis)

Source.combine(s1, s2, s3)(MergeLatestWithDefault[Int](_, 0)).runForeach(println)

// Output: 
//
// List(1, 0, 0)
// List(1, 11, 0)
// List(1, 11, 101)
// List(2, 11, 101)
// List(2, 12, 101)
// List(3, 12, 101)
// List(3, 13, 101)
// List(3, 13, 102)
// List(3, 13, 103)