A recent R&D project prompted me to look into a programming platform for a distributed system. Storm coupled with Kafka popped up as a candidate, but as streaming wasn’t part of the system requirement, I decided to try out an actor-based system. Between Java and Scala, I had in mind Scala as the programming language primarily for its good mix of object-oriented and functional programming styles, as mentioned in a previous post about Scala-Play.
Naturally, Scala-Akka became a prime candidate. During the technology evaluation phase, I came across a couple of sample applications from Lightbend (formerly Typesafe) that I think are great tutorials for getting into Scala + Akka. Certain illustrated techniques for performance and scalability in some of the more comprehensive applications are particularly useful. Although the Play framework serves a great application development platform, it’s of less interest from a product functionality perspective.
Akka Actor Systems
Akka is an open-source actor library/toolkit targeted for building scalable concurrent and distributed applications in Scala or Java. At the core of Akka are lightweight computational primitives called actors, each of which encapsulates state and behavior, and communicates via asynchronous immutable message passing.
It’s important to note that keeping messages immutable and non-blocking are some of the fundamental best-practices that the Actor model is designed for. In other words, they aren’t enforced by the model itself, hence it’s the developer’s responsibility to embrace the best practices. The underlying shared-nothing principle of actors with message passing the only means of interactions makes it an appealing concurrency model, as opposed to managing shared memory with locks in general thread-based concurrency.
Sample app #1: Pi approximation
To quickly grasp how to use Akka Actors to solve computational problems, it might be worth taking a look at a sample application for approximating value of pi (i.e. ?). Although the sample application consists of deprecated code, I still find it a nice example for understanding how to craft out the computational components as actors and coordinate partial result passing via messages among the actors.
Given the dated code, one might want to just skim through the source code of the application while skipping the syntactic details. It shows how easy it is to formulate a distributed computation scheme by making the computation workers (Worker), aggregator (Master) and output listener (Listener) as actors, each playing different roles. A couple of notes:
- In general, “tell” (i.e. fire-and-forget) is preferred to “ask” in sending messages for performance reason. It makes sense in this application since it’s an approximation task, hence failure of a worker in a rare case isn’t end of the world.
- Instead of having all actors defined in a single source file as in this example, actors are often defined separately in a slightly more complex application. It’s a common practice that actor classes are defined using companion objects in Scala. For instance, the Worker actor would be something like the following:
object Worker { sealed trait PiApprox case class Work(start: Int, nrOfElements: Int) extends PiApprox case class Result(value: Double) extends PiApprox } class Worker extends Actor { def calculatePiFor(start: Int, nrOfElements: Int): Double = { ... } def receive = { case Work(start, nrOfElements) => sender ! Result(calculatePiFor(start, nrOfElements)) } }
Sample app #2: Reactive maps
Lightbend provides a functionality-rich sample application, reactive maps, that illustrates a number of features centered around an actor system, including:
- GPS using HTML5 geolocation
- Bot simulation on geospatial maps
- Play’s handling WebSockets with actors
- Actor dependency injection
- Akka’s peer-to-peer cluster
- Distributed publish/subscribe in cluster
- Akka persistence and journal
- Akka cluster sharding
- Reactive application deployment
Like most of their sample application templates, reactive-maps comes with a tutorial that walks through the application. What I like about this one is that it starts with a more barebone working version and proceeds to enhance with more robust features. In the second half of the application walk-thru, a new feature for user travel distance tracking is created from scratch and then rewritten to address scalability issue by means of improved design of the associated actor as well as using of Akka persistence/journal and cluster sharding.
Due to the rather wide range of features involved in the application, it might take some effort to go over the whole walk-thru. Nevertheless, I think it’s a worthy exercise to pick up some neat techniques in building a real-world application using Scala and Akka.
Deprecated Akka persistence interface
The source code seems to be rather up to date, although the deprecated Akka persistence interface EventsourcedProcessor does generate some compiler warning. To fix it, use trait PersistenActor instead and override the now abstract PersistenceId method. The relevant code after the fix should be as follows:
/app/backend/UserMetaData.scala:
... // import akka.persistence.EventsourcedProcessor import akka.persistence.PersistentActor ... // class UserMetaData extends EventsourcedProcessor { class UserMetaData extends PersistentActor { ... override def persistenceId = self.path.toStringWithoutAddress override def receiveRecover: Receive = { ... } ...
Issue with dependency injection and cluster sharding
There is a bug caught during compilation that arises from the binding for UserMetaData actors in the Play module, Actors.scala, responsible for initializing actors for the web frontend. Dependency injection is used in the module to bind actors of backend role that need to be created from the backend. The cluster-sharded UserMetaData actors now need to be created with the ClusterSharding extension hence requiring a special binding. This new binding causes an exception as follows:
Unexpected exception CreationException: Unable to create injector, see the following errors: Error in custom provider, java.lang.IllegalArgumentException: Shard type [UserMetaData] must be started first while locating actors.UserMetaDataProvider at actors.Actors.configure(Actors.scala:26) (via modules: com.google.inject.util.Modules$OverrideModule -> actors.Actors) while locating akka.actor.ActorRef annotated with @com.google.inject.name.Named(value=userMetaData) Caused by: java.lang.IllegalArgumentException: Shard type [UserMetaData] must be started first at akka.contrib.pattern.ClusterSharding.shardRegion(ClusterSharding.scala:337) at actors.UserMetaDataProvider.get$lzycompute(Actors.scala:36) at actors.UserMetaDataProvider.get(Actors.scala:36) at actors.UserMetaDataProvider.get(Actors.scala:35) at com.google.inject.internal.ProviderInternalFactory.provision(ProviderInternalFactory.java:81) ...
It can be fixed by moving the ClusterSharding related code from class BackendActors into class UserMetaDataProvider, as follows:
/app/actors/Actors.scala:
... class UserMetaDataProvider @Inject() (system: ActorSystem) extends Provider[ActorRef] { lazy val get = { if (Cluster(system).selfRoles.exists(r => r.startsWith("backend"))) { ClusterSharding(system).start( typeName = UserMetaData.shardName, entryProps = Some(UserMetaData.props), idExtractor = UserMetaData.idExtractor, shardResolver = UserMetaData.shardResolver) } else { ClusterSharding(system).start( typeName = UserMetaData.shardName, entryProps = None, idExtractor = UserMetaData.idExtractor, shardResolver = UserMetaData.shardResolver) } ClusterSharding(system).shardRegion(UserMetaData.shardName) } } ...
Akka persistence journal dependency issue
Near the end of the example walk-thru is a custom journal setup using the key-value store Leveldb. However the setup fails at run-time with errors as follows:
[ERROR] [application-akka.actor.default-dispatcher-2] [ActorSystem(application)] Uncaught fatal error from thread [application-akka.actor.default-dispatcher-2] shutting down ActorSystem [application] java.lang.NoSuchMethodError: com.google.common.io.Closeables.closeQuietly(Ljava/io/Closeable;)V at org.iq80.leveldb.impl.MMapLogWriter.close(MMapLogWriter.java:83) at org.iq80.leveldb.impl.VersionSet.initializeIfNeeded(VersionSet.java:111) at org.iq80.leveldb.impl.VersionSet.(VersionSet.java:91) at org.iq80.leveldb.impl.DbImpl.(DbImpl.java:167) at org.iq80.leveldb.impl.Iq80DBFactory.open(Iq80DBFactory.java:59) at akka.persistence.journal.leveldb.LeveldbStore$class.preStart(LeveldbStore.scala:112) at akka.persistence.journal.leveldb.SharedLeveldbStore.preStart(LeveldbStore.scala:127) ... [ERROR] [application-akka.actor.default-dispatcher-4] [akka://application/user/store] null java.lang.NullPointerException at akka.persistence.journal.leveldb.LeveldbStore$class.postStop(LeveldbStore.scala:117) at akka.persistence.journal.leveldb.SharedLeveldbStore.postStop(LeveldbStore.scala:127) at akka.actor.Actor$class.aroundPostStop(Actor.scala:477) at akka.persistence.journal.leveldb.SharedLeveldbStore.aroundPostStop(LeveldbStore.scala:127) ...
Some relevant bug reports in the Akka community suggests that it’s a problem with LevelDB’s dependency on some dated version of Google Guava. Since the default journal seems to work fine and the custom setup isn’t for production-grade journal anyway, I’m going to skip it. Source code with the above changes can be found at GitHub. In a production environment, one would probably want to use Redis, PostgreSQL, HBase, etc, for the persistence journal.
Below is a screen-shot of the final version of the reactive-maps application.
Final thoughts
Despite the described glitches, Lightbend’s reactive-maps application is a well-thought-out tutorial, methodically stepping through the thought process from design to implementation, along with helpful remarks related to real-world performance and scalability. Even though the sample application is primarily for illustration, it’s no trivial hello-world and a lot of demonstrated techniques could be borrowed or repurposed in a production-grade actor system.
As mentioned earlier in the post, I think Akka actor with its shared-nothing principle and non-blocking message-passing communication is a great alternative to the thread-based shared-memory concurrency model in which deadlocks and expensive context switching could be dreadful to deal with. However, in building a well-designed actor-based application, it does require proper architectural work and discipline for best-practices to componentize tasks into lightweight actors which interact among themselves by means of immutable message passing.
On scalability, there are powerful features like actor cluster sharding and distributed publish/subscribe that allow one to build actor systems that can scale horizontally. And last but not least, Scala’s deep root in both object-oriented programming and functional programming makes it a effective tool for the coding task.