The Akka Stream API comes with a suite of versatile tools for stream processing. Besides the Graph DSL, a set of built-in stream operators is also readily available. Yet, if more custom streams are needed, GraphStage
allows one to create streaming operators with specific stream processing logic between the input and output ports.
As illustrated in the Akka Stream doc re: custom stream processing, one can come up with a transformation function like map
or filter
with a custom GraphStage in just a few lines of code. For example, method map
can be implemented as a Flow using GraphStage:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// Implementing `map` using GraphStage class Map[A, B](f: A => B) extends GraphStage[FlowShape[A, B]] { val in = Inlet[A]("Map.in") val out = Outlet[B]("Map.out") override val shape = FlowShape.of(in, out) override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) { setHandler(in, new InHandler { override def onPush(): Unit = { push(out, f(grab(in))) } }) setHandler(out, new OutHandler { override def onPull(): Unit = { pull(in) } }) } } |
In analyzing a given stream operation, it’s easier to reason about the flow logic starting from the downstream and trace upward. With that in mind, let’s look at the above snippet. When there is a demand from the downstream to pull an element out of the output port, callback method onPull
is called which initiates a pull of a new element into the input port which, upon push from the upstream, triggers the onPush
callback to grab the the element on the input port, apply function f
and push it to the output port.
What is a GraphStage?
A GraphStage
represents a stream processing operator. Below is the source code of abstract classes GraphStage and GraphStageLogic:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
// Class GraphStage abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] { final override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, NotUsed) = (createLogic(inheritedAttributes), NotUsed) @throws(classOf[Exception]) def createLogic(inheritedAttributes: Attributes): GraphStageLogic } // Class GraphStageLogic abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: Int) { import GraphInterpreter._ import GraphStageLogic._ def this(shape: Shape) = this(shape.inlets.size, shape.outlets.size) ... final protected def setHandler(in: Inlet[_], handler: InHandler): Unit = { handlers(in.id) = handler if (_interpreter != null) _interpreter.setHandler(conn(in), handler) } final protected def setHandler(out: Outlet[_], handler: OutHandler): Unit = { handlers(out.id + inCount) = handler if (_interpreter != null) _interpreter.setHandler(conn(out), handler) } ... } |
To use (i.e. extend) a GraphStage
, one needs to implement method createLogic
which returns a GraphStageLogic
that takes a shape
and consists of defined method setHandler
which, in turn, takes two arguments Inlet/Outlet and InHandler/OutHandler. These InHandler and OutHandler routines are where the custom processing logic for every stream element resides.
As illustrated in the map
or filter
implementation in the mentioned Akka doc, to define a GraphStage one would need to minimally define in
, out
and shape
(FlowShape in those examples) of the graph, as well as the stream processing logic in the InHander and OutHandler.
Handling external asynchronous events
Among various customizing features, one can extend a GraphStage to handle asynchronous events (i.e. Scala Futures) that aren’t part of the stream. To do that, simply define a callback using getAsyncCallback
to create an AsyncCallback
, which will be invoked by the external event via method invoke
.
As an exercise for building custom stream processing operators with GraphStage, we’re going to modify the above map
Flow to one that dynamically changes the transformation function upon triggering by an asynchronous event. Let’s name the class DynamicMap
which takes a switch
event of type Future[Unit] and two ‘A => B’ transformation functions (f
being the original function and g
the switched one).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
// Dynamically switching `map` functions upon triggering by an asynchronous event import akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl._ import akka.stream.stage._ import akka.{Done, NotUsed} import scala.concurrent.{ExecutionContext, Future} class DynamicMap[A, B](switch: Future[Unit], f: A => B, g: A => B)(implicit ec: ExecutionContext) extends GraphStage[FlowShape[A, B]] { val in = Inlet[A]("DynamicMap.in") val out = Outlet[B]("DynamicMap.out") override val shape = FlowShape.of(in, out) override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) { var flipped = false override def preStart(): Unit = { val callback = getAsyncCallback[Unit] { _ => flipped = true } switch.foreach(callback.invoke) } setHandler(in, new InHandler { override def onPush(): Unit = { push(out, if (flipped) g(grab(in)) else f(grab(in))) } }) setHandler(out, new OutHandler { override def onPull(): Unit = { pull(in) } }) } } |
In this case, callback
simply modifies variable flipped
from the initial false
value to true
so that when being pushed on the input port the InHandler will now push g(elem)
rather than f(elem)
to the output port. In addition, an ExecutionContext
, required by method invoke
for callback invocation, is passed in as an implicit parameter.
Note that to avoid race conditions, the callback is defined and invoked using the preStart
lifecycle hook, rather than in the constructor of GraphStageLogic.
Testing DynamicMap
with a dummy asynchronous event switch
that simply returns in a milliseconds and a couple of trivial ‘DataIn => DataOut’ transformation functions:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
// Testing `DynamicMap` case class DataIn(id: Int) case class DataOut(content: String) implicit val system = ActorSystem("system") implicit val materializer = ActorMaterializer() implicit val ec = system.dispatcher val source: Source[DataIn, NotUsed] = Source(1 to 2000).map(DataIn(_)) val sink: Sink[DataOut, Future[Done]] = Sink.foreach(println) val switch = Future{ Thread.sleep(1) } val f = (di: DataIn) => DataOut(s"ID-${di.id}-OLD") val g = (di: DataIn) => DataOut(s"ID-${di.id}-NEW") source.via(new DynamicMap[DataIn, DataOut](switch, f, g)).runWith(sink) // OUTPUT: // // DataOut(ID-1-OLD) // DataOut(ID-2-OLD) // ... // DataOut(ID-982-OLD) // DataOut(ID-983-OLD) // DataOut(ID-984-NEW) // DataOut(ID-985-NEW) // DataOut(ID-986-NEW) // ... // ... |
Great appreciations to your so sincere kind sharing! I’m so inspired!