Tag Archives: actor model

Internet-of-Things And Akka Actors

IoT (Internet of Things) has recently been one of the most popular buzzwords. Despite being over-hyped, we’re indeed heading towards a foreseeable world in which all sorts of things are inter-connected. Before IoT became a hot acronym, I was heavily involved in building a Home-Area-Network SaaS platform over the course of 5 years in a previous startup I cofounded, so it’s no stranger to me.

At the low-level device network layer, there used to be platform service companies providing gateway hardware along with proprietary APIs for IoT devices running on sensor network protocols (such as ZigBee, Z-Wave). The landscape has been evolving over the past couple of years. As more and more companies begin to throw their weight behind building products in the IoT ecosystem, open standards for device connectivity emerge. One of them is MQTT (Message Queue Telemetry Transport).

Message Queue Telemetry Transport

MQTT had been relatively little-known until it was standardized at OASIS a couple of years ago. The lightweight publish-subscribe messaging protocol, MQTT, has since been increasingly adopted by major players, including Amazon, as the underlying connectivity protocols for IoT devices. It’s TCP/IP based but its variant, MQTT-SN (MQTT for Sensor Networks), covers sensor network communication protocols such as ZigBee. There are also quite a few MQTT message brokers, including HiveMQ, Mosquitto and RabbitMQ.

IoT makes a great use case for Akka actor systems which come with lightweight loosely-coupled actors in decentralized clusters with robust routing, sharding and pub-sub features, as mentioned in a previous blog post. The actor model can be rather easily structured to emulate the operations of a typical IoT network that scales in device volume. In addition, availability of MQTT clients for Akka such as Paho-Akka makes it easy to communicate with MQTT brokers.

A Scala-based IoT application

UPDATE: An expanded version of this application with individual actors representing each of the IoT devices, each of which maintains its own internal state and setting, is now available. Please see the Akka Actors IoT v.2 blog post for details.

In this blog post, I’m going to illustrate how to build a scalable distributed worker system using Akka actors to service requests from a MQTT-based IoT system. A good portion of the Akka clustering setup is derived from Lightbend’s Akka distributed workers template. Below is a diagram of the application:

IoT with MQTT and Akka Actor Systems

As shown in the diagram, the application consists of the following components:

1. IoT

  • A DeviceRequest actor which:
    • simulates work requests from IoT devices
    • publishes requests to a MQTT pub-sub topic
    • re-publishes requests upon receiving failure messages from a topic subscriber
  • An IotAgent actor which:
    • subscribes to the mqtt-topic for the work requests
    • sends received work requests via ClusterClient to the master cluster
    • sends DeviceRequest actor a failure message upon receiving failure messages from Master actor
  • A MQTT pub-sub client, MqttPubSub, for communicating with a MQTT broker
  • A configuration helper object, MqttConfig, consisting of:
    • MQTT pub-sub topic
    • URL for the MQTT broker
    • Serialization methods to convert objects to byte arrays, and vice versa

2. Master Cluster

  • A fault-tolerant decentralized cluster which:
    • manages a singleton actor instance among the cluster nodes (with a specified role)
    • delegates ClusterClientReceptionist on every node to answer external connection requests
    • provides fail-over of the singleton actor to the next-oldest node in the cluster
  • A Master singleton actor which:
    • registers Workers and distributes work to available Workers
    • acknowledges work request reception with IotAgent
    • publishes work results to a work-results topic via Akka distributed pub-sub
    • maintains work states using persistence journal
  • A PostProcessor actor in the master cluster which:
    • simulates post-processing of the work results
    • subscribes to the work-results topic

3. Workers

  • An actor system of Workers each of which:
    • communicates via ClusterClient with the master cluster
    • registers with, pulls work from the Master actor
    • reports work status with the Master actor
    • instantiates a WorkProcessor actor to perform the actual work
  • WorkProcessor actors which process the work requests

Source code is available at GitHub.

A few notes:

  1. Neither IotAgent nor Worker actor system is a part of the master cluster, hence both need to communicate with the Master via ClusterClient.
  2. Rather than having the Master actor spawn child Workers and push work over, the Workers are set up to register with the Master and pull work from it – a model similar to what Derek Wyatt advocated in his post.
  3. Paho-Akka is used as the MQTT pub-sub client with configuration information held within the helper object, MqttConfig.
  4. The helper object MqttConfig consists of MQTT pub-sub topic/broker information and methods to serialize/deserialize the Work objects which, in turn, contains Device objects. The explicit serializations are necessary since multiple JVMs will be at play if one launches the master cluster, IoT and worker actor systems on separate JVMs.
  5. The test Mosquitto broker at tcp://test.mosquitto.org:1883 serves as the MQTT broker. An alternative is to install a MQTT broker (Mosquitto, HiveMQ, etc) local to the IoT network.
  6. The IotAgent uses Actor’s ask method (?), instead of the fire-and-forget tell method (!), to confirm message receipt by the Master via a Future return. If the receipt confirmation is not so important, using the tell method will be a much preferred choice for performance.
  7. This is primarily a proof-of-concept application of IoT using Akka actors, hence code performance optimization isn’t a priority. In addition, for production systems, a production-grade persistence journal (e.g. Redis, Cassandra) should be used and multiple-Master via sharding could be considered.

Test-running

Similar to how you would test-run Lightbend’s distributed workers template, you may open up separate command line terminals and run the different components on separate JVMs, adding and killing the launched components to observe how the systems scale out, fail over, persist work states, etc. Here’s an example of test-run sequence:

Launch the master cluster seed node with persistence journal:
<project-root>/bin/activator "runMain worker.Main 2551"

Launch the IotAgent-DeviceRequest node:
<project-root>/bin/activator "runMain worker.Main 3001"

Launch additional master cluster seed node:
<project-root>/bin/activator "runMain worker.Main 2552"

Launch the Worker node:
<project-root>/bin/activator "runMain worker.Main 0"

Launch additional Worker node:
<project-root>/bin/activator "runMain worker.Main 0"

Below are some sample console output.

Console Output: Master seed node with persistence journal:

$ bin/activator "runMain worker.Main 2551"
[info] Loading project definition from /Users/leo/apps/scala/akka-iot-mqtt/project/project
[info] Updating {file:/Users/leo/apps/scala/akka-iot-mqtt/project/project/}akka-iot-mqtt-build-build...
...
...
[info] Done updating.
[info] Compiling 12 Scala sources to /Users/leo/apps/scala/akka-iot-mqtt/target/scala-2.11/classes...
background log: info: Running worker.Main 2551
background log: info: [INFO] [04/29/2016 08:54:19.120] [main] [akka.remote.Remoting] Starting remoting
background log: info: [INFO] [04/29/2016 08:54:21.627] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2551]
background log: info: [INFO] [04/29/2016 08:54:21.628] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://ClusterSystem@127.0.0.1:2551]
...
...
background log: info: [INFO] [04/29/2016 08:54:28.140] [ClusterSystem-akka.actor.default-dispatcher-15] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2551] is JOINING, roles [backend]
background log: info: [INFO] [04/29/2016 08:54:28.974] [ClusterSystem-akka.actor.default-dispatcher-15] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2551] to [Up]
background log: info: [INFO] [04/29/2016 08:54:29.102] [ClusterSystem-akka.actor.default-dispatcher-18] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master] Singleton manager starting singleton actor [akka://ClusterSystem/user/master/singleton]
background log: info: [INFO] [04/29/2016 08:54:29.103] [ClusterSystem-akka.actor.default-dispatcher-18] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master] ClusterSingletonManager state change [Start -> Oldest]
...
...

<<< /bin/activator "runMain worker.Main 2552"

background log: info: [INFO] [04/29/2016 08:57:17.648] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2552] is JOINING, roles [backend]
background log: info: [INFO] [04/29/2016 08:57:18.194] [ClusterSystem-akka.actor.default-dispatcher-14] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2552] to [Up]
...
...
background log: info: [INFO] [04/29/2016 08:57:29.043] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Marking node(s) as REACHABLE [Member(address = akka.tcp://ClusterSystem@127.0.0.1:2552, status = Up)]
...
...

<<< [while worker node 0-a is up]

background log: info: [INFO] [04/29/2016 09:05:09.448] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work: Work Id 4fc64049-fa21-49c6-9d54-743eaa055373 | Device Id lamp-5095
background log: info: [INFO] [04/29/2016 09:05:09.930] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work to: Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316 | Work Id 4fc64049-fa21-49c6-9d54-743eaa055373 | Device Id lamp-5095
background log: info: [INFO] [04/29/2016 09:05:09.934] [ClusterSystem-akka.actor.default-dispatcher-45] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Acknowledged work done: Work Id 4fc64049-fa21-49c6-9d54-743eaa055373 | Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316
background log: info: [INFO] [04/29/2016 09:05:10.169] [ClusterSystem-akka.actor.default-dispatcher-20] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/postprocessor] Post-processor -> Got work result NO CHANGE | Work Id 4fc64049-fa21-49c6-9d54-743eaa055373
background log: info: [INFO] [04/29/2016 09:05:15.011] [ClusterSystem-akka.actor.default-dispatcher-45] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work: Work Id a66d1079-5083-491e-9b5e-885e7914426f | Device Id security-alarm-9095
background log: info: [INFO] [04/29/2016 09:05:15.366] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work to: Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316 | Work Id a66d1079-5083-491e-9b5e-885e7914426f | Device Id security-alarm-9095
background log: info: [INFO] [04/29/2016 09:05:15.370] [ClusterSystem-akka.actor.default-dispatcher-45] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Acknowledged work done: Work Id a66d1079-5083-491e-9b5e-885e7914426f | Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316
background log: info: [INFO] [04/29/2016 09:05:15.542] [ClusterSystem-akka.actor.default-dispatcher-20] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/postprocessor] Post-processor -> Got work result Switch light to ON | Work Id a66d1079-5083-491e-9b5e-885e7914426f
background log: info: [INFO] [04/29/2016 09:05:22.412] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work: Work Id bc502538-5baa-494d-8565-5cb91ad27001 | Device Id thermostat-1004
background log: info: [INFO] [04/29/2016 09:05:22.699] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work to: Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316 | Work Id bc502538-5baa-494d-8565-5cb91ad27001 | Device Id thermostat-1004
background log: info: [INFO] [04/29/2016 09:05:22.703] [ClusterSystem-akka.actor.default-dispatcher-43] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Acknowledged work done: Work Id bc502538-5baa-494d-8565-5cb91ad27001 | Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316
background log: info: [INFO] [04/29/2016 09:05:22.798] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/postprocessor] Post-processor -> Got work result LOWER temperature by 1F | Work Id bc502538-5baa-494d-8565-5cb91ad27001
background log: info: [INFO] [04/29/2016 09:05:31.779] [ClusterSystem-akka.actor.default-dispatcher-45] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work: Work Id 1c815602-9666-4c5d-9029-4fdd719f48b4 | Device Id thermostat-1044
background log: info: [INFO] [04/29/2016 09:05:32.065] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work to: Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316 | Work Id 1c815602-9666-4c5d-9029-4fdd719f48b4 | Device Id thermostat-1044
background log: info: [INFO] [04/29/2016 09:05:32.070] [ClusterSystem-akka.actor.default-dispatcher-45] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Acknowledged work done: Work Id 1c815602-9666-4c5d-9029-4fdd719f48b4 | Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316
background log: info: [INFO] [04/29/2016 09:05:32.257] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/postprocessor] Post-processor -> Got work result RAISE temperature by 2F | Work Id 1c815602-9666-4c5d-9029-4fdd719f48b4
background log: info: [INFO] [04/29/2016 09:05:36.169] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work: Work Id fadc48cf-a8c7-4942-bd35-d97b7acf8ad9 | Device Id lamp-5471
background log: info: [INFO] [04/29/2016 09:05:36.359] [ClusterSystem-akka.actor.default-dispatcher-39] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work to: Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316 | Work Id fadc48cf-a8c7-4942-bd35-d97b7acf8ad9 | Device Id lamp-5471
background log: info: [INFO] [04/29/2016 09:05:36.363] [ClusterSystem-akka.actor.default-dispatcher-43] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Acknowledged work done: Work Id fadc48cf-a8c7-4942-bd35-d97b7acf8ad9 | Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316
background log: info: [INFO] [04/29/2016 09:05:36.496] [ClusterSystem-akka.actor.default-dispatcher-39] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/postprocessor] Post-processor -> Got work result Switch light to ON | Work Id fadc48cf-a8c7-4942-bd35-d97b7acf8ad9
...
...

<<< [downing worker node 54323]

background log: info: [WARN] [04/29/2016 09:12:43.526] [ClusterSystem-akka.remote.default-remote-dispatcher-34] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FWorkerSystem%40127.0.0.1%3A54323-3] Association with remote system [akka.tcp://WorkerSystem@127.0.0.1:54323] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
background log: info: [INFO] [04/29/2016 09:12:50.846] [ClusterSystem-akka.actor.default-dispatcher-49] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work: Work Id ea8f92ae-776f-4bd0-87b4-5ad3992b1daa | Device Id lamp-5210
background log: info: [INFO] [04/29/2016 09:12:50.884] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work to: Worker Id 68a11934-16ca-4bbb-a8c9-27d5a4ac5ede | Work Id ea8f92ae-776f-4bd0-87b4-5ad3992b1daa | Device Id lamp-5210
background log: info: [INFO] [04/29/2016 09:12:51.009] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Acknowledged work done: Work Id ea8f92ae-776f-4bd0-87b4-5ad3992b1daa | Worker Id 68a11934-16ca-4bbb-a8c9-27d5a4ac5ede
background log: info: [INFO] [04/29/2016 09:12:51.010] [ClusterSystem-akka.actor.default-dispatcher-39] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/postprocessor] Post-processor -> Got work result Switch light to ON | Work Id ea8f92ae-776f-4bd0-87b4-5ad3992b1daa
...
...

<<< [downing worker node 54351]

