Tag Archives: actor model

Akka Persistence Journal Using Redis

If you’ve used Lightbend’s Scala-Akka templates that involve persisting Akka actor states, you’ll notice that LevelDB is usually configured as the default storage medium for persistence journals (and snapshots). In many of these templates, a shared LevelDB journal is shared by multiple actor systems. As reminded by the template documentation as well as code-level comments, such setup isn’t suitable for production systems.

Thanks to the prospering Akka user community which maintains a good list of journal plugins you could pick from to suit your specific needs. Journal choices include Cassandra, HBase, Redis, PostgreSQL and others. In this blog post, I’m going to highlight how to set up Akka persistent journal using a plugin for Redis, which is one of the most popular open-source key-value stores.

Redis client for Scala

First thing first, you’ll need a Redis server running on a server node you want your actor systems to connect to. If you haven’t already had one, download the server from Redis website and install it on a designated server host. The installation should include a command-line interface tool, redis-cli, that comes in handy for ad-hoc data update/lookup.

Next, you need a Redis client for Scala, Rediscala, which is a non-blocking Redis driver that wraps Redis requests/responses in Futures. To include the Rediscala in the application, simply specify it as a library dependency in build.sbt.

Redis journal plugin

The Redis journal plugin is from Hootsuite. Similar to how Rediscala is set up in build.sbt, you can add the dependency for the Redis journal plugin. To tell sbt where to locate the Ivy repo for the journal plugin, you’ll also need to add a resolver as well. The build.sbt content should look something like the following:

Alternatively, rather than specifying them as dependencies you can clone the git repos for the Redis client and journal plugin, use sbt to generate a jar file for each of them, and include them in your application library (e.g. under /activator-project-root/lib/).

Application configurations

Now that the library dependency setup for Redis journal and Redis client is taken care of, next in line is to update the configuration information in application.conf to replace LevelDB with Redis.

Besides Akka related configuration, the Redis host and port information is specified in the configuration file. The Redis journal plugin has the RedisJournal class that extends trait DefaultRedisComponent, which in turn reads the Redis host/port information from the configuration file and overrides the default host/port (localhost/6379) in the RedisClient case class within Rediscala.

As for the Akka persistence configuration, simply remove all LevelDB related lines in the configuration file and add the Redis persistence journal (and snapshot) plugin information. The application.conf content now looks like the following:

Onto the application source code

That’s all the configuration changes needed for using Redis persistence journal. To retire LevelDB as the journal store from within the application, you can simply remove all code and imports that reference LevelDB for journal/snapshot setup. Any existing code logic you’ve developed to persist for LevelDB should now be applied to the Redis journal without changes.

In other words, this LevelDB to Redis journal migration is almost entirely a configurative effort. For general-purpose persistence of actor states, Akka’s persist method abstracts you from having to directly deal with Redis-specific interactions. Tracing the source code of Akka’s PersistentActor.scala, persist method is defined as follows:

For instance, a typical persist snippet might look like the following:

In essence, as long as actor states are persisted with the proper method signature, any journal store specific interactions will be taken care of by the corresponding journal plugin.

Internet-of-Things And Akka Actors

IoT (Internet of Things) has recently been one of the most popular buzzwords. Despite being over-hyped, we’re indeed heading towards a foreseeable world in which all sorts of things are inter-connected. Before IoT became a hot acronym, I was heavily involved in building a Home-Area-Network SaaS platform over the course of 5 years in a previous startup I cofounded, so it’s no stranger to me.

At the low-level device network layer, there used to be platform service companies providing gateway hardware along with proprietary APIs for IoT devices running on sensor network protocols (such as ZigBee, Z-Wave). The landscape has been evolving over the past couple of years. As more and more companies begin to throw their weight behind building products in the IoT ecosystem, open standards for device connectivity emerge. One of them is MQTT (Message Queue Telemetry Transport).

Message Queue Telemetry Transport

MQTT had been relatively little-known until it was standardized at OASIS a couple of years ago. The lightweight publish-subscribe messaging protocol, MQTT, has since been increasingly adopted by major players, including Amazon, as the underlying connectivity protocols for IoT devices. It’s TCP/IP based but its variant, MQTT-SN (MQTT for Sensor Networks), covers sensor network communication protocols such as ZigBee. There are also quite a few MQTT message brokers, including HiveMQ, Mosquitto and RabbitMQ.

IoT makes a great use case for Akka actor systems which come with lightweight loosely-coupled actors in decentralized clusters with robust routing, sharding and pub-sub features, as mentioned in a previous blog post. The actor model can be rather easily structured to emulate the operations of a typical IoT network that scales in device volume. In addition, availability of MQTT clients for Akka such as Paho-Akka makes it easy to communicate with MQTT brokers.

A Scala-based IoT application

