Monthly Archives: September 2019

Custom Akka Stream Processing

The Akka Stream API comes with a suite of versatile tools for stream processing. Besides the Graph DSL, a set of built-in stream operators is also readily available. Yet, if more custom streams are needed, GraphStage allows one to create streaming operators with specific stream processing logic between the input and output ports.

As illustrated in the Akka Stream doc re: custom stream processing, one can come up with a transformation function like map or filter with a custom GraphStage in just a few lines of code. For example, method map can be implemented as a Flow using GraphStage:

In analyzing a given stream operation, it’s easier to reason about the flow logic starting from the downstream and trace upward. With that in mind, let’s look at the above snippet. When there is a demand from the downstream to pull an element out of the output port, callback method onPull is called which initiates a pull of a new element into the input port which, upon push from the upstream, triggers the onPush callback to grab the the element on the input port, apply function f and push it to the output port.

What is a GraphStage?

A GraphStage represents a stream processing operator. Below is the source code of abstract classes GraphStage and GraphStageLogic:

To use (i.e. extend) a GraphStage, one needs to implement method createLogic which returns a GraphStageLogic that takes a shape and consists of defined method setHandler which, in turn, takes two arguments Inlet/Outlet and InHandler/OutHandler. These InHandler and OutHandler routines are where the custom processing logic for every stream element resides.

As illustrated in the map or filter implementation in the mentioned Akka doc, to define a GraphStage one would need to minimally define in, out and shape (FlowShape in those examples) of the graph, as well as the stream processing logic in the InHander and OutHandler.

Handling external asynchronous events

Among various customizing features, one can extend a GraphStage to handle asynchronous events (i.e. Scala Futures) that aren’t part of the stream. To do that, simply define a callback using getAsyncCallback to create an AsyncCallback, which will be invoked by the external event via method invoke.

As an exercise for building custom stream processing operators with GraphStage, we’re going to modify the above map Flow to one that dynamically changes the transformation function upon triggering by an asynchronous event. Let’s name the class DynamicMap which takes a switch event of type Future[Unit] and two ‘A => B’ transformation functions (f being the original function and g the switched one).

In this case, callback simply modifies variable flipped from the initial false value to true so that when being pushed on the input port the InHandler will now push g(elem) rather than f(elem) to the output port. In addition, an ExecutionContext, required by method invoke for callback invocation, is passed in as an implicit parameter.

Note that to avoid race conditions, the callback is defined and invoked using the preStart lifecycle hook, rather than in the constructor of GraphStageLogic.

Testing DynamicMap with a dummy asynchronous event switch that simply returns in a milliseconds and a couple of trivial ‘DataIn => DataOut’ transformation functions: