As shown in a previous blog post illustrating with a simple text mining application, Akka Stream provides a robust `GraphDSL` domain specific language for assembling stream graphs using a mix of fan-in/fan-out junctions. While `GraphDSL` is a great tool, the supported fan-in/fan-out components are limited to have a fixed number of inputs and outputs as all connections of the graph must be known and connected upfront.
To build a streaming service that allows new producers and consumers to be dynamically added, one would need to look outside of the `GraphDSL`. In this blog post, we’re going to look at how to build a dynamic publish-subscribe service.
MergeHub
Akka provides MergeHub that serves as a dynamic fan-in junction in the form of a Source to be attached to with a single consumer. Once materialized, multiple producers can be attached to the hub where elements coming from these producers are emitted in a first-comes-first-served fashion along with `backpressure` support.
MergeHub.source has the following method signature:
// MergeHub.source def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]]
Example:
import akka.actor.ActorSystem import akka.stream.{ActorMaterializer, DelayOverflowStrategy} import akka.stream.scaladsl._ import akka.{Done, NotUsed} implicit val system = ActorSystem("system") implicit val materializer = ActorMaterializer() implicit val ec = system.dispatcher val consumer = Sink.foreach(println) val toConsumer: Sink[String, NotUsed] = MergeHub.source[String](perProducerBufferSize = 32).to(consumer).run() Source(1 to 3).map(i => s"Batch-delayed-$i"). delay(2.seconds, DelayOverflowStrategy.backpressure). runWith(toConsumer) Source(4 to 5).map(i => s"Batch-immediate-$i"). runWith(toConsumer) // Batch-immediate-4 // Batch-immediate-5 // Batch-delayed-1 // Batch-delayed-2 // Batch-delayed-3
BroadcastHub
BroadcastHub, on the other hand, serves as a dynamic fan-out junction in the form of a Sink to be attached to with a single producer. Similarly, once materialized, multiple consumers can be attached to it, again, with `backpressure` support.
BroadcastHub.sink has the following method signature:
// BroadcastHub.sink def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]]
Example:
import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl._ import akka.{Done, NotUsed} implicit val system = ActorSystem("system") implicit val materializer = ActorMaterializer() implicit val ec = system.dispatcher val producer = Source(1 to 20).map(i => s"Element-$i") val fromProducer: Source[String, NotUsed] = producer.toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).run() fromProducer.runForeach{ case s: String => println("Consumer-A-" + s) } fromProducer.runForeach{ case s: String => println("Consumer-B-" + s) } // Consumer-A-Element-1 // Consumer-B-Element-1 // Consumer-A-Element-2 // Consumer-B-Element-2 // Consumer-B-Element-3 // Consumer-A-Element-3 // ... // Consumer-B-Element-19 // Consumer-A-Element-18 // Consumer-B-Element-20 // Consumer-A-Element-19 // Consumer-A-Element-20
It should be cautioned that if the source has fewer elements than the `bufferSize` specified in BroadcastHub.sink, none of the elements will be consumed by the attached consumers. It took me a while to realize it’s `fromProducer` that “silently” consumes the elements when materialized before the attached consumers have a chance to consume them. That, to me, is really an undocumented “bug”. Using `alsoToMat` as shown below, one can uncover the seemingly “missing” elements in such case:
// Testing for Source with elements fewer than `bufferSize`: val producer = Source(1 to 5).map(i => s"Element-$i") val fromProducer: Source[String, NotUsed] = producer. alsoToMat(BroadcastHub.sink(bufferSize = 16))(Keep.right). toMat(Sink.foreach{ case s: String => println("Consumer-" + s) })(Keep.left). run() // Consumer-Element-1 // Consumer-Element-2 // Consumer-Element-3 // Consumer-Element-4 // Consumer-Element-5
MergeHub + BroadcastHub
By connecting a `MergeHub` with a `BroadcastHub`, one can create a dynamic publish-subscribe “channel” in the form of a Flow via Flow.fromSinkAndSource:
// Creating a pub-sub channel as a Flow val (psSink, psSource) = MergeHub.source[String](perProducerBufferSize). toMat(BroadcastHub.sink[String](bufferSize))(Keep.both).run() val psChannel: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(psSink, psSource)
Note that `Keep.both` in the above snippet produces a Tuple of materialized values `(Sink[T, NotUsed], Source[T, NotUsed])` from `MergeHub.source[T]` and `BroadcastHub.sink[T]`. The pub-sub channel `psChannel` can be illustrated as follows:
p psChannel c r \ / o o \ ------------------- ------------------- / n d \ | MergeHub.source | | BroadcastHub.sink | / s u --- | | ---> | | --- u c / | (psSink) | | (psSource) | \ m e / ------------------- ------------------- \ e r / \ r s s Source[T,Sink[T,NotUsed]] Sink[T,Source[T,NotUsed]]
Below is sample code for a simple pub-sub channel `psChannel`:
// A simple pub-sub service example import akka.actor.ActorSystem import akka.stream.{ActorMaterializer, DelayOverflowStrategy} import akka.stream.scaladsl._ import akka.{Done, NotUsed} implicit val system = ActorSystem("system") implicit val materializer = ActorMaterializer() implicit val ec = system.dispatcher val (psSink, psSource) = MergeHub.source[String](perProducerBufferSize = 32). toMat(BroadcastHub.sink[String](bufferSize = 32))(Keep.both).run() // Optional: avoid building up backpressure in the absence of subscribers psSource.runWith(Sink.ignore)
Serving as a pub-sub channel, the input of `psChannel` is published via `psSink` to all subscribers while its output streams through `psSource` all the elements published. For example:
// Serving as a pub-sub service val p1 = Source(1 to 3).map(i => s"Batch-delayed-$i"). delay(2.seconds, DelayOverflowStrategy.backpressure) val p2 = Source(4 to 5).map(i => s"Batch-immediate-$i") p1.runWith(psSink) p2.runWith(psSink) val s1 = psSource val s2 = psSource s1.runForeach(s => println("Consumer-A-" + s)) s2.runForeach(s => println("Consumer-B-" + s)) // Consumer-A-Batch-immediate-4 // Consumer-B-Batch-immediate-4 // Consumer-B-Batch-immediate-5 // Consumer-A-Batch-immediate-5 // Consumer-A-Batch-delayed-1 // Consumer-A-Batch-delayed-2 // Consumer-B-Batch-delayed-1 // Consumer-B-Batch-delayed-2 // Consumer-B-Batch-delayed-3 // Consumer-A-Batch-delayed-3
Running `psChannel` as a `Flow`:
// Using the pub-sub service as a Flow val psChannel: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(psSink, psSource) Source(1 to 3).map(i => s"Batch-delayed-$i"). delay(2.seconds, DelayOverflowStrategy.backpressure). viaMat(psChannel)(Keep.right). to(Sink.foreach{ case s: String => println("Consumer-A-" + s) }). run() Source(4 to 5).map(i => s"Batch-immediate-$i"). viaMat(psChannel)(Keep.right). to(Sink.foreach{ case s: String => println("Consumer-B-" + s) }). run() // Consumer-B-Batch-immediate-4 // Consumer-B-Batch-immediate-5 // Consumer-A-Batch-immediate-4 // Consumer-A-Batch-immediate-5 // Consumer-A-Batch-delayed-1 // Consumer-A-Batch-delayed-2 // Consumer-B-Batch-delayed-1 // Consumer-A-Batch-delayed-3 // Consumer-B-Batch-delayed-2 // Consumer-B-Batch-delayed-3
Note that each of the input elements for `psChannel` gets consumed by every consumer.
Other relevant topics that might be of interest include KillSwitch for stream completion control and PartitionHub for routing Stream elements from a given producer to a dynamic set of consumers.