Author Archives: Leo Cheung

Custom Akka Stream Processing

The Akka Stream API comes with a suite of versatile tools for stream processing. Besides the Graph DSL, a set of built-in stream operators is also readily available. Yet, if more custom streams are needed, `GraphStage` allows one to create streaming operators with specific stream processing logic between the input and output ports.

As illustrated in the Akka Stream doc re: custom stream processing, one can come up with a transformation function like `map` or `filter` with a custom GraphStage in just a few lines of code. For example, method `map` can be implemented as a Flow using GraphStage:

// Implementing `map` using GraphStage
class Map[A, B](f: A => B) extends GraphStage[FlowShape[A, B]] {

  val in = Inlet[A]("Map.in")
  val out = Outlet[B]("Map.out")

  override val shape = FlowShape.of(in, out)

  override def createLogic(attr: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          push(out, f(grab(in)))
        }
      })
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
}

In analyzing a given stream operation, it’s easier to reason about the flow logic starting from the downstream and trace upward. With that in mind, let’s look at the above snippet. When there is a demand from the downstream to pull an element out of the output port, callback method `onPull` is called which initiates a pull of a new element into the input port which, upon push from the upstream, triggers the `onPush` callback to grab the the element on the input port, apply function `f` and push it to the output port.

What is a GraphStage?

A `GraphStage` represents a stream processing operator. Below is the source code of abstract classes GraphStage and GraphStageLogic:

// Class GraphStage
abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] {
  final override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, NotUsed) =
    (createLogic(inheritedAttributes), NotUsed)

  @throws(classOf[Exception])
  def createLogic(inheritedAttributes: Attributes): GraphStageLogic
}

// Class GraphStageLogic
abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: Int) {

  import GraphInterpreter._
  import GraphStageLogic._

  def this(shape: Shape) = this(shape.inlets.size, shape.outlets.size)

  ...

  final protected def setHandler(in: Inlet[_], handler: InHandler): Unit = {
    handlers(in.id) = handler
    if (_interpreter != null) _interpreter.setHandler(conn(in), handler)
  }

  final protected def setHandler(out: Outlet[_], handler: OutHandler): Unit = {
    handlers(out.id + inCount) = handler
    if (_interpreter != null) _interpreter.setHandler(conn(out), handler)
  }

  ...
}

To use (i.e. extend) a `GraphStage`, one needs to implement method `createLogic` which returns a `GraphStageLogic` that takes a `shape` and consists of defined method `setHandler` which, in turn, takes two arguments Inlet/Outlet and InHandler/OutHandler. These InHandler and OutHandler routines are where the custom processing logic for every stream element resides.

As illustrated in the `map` or `filter` implementation in the mentioned Akka doc, to define a GraphStage one would need to minimally define `in`, `out` and `shape` (FlowShape in those examples) of the graph, as well as the stream processing logic in the InHander and OutHandler.

Handling external asynchronous events

Among various customizing features, one can extend a GraphStage to handle asynchronous events (i.e. Scala Futures) that aren’t part of the stream. To do that, simply define a callback using `getAsyncCallback` to create an `AsyncCallback`, which will be invoked by the external event via method `invoke`.

As an exercise for building custom stream processing operators with GraphStage, we’re going to modify the above `map` Flow to one that dynamically changes the transformation function upon triggering by an asynchronous event. Let’s name the class `DynamicMap` which takes a `switch` event of type Future[Unit] and two ‘A => B’ transformation functions (`f` being the original function and `g` the switched one).

// Dynamically switching `map` functions upon triggering by an asynchronous event
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.{Done, NotUsed}

import scala.concurrent.{ExecutionContext, Future}

class DynamicMap[A, B](switch: Future[Unit], f: A => B, g: A => B)(implicit ec: ExecutionContext)
  extends GraphStage[FlowShape[A, B]] {
 
  val in = Inlet[A]("DynamicMap.in")
  val out = Outlet[B]("DynamicMap.out")
 
  override val shape = FlowShape.of(in, out)
 
  override def createLogic(attr: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var flipped = false

      override def preStart(): Unit = {
        val callback = getAsyncCallback[Unit] { _ =>
          flipped = true
        }
        switch.foreach(callback.invoke)
      }

      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          push(out, if (flipped) g(grab(in)) else f(grab(in)))
        }
      })

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
}

