Tag Archives: akka actors

Text Mining With Akka Streams

Reactive Systems, whose core characteristics are declared in the Reactive Manifesto, have started to emerge in recent years as message-driven systems that emphasize scalability, responsiveness and resilience. It’s pretty clear from the requirements that a system can’t be simply made Reactive. Rather, it should be built from the architectural level to be Reactive.

Akka’s actor systems, which rely on asynchronous message-passing among lightweight loosely-coupled actors, serve a great run-time platform for building Reactive Systems on the JVM (Java Virtual Machine). I posted a few blogs along with sample code about Akka actors in the past. This time I’m going to talk about something different but closely related.

Reactive Streams

While bearing a similar name, Reactive Streams is a separate initiative that mandates its implementations to be capable of processing stream data asynchronously and at the same time automatically regulating the stream flows in a non-blocking fashion.

Akka Streams, built on top of Akka actor systems, is an implementation of Reactive Streams. Equipped with the back-pressure functionality, it eliminates the need of manually buffering stream flows or custom-building stream buffering mechanism to avoid buffer overflow problems.

Extracting n-grams from text

In text mining, n-grams are useful data in the area of NLP (natural language processing). In this blog post, I’ll illustrate extracting n-grams from a stream of text messages using Akka Streams with Scala as the programming language.

First thing first, let’s create an object with methods for generating random text content:

Source code: TextMessage.scala

Some minimal effort has been made to generate random clauses of likely pronounceable fake words along with punctuations. To make it a little more flexible, lengths of individual words and clauses would be supplied as parameters.

Next, create another object with text processing methods responsible for extracting n-grams from input text, with n being an input parameter. Using Scala’s sliding(size, step) iterator method with size n and step default to 1, a new iterator of sliding window view is generated to produce the wanted n-grams.

Source code: TextProcessor.scala

Now that the text processing tools are in place, we can focus on building the main streaming application in which Akka Streams plays the key role.

First, make sure we have the necessary library dependencies included in build.sbt:

Source code: build.sbt

As Akka Streams is relatively new development work, more recent Akka versions (2.4.9 or higher) should be used.

Let’s start with a simple stream for this text mining application:

Source code: NgramStream_v01.scala

As shown in the source code, constructing a simple stream like this is just defining and chaining together the text-generating source, the text-processing flow and the text-display sink as follows:

Graph DSL

Akka Streams provides a Graph DSL (domain-specific language) that helps build the topology of stream flows using predefined fan-in/fan-out functions.

What Graph DSL does is somewhat similar to how Apache Storm‘s TopologyBuilder pieces together its spouts (i.e. stream sources), bolts (i.e. stream processors) and stream grouping/partitioning functions, as illustrated in a previous blog about HBase streaming.

Back-pressue

Now, let’s branch off the stream using Graph DSL to illustrate how the integral back-pressue feature is at play.

Source code: NgramStream_v02.scala

Streaming to a file should be significantly slower than streaming to the console. To make the difference more noticeable, a delay is deliberately added to streaming each line of text in the file sink.

Running the application and you will notice that the console display is slowed down. It’s the result of the upstream data flow being regulated to accommodate the relatively slow file I/O outlet even though the other console outlet is able to consume relatively faster – all that being conducted in a non-blocking fashion.

Graph DSL create() methods

To build a streaming topology using Graph DSL, you’ll need to use one of the create() methods defined within trait GraphApply, which is extended by object GraphDSL. Here are the signatures of the create() methods:

Note that the sbt-boilerplate template language is needed to interpret the create() method being used in the application that takes multiple stream components as input parameters.

Materialized values

In Akka Streams, materializing a constructed stream is the step of actually running the stream with the necessary resources. To run the stream, the implicitly passed factory method ActorMaterializer() is required to allocate the resources for stream execution. That includes starting up the underlying Akka actors to process the stream.

Every processing stage of the stream can produce a materialized value. By default, using the via(flow) and to(sink) functions, the materialized value of the left-most stage will be preserved. As in the following example, for graph1, the materialized value of the source is preserved:

To allow one to selectively capture the materialized values of the specific stream components, Akka Streams provides functions viaMat(flow) and toMat(sink) along with a combiner function, Keep. As shown in the above example, for graph2, the materialized value of the flow is preserved, whereas for graph3, materialized values for both the flow and sink are preserved.

Back to our fileSink function as listed below, toMat(fileIOSink)(Keep.right) instructs Akka Streams to keep the materialized value of the fileIOSink as a Future value of type IOResult:

Using Graph DSL, as seen earlier in the signature of the create() method, one can select what materialized value is to be preserved by specifying the associated stream components accordingly as the curried parameters:

In our case, we want the materialized value of fileSink, thus the curried parameters should look like this:

Defining the stream graph

Akka Streams provides a number of functions for fan-out (e.g. Broadcast, Balance) and fan-in (e.g. Merge, Concat). In our example, we want a simple topology with a single text source and the same n-gram generator flow branching off to two sinks in parallel:

Adding a message counter

Let’s further expand our n-gram extraction application to include displaying a count. A simple count-flow is created to map each message string into numeric 1, and a count-sink to sum up all these 1′s streamed to the sink. Adding them as the third flow and sink to the existing stream topology yields something similar to the following:

Source code: NgramStream_v03.scala

Full source code of the application is at GitHub.

Final thoughts

Having used Apache Storm, I see it a rather different beast compared with Akka Streams. A full comparison between the two would obviously be an extensive exercise by itself, but it suffices to say that both are great platforms for streaming applications.

Perhaps one of the biggest differences between the two is that Storm provides granular message delivery options (at most / at least / exactly once, guaranteed message delivery) whereas Akka Streams by design questions the premise of reliable messaging on distributed systems. For instance, if guaranteed message delivery is a requirement, Akka Streams would probably not be the best choice.

Back-pressure has recently been added to Storm’s v.1.0.x built-in feature list, so there is indeed some flavor of reactiveness in it. Aside from message delivery options, choosing between the two technologies might be a decision basing more on other factors such as engineering staff’s expertise, concurrency model preference, etc.

Outside of the turf of typical streaming systems, Akka Streams also plays a key role as the underlying platform for an emerging service stack. Viewed as the next-generation of Spray.io, Akka HTTP is built on top of Akka Streams. Designed for building HTTP-based integration layers, Akka HTTP provides versatile streaming-oriented HTTP routing and request/response transformation mechanism. Under the hood, Akka Streams’ back-pressure functionality regulates data streaming between the server and the remote client, consequently conserving memory utilization on the server.

Akka Persistence Journal Using Redis

If you’ve used Lightbend’s Scala-Akka templates that involve persisting Akka actor states, you’ll notice that LevelDB is usually configured as the default storage medium for persistence journals (and snapshots). In many of these templates, a shared LevelDB journal is shared by multiple actor systems. As reminded by the template documentation as well as code-level comments, such setup isn’t suitable for production systems.

Thanks to the prospering Akka user community which maintains a good list of journal plugins you could pick from to suit your specific needs. Journal choices include Cassandra, HBase, Redis, PostgreSQL and others. In this blog post, I’m going to highlight how to set up Akka persistent journal using a plugin for Redis, which is one of the most popular open-source key-value stores.

Redis client for Scala

First thing first, you’ll need a Redis server running on a server node you want your actor systems to connect to. If you haven’t already had one, download the server from Redis website and install it on a designated server host. The installation should include a command-line interface tool, redis-cli, that comes in handy for ad-hoc data update/lookup.

Next, you need a Redis client for Scala, Rediscala, which is a non-blocking Redis driver that wraps Redis requests/responses in Futures. To include the Rediscala in the application, simply specify it as a library dependency in build.sbt.

Redis journal plugin

The Redis journal plugin is from Hootsuite. Similar to how Rediscala is set up in build.sbt, you can add the dependency for the Redis journal plugin. To tell sbt where to locate the Ivy repo for the journal plugin, you’ll also need to add a resolver as well. The build.sbt content should look something like the following:

Alternatively, rather than specifying them as dependencies you can clone the git repos for the Redis client and journal plugin, use sbt to generate a jar file for each of them, and include them in your application library (e.g. under /activator-project-root/lib/).

Application configurations

Now that the library dependency setup for Redis journal and Redis client is taken care of, next in line is to update the configuration information in application.conf to replace LevelDB with Redis.

Besides Akka related configuration, the Redis host and port information is specified in the configuration file. The Redis journal plugin has the RedisJournal class that extends trait DefaultRedisComponent, which in turn reads the Redis host/port information from the configuration file and overrides the default host/port (localhost/6379) in the RedisClient case class within Rediscala.

As for the Akka persistence configuration, simply remove all LevelDB related lines in the configuration file and add the Redis persistence journal (and snapshot) plugin information. The application.conf content now looks like the following:

Onto the application source code

That’s all the configuration changes needed for using Redis persistence journal. To retire LevelDB as the journal store from within the application, you can simply remove all code and imports that reference LevelDB for journal/snapshot setup. Any existing code logic you’ve developed to persist for LevelDB should now be applied to the Redis journal without changes.

In other words, this LevelDB to Redis journal migration is almost entirely a configurative effort. For general-purpose persistence of actor states, Akka’s persist method abstracts you from having to directly deal with Redis-specific interactions. Tracing the source code of Akka’s PersistentActor.scala, persist method is defined as follows:

For instance, a typical persist snippet might look like the following:

In essence, as long as actor states are persisted with the proper method signature, any journal store specific interactions will be taken care of by the corresponding journal plugin.

Internet-of-Things And Akka Actors

IoT (Internet of Things) has recently been one of the most popular buzzwords. Despite being over-hyped, we’re indeed heading towards a foreseeable world in which all sorts of things are inter-connected. Before IoT became a hot acronym, I was heavily involved in building a Home-Area-Network SaaS platform over the course of 5 years in a previous startup I cofounded, so it’s no stranger to me.

At the low-level device network layer, there used to be platform service companies providing gateway hardware along with proprietary APIs for IoT devices running on sensor network protocols (such as ZigBee, Z-Wave). The landscape has been evolving over the past couple of years. As more and more companies begin to throw their weight behind building products in the IoT ecosystem, open standards for device connectivity emerge. One of them is MQTT (Message Queue Telemetry Transport).

Message Queue Telemetry Transport

MQTT had been relatively little-known until it was standardized at OASIS a couple of years ago. The lightweight publish-subscribe messaging protocol, MQTT, has since been increasingly adopted by major players, including Amazon, as the underlying connectivity protocols for IoT devices. It’s TCP/IP based but its variant, MQTT-SN (MQTT for Sensor Networks), covers sensor network communication protocols such as ZigBee. There are also quite a few MQTT message brokers, including HiveMQ, Mosquitto and RabbitMQ.

IoT makes a great use case for Akka actor systems which come with lightweight loosely-coupled actors in decentralized clusters with robust routing, sharding and pub-sub features, as mentioned in a previous blog post. The actor model can be rather easily structured to emulate the operations of a typical IoT network that scales in device volume. In addition, availability of MQTT clients for Akka such as Paho-Akka makes it easy to communicate with MQTT brokers.

A Scala-based IoT application

In this blog post, I’m going to illustrate how to build a scalable distributed worker system using Akka actors to service requests from a MQTT-based IoT system. A good portion of the Akka clustering setup is derived from Lightbend’s Akka distributed workers template. Below is a diagram of the application:

IoT with MQTT pub-sub and Akka Actor Systems
IoT Akka Actor Systems

As shown in the diagram, the application consists of the following components:

1. IoT

  • A DeviceRequest actor which:
    • simulates work requests from IoT devices
    • publishes requests to a MQTT pub-sub topic
    • re-publishes requests upon receiving failure messages from a topic subscriber
  • An IotAgent actor which:
    • subscribes to the mqtt-topic for the work requests
    • sends received work requests via ClusterClient to the master cluster
    • sends DeviceRequest actor a failure message upon receiving failure messages from Master actor
  • A MQTT pub-sub client, MqttPubSub, for communicating with a MQTT broker
  • A configuration helper object, MqttConfig, consisting of:
    • MQTT pub-sub topic
    • URL for the MQTT broker
    • Serialization methods to convert objects to byte arrays, and vice versa

2. Master Cluster

  • A Master singleton actor which:
    • serves as the ClusterClientReceptionist to answer external connection requests
    • fails over to the next-oldest node
    • maintains work states using persistence journal
    • publishes work results to a work-results topic via Akka distributed pub-sub
    • acknowledges work request reception with IotAgent
    • registers Workers and distributes work to available Workers
  • A PostProcessor actor in the master cluster which:
    • simulates post-processing of the work results
    • subscribes to the work-results topic

3. Workers

  • An actor system of Workers each of which:
    • communicates via ClusterClient with the master cluster
    • registers with, pulls work from the Master actor
    • reports work status with the Master actor
    • instantiates a WorkProcessor actor to perform the actual work
  • WorkProcessor actors which process the work requests

Source code is available at GitHub.

A few notes:

  1. Neither IotAgent nor Worker actor system is a part of the master cluster, hence both need to communicate with the Master via ClusterClient.
     
  2. Rather than having the Master actor spawn child Workers and push work over, the Workers are set up to register with the Master and pull work from it – a model similar to what Derek Wyatt advocated in his post.
     
  3. Paho-Akka is used as the MQTT pub-sub client with configuration information held within the helper object, MqttConfig.
     
  4. The helper object MqttConfig consists of MQTT pub-sub topic/broker information and methods to serialize/deserialize the Work objects which, in turn, contains Device objects. The explicit serializations are necessary since multiple JVMs will be at play if one launches the master cluster, IoT and worker actor systems on separate JVMs.
     
  5. The test Mosquitto broker at tcp://test.mosquitto.org:1883 serves as the MQTT broker. An alternative is to install a MQTT broker (Mosquitto, HiveMQ, etc) local to the IoT network.
     
  6. The IotAgent uses Actor’s ask method (?), instead of the fire-and-forget tell method (!), to confirm message receipt by the Master via a Future return. If the receipt confirmation is not so important, using the tell method will be a much preferred choice for performance.
     
  7. This is primarily a proof-of-concept application of IoT using Akka actors, hence code performance optimization isn’t a priority. In addition, for production systems, a production-grade persistence journal (e.g. Redis, Cassandra) should be used and multiple-Master via sharding could be considered.
     

Test-running

Similar to how you would test-run Lightbend’s distributed workers template, you may open up separate command line terminals and run the different components on separate JVMs, adding and killing the launched components to observe how the systems scale out, fail over, persist work states, etc. Here’s an example of test-run sequence:

Below are some sample console output.

Console Output: Master seed node with persistence journal:

Console Output: IotAgent-DeviceRequest node:

Console Output: Worker node:

Scala Distributed Systems With Akka

A recent R&D project prompted me to look into a programming platform for a distributed system. Storm coupled with Kafka popped up as a candidate, but as streaming wasn’t part of the system requirement, I decided to try out an actor-based system. Between Java and Scala, I had in mind Scala as the programming language primarily for its good mix of object-oriented and functional programming styles, as mentioned in a previous post about Scala-Play.

Naturally, Scala-Akka became a prime candidate. During the technology evaluation phase, I came across a couple of sample applications from Lightbend (formerly Typesafe) that I think are great tutorials for getting into Scala + Akka. Certain illustrated techniques for performance and scalability in some of the more comprehensive applications are particularly useful. Although the Play framework serves a great application development platform, it’s of less interest from a product functionality perspective.

Akka Actor Systems

Akka is an open-source actor library/toolkit targeted for building scalable concurrent and distributed applications in Scala or Java. At the core of Akka are lightweight computational primitives called actors, each of which encapsulates state and behavior, and communicates via asynchronous immutable message passing.

It’s important to note that keeping messages immutable and non-blocking are some of the fundamental best-practices that the Actor model is designed for. In other words, they aren’t enforced by the model itself, hence it’s the developer’s responsibility to embrace the best practices. The underlying shared-nothing principle of actors with message passing the only means of interactions makes it an appealing concurrency model, as opposed to managing shared memory with locks in general thread-based concurrency.

Sample app #1: Pi approximation

To quickly grasp how to use Akka Actors to solve computational problems, it might be worth taking a look at a sample application for approximating value of pi (i.e. 𝜋). Although the sample application consists of deprecated code, I still find it a nice example for understanding how to craft out the computational components as actors and coordinate partial result passing via messages among the actors.

Given the dated code, one might want to just skim through the source code of the application while skipping the syntactic details. It shows how easy it is to formulate a distributed computation scheme by making the computation workers (Worker), aggregator (Master) and output listener (Listener) as actors, each playing different roles. A couple of notes:

  1. In general, “tell” (i.e. fire-and-forget) is preferred to “ask” in sending messages for performance reason. It makes sense in this application since it’s an approximation task, hence failure of a worker in a rare case isn’t end of the world.
  2. Instead of having all actors defined in a single source file as in this example, actors are often defined separately in a slightly more complex application. It’s a common practice that actor classes are defined using companion objects in Scala. For instance, the Worker actor would be something like the following:

Actor class and companion object

In a typical actor class in Scala, the receive method is often implemented with messages composed and sent by pattern matching their message types, which in turn are defined in the actor companion object. Here’s another example, which is partial code of an actor in the next sample app:

Sample app #2: Reactive maps

Lightbend provides a functionality-rich sample application, reactive maps, that illustrates a number of features centered around an actor system, including:

  • GPS using HTML5 geolocation
  • Bot simulation on geospatial maps
  • Play’s handling WebSockets with actors
  • Actor dependency injection
  • Akka’s peer-to-peer cluster
  • Distributed publish/subscribe in cluster
  • Akka persistence and journal
  • Akka cluster sharding
  • Reactive application deployment

Like most of their sample application templates, reactive-maps comes with a tutorial that walks through the application. What I like about this one is that it starts with a more barebone working version and proceeds to enhance with more robust features. In the second half of the application walk-thru, a new feature for user travel distance tracking is created from scratch and then rewritten to address scalability issue by means of improved design of the associated actor as well as using of Akka persistence/journal and cluster sharding.

Due to the rather wide range of features involved in the application, it might take some effort to go over the whole walk-thru. Nevertheless, I think it’s a worthy exercise to pick up some neat techniques in building a real-world application using Scala and Akka.

Deprecated Akka persistence interface

The source code seems to be rather up to date, although the deprecated Akka persistence interface EventsourcedProcessor does generate some compiler warning. To fix it, use trait PersistenActor instead and override the now abstract PersistenceId method. The relevant code after the fix should be as follows:

/app/backend/UserMetaData.scala:

Issue with dependency injection and cluster sharding

There is a bug caught during compilation that arises from the binding for UserMetaData actors in the Play module, Actors.scala, responsible for initializing actors for the web frontend. Dependency injection is used in the module to bind actors of backend role that need to be created from the backend. The cluster-sharded UserMetaData actors now need to be created with the ClusterSharding extension hence requiring a special binding. This new binding causes an exception as follows:

It can be fixed by moving the ClusterSharding related code from class BackendActors into class UserMetaDataProvider, as follows:

/app/actors/Actors.scala:

Akka persistence journal dependency issue

Near the end of the example walk-thru is a custom journal setup using the key-value store Leveldb. However the setup fails at run-time with errors as follows:

Some relevant bug reports in the Akka community suggests that it’s a problem with LevelDB’s dependency on some dated version of Google Guava. Since the default journal seems to work fine and the custom setup isn’t for production-grade journal anyway, I’m going to skip it. Source code with the above changes can be found at GitHub. In a production environment, one would probably want to use Redis, PostgreSQL, HBase, etc, for the persistence journal.

Below is a screen-shot of the final version of the reactive-maps application.

Scala Akka Reactive Maps

Final thoughts

Despite the described glitches, Lightbend’s reactive-maps application is a well-thought-out tutorial, methodically stepping through the thought process from design to implementation, along with helpful remarks related to real-world performance and scalability. Even though the sample application is primarily for illustration, it’s no trivial hello-world and a lot of demonstrated techniques could be borrowed or repurposed in a production-grade actor system.

As mentioned earlier in the post, I think Akka actor with its shared-nothing principle and non-blocking message-passing communication is a great alternative to the thread-based shared-memory concurrency model in which deadlocks and expensive context switching could be dreadful to deal with. However, in building a well-designed actor-based application, it does require proper architectural work and discipline for best-practices to componentize tasks into lightweight actors which interact among themselves by means of immutable message passing.

On scalability, there are powerful features like actor cluster sharding and distributed publish/subscribe that allow one to build actor systems that can scale horizontally. And last but not least, Scala’s deep root in both object-oriented programming and functional programming makes it a effective tool for the coding task.