As shown in a previous blog post illustrating with a simple text mining application, Akka Stream provides a robust GraphDSL
domain specific language for assembling stream graphs using a mix of fan-in/fan-out junctions. While GraphDSL
is a great tool, the supported fan-in/fan-out components are limited to have a fixed number of inputs and outputs as all connections of the graph must be known and connected upfront.
To build a streaming service that allows new producers and consumers to be dynamically added, one would need to look outside of the GraphDSL
. In this blog post, we’re going to look at how to build a dynamic publish-subscribe service.
MergeHub
Akka provides MergeHub that serves as a dynamic fan-in junction in the form of a Source to be attached to with a single consumer. Once materialized, multiple producers can be attached to the hub where elements coming from these producers are emitted in a first-comes-first-served fashion along with backpressure
support.
MergeHub.source has the following method signature:
1 2 |
// MergeHub.source def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] |
Example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
import akka.actor.ActorSystem import akka.stream.{ActorMaterializer, DelayOverflowStrategy} import akka.stream.scaladsl._ import akka.{Done, NotUsed} implicit val system = ActorSystem("system") implicit val materializer = ActorMaterializer() implicit val ec = system.dispatcher val consumer = Sink.foreach(println) val toConsumer: Sink[String, NotUsed] = MergeHub.source[String](perProducerBufferSize = 32).to(consumer).run() Source(1 to 3).map(i => s"Batch-delayed-$i"). delay(2.seconds, DelayOverflowStrategy.backpressure). runWith(toConsumer) Source(4 to 5).map(i => s"Batch-immediate-$i"). runWith(toConsumer) // Batch-immediate-4 // Batch-immediate-5 // Batch-delayed-1 // Batch-delayed-2 // Batch-delayed-3 |
BroadcastHub
BroadcastHub, on the other hand, serves as a dynamic fan-out junction in the form of a Sink to be attached to with a single producer. Similarly, once materialized, multiple consumers can be attached to it, again, with backpressure
support.
BroadcastHub.sink has the following method signature:
1 2 |
// BroadcastHub.sink def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] |
Example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl._ import akka.{Done, NotUsed} implicit val system = ActorSystem("system") implicit val materializer = ActorMaterializer() implicit val ec = system.dispatcher val producer = Source(1 to 20).map(i => s"Element-$i") val fromProducer: Source[String, NotUsed] = producer.toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).run() fromProducer.runForeach{ case s: String => println("Consumer-A-" + s) } fromProducer.runForeach{ case s: String => println("Consumer-B-" + s) } // Consumer-A-Element-1 // Consumer-B-Element-1 // Consumer-A-Element-2 // Consumer-B-Element-2 // Consumer-B-Element-3 // Consumer-A-Element-3 // ... // Consumer-B-Element-19 // Consumer-A-Element-18 // Consumer-B-Element-20 // Consumer-A-Element-19 // Consumer-A-Element-20 |
It should be cautioned that if the source has fewer elements than the bufferSize
specified in BroadcastHub.sink, none of the elements will be consumed by the attached consumers. It took me a while to realize it’s fromProducer
that “silently” consumes the elements when materialized before the attached consumers have a chance to consume them. That, to me, is really an undocumented “bug”. Using alsoToMat
as shown below, one can uncover the seemingly “missing” elements in such case:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
// Testing for Source with elements fewer than `bufferSize`: val producer = Source(1 to 5).map(i => s"Element-$i") val fromProducer: Source[String, NotUsed] = producer. alsoToMat(BroadcastHub.sink(bufferSize = 16))(Keep.right). toMat(Sink.foreach{ case s: String => println("Consumer-" + s) })(Keep.left). run() // Consumer-Element-1 // Consumer-Element-2 // Consumer-Element-3 // Consumer-Element-4 // Consumer-Element-5 |
MergeHub + BroadcastHub
By connecting a MergeHub
with a BroadcastHub
, one can create a dynamic publish-subscribe “channel” in the form of a Flow via Flow.fromSinkAndSource:
1 2 3 4 5 6 |
// Creating a pub-sub channel as a Flow val (psSink, psSource) = MergeHub.source[String](perProducerBufferSize). toMat(BroadcastHub.sink[String](bufferSize))(Keep.both).run() val psChannel: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(psSink, psSource) |
Note that Keep.both
in the above snippet produces a Tuple of materialized values (Sink[T, NotUsed], Source[T, NotUsed])
from MergeHub.source[T]
and BroadcastHub.sink[T]
. The pub-sub channel psChannel
can be illustrated as follows:
1 2 3 4 5 6 7 8 9 10 |
p psChannel c r \ / o o \ ------------------- ------------------- / n d \ | MergeHub.source | | BroadcastHub.sink | / s u --- | | ---> | | --- u c / | (psSink) | | (psSource) | \ m e / ------------------- ------------------- \ e r / \ r s s Source[T,Sink[T,NotUsed]] Sink[T,Source[T,NotUsed]] |
Below is sample code for a simple pub-sub channel psChannel
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
// A simple pub-sub service example import akka.actor.ActorSystem import akka.stream.{ActorMaterializer, DelayOverflowStrategy} import akka.stream.scaladsl._ import akka.{Done, NotUsed} implicit val system = ActorSystem("system") implicit val materializer = ActorMaterializer() implicit val ec = system.dispatcher val (psSink, psSource) = MergeHub.source[String](perProducerBufferSize = 32). toMat(BroadcastHub.sink[String](bufferSize = 32))(Keep.both).run() // Optional: avoid building up backpressure in the absence of subscribers psSource.runWith(Sink.ignore) |
Serving as a pub-sub channel, the input of psChannel
is published via psSink
to all subscribers while its output streams through psSource
all the elements published. For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
// Serving as a pub-sub service val p1 = Source(1 to 3).map(i => s"Batch-delayed-$i"). delay(2.seconds, DelayOverflowStrategy.backpressure) val p2 = Source(4 to 5).map(i => s"Batch-immediate-$i") p1.runWith(psSink) p2.runWith(psSink) val s1 = psSource val s2 = psSource s1.runForeach(s => println("Consumer-A-" + s)) s2.runForeach(s => println("Consumer-B-" + s)) // Consumer-A-Batch-immediate-4 // Consumer-B-Batch-immediate-4 // Consumer-B-Batch-immediate-5 // Consumer-A-Batch-immediate-5 // Consumer-A-Batch-delayed-1 // Consumer-A-Batch-delayed-2 // Consumer-B-Batch-delayed-1 // Consumer-B-Batch-delayed-2 // Consumer-B-Batch-delayed-3 // Consumer-A-Batch-delayed-3 |
Running psChannel
as a Flow
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
// Using the pub-sub service as a Flow val psChannel: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(psSink, psSource) Source(1 to 3).map(i => s"Batch-delayed-$i"). delay(2.seconds, DelayOverflowStrategy.backpressure). viaMat(psChannel)(Keep.right). to(Sink.foreach{ case s: String => println("Consumer-A-" + s) }). run() Source(4 to 5).map(i => s"Batch-immediate-$i"). viaMat(psChannel)(Keep.right). to(Sink.foreach{ case s: String => println("Consumer-B-" + s) }). run() // Consumer-B-Batch-immediate-4 // Consumer-B-Batch-immediate-5 // Consumer-A-Batch-immediate-4 // Consumer-A-Batch-immediate-5 // Consumer-A-Batch-delayed-1 // Consumer-A-Batch-delayed-2 // Consumer-B-Batch-delayed-1 // Consumer-A-Batch-delayed-3 // Consumer-B-Batch-delayed-2 // Consumer-B-Batch-delayed-3 |
Note that each of the input elements for psChannel
gets consumed by every consumer.
Other relevant topics that might be of interest include KillSwitch for stream completion control and PartitionHub for routing Stream elements from a given producer to a dynamic set of consumers.