In this case, `callback` simply modifies variable `flipped` from the initial `false` value to `true` so that when being pushed on the input port the InHandler will now push `g(elem)` rather than `f(elem)` to the output port. In addition, an `ExecutionContext`, required by method `invoke` for callback invocation, is passed in as an implicit parameter.

Note that to avoid race conditions, the callback is defined and invoked using the `preStart` lifecycle hook, rather than in the constructor of GraphStageLogic.

Testing `DynamicMap` with a dummy asynchronous event `switch` that simply returns in a milliseconds and a couple of trivial ‘DataIn => DataOut’ transformation functions:

// Testing `DynamicMap`
case class DataIn(id: Int)
case class DataOut(content: String)

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val source: Source[DataIn, NotUsed] = Source(1 to 2000).map(DataIn(_))
val sink: Sink[DataOut, Future[Done]] = Sink.foreach(println)

val switch = Future{ Thread.sleep(1) }
val f = (di: DataIn) => DataOut(s"ID-${di.id}-OLD")
val g = (di: DataIn) => DataOut(s"ID-${di.id}-NEW")

source.via(new DynamicMap[DataIn, DataOut](switch, f, g)).runWith(sink)

// OUTPUT:
//
// DataOut(ID-1-OLD)
// DataOut(ID-2-OLD)
// ...
// DataOut(ID-982-OLD)
// DataOut(ID-983-OLD)
// DataOut(ID-984-NEW)
// DataOut(ID-985-NEW)
// DataOut(ID-986-NEW)
// ...
// ...

Akka Dynamic Pub-Sub Service

As shown in a previous blog post illustrating with a simple text mining application, Akka Stream provides a robust `GraphDSL` domain specific language for assembling stream graphs using a mix of fan-in/fan-out junctions. While `GraphDSL` is a great tool, the supported fan-in/fan-out components are limited to have a fixed number of inputs and outputs as all connections of the graph must be known and connected upfront.

To build a streaming service that allows new producers and consumers to be dynamically added, one would need to look outside of the `GraphDSL`. In this blog post, we’re going to look at how to build a dynamic publish-subscribe service.

MergeHub

Akka provides MergeHub that serves as a dynamic fan-in junction in the form of a Source to be attached to with a single consumer. Once materialized, multiple producers can be attached to the hub where elements coming from these producers are emitted in a first-comes-first-served fashion along with `backpressure` support.

MergeHub.source has the following method signature:

// MergeHub.source
def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]]

Example:

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, DelayOverflowStrategy}
import akka.stream.scaladsl._
import akka.{Done, NotUsed}

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val consumer = Sink.foreach(println)

val toConsumer: Sink[String, NotUsed] =
  MergeHub.source[String](perProducerBufferSize = 32).to(consumer).run()

Source(1 to 3).map(i => s"Batch-delayed-$i").
  delay(2.seconds, DelayOverflowStrategy.backpressure).
  runWith(toConsumer)

Source(4 to 5).map(i => s"Batch-immediate-$i").
  runWith(toConsumer)

// Batch-immediate-4
// Batch-immediate-5
// Batch-delayed-1
// Batch-delayed-2
// Batch-delayed-3

BroadcastHub

BroadcastHub, on the other hand, serves as a dynamic fan-out junction in the form of a Sink to be attached to with a single producer. Similarly, once materialized, multiple consumers can be attached to it, again, with `backpressure` support.

BroadcastHub.sink has the following method signature:

// BroadcastHub.sink
def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]]

Example:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.{Done, NotUsed}

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val producer = Source(1 to 20).map(i => s"Element-$i")

val fromProducer: Source[String, NotUsed] =
  producer.toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).run()

fromProducer.runForeach{ case s: String => println("Consumer-A-" + s) }
fromProducer.runForeach{ case s: String => println("Consumer-B-" + s) }

// Consumer-A-Element-1
// Consumer-B-Element-1
// Consumer-A-Element-2
// Consumer-B-Element-2
// Consumer-B-Element-3
// Consumer-A-Element-3
// ...
// Consumer-B-Element-19
// Consumer-A-Element-18
// Consumer-B-Element-20
// Consumer-A-Element-19
// Consumer-A-Element-20

