Reactive Systems, whose core characteristics are declared in the Reactive Manifesto, have started to emerge in recent years as message-driven systems that emphasize scalability, responsiveness and resilience. It’s pretty clear from the requirements that a system can’t be simply made Reactive. Rather, it should be built from the architectural level to be Reactive.
Akka’s actor systems, which rely on asynchronous message-passing among lightweight loosely-coupled actors, serve a great run-time platform for building Reactive Systems on the JVM (Java Virtual Machine). I posted a few blogs along with sample code about Akka actors in the past. This time I’m going to talk about something different but closely related.
Reactive Streams
While bearing a similar name, Reactive Streams is a separate initiative that mandates its implementations to be capable of processing stream data asynchronously and at the same time automatically regulating the stream flows in a non-blocking fashion.
Akka Streams, built on top of Akka actor systems, is an implementation of Reactive Streams. Equipped with the back-pressure functionality, it eliminates the need of manually buffering stream flows or custom-building stream buffering mechanism to avoid buffer overflow problems.
Extracting n-grams from text
In text mining, n-grams are useful data in the area of NLP (natural language processing). In this blog post, I’ll illustrate extracting n-grams from a stream of text messages using Akka Streams with Scala as the programming language.
First thing first, let’s create an object with methods for generating random text content:
Source code: TextMessage.scala
Some minimal effort has been made to generate random clauses of likely pronounceable fake words along with punctuations. To make it a little more flexible, lengths of individual words and clauses would be supplied as parameters.
Next, create another object with text processing methods responsible for extracting n-grams from input text, with n being an input parameter. Using Scala’s sliding(size, step) iterator method with size n and step default to 1, a new iterator of sliding window view is generated to produce the wanted n-grams.
Source code: TextProcessor.scala
Now that the text processing tools are in place, we can focus on building the main streaming application in which Akka Streams plays the key role.
First, make sure we have the necessary library dependencies included in build.sbt:
Source code: build.sbt
As Akka Streams is relatively new development work, more recent Akka versions (2.4.9 or higher) should be used.
Let’s start with a simple stream for this text mining application:
Source code: NgramStream_v01.scala
As shown in the source code, constructing a simple stream like this is just defining and chaining together the text-generating source, the text-processing flow and the text-display sink as follows:
val textSource: Source[String, NotUsed] = Source((1 to numMessages).map(_ => TextMessage.genRandText(minWordsInText: Int, maxWordsInText: Int)) ) def ngramFlow(n: Int): Flow[String, String, NotUsed] = Flow[String].map(text => TextProcessor.genNgrams(text, n)) val textSink = Sink.foreach[String](println) textSource.via(ngramFlow(nVal)).runWith(textSink). onComplete { ... }
Graph DSL
Akka Streams provides a Graph DSL (domain-specific language) that helps build the topology of stream flows using predefined fan-in/fan-out functions.
What Graph DSL does is somewhat similar to how Apache Storm‘s TopologyBuilder pieces together its spouts (i.e. stream sources), bolts (i.e. stream processors) and stream grouping/partitioning functions, as illustrated in a previous blog about HBase streaming.
Back-pressure
Now, let’s branch off the stream using Graph DSL to illustrate how the integral back-pressure feature is at play.
Source code: NgramStream_v02.scala
Streaming to a file should be significantly slower than streaming to the console. To make the difference more noticeable, a delay is deliberately added to streaming each line of text in the file sink.
Running the application and you will notice that the console display is slowed down. It’s the result of the upstream data flow being regulated to accommodate the relatively slow file I/O outlet even though the other console outlet is able to consume relatively faster – all that being conducted in a non-blocking fashion.
Graph DSL create() methods
To build a streaming topology using Graph DSL, you’ll need to use one of the create() methods defined within trait GraphApply, which is extended by object GraphDSL. Here are the signatures of the create() methods:
trait GraphApply { def create[S <: Shape]()(buildBlock: GraphDSL.Builder[NotUsed] => S): Graph[S, NotUsed] = { val builder = new GraphDSL.Builder val s = buildBlock(builder) val mod = builder.module.replaceShape(s) new GraphApply.GraphImpl(s, mod) } ... [2..# def create[S <: Shape, Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) => Mat)( buildBlock: GraphDSL.Builder[Mat] => ([#g1.Shape#]) => S): Graph[S, Mat] = { val builder = new GraphDSL.Builder val curried = combineMat.curried val s##1 = builder.add(g##1, (m##1: M##1) => curried(m##1)) [2..#val s1 = builder.add(g1, (f: M1 => Any, m1: M1) => f(m1))# ] val s = buildBlock(builder)([#s1#]) val mod = builder.module.replaceShape(s) new GraphApply.GraphImpl(s, mod) }# ] }
Note that the sbt-boilerplate template language is needed to interpret the create() method being used in the application that takes multiple stream components as input parameters.
Materialized values
In Akka Streams, materializing a constructed stream is the step of actually running the stream with the necessary resources. To run the stream, the implicitly passed factory method ActorMaterializer() is required to allocate the resources for stream execution. That includes starting up the underlying Akka actors to process the stream.
Every processing stage of the stream can produce a materialized value. By default, using the via(flow) and to(sink) functions, the materialized value of the left-most stage will be preserved. As in the following example, for graph1, the materialized value of the source is preserved:
val source: Source[String, Future[String]] = ... val flow: Flow[String, String, Future[Int]] = ... val sink: Sink[String, Future[IOResult]] = ... val graph1: RunnableGraph[Future[String]] = source.via(flow).to(sink) val graph2: RunnableGraph[Future[Int]] = source.viaMat(flow)(Keep.right).to(sink) val graph3: RunnableGraph[(Future[String], Future[IOResult])] = source.via(flow).toMat(sink)(Keep.both)
To allow one to selectively capture the materialized values of the specific stream components, Akka Streams provides functions viaMat(flow) and toMat(sink) along with a combiner function, Keep. As shown in the above example, for graph2, the materialized value of the flow is preserved, whereas for graph3, materialized values for both the flow and sink are preserved.
Back to our fileSink function as listed below, toMat(fileIOSink)(Keep.right) instructs Akka Streams to keep the materialized value of the fileIOSink as a Future value of type IOResult:
def fileSink(filename: String): Sink[String, Future[IOResult]] = Flow[String]. map(line => { Thread.sleep(500); ByteString(line + "\n") }). toMat(FileIO.toPath(Paths.get(filename)))(Keep.right) // sleep() delay added to illustrate back-pressure
Using Graph DSL, as seen earlier in the signature of the create() method, one can select what materialized value is to be preserved by specifying the associated stream components accordingly as the curried parameters:
([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) => Mat)
In our case, we want the materialized value of fileSink, thus the curried parameters should look like this:
(consoleSink, fileSink(filename))((_, file) => file)
Defining the stream graph
Akka Streams provides a number of functions for fan-out (e.g. Broadcast, Balance) and fan-in (e.g. Merge, Concat). In our example, we want a simple topology with a single text source and the same n-gram generator flow branching off to two sinks in parallel:
val bcast = builder.add(Broadcast[String](2)) textSource ~> ngramFlow(nVal) ~> bcast.in bcast.out(0) ~> console bcast.out(1) ~> file
Adding a message counter
Let’s further expand our n-gram extraction application to include displaying a count. A simple count-flow is created to map each message string into numeric 1, and a count-sink to sum up all these 1’s streamed to the sink. Adding them as the third flow and sink to the existing stream topology yields something similar to the following:
val bcast = builder.add(Broadcast[String](3)) val ngFlow = ngramFlow(nVal) textSource ~> bcast.in bcast.out(0) ~> ngFlow ~> console bcast.out(1) ~> ngFlow ~> file bcast.out(2) ~> countFlow ~> count
Source code: NgramStream_v03.scala
Full source code of the application is at GitHub.
Final thoughts
Having used Apache Storm, I see it a rather different beast compared with Akka Streams. A full comparison between the two would obviously be an extensive exercise by itself, but it suffices to say that both are great platforms for streaming applications.
Perhaps one of the biggest differences between the two is that Storm provides granular message delivery options (at most / at least / exactly once, guaranteed message delivery) whereas Akka Streams by design questions the premise of reliable messaging on distributed systems. For instance, if guaranteed message delivery is a requirement, Akka Streams would probably not be the best choice.
Back-pressure has recently been added to Storm’s v.1.0.x built-in feature list, so there is indeed some flavor of reactiveness in it. Aside from message delivery options, choosing between the two technologies might be a decision basing more on other factors such as engineering staff’s expertise, concurrency model preference, etc.
Outside of the turf of typical streaming systems, Akka Streams also plays a key role as the underlying platform for an emerging service stack. Viewed as the next-generation of Spray.io, Akka HTTP is built on top of Akka Streams. Designed for building HTTP-based integration layers, Akka HTTP provides versatile streaming-oriented HTTP routing and request/response transformation mechanism. Under the hood, Akka Streams’ back-pressure functionality regulates data streaming between the server and the remote client, consequently conserving memory utilization on the server.
Pingback: Akka Dynamic Pub-Sub Service | Genuine Blog