In this blog post, I’m going to illustrate how to build a scalable distributed worker system using Akka actors to service requests from a MQTT-based IoT system. A good portion of the Akka clustering setup is derived from Lightbend’s Akka distributed workers template. Below is a diagram of the application:

IoT with MQTT pub-sub and Akka Actor Systems
IoT Akka Actor Systems

As shown in the diagram, the application consists of the following components:

1. IoT

  • A DeviceRequest actor which:
    • simulates work requests from IoT devices
    • publishes requests to a MQTT pub-sub topic
    • re-publishes requests upon receiving failure messages from a topic subscriber
  • An IotAgent actor which:
    • subscribes to the mqtt-topic for the work requests
    • sends received work requests via ClusterClient to the master cluster
    • sends DeviceRequest actor a failure message upon receiving failure messages from Master actor
  • A MQTT pub-sub client, MqttPubSub, for communicating with a MQTT broker
  • A configuration helper object, MqttConfig, consisting of:
    • MQTT pub-sub topic
    • URL for the MQTT broker
    • Serialization methods to convert objects to byte arrays, and vice versa

2. Master Cluster

  • A Master singleton actor which:
    • serves as the ClusterClientReceptionist to answer external connection requests
    • fails over to the next-oldest node
    • maintains work states using persistence journal
    • publishes work results to a work-results topic via Akka distributed pub-sub
    • acknowledges work request reception with IotAgent
    • registers Workers and distributes work to available Workers
  • A PostProcessor actor in the master cluster which:
    • simulates post-processing of the work results
    • subscribes to the work-results topic

3. Workers

  • An actor system of Workers each of which:
    • communicates via ClusterClient with the master cluster
    • registers with, pulls work from the Master actor
    • reports work status with the Master actor
    • instantiates a WorkProcessor actor to perform the actual work
  • WorkProcessor actors which process the work requests

Source code is available at GitHub.

A few notes:

  1. Neither IotAgent nor Worker actor system is a part of the master cluster, hence both need to communicate with the Master via ClusterClient.
     
  2. Rather than having the Master actor spawn child Workers and push work over, the Workers are set up to register with the Master and pull work from it – a model similar to what Derek Wyatt advocated in his post.
     
  3. Paho-Akka is used as the MQTT pub-sub client with configuration information held within the helper object, MqttConfig.
     
  4. The helper object MqttConfig consists of MQTT pub-sub topic/broker information and methods to serialize/deserialize the Work objects which, in turn, contains Device objects. The explicit serializations are necessary since multiple JVMs will be at play if one launches the master cluster, IoT and worker actor systems on separate JVMs.
     
  5. The test Mosquitto broker at tcp://test.mosquitto.org:1883 serves as the MQTT broker. An alternative is to install a MQTT broker (Mosquitto, HiveMQ, etc) local to the IoT network.
     
  6. The IotAgent uses Actor’s ask method (?), instead of the fire-and-forget tell method (!), to confirm message receipt by the Master via a Future return. If the receipt confirmation is not so important, using the tell method will be a much preferred choice for performance.
     
  7. This is primarily a proof-of-concept application of IoT using Akka actors, hence code performance optimization isn’t a priority. In addition, for production systems, a production-grade persistence journal (e.g. Redis, Cassandra) should be used and multiple-Master via sharding could be considered.
     

Test-running

Similar to how you would test-run Lightbend’s distributed workers template, you may open up separate command line terminals and run the different components on separate JVMs, adding and killing the launched components to observe how the systems scale out, fail over, persist work states, etc. Here’s an example of test-run sequence:

Below are some sample console output.

Console Output: Master seed node with persistence journal:

Console Output: IotAgent-DeviceRequest node:

Console Output: Worker node:

Scala Distributed Systems With Akka

A recent R&D project prompted me to look into a programming platform for a distributed system. Storm coupled with Kafka popped up as a candidate, but as streaming wasn’t part of the system requirement, I decided to try out an actor-based system. Between Java and Scala, I had in mind Scala as the programming language primarily for its good mix of object-oriented and functional programming styles, as mentioned in a previous post about Scala-Play.

Naturally, Scala-Akka became a prime candidate. During the technology evaluation phase, I came across a couple of sample applications from Lightbend (formerly Typesafe) that I think are great tutorials for getting into Scala + Akka. Certain illustrated techniques for performance and scalability in some of the more comprehensive applications are particularly useful. Although the Play framework serves a great application development platform, it’s of less interest from a product functionality perspective.

Akka Actor Systems

Akka is an open-source actor library/toolkit targeted for building scalable concurrent and distributed applications in Scala or Java. At the core of Akka are lightweight computational primitives called actors, each of which encapsulates state and behavior, and communicates via asynchronous immutable message passing.

It’s important to note that keeping messages immutable and non-blocking are some of the fundamental best-practices that the Actor model is designed for. In other words, they aren’t enforced by the model itself, hence it’s the developer’s responsibility to embrace the best practices. The underlying shared-nothing principle of actors with message passing the only means of interactions makes it an appealing concurrency model, as opposed to managing shared memory with locks in general thread-based concurrency.