It should be cautioned that if the source has fewer elements than the `bufferSize` specified in BroadcastHub.sink, none of the elements will be consumed by the attached consumers. It took me a while to realize it’s `fromProducer` that “silently” consumes the elements when materialized before the attached consumers have a chance to consume them. That, to me, is really an undocumented “bug”. Using `alsoToMat` as shown below, one can uncover the seemingly “missing” elements in such case:

// Testing for Source with elements fewer than `bufferSize`:
val producer = Source(1 to 5).map(i => s"Element-$i")

val fromProducer: Source[String, NotUsed] =
  producer.
    alsoToMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).
    toMat(Sink.foreach{ case s: String => println("Consumer-" + s) })(Keep.left).
    run()
// Consumer-Element-1
// Consumer-Element-2
// Consumer-Element-3
// Consumer-Element-4
// Consumer-Element-5

MergeHub + BroadcastHub

By connecting a `MergeHub` with a `BroadcastHub`, one can create a dynamic publish-subscribe “channel” in the form of a Flow via Flow.fromSinkAndSource:

// Creating a pub-sub channel as a Flow
val (psSink, psSource) = MergeHub.source[String](perProducerBufferSize).
  toMat(BroadcastHub.sink[String](bufferSize))(Keep.both).run()

val psChannel: Flow[String, String, NotUsed] =
  Flow.fromSinkAndSource(psSink, psSource)

Note that `Keep.both` in the above snippet produces a Tuple of materialized values `(Sink[T, NotUsed], Source[T, NotUsed])` from `MergeHub.source[T]` and `BroadcastHub.sink[T]`. The pub-sub channel `psChannel` can be illustrated as follows:

    p                           psChannel                          c
    r  \                                                        /  o
    o   \    -------------------        -------------------    /   n
    d    \  |  MergeHub.source  |      | BroadcastHub.sink |  /    s
    u  ---  |                   | ---> |                   |  ---  u
    c    /  |      (psSink)     |      |     (psSource)    |  \    m
    e   /    -------------------        -------------------    \   e
    r  /                                                        \  r
    s                                                              s
         Source[T,Sink[T,NotUsed]]    Sink[T,Source[T,NotUsed]]

Below is sample code for a simple pub-sub channel `psChannel`:

// A simple pub-sub service example
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, DelayOverflowStrategy}
import akka.stream.scaladsl._
import akka.{Done, NotUsed}

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val (psSink, psSource) = MergeHub.source[String](perProducerBufferSize = 32).
  toMat(BroadcastHub.sink[String](bufferSize = 32))(Keep.both).run()

// Optional: avoid building up backpressure in the absence of subscribers
psSource.runWith(Sink.ignore)

Serving as a pub-sub channel, the input of `psChannel` is published via `psSink` to all subscribers while its output streams through `psSource` all the elements published. For example:

// Serving as a pub-sub service
val p1 = Source(1 to 3).map(i => s"Batch-delayed-$i").
  delay(2.seconds, DelayOverflowStrategy.backpressure)

val p2 = Source(4 to 5).map(i => s"Batch-immediate-$i")

p1.runWith(psSink)
p2.runWith(psSink)

val s1 = psSource
val s2 = psSource

s1.runForeach(s => println("Consumer-A-" + s))
s2.runForeach(s => println("Consumer-B-" + s))

// Consumer-A-Batch-immediate-4
// Consumer-B-Batch-immediate-4
// Consumer-B-Batch-immediate-5
// Consumer-A-Batch-immediate-5
// Consumer-A-Batch-delayed-1
// Consumer-A-Batch-delayed-2
// Consumer-B-Batch-delayed-1
// Consumer-B-Batch-delayed-2
// Consumer-B-Batch-delayed-3
// Consumer-A-Batch-delayed-3

Running `psChannel` as a `Flow`:

// Using the pub-sub service as a Flow
val psChannel: Flow[String, String, NotUsed] =
  Flow.fromSinkAndSource(psSink, psSource)

