Monthly Archives: August 2019

Akka Dynamic Pub-Sub Service

As shown in a previous blog post illustrating with a simple text mining application, Akka Stream provides a robust `GraphDSL` domain specific language for assembling stream graphs using a mix of fan-in/fan-out junctions. While `GraphDSL` is a great tool, the supported fan-in/fan-out components are limited to have a fixed number of inputs and outputs as all connections of the graph must be known and connected upfront.

To build a streaming service that allows new producers and consumers to be dynamically added, one would need to look outside of the `GraphDSL`. In this blog post, we’re going to look at how to build a dynamic publish-subscribe service.

MergeHub

Akka provides MergeHub that serves as a dynamic fan-in junction in the form of a Source to be attached to with a single consumer. Once materialized, multiple producers can be attached to the hub where elements coming from these producers are emitted in a first-comes-first-served fashion along with `backpressure` support.

MergeHub.source has the following method signature:

// MergeHub.source
def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]]

Example:

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, DelayOverflowStrategy}
import akka.stream.scaladsl._
import akka.{Done, NotUsed}

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val consumer = Sink.foreach(println)

val toConsumer: Sink[String, NotUsed] =
  MergeHub.source[String](perProducerBufferSize = 32).to(consumer).run()

Source(1 to 3).map(i => s"Batch-delayed-$i").
  delay(2.seconds, DelayOverflowStrategy.backpressure).
  runWith(toConsumer)

Source(4 to 5).map(i => s"Batch-immediate-$i").
  runWith(toConsumer)

// Batch-immediate-4
// Batch-immediate-5
// Batch-delayed-1
// Batch-delayed-2
// Batch-delayed-3

BroadcastHub

BroadcastHub, on the other hand, serves as a dynamic fan-out junction in the form of a Sink to be attached to with a single producer. Similarly, once materialized, multiple consumers can be attached to it, again, with `backpressure` support.

BroadcastHub.sink has the following method signature:

// BroadcastHub.sink
def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]]

Example:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.{Done, NotUsed}

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val producer = Source(1 to 20).map(i => s"Element-$i")

val fromProducer: Source[String, NotUsed] =
  producer.toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).run()

fromProducer.runForeach{ case s: String => println("Consumer-A-" + s) }
fromProducer.runForeach{ case s: String => println("Consumer-B-" + s) }

// Consumer-A-Element-1
// Consumer-B-Element-1
// Consumer-A-Element-2
// Consumer-B-Element-2
// Consumer-B-Element-3
// Consumer-A-Element-3
// ...
// Consumer-B-Element-19
// Consumer-A-Element-18
// Consumer-B-Element-20
// Consumer-A-Element-19
// Consumer-A-Element-20

It should be cautioned that if the source has fewer elements than the `bufferSize` specified in BroadcastHub.sink, none of the elements will be consumed by the attached consumers. It took me a while to realize it’s `fromProducer` that “silently” consumes the elements when materialized before the attached consumers have a chance to consume them. That, to me, is really an undocumented “bug”. Using `alsoToMat` as shown below, one can uncover the seemingly “missing” elements in such case:

// Testing for Source with elements fewer than `bufferSize`:
val producer = Source(1 to 5).map(i => s"Element-$i")

val fromProducer: Source[String, NotUsed] =
  producer.
    alsoToMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).
    toMat(Sink.foreach{ case s: String => println("Consumer-" + s) })(Keep.left).
    run()
// Consumer-Element-1
// Consumer-Element-2
// Consumer-Element-3
// Consumer-Element-4
// Consumer-Element-5

MergeHub + BroadcastHub

By connecting a `MergeHub` with a `BroadcastHub`, one can create a dynamic publish-subscribe “channel” in the form of a Flow via Flow.fromSinkAndSource:

// Creating a pub-sub channel as a Flow
val (psSink, psSource) = MergeHub.source[String](perProducerBufferSize).
  toMat(BroadcastHub.sink[String](bufferSize))(Keep.both).run()

val psChannel: Flow[String, String, NotUsed] =
  Flow.fromSinkAndSource(psSink, psSource)

Note that `Keep.both` in the above snippet produces a Tuple of materialized values `(Sink[T, NotUsed], Source[T, NotUsed])` from `MergeHub.source[T]` and `BroadcastHub.sink[T]`. The pub-sub channel `psChannel` can be illustrated as follows:

    p                           psChannel                          c
    r  \                                                        /  o
    o   \    -------------------        -------------------    /   n
    d    \  |  MergeHub.source  |      | BroadcastHub.sink |  /    s
    u  ---  |                   | ---> |                   |  ---  u
    c    /  |      (psSink)     |      |     (psSource)    |  \    m
    e   /    -------------------        -------------------    \   e
    r  /                                                        \  r
    s                                                              s
         Source[T,Sink[T,NotUsed]]    Sink[T,Source[T,NotUsed]]

Below is sample code for a simple pub-sub channel `psChannel`:

// A simple pub-sub service example
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, DelayOverflowStrategy}
import akka.stream.scaladsl._
import akka.{Done, NotUsed}

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val (psSink, psSource) = MergeHub.source[String](perProducerBufferSize = 32).
  toMat(BroadcastHub.sink[String](bufferSize = 32))(Keep.both).run()

// Optional: avoid building up backpressure in the absence of subscribers
psSource.runWith(Sink.ignore)

Serving as a pub-sub channel, the input of `psChannel` is published via `psSink` to all subscribers while its output streams through `psSource` all the elements published. For example:

// Serving as a pub-sub service
val p1 = Source(1 to 3).map(i => s"Batch-delayed-$i").
  delay(2.seconds, DelayOverflowStrategy.backpressure)

val p2 = Source(4 to 5).map(i => s"Batch-immediate-$i")

p1.runWith(psSink)
p2.runWith(psSink)

val s1 = psSource
val s2 = psSource

s1.runForeach(s => println("Consumer-A-" + s))
s2.runForeach(s => println("Consumer-B-" + s))

// Consumer-A-Batch-immediate-4
// Consumer-B-Batch-immediate-4
// Consumer-B-Batch-immediate-5
// Consumer-A-Batch-immediate-5
// Consumer-A-Batch-delayed-1
// Consumer-A-Batch-delayed-2
// Consumer-B-Batch-delayed-1
// Consumer-B-Batch-delayed-2
// Consumer-B-Batch-delayed-3
// Consumer-A-Batch-delayed-3

Running `psChannel` as a `Flow`:

// Using the pub-sub service as a Flow
val psChannel: Flow[String, String, NotUsed] =
  Flow.fromSinkAndSource(psSink, psSource)

Source(1 to 3).map(i => s"Batch-delayed-$i").
  delay(2.seconds, DelayOverflowStrategy.backpressure).
  viaMat(psChannel)(Keep.right).
  to(Sink.foreach{ case s: String => println("Consumer-A-" + s) }).
  run()

Source(4 to 5).map(i => s"Batch-immediate-$i").
  viaMat(psChannel)(Keep.right).
  to(Sink.foreach{ case s: String => println("Consumer-B-" + s) }).
  run()

// Consumer-B-Batch-immediate-4
// Consumer-B-Batch-immediate-5
// Consumer-A-Batch-immediate-4
// Consumer-A-Batch-immediate-5
// Consumer-A-Batch-delayed-1
// Consumer-A-Batch-delayed-2
// Consumer-B-Batch-delayed-1
// Consumer-A-Batch-delayed-3
// Consumer-B-Batch-delayed-2
// Consumer-B-Batch-delayed-3

Note that each of the input elements for `psChannel` gets consumed by every consumer.

Other relevant topics that might be of interest include KillSwitch for stream completion control and PartitionHub for routing Stream elements from a given producer to a dynamic set of consumers.