Tag Archives: akka streams

Merging Akka Streams With MergeLatest

Akka Stream comes with a comprehensive set of fan-in/fan-out features for stream processing. It’s worth noting that rather than as substreams, fan-in/fan-out operations take regular streams as input and generate regular streams as output. These operations are different from substreaming which produces nested SubSource or SubFlow instances with operators like groupBy which, in turn, can be merged back into a regular stream via functions like mergeSubstreams.

Fan-in: Zip versus Merge

For fan-in functionalities, they primarily belong to two types of operations: Zip and Merge. One of the main differences between the them is that Zip may combine streams of different element types to generate a stream of tuple-typed elements whereas Merge takes streams of same type and generates a stream of elements (or a stream of collections of elements). Another difference is that the resulting stream emits when each of the input streams has an element for Zip; as opposed to emitting as soon as any one of the input streams has an element for Merge.

Starting v2.6, Akka Stream introduces a few additional flavors of Merge functions such as mergeLatest, mergePreferred, mergePrioritized. In this blog post, we’re going to focus on Merge, in particular, mergeLatest which, unlike most other Merge functions, generates a list of elements for each element emitted from any of the input streams.

MergeLatest

Function mergeLatest takes a couple of parameters: inputPorts which is the number of input streams and eagerClose which specifics whether the stream completes when all upstreams complete (false) or one upstream completes (true).

Let’s try it out using Source.combine, which takes two or more Sources and apply the provided uniform fan-in operator (in this case, MergeLatest):

import akka.stream.scaladsl._
import akka.actor.ActorSystem

implicit val system = ActorSystem("system")

val s1 = Source(1 to 3)
val s2 = Source(11 to 13).throttle(1, 50.millis)
val s3 = Source(101 to 103).throttle(1, 100.millis)

// Source.combine(s1, s2, s3)(Merge[Int](_)).runForeach(println)  // Ordinary Merge
Source.combine(s1, s2, s3)(MergeLatest[Int](_, 0)).runForeach(println)

// Output: 
//
// List(1, 11, 101)
// List(2, 11, 101)
// List(2, 12, 101)
// List(3, 12, 101)
// List(3, 13, 101)
// List(3, 13, 102)
// List(3, 13, 103)

For comparison, had MergeLatest been replaced with the ordinary Merge, the output would be like this:

// Output:
//
// 1
// 11
// 101
// 2
// 12
// 3
// 13
// 102
// 103

As can be seen from Akka Stream’s Flow source code, mergeLatest uses the stream processing operator MergeLatest for the special case of 2 input streams:

def mergeLatest[U >: Out, M](that: Graph[SourceShape[U], M], eagerComplete: Boolean = false): Repr[immutable.Seq[U]] =
  via(mergeLatestGraph(that, eagerComplete))

protected def mergeLatestGraph[U >: Out, M](
    that: Graph[SourceShape[U], M],
      eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, immutable.Seq[U]], M] =
  GraphDSL.create(that) { implicit b => r =>
    val merge = b.add(MergeLatest[U](2, eagerComplete))
    r ~> merge.in(1)
    FlowShape(merge.in(0), merge.out)
  }

And below is how the MergeLatest operator is implemented:

object MergeLatest {
  def apply[T](inputPorts: Int, eagerComplete: Boolean = false): GraphStage[UniformFanInShape[T, List[T]]] =
    new MergeLatest[T, List[T]](inputPorts, eagerComplete)(_.toList)
}

final class MergeLatest[T, M](val inputPorts: Int, val eagerClose: Boolean)(buildElem: Array[T] => M)
    extends GraphStage[UniformFanInShape[T, M]] {
  require(inputPorts >= 1, "input ports must be >= 1")

  val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("MergeLatest.in" + i))
  val out: Outlet[M] = Outlet[M]("MergeLatest.out")
  override val shape: UniformFanInShape[T, M] = UniformFanInShape(out, in: _*)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler {
      private val activeStreams: java.util.HashSet[Int] = new java.util.HashSet[Int]()
      private var runningUpstreams: Int = inputPorts
      private def upstreamsClosed: Boolean = runningUpstreams == 0
      private def allMessagesReady: Boolean = activeStreams.size == inputPorts
      private val messages: Array[Any] = new Array[Any](inputPorts)

      override def preStart(): Unit = in.foreach(tryPull)

      in.zipWithIndex.foreach {
        case (input, index) =>
          setHandler(
            input,
            new InHandler {
              override def onPush(): Unit = {
                messages.update(index, grab(input))
                activeStreams.add(index)
                if (allMessagesReady) emit(out, buildElem(messages.asInstanceOf[Array[T]]))
                tryPull(input)
              }

              override def onUpstreamFinish(): Unit = {
                if (!eagerClose) {
                  runningUpstreams -= 1
                  if (upstreamsClosed) completeStage()
                } else completeStage()
              }
            })
      }

      override def onPull(): Unit = {
        var i = 0
        while (i < inputPorts) {
          if (!hasBeenPulled(in(i))) tryPull(in(i))
          i += 1
        }
      }

      setHandler(out, this)
    }

  override def toString = "MergeLatest"
}

As shown in the source code, it’s implemented as a standard GraphStage of UniformFanInShape. The implementation is so modular that repurposing it to do something a little differently can be rather easy.

Repurposing MergeLatest

There was a relevant use case inquiry on Stack Overflow to which I offered a solution for changing the initial stream emission behavior. MergeLatest by design starts emitting the output stream only after each input stream has emitted an initial element, which is somewhat an exception to typical Merge behavior as mentioned earlier. The solution I suggested is to revise the operator to change the emission behavior similar to other Merge operators — i.e. start emitting as soon as one of the input streams has an element by filling in the rest with a user-provided default element.

Below is the repurposed code:

import akka.stream.scaladsl._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.{ Attributes, Inlet, Outlet, UniformFanInShape }
import scala.collection.immutable

object MergeLatestWithDefault {
  def apply[T](inputPorts: Int, default: T, eagerComplete: Boolean = false): GraphStage[UniformFanInShape[T, List[T]]] =
	new MergeLatestWithDefault[T, List[T]](inputPorts, default, eagerComplete)(_.toList)
}

final class MergeLatestWithDefault[T, M](val inputPorts: Int, val default: T, val eagerClose: Boolean)(buildElem: Array[T] => M)
	extends GraphStage[UniformFanInShape[T, M]] {
  require(inputPorts >= 1, "input ports must be >= 1")

  val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("MergeLatestWithDefault.in" + i))
  val out: Outlet[M] = Outlet[M]("MergeLatestWithDefault.out")
  override val shape: UniformFanInShape[T, M] = UniformFanInShape(out, in: _*)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
	new GraphStageLogic(shape) with OutHandler {
	  private val activeStreams: java.util.HashSet[Int] = new java.util.HashSet[Int]()
	  private var runningUpstreams: Int = inputPorts
	  private def upstreamsClosed: Boolean = runningUpstreams == 0
	  private val messages: Array[Any] = Array.fill[Any](inputPorts)(default)

	  override def preStart(): Unit = in.foreach(tryPull)

	  in.zipWithIndex.foreach {
		case (input, index) =>
		  setHandler(
			input,
			new InHandler {
			  override def onPush(): Unit = {
				messages.update(index, grab(input))
				activeStreams.add(index)
				emit(out, buildElem(messages.asInstanceOf[Array[T]]))
				tryPull(input)
			  }

			  override def onUpstreamFinish(): Unit = {
				if (!eagerClose) {
				  runningUpstreams -= 1
				  if (upstreamsClosed) completeStage()
				} else completeStage()
			  }
			})
	  }

	  override def onPull(): Unit = {
		var i = 0
		while (i < inputPorts) {
		  if (!hasBeenPulled(in(i))) tryPull(in(i))
		  i += 1
		}
	  }

	  setHandler(out, this)
	}

  override def toString = "MergeLatestWithDefault"
}

Little code change is necessary in this case. Besides an additional parameter for the default element value to be pre-filled in an internal array, the only change is that emit within onPush within the InHandler is no longer conditional.

Testing it out:

import akka.stream.scaladsl._
import akka.actor.ActorSystem

implicit val system = ActorSystem("system")

val s1 = Source(1 to 3)
val s2 = Source(11 to 13).throttle(1, 50.millis)
val s3 = Source(101 to 103).throttle(1, 100.millis)

Source.combine(s1, s2, s3)(MergeLatestWithDefault[Int](_, 0)).runForeach(println)

// Output: 
//
// List(1, 0, 0)
// List(1, 11, 0)
// List(1, 11, 101)
// List(2, 11, 101)
// List(2, 12, 101)
// List(3, 12, 101)
// List(3, 13, 101)
// List(3, 13, 102)
// List(3, 13, 103)

Akka Stream Stateful MapConcat