Source(1 to 3).map(i => s"Batch-delayed-$i").
  delay(2.seconds, DelayOverflowStrategy.backpressure).
  viaMat(psChannel)(Keep.right).
  to(Sink.foreach{ case s: String => println("Consumer-A-" + s) }).
  run()

Source(4 to 5).map(i => s"Batch-immediate-$i").
  viaMat(psChannel)(Keep.right).
  to(Sink.foreach{ case s: String => println("Consumer-B-" + s) }).
  run()

// Consumer-B-Batch-immediate-4
// Consumer-B-Batch-immediate-5
// Consumer-A-Batch-immediate-4
// Consumer-A-Batch-immediate-5
// Consumer-A-Batch-delayed-1
// Consumer-A-Batch-delayed-2
// Consumer-B-Batch-delayed-1
// Consumer-A-Batch-delayed-3
// Consumer-B-Batch-delayed-2
// Consumer-B-Batch-delayed-3

Note that each of the input elements for `psChannel` gets consumed by every consumer.

Other relevant topics that might be of interest include KillSwitch for stream completion control and PartitionHub for routing Stream elements from a given producer to a dynamic set of consumers.

Fibonacci In Scala: Tailrec, Memoized

One of the most popular number series being used as a programming exercise is undoubtedly the Fibonacci numbers:

F(0) = 1
F(1) = 1
F(n) = F(n-1) + F(n-2)

Perhaps a prominent reason why the Fibonacci sequence is of vast interest in Math is the associated Golden Ratio, but I think what makes it a great programming exercise is that despite a simplistic definition, the sequence’s exponential growth rate presents challenges in implementations with space/time efficiency in mind.

Having seen various ways of implementing methods for the Fibonacci numbers, I thought it might be worth putting them together, from a naive implementation to something more space/time efficient. But first, let’s take a quick look at the computational complexity of Fibonacci.

Fibonacci complexity

If we denote T(n) as the time required to compute F(n), by definition:

T(n) = T(n-1) + T(n-2) + K

where K is the time taken by some simple arithmetic to arrive at F(n) from F(n-1) and F(n-2).

With some approximation Math analysis (see this post), it can be shown that the lower bound and upper bound of T(n) are O(2^(n/2)) and O(2^n), respectively. For better precision, one can derive a more exact time complexity by solving the associated characteristic equation, `x^2 = x + 1`, which yields x = ~1.618 to deduce that:

Time complexity for computing F(n) = O(R^n)

where R = ~1.618 is the Golden Ratio.

As for space complexity, if one looks at the recursive tree for computing F(n), it’s pretty clear that its depth is F(n-1)’s tree depth plus one. Thus, the required space for F(n) is proportional to n. In other words:

Space complexity for computing F(n) = O(n)

The relatively small space complexity compared with the exponential time complexity explains why computing a Fibonacci number too large for a computer would generally lead to an infinite run rather than a out-of-memory/stack overflow problem.

It’s worth noting, though, if F(n) is computed via conventional iterations (e.g. a while-loop or tail recursion which gets translated into iterations by Scala under the hood), the time complexity would be reduced to O(n) proportional to the number of the loop cycles. And the space complexity would be O(1) since no `n`-dependent extra space is needed other than that for storing the Fibonacci sequence.

Naive Fibonacci

To generate Fibonacci numbers, the most straight forward approach is via a basic recursive function like below:

def fib(n: Int): BigInt = n match {
  case 0 => 0
  case 1 => 1
  case _ => fib(n-2) + fib(n-1)
}

(0 to 10).foreach(n => print(fib(n) + " "))
// 0 1 1 2 3 5 8 13 21 34 55

fib(50)
// res1: BigInt = 12586269025

With such a `naive` recursive function, computing the 50th number, i.e. fib(50), would take minutes on a typical laptop, and attempts to compute any number higher up like fib(90) would most certainly lead to an infinite run.

Tail recursive Fibonacci

So, let’s come up with a tail recursive method:

def fibTR(num: Int): BigInt = {
  @scala.annotation.tailrec
  def fibFcn(n: Int, acc1: BigInt, acc2: BigInt): BigInt = n match {
    case 0 => acc1
    case 1 => acc2
    case _ => fibFcn(n - 1, acc2, acc1 + acc2)
  }

  fibFcn(num, 0, 1)
}

As shown above, tail recursion is accomplished by means of a couple of accumulators as parameters for the inner method to recursively carry over the two numbers that precede the current number.

With the Fibonacci `TailRec` version, computing, say, the 90th number would finish instantaneously.

fibTR(90)
// res2: BigInt = 2880067194370816120

Fibonacci in a Scala Stream

Another way of implementing Fibonacci is to define the sequence to be stored in a “lazy” collection, such as a Scala Stream:

val fibS: Stream[BigInt] = 0 #:: fibS.scan(BigInt(1))(_ + _)

fibS(90)
// res3: BigInt = 2880067194370816120

Using method scan, `scan(1)(_ + _)` generates a Stream with each of its elements being successively assigned the sum of the previous two elements. Since Streams are “lazy”, none of the element values in the defined `fibStream` will be evaluated until the element is being requested.

While at it, there is a couple of other commonly seen Fibonacci implementation variants with Scala Stream:

val fibS: Stream[BigInt] = 0 #:: 1 #:: (fibS zip fibS.tail).map(n => n._1 + n._2)

val fibS: Stream[BigInt] = {
  def fs(prev: BigInt, curr: BigInt): Stream[BigInt] = prev #:: fs(curr, prev + curr)
  fs(0, 1)
}

Scala Stream memoizes by design

These Stream-based Fibonacci implementations perform reasonably well, somewhat comparable to the tail recursive Fibonacci. But while these Stream implementations all involve recursion, none is tail recursive. So, why doesn’t it suffer the same performance issue like the `naive` Fibonacci implementation does? The short answer is memoization.

Digging into the source code of Scala Stream would reveal that method `#::` (which is wrapped in class ConsWrapper) is defined as:

def #::[B >: A](hd: B): Stream[B] = cons(hd, tl) 

Tracing method `cons` further reveals that the Stream tail is a by-name parameter to class `Cons`, thus ensuring that the concatenation is performed lazily:

final class Cons[+A](hd: A, tl: => Stream[A]) extends Stream[A]

But lazy evaluation via by-name parameter does nothing to memoization. Digging deeper into the source code, one would see that Stream content is iterated through a StreamIterator class defined as follows:

final class StreamIterator[+A] private() extends AbstractIterator[A] with Iterator[A] {
  def this(self: Stream[A]) {
    this()
    these = new LazyCell(self)
  }

  class LazyCell(st: => Stream[A]) {
    lazy val v = st
  }

  private var these: LazyCell = _

  def hasNext: Boolean = these.v.nonEmpty

  def next(): A =
    if (isEmpty) Iterator.empty.next()
    else {
      val cur    = these.v
      val result = cur.head
      these = new LazyCell(cur.tail)
      result
    }

  ...
}

The inner class `LazyCell` not only has a by-name parameter but, more importantly, makes the Stream represented by the StreamIterator instance a `lazy val` which, by nature, enables memoization by caching the value upon the first (and only first) evaluation.

Memoized Fibonacci using a mutable Map

While using a Scala Stream to implement Fibonacci would automatically leverage memoization, one could also explicitly employ the very feature without Streams. For instance, by leveraging method getOrElseUpdate in a mutable Map, a `memoize` function can be defined as follows:

// Memoization using mutable Map

def memoize[K, V](f: K => V): K => V = {
  val cache = scala.collection.mutable.Map.empty[K, V]
  k => cache.getOrElseUpdate(k, f(k))
}

For example, the `naive` Fibonacci equipped with memoization via this `memoize` function would instantly become a much more efficient implementation:

val fibM: Int => BigInt = memoize(n => n match {
  case 0 => 0
  case 1 => 1
  case _ => fibM(n-2) + fibM(n-1)
})

fibM(90)
// res4: BigInt = 2880067194370816120

For the tail recursive Fibonacci `fibTR`, this `memoize` function wouldn’t be applicable as its inner function `fibFcn` takes accumulators as additional parameters. As for the Stream-based `fibS` which is already equipped with Stream’s memoization, applying `memoize` wouldn’t produce any significant performance gain.