background log: info: [WARN] [04/29/2016 09:18:07.161] [ClusterSystem-akka.remote.default-remote-dispatcher-38] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FWorkerSystem%40127.0.0.1%3A54351-5] Association with remote system [akka.tcp://WorkerSystem@127.0.0.1:54351] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
...
...

<<< [downing iot-device node 3001]

background log: info: [WARN] [04/29/2016 09:18:43.385] [ClusterSystem-akka.remote.default-remote-dispatcher-38] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FIotSystem%40127.0.0.1%3A3001-8] Association with remote system [akka.tcp://IotSystem@127.0.0.1:3001] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
...
...

<<< [downing cluster seed node 2552]

background log: info: [WARN] [04/29/2016 09:18:53.937] [ClusterSystem-akka.remote.default-remote-dispatcher-34] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
...
...
background log: info: [INFO] [04/29/2016 09:19:07.754] [ClusterSystem-akka.actor.default-dispatcher-43] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is auto-downing unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2552]
background log: info: [INFO] [04/29/2016 09:19:07.783] [ClusterSystem-akka.actor.default-dispatcher-43] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Marking unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2552] as [Down]
background log: info: [INFO] [04/29/2016 09:19:08.775] [ClusterSystem-akka.actor.default-dispatcher-51] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is removing unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2552]
background log: info: [INFO] [04/29/2016 09:19:08.776] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master] Member removed [akka.tcp://ClusterSystem@127.0.0.1:2552]
background log: info: [WARN] [04/29/2016 09:19:08.861] [ClusterSystem-akka.remote.default-remote-dispatcher-38] [akka.remote.Remoting] Association to [akka.tcp://ClusterSystem@127.0.0.1:2552] having UID [113992052] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.

Console Output: IotAgent-DeviceRequest node:

$ bin/activator "runMain worker.Main 3001"
[info] Loading project definition from /Users/leo/apps/scala/akka-iot-mqtt/project/project
[info] Loading project definition from /Users/leo/apps/scala/akka-iot-mqtt/project
[info] Set current project to akka-iot-mqtt (in build file:/Users/leo/apps/scala/akka-iot-mqtt/)
background log: info: Running worker.Main 3001
background log: info: [INFO] [04/29/2016 09:00:42.224] [main] [akka.remote.Remoting] Starting remoting
background log: info: [INFO] [04/29/2016 09:00:42.664] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://IotSystem@127.0.0.1:3001]
background log: info: [INFO] [04/29/2016 09:00:42.665] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://IotSystem@127.0.0.1:3001]
background log: info: [INFO] [04/29/2016 09:00:44.563] [IotSystem-akka.actor.default-dispatcher-2] [akka.tcp://IotSystem@127.0.0.1:3001/user/clusterClient] Connected to [akka.tcp://ClusterSystem@127.0.0.1:2552/system/receptionist]
background log: info: 09:00:45.632 [IotSystem-akka.actor.default-dispatcher-3] INFO  com.sandinh.paho.akka.MqttPubSub - connecting to tcp://test.mosquitto.org:1883..
background log: info: 09:00:46.330 [MQTT Call: paho329851808627927] INFO  com.sandinh.paho.akka.MqttPubSub - connected
background log: info: [INFO] [04/29/2016 09:00:46.334] [IotSystem-akka.actor.default-dispatcher-2] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> MQTT subscription to akka-iot-mqtt-topic acknowledged
background log: info: 09:00:46.513 [MQTT Call: paho329851808627927] INFO  com.sandinh.paho.akka.MqttPubSub - subscribed to [akka-iot-mqtt-topic]
background log: info: [INFO] [04/29/2016 09:00:49.413] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Device Id security-alarm-9094 | Device State OFF
background log: info: [INFO] [04/29/2016 09:00:49.553] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Publishing MQTT Topic akka-iot-mqtt-topic: Device Id security-alarm-9094 | Device State OFF
background log: info: 09:00:49.555 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - delivery complete [akka-iot-mqtt-topic]
background log: info: 09:00:49.736 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - message arrived akka-iot-mqtt-topic
background log: info: [INFO] [04/29/2016 09:00:49.738] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Received MQTT message: Device Id security-alarm-9094 | Device State OFF | Work Id 9016bdbd-e70d-4546-8cc8-6719c02f8404
background log: info: [INFO] [04/29/2016 09:00:49.738] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Sending work to cluster master
...
...
background log: info: [INFO] [04/29/2016 09:05:09.264] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Device Id lamp-5095 | Device State ON
background log: info: [INFO] [04/29/2016 09:05:09.264] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Publishing MQTT Topic akka-iot-mqtt-topic: Device Id lamp-5095 | Device State ON
background log: info: 09:05:09.265 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - delivery complete [akka-iot-mqtt-topic]
background log: info: 09:05:09.444 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - message arrived akka-iot-mqtt-topic
background log: info: [INFO] [04/29/2016 09:05:09.445] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Received MQTT message: Device Id lamp-5095 | Device State ON | Work Id 4fc64049-fa21-49c6-9d54-743eaa055373
background log: info: [INFO] [04/29/2016 09:05:09.445] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Sending work to cluster master
background log: info: [INFO] [04/29/2016 09:05:14.674] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Device Id security-alarm-9095 | Device State OFF
background log: info: [INFO] [04/29/2016 09:05:14.675] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Publishing MQTT Topic akka-iot-mqtt-topic: Device Id security-alarm-9095 | Device State OFF
background log: info: 09:05:14.675 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - delivery complete [akka-iot-mqtt-topic]
background log: info: 09:05:15.008 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - message arrived akka-iot-mqtt-topic
background log: info: [INFO] [04/29/2016 09:05:15.008] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Received MQTT message: Device Id security-alarm-9095 | Device State OFF | Work Id a66d1079-5083-491e-9b5e-885e7914426f
background log: info: [INFO] [04/29/2016 09:05:15.008] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Sending work to cluster master
background log: info: [INFO] [04/29/2016 09:05:22.213] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Device Id thermostat-1004 | Device State HEAT
background log: info: [INFO] [04/29/2016 09:05:22.213] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Publishing MQTT Topic akka-iot-mqtt-topic: Device Id thermostat-1004 | Device State HEAT
background log: info: 09:05:22.229 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - delivery complete [akka-iot-mqtt-topic]
background log: info: 09:05:22.409 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - message arrived akka-iot-mqtt-topic
background log: info: [INFO] [04/29/2016 09:05:22.409] [IotSystem-akka.actor.default-dispatcher-4] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Received MQTT message: Device Id thermostat-1004 | Device State HEAT | Work Id bc502538-5baa-494d-8565-5cb91ad27001
background log: info: [INFO] [04/29/2016 09:05:22.409] [IotSystem-akka.actor.default-dispatcher-4] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Sending work to cluster master
background log: info: [INFO] [04/29/2016 09:05:31.593] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Device Id thermostat-1044 | Device State HEAT
background log: info: [INFO] [04/29/2016 09:05:31.593] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Publishing MQTT Topic akka-iot-mqtt-topic: Device Id thermostat-1044 | Device State HEAT
background log: info: 09:05:31.593 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - delivery complete [akka-iot-mqtt-topic]
background log: info: 09:05:31.776 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - message arrived akka-iot-mqtt-topic
background log: info: [INFO] [04/29/2016 09:05:31.777] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Received MQTT message: Device Id thermostat-1044 | Device State HEAT | Work Id 1c815602-9666-4c5d-9029-4fdd719f48b4
background log: info: [INFO] [04/29/2016 09:05:31.777] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Sending work to cluster master
background log: info: [INFO] [04/29/2016 09:05:35.983] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Device Id lamp-5471 | Device State OFF
background log: info: [INFO] [04/29/2016 09:05:35.983] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Publishing MQTT Topic akka-iot-mqtt-topic: Device Id lamp-5471 | Device State OFF
background log: info: 09:05:35.984 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - delivery complete [akka-iot-mqtt-topic]
background log: info: 09:05:36.166 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - message arrived akka-iot-mqtt-topic
background log: info: [INFO] [04/29/2016 09:05:36.166] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Received MQTT message: Device Id lamp-5471 | Device State OFF | Work Id fadc48cf-a8c7-4942-bd35-d97b7acf8ad9
background log: info: [INFO] [04/29/2016 09:05:36.166] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Sending work to cluster master
...
...

Console Output: Worker node:

$ bin/activator "runMain worker.Main 0"
[info] Loading project definition from /Users/leo/apps/scala/akka-iot-mqtt/project/project
[info] Loading project definition from /Users/leo/apps/scala/akka-iot-mqtt/project
[info] Set current project to akka-iot-mqtt (in build file:/Users/leo/apps/scala/akka-iot-mqtt/)
background log: info: Running worker.Main 0
background log: info: [INFO] [04/29/2016 09:02:21.265] [main] [akka.remote.Remoting] Starting remoting
background log: info: [INFO] [04/29/2016 09:02:23.385] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://WorkerSystem@127.0.0.1:54323]
background log: info: [INFO] [04/29/2016 09:02:23.419] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://WorkerSystem@127.0.0.1:54323]
background log: info: [INFO] [04/29/2016 09:02:26.278] [WorkerSystem-akka.actor.default-dispatcher-3] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/clusterClient] Connected to [akka.tcp://ClusterSystem@127.0.0.1:2552/system/receptionist]
...
...
background log: info: [INFO] [04/29/2016 09:05:09.932] [WorkerSystem-akka.actor.default-dispatcher-16] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Received work request: Job Adjust device | Device Id lamp-5095 | Device State ON
background log: info: [INFO] [04/29/2016 09:05:09.932] [WorkerSystem-akka.actor.default-dispatcher-16] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Finished work: Action NO CHANGE | Work Id 4fc64049-fa21-49c6-9d54-743eaa055373
background log: info: [INFO] [04/29/2016 09:05:15.368] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Received work request: Job Adjust device | Device Id security-alarm-9095 | Device State OFF
background log: info: [INFO] [04/29/2016 09:05:15.368] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Finished work: Action Switch light to ON | Work Id a66d1079-5083-491e-9b5e-885e7914426f
background log: info: [INFO] [04/29/2016 09:05:22.701] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Received work request: Job Adjust device | Device Id thermostat-1004 | Device State HEAT
background log: info: [INFO] [04/29/2016 09:05:22.701] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Finished work: Action LOWER temperature by 1F | Work Id bc502538-5baa-494d-8565-5cb91ad27001
background log: info: [INFO] [04/29/2016 09:05:32.068] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Received work request: Job Adjust device | Device Id thermostat-1044 | Device State HEAT
background log: info: [INFO] [04/29/2016 09:05:32.068] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Finished work: Action RAISE temperature by 2F | Work Id 1c815602-9666-4c5d-9029-4fdd719f48b4
background log: info: [INFO] [04/29/2016 09:05:36.361] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Received work request: Job Adjust device | Device Id lamp-5471 | Device State OFF
background log: info: [INFO] [04/29/2016 09:05:36.361] [WorkerSystem-akka.actor.default-dispatcher-3] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Finished work: Action Switch light to ON | Work Id fadc48cf-a8c7-4942-bd35-d97b7acf8ad9
...
...

Scala Distributed Systems With Akka

A recent R&D project prompted me to look into a programming platform for a distributed system. Storm coupled with Kafka popped up as a candidate, but as streaming wasn’t part of the system requirement, I decided to try out an actor-based system. Between Java and Scala, I had in mind Scala as the programming language primarily for its good mix of object-oriented and functional programming styles, as mentioned in a previous post about Scala-Play.

Naturally, Scala-Akka became a prime candidate. During the technology evaluation phase, I came across a couple of sample applications from Lightbend (formerly Typesafe) that I think are great tutorials for getting into Scala + Akka. Certain illustrated techniques for performance and scalability in some of the more comprehensive applications are particularly useful. Although the Play framework serves a great application development platform, it’s of less interest from a product functionality perspective.

Akka Actor Systems

Akka is an open-source actor library/toolkit targeted for building scalable concurrent and distributed applications in Scala or Java. At the core of Akka are lightweight computational primitives called actors, each of which encapsulates state and behavior, and communicates via asynchronous immutable message passing.

It’s important to note that keeping messages immutable and non-blocking are some of the fundamental best-practices that the Actor model is designed for. In other words, they aren’t enforced by the model itself, hence it’s the developer’s responsibility to embrace the best practices. The underlying shared-nothing principle of actors with message passing the only means of interactions makes it an appealing concurrency model, as opposed to managing shared memory with locks in general thread-based concurrency.

Sample app #1: Pi approximation

To quickly grasp how to use Akka Actors to solve computational problems, it might be worth taking a look at a sample application for approximating value of pi (i.e. ?). Although the sample application consists of deprecated code, I still find it a nice example for understanding how to craft out the computational components as actors and coordinate partial result passing via messages among the actors.

Given the dated code, one might want to just skim through the source code of the application while skipping the syntactic details. It shows how easy it is to formulate a distributed computation scheme by making the computation workers (Worker), aggregator (Master) and output listener (Listener) as actors, each playing different roles. A couple of notes:

  1. In general, “tell” (i.e. fire-and-forget) is preferred to “ask” in sending messages for performance reason. It makes sense in this application since it’s an approximation task, hence failure of a worker in a rare case isn’t end of the world.
  2. Instead of having all actors defined in a single source file as in this example, actors are often defined separately in a slightly more complex application. It’s a common practice that actor classes are defined using companion objects in Scala. For instance, the Worker actor would be something like the following:
  object Worker {
    sealed trait PiApprox
    case class Work(start: Int, nrOfElements: Int) extends PiApprox
    case class Result(value: Double) extends PiApprox
  }

  class Worker extends Actor {
    def calculatePiFor(start: Int, nrOfElements: Int): Double = {
      ...
    }
    def receive = {
      case Work(start, nrOfElements) =>
        sender ! Result(calculatePiFor(start, nrOfElements))
    }
  }

Sample app #2: Reactive maps

Lightbend provides a functionality-rich sample application, reactive maps, that illustrates a number of features centered around an actor system, including:

  • GPS using HTML5 geolocation
  • Bot simulation on geospatial maps
  • Play’s handling WebSockets with actors
  • Actor dependency injection
  • Akka’s peer-to-peer cluster
  • Distributed publish/subscribe in cluster
  • Akka persistence and journal
  • Akka cluster sharding
  • Reactive application deployment

Like most of their sample application templates, reactive-maps comes with a tutorial that walks through the application. What I like about this one is that it starts with a more barebone working version and proceeds to enhance with more robust features. In the second half of the application walk-thru, a new feature for user travel distance tracking is created from scratch and then rewritten to address scalability issue by means of improved design of the associated actor as well as using of Akka persistence/journal and cluster sharding.

Due to the rather wide range of features involved in the application, it might take some effort to go over the whole walk-thru. Nevertheless, I think it’s a worthy exercise to pick up some neat techniques in building a real-world application using Scala and Akka.

Deprecated Akka persistence interface

The source code seems to be rather up to date, although the deprecated Akka persistence interface EventsourcedProcessor does generate some compiler warning. To fix it, use trait PersistenActor instead and override the now abstract PersistenceId method. The relevant code after the fix should be as follows:

/app/backend/UserMetaData.scala:

...
// import akka.persistence.EventsourcedProcessor
import akka.persistence.PersistentActor
...
// class UserMetaData extends EventsourcedProcessor {
class UserMetaData extends PersistentActor {
  ...
  override def persistenceId = self.path.toStringWithoutAddress
  override def receiveRecover: Receive = {
    ...
  }
  ...

Issue with dependency injection and cluster sharding

There is a bug caught during compilation that arises from the binding for UserMetaData actors in the Play module, Actors.scala, responsible for initializing actors for the web frontend. Dependency injection is used in the module to bind actors of backend role that need to be created from the backend. The cluster-sharded UserMetaData actors now need to be created with the ClusterSharding extension hence requiring a special binding. This new binding causes an exception as follows:

Unexpected exception CreationException: Unable to create injector, see the following errors:

Error in custom provider, java.lang.IllegalArgumentException:
  Shard type [UserMetaData] must be started first while locating actors.UserMetaDataProvider
  at actors.Actors.configure(Actors.scala:26)
  (via modules: com.google.inject.util.Modules$OverrideModule -> actors.Actors)
  while locating akka.actor.ActorRef annotated with @com.google.inject.name.Named(value=userMetaData)
Caused by: java.lang.IllegalArgumentException: Shard type [UserMetaData] must be started first
	at akka.contrib.pattern.ClusterSharding.shardRegion(ClusterSharding.scala:337)
	at actors.UserMetaDataProvider.get$lzycompute(Actors.scala:36)
	at actors.UserMetaDataProvider.get(Actors.scala:36)
	at actors.UserMetaDataProvider.get(Actors.scala:35)
	at com.google.inject.internal.ProviderInternalFactory.provision(ProviderInternalFactory.java:81)
        ...

It can be fixed by moving the ClusterSharding related code from class BackendActors into class UserMetaDataProvider, as follows:

/app/actors/Actors.scala:

...
class UserMetaDataProvider @Inject() (system: ActorSystem) extends Provider[ActorRef] {
  lazy val get = {
    if (Cluster(system).selfRoles.exists(r => r.startsWith("backend"))) {
      ClusterSharding(system).start(
        typeName = UserMetaData.shardName,
        entryProps = Some(UserMetaData.props),
        idExtractor = UserMetaData.idExtractor,
        shardResolver = UserMetaData.shardResolver)
    }
    else {
      ClusterSharding(system).start(
        typeName = UserMetaData.shardName,
        entryProps = None,
        idExtractor = UserMetaData.idExtractor,
        shardResolver = UserMetaData.shardResolver)
    }
    ClusterSharding(system).shardRegion(UserMetaData.shardName)
  }
}
...

Akka persistence journal dependency issue

Near the end of the example walk-thru is a custom journal setup using the key-value store Leveldb. However the setup fails at run-time with errors as follows:

[ERROR] [application-akka.actor.default-dispatcher-2] [ActorSystem(application)] Uncaught fatal error from thread
  [application-akka.actor.default-dispatcher-2] shutting down ActorSystem [application]
java.lang.NoSuchMethodError: com.google.common.io.Closeables.closeQuietly(Ljava/io/Closeable;)V
	at org.iq80.leveldb.impl.MMapLogWriter.close(MMapLogWriter.java:83)
	at org.iq80.leveldb.impl.VersionSet.initializeIfNeeded(VersionSet.java:111)
	at org.iq80.leveldb.impl.VersionSet.(VersionSet.java:91)
	at org.iq80.leveldb.impl.DbImpl.(DbImpl.java:167)
	at org.iq80.leveldb.impl.Iq80DBFactory.open(Iq80DBFactory.java:59)
	at akka.persistence.journal.leveldb.LeveldbStore$class.preStart(LeveldbStore.scala:112)
	at akka.persistence.journal.leveldb.SharedLeveldbStore.preStart(LeveldbStore.scala:127)
	...
[ERROR] [application-akka.actor.default-dispatcher-4] [akka://application/user/store] null
java.lang.NullPointerException
	at akka.persistence.journal.leveldb.LeveldbStore$class.postStop(LeveldbStore.scala:117)
	at akka.persistence.journal.leveldb.SharedLeveldbStore.postStop(LeveldbStore.scala:127)
	at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
	at akka.persistence.journal.leveldb.SharedLeveldbStore.aroundPostStop(LeveldbStore.scala:127)
        ...

Some relevant bug reports in the Akka community suggests that it’s a problem with LevelDB’s dependency on some dated version of Google Guava. Since the default journal seems to work fine and the custom setup isn’t for production-grade journal anyway, I’m going to skip it. Source code with the above changes can be found at GitHub. In a production environment, one would probably want to use Redis, PostgreSQL, HBase, etc, for the persistence journal.

Below is a screen-shot of the final version of the reactive-maps application.

Scala Akka Reactive Maps

Final thoughts

Despite the described glitches, Lightbend’s reactive-maps application is a well-thought-out tutorial, methodically stepping through the thought process from design to implementation, along with helpful remarks related to real-world performance and scalability. Even though the sample application is primarily for illustration, it’s no trivial hello-world and a lot of demonstrated techniques could be borrowed or repurposed in a production-grade actor system.

As mentioned earlier in the post, I think Akka actor with its shared-nothing principle and non-blocking message-passing communication is a great alternative to the thread-based shared-memory concurrency model in which deadlocks and expensive context switching could be dreadful to deal with. However, in building a well-designed actor-based application, it does require proper architectural work and discipline for best-practices to componentize tasks into lightweight actors which interact among themselves by means of immutable message passing.

On scalability, there are powerful features like actor cluster sharding and distributed publish/subscribe that allow one to build actor systems that can scale horizontally. And last but not least, Scala’s deep root in both object-oriented programming and functional programming makes it a effective tool for the coding task.