Sample app #1: Pi approximation

To quickly grasp how to use Akka Actors to solve computational problems, it might be worth taking a look at a sample application for approximating value of pi (i.e. 𝜋). Although the sample application consists of deprecated code, I still find it a nice example for understanding how to craft out the computational components as actors and coordinate partial result passing via messages among the actors.

Given the dated code, one might want to just skim through the source code of the application while skipping the syntactic details. It shows how easy it is to formulate a distributed computation scheme by making the computation workers (Worker), aggregator (Master) and output listener (Listener) as actors, each playing different roles. A couple of notes:

  1. In general, “tell” (i.e. fire-and-forget) is preferred to “ask” in sending messages for performance reason. It makes sense in this application since it’s an approximation task, hence failure of a worker in a rare case isn’t end of the world.
  2. Instead of having all actors defined in a single source file as in this example, actors are often defined separately in a slightly more complex application. It’s a common practice that actor classes are defined using companion objects in Scala. For instance, the Worker actor would be something like the following:

Actor class and companion object

In a typical actor class in Scala, the receive method is often implemented with messages composed and sent by pattern matching their message types, which in turn are defined in the actor companion object. Here’s another example, which is partial code of an actor in the next sample app:

Sample app #2: Reactive maps

Lightbend provides a functionality-rich sample application, reactive maps, that illustrates a number of features centered around an actor system, including:

  • GPS using HTML5 geolocation
  • Bot simulation on geospatial maps
  • Play’s handling WebSockets with actors
  • Actor dependency injection
  • Akka’s peer-to-peer cluster
  • Distributed publish/subscribe in cluster
  • Akka persistence and journal
  • Akka cluster sharding
  • Reactive application deployment

Like most of their sample application templates, reactive-maps comes with a tutorial that walks through the application. What I like about this one is that it starts with a more barebone working version and proceeds to enhance with more robust features. In the second half of the application walk-thru, a new feature for user travel distance tracking is created from scratch and then rewritten to address scalability issue by means of improved design of the associated actor as well as using of Akka persistence/journal and cluster sharding.

Due to the rather wide range of features involved in the application, it might take some effort to go over the whole walk-thru. Nevertheless, I think it’s a worthy exercise to pick up some neat techniques in building a real-world application using Scala and Akka.

Deprecated Akka persistence interface

The source code seems to be rather up to date, although the deprecated Akka persistence interface EventsourcedProcessor does generate some compiler warning. To fix it, use trait PersistenActor instead and override the now abstract PersistenceId method. The relevant code after the fix should be as follows:

/app/backend/UserMetaData.scala:

Issue with dependency injection and cluster sharding

There is a bug caught during compilation that arises from the binding for UserMetaData actors in the Play module, Actors.scala, responsible for initializing actors for the web frontend. Dependency injection is used in the module to bind actors of backend role that need to be created from the backend. The cluster-sharded UserMetaData actors now need to be created with the ClusterSharding extension hence requiring a special binding. This new binding causes an exception as follows:

It can be fixed by moving the ClusterSharding related code from class BackendActors into class UserMetaDataProvider, as follows:

/app/actors/Actors.scala:

Akka persistence journal dependency issue

Near the end of the example walk-thru is a custom journal setup using the key-value store Leveldb. However the setup fails at run-time with errors as follows:

Some relevant bug reports in the Akka community suggests that it’s a problem with LevelDB’s dependency on some dated version of Google Guava. Since the default journal seems to work fine and the custom setup isn’t for production-grade journal anyway, I’m going to skip it. Source code with the above changes can be found at GitHub. In a production environment, one would probably want to use Redis, PostgreSQL, HBase, etc, for the persistence journal.

Below is a screen-shot of the final version of the reactive-maps application.

Scala Akka Reactive Maps

Final thoughts

Despite the described glitches, Lightbend’s reactive-maps application is a well-thought-out tutorial, methodically stepping through the thought process from design to implementation, along with helpful remarks related to real-world performance and scalability. Even though the sample application is primarily for illustration, it’s no trivial hello-world and a lot of demonstrated techniques could be borrowed or repurposed in a production-grade actor system.

As mentioned earlier in the post, I think Akka actor with its shared-nothing principle and non-blocking message-passing communication is a great alternative to the thread-based shared-memory concurrency model in which deadlocks and expensive context switching could be dreadful to deal with. However, in building a well-designed actor-based application, it does require proper architectural work and discipline for best-practices to componentize tasks into lightweight actors which interact among themselves by means of immutable message passing.

On scalability, there are powerful features like actor cluster sharding and distributed publish/subscribe that allow one to build actor systems that can scale horizontally. And last but not least, Scala’s deep root in both object-oriented programming and functional programming makes it a effective tool for the coding task.