If you’ve been building applications with Akka Stream in Scala, you would probably have used mapConcat (and perhaps flatMapConcat as well). It’s a handy method for expanding and flattening content of a Stream, much like how flatMap operates on an ordinary Scala collection. The method has the following signature:

Here’s a trivial example using mapConcat:

A mapConcat with an internal state

A relatively less popular method that allows one to expand and flatten Stream elements while iteratively processing some internal state is statefulMapConcat, with method signature as follows:

Interestingly, method mapConcat is just a parametrically restricted version of method statefulMapConcat. Here’s how mapConcat[T] is implemented in Akka Stream Flow:

Example 1: Extracting sections of elements

Let’s look at a simple example that illustrates how statefulMapConcat can be used to extract sections of a given Source in accordance with special elements designated for section-start / stop.

The internal state in the above example is the mutable Boolean variable discard being toggled in accordance with the designated start/stop element to either return an empty Iterable (in this case, Nil) or an Iterable consisting the element in a given iteration.

Example 2: Conditional element-wise pairing of streams

Next, we look at a slightly more complex example. Say, we have two Sources of integer elements and we would like to pair up the elements from the two Sources based on some condition provided as a (Int, Int) => Boolean function.

In the main method ConditionalZip, a couple of Lists are maintained for the two Stream Sources to keep track of elements held off in previous iterations to be conditionally consumed in subsequent iterations based on the provided condition function.

Utility method popFirstMatch is for extracting the first element in a List that satisfies the condition derived from the condition function. It also returns the resulting List consisting of the remaining elements.

Note that the filler elements are for method zipAll (available on Akka Stream 2.6+) to cover all elements in the “bigger” Stream Source of the two. The provided filler value should be distinguishable from the Stream elements (Int.Minvalue in this example) so that the condition logic can be applied accordingly.

Test running ConditionalZip:

Custom Akka Stream Processing

The Akka Stream API comes with a suite of versatile tools for stream processing. Besides the Graph DSL, a set of built-in stream operators is also readily available. Yet, if more custom streams are needed, GraphStage allows one to create streaming operators with specific stream processing logic between the input and output ports.

As illustrated in the Akka Stream doc re: custom stream processing, one can come up with a transformation function like map or filter with a custom GraphStage in just a few lines of code. For example, method map can be implemented as a Flow using GraphStage:

In analyzing a given stream operation, it’s easier to reason about the flow logic starting from the downstream and trace upward. With that in mind, let’s look at the above snippet. When there is a demand from the downstream to pull an element out of the output port, callback method onPull is called which initiates a pull of a new element into the input port which, upon push from the upstream, triggers the onPush callback to grab the the element on the input port, apply function f and push it to the output port.

What is a GraphStage?

A GraphStage represents a stream processing operator. Below is the source code of abstract classes GraphStage and GraphStageLogic:

To use (i.e. extend) a GraphStage, one needs to implement method createLogic which returns a GraphStageLogic that takes a shape and consists of defined method setHandler which, in turn, takes two arguments Inlet/Outlet and InHandler/OutHandler. These InHandler and OutHandler routines are where the custom processing logic for every stream element resides.

As illustrated in the map or filter implementation in the mentioned Akka doc, to define a GraphStage one would need to minimally define in, out and shape (FlowShape in those examples) of the graph, as well as the stream processing logic in the InHander and OutHandler.

Handling external asynchronous events

Among various customizing features, one can extend a GraphStage to handle asynchronous events (i.e. Scala Futures) that aren’t part of the stream. To do that, simply define a callback using getAsyncCallback to create an AsyncCallback, which will be invoked by the external event via method invoke.

As an exercise for building custom stream processing operators with GraphStage, we’re going to modify the above map Flow to one that dynamically changes the transformation function upon triggering by an asynchronous event. Let’s name the class DynamicMap which takes a switch event of type Future[Unit] and two ‘A => B’ transformation functions (f being the original function and g the switched one).

In this case, callback simply modifies variable flipped from the initial false value to true so that when being pushed on the input port the InHandler will now push g(elem) rather than f(elem) to the output port. In addition, an ExecutionContext, required by method invoke for callback invocation, is passed in as an implicit parameter.

Note that to avoid race conditions, the callback is defined and invoked using the preStart lifecycle hook, rather than in the constructor of GraphStageLogic.

Testing DynamicMap with a dummy asynchronous event switch that simply returns in a milliseconds and a couple of trivial ‘DataIn => DataOut’ transformation functions: