The Akka Stream API comes with a suite of versatile tools for stream processing. Besides the Graph DSL, a set of built-in stream operators is also readily available. Yet, if more custom streams are needed, `GraphStage` allows one to create streaming operators with specific stream processing logic between the input and output ports.
As illustrated in the Akka Stream doc re: custom stream processing, one can come up with a transformation function like `map` or `filter` with a custom GraphStage in just a few lines of code. For example, method `map` can be implemented as a Flow using GraphStage:
// Implementing `map` using GraphStage class Map[A, B](f: A => B) extends GraphStage[FlowShape[A, B]] { val in = Inlet[A]("Map.in") val out = Outlet[B]("Map.out") override val shape = FlowShape.of(in, out) override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) { setHandler(in, new InHandler { override def onPush(): Unit = { push(out, f(grab(in))) } }) setHandler(out, new OutHandler { override def onPull(): Unit = { pull(in) } }) } }
In analyzing a given stream operation, it’s easier to reason about the flow logic starting from the downstream and trace upward. With that in mind, let’s look at the above snippet. When there is a demand from the downstream to pull an element out of the output port, callback method `onPull` is called which initiates a pull of a new element into the input port which, upon push from the upstream, triggers the `onPush` callback to grab the the element on the input port, apply function `f` and push it to the output port.
What is a GraphStage?
A `GraphStage` represents a stream processing operator. Below is the source code of abstract classes GraphStage and GraphStageLogic:
// Class GraphStage abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] { final override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, NotUsed) = (createLogic(inheritedAttributes), NotUsed) @throws(classOf[Exception]) def createLogic(inheritedAttributes: Attributes): GraphStageLogic } // Class GraphStageLogic abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: Int) { import GraphInterpreter._ import GraphStageLogic._ def this(shape: Shape) = this(shape.inlets.size, shape.outlets.size) ... final protected def setHandler(in: Inlet[_], handler: InHandler): Unit = { handlers(in.id) = handler if (_interpreter != null) _interpreter.setHandler(conn(in), handler) } final protected def setHandler(out: Outlet[_], handler: OutHandler): Unit = { handlers(out.id + inCount) = handler if (_interpreter != null) _interpreter.setHandler(conn(out), handler) } ... }
To use (i.e. extend) a `GraphStage`, one needs to implement method `createLogic` which returns a `GraphStageLogic` that takes a `shape` and consists of defined method `setHandler` which, in turn, takes two arguments Inlet/Outlet and InHandler/OutHandler. These InHandler and OutHandler routines are where the custom processing logic for every stream element resides.
As illustrated in the `map` or `filter` implementation in the mentioned Akka doc, to define a GraphStage one would need to minimally define `in`, `out` and `shape` (FlowShape in those examples) of the graph, as well as the stream processing logic in the InHander and OutHandler.
Handling external asynchronous events
Among various customizing features, one can extend a GraphStage to handle asynchronous events (i.e. Scala Futures) that aren’t part of the stream. To do that, simply define a callback using `getAsyncCallback` to create an `AsyncCallback`, which will be invoked by the external event via method `invoke`.
As an exercise for building custom stream processing operators with GraphStage, we’re going to modify the above `map` Flow to one that dynamically changes the transformation function upon triggering by an asynchronous event. Let’s name the class `DynamicMap` which takes a `switch` event of type Future[Unit] and two ‘A => B’ transformation functions (`f` being the original function and `g` the switched one).
// Dynamically switching `map` functions upon triggering by an asynchronous event import akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl._ import akka.stream.stage._ import akka.{Done, NotUsed} import scala.concurrent.{ExecutionContext, Future} class DynamicMap[A, B](switch: Future[Unit], f: A => B, g: A => B)(implicit ec: ExecutionContext) extends GraphStage[FlowShape[A, B]] { val in = Inlet[A]("DynamicMap.in") val out = Outlet[B]("DynamicMap.out") override val shape = FlowShape.of(in, out) override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) { var flipped = false override def preStart(): Unit = { val callback = getAsyncCallback[Unit] { _ => flipped = true } switch.foreach(callback.invoke) } setHandler(in, new InHandler { override def onPush(): Unit = { push(out, if (flipped) g(grab(in)) else f(grab(in))) } }) setHandler(out, new OutHandler { override def onPull(): Unit = { pull(in) } }) } }
In this case, `callback` simply modifies variable `flipped` from the initial `false` value to `true` so that when being pushed on the input port the InHandler will now push `g(elem)` rather than `f(elem)` to the output port. In addition, an `ExecutionContext`, required by method `invoke` for callback invocation, is passed in as an implicit parameter.
Note that to avoid race conditions, the callback is defined and invoked using the `preStart` lifecycle hook, rather than in the constructor of GraphStageLogic.
Testing `DynamicMap` with a dummy asynchronous event `switch` that simply returns in a milliseconds and a couple of trivial ‘DataIn => DataOut’ transformation functions:
// Testing `DynamicMap` case class DataIn(id: Int) case class DataOut(content: String) implicit val system = ActorSystem("system") implicit val materializer = ActorMaterializer() implicit val ec = system.dispatcher val source: Source[DataIn, NotUsed] = Source(1 to 2000).map(DataIn(_)) val sink: Sink[DataOut, Future[Done]] = Sink.foreach(println) val switch = Future{ Thread.sleep(1) } val f = (di: DataIn) => DataOut(s"ID-${di.id}-OLD") val g = (di: DataIn) => DataOut(s"ID-${di.id}-NEW") source.via(new DynamicMap[DataIn, DataOut](switch, f, g)).runWith(sink) // OUTPUT: // // DataOut(ID-1-OLD) // DataOut(ID-2-OLD) // ... // DataOut(ID-982-OLD) // DataOut(ID-983-OLD) // DataOut(ID-984-NEW) // DataOut(ID-985-NEW) // DataOut(ID-986-NEW) // ... // ...
Great appreciations to your so sincere kind sharing! I’m so inspired!