Tag Archives: iot

Scala IoT Systems With Akka Actors II

Back in 2016, I built an Internet-of-Thing (IoT) prototype system leveraging the “minimalist” design principle of the Actor model to simulate low-cost, low-powered IoT devices. A simplified version of the prototype was published in a previous blog post. The stripped-down application was written in Scala along with the Akka Actors run-time library, which is arguably the predominant Actor model implementation at present. Message Queue Telemetry Transport (MQTT) was used as the publish-subscribe messaging protocol for the simulated IoT devices. For simplicity, a single actor was used to simulate requests from a bunch of IoT devices.

In this blog post, I would like to share a version closer to the design of the full prototype system. With the same tech stack used in the previous application, it’s an expanded version (hence, II) that uses loosely-coupled lightweight actors to simulate individual IoT devices, each of which maintains its own internal state and handles bidirectional communications via non-blocking message passing. Using a distributed workers system adapted from a Lightbend template along with a persistence journal, the end product is an IoT system equipped with a scalable fault-tolerant data processing system.

Main components

Below is a diagram and a summary of the revised Scala application which consists of 3 main components:

IoT with MQTT and Akka Actor Systems v.2

1. IoT

  • An IotManager actor which:
    • instantiates a specified number of devices upon start-up
    • subscribes to a MQTT pub-sub topic for the work requests
    • sends received work requests via ClusterClient to the master cluster
    • notifies Device actors upon receiving failure messages from Master actor
    • forwards work results to the corresponding devices upon receiving them from ResultProcessor
  • Device actors each of which:
    • simulates a thermostat, lamp, or security alarm with random initial state and setting
    • maintains and updates internal state and setting upon receiving work results from IotManager
    • generates work requests and publishes them to the MQTT pub-sub topic
    • re-publishes requests upon receiving failure messages from IotManager
  • A MQTT pub-sub broker and a MQTT client for communicating with the broker
  • A configuration helper object, MqttConfig, consisting of:
    • MQTT pub-sub topic
    • URL for the MQTT broker
    • serialization methods to convert objects to byte arrays, and vice versa

2. Master Cluster

  • A fault-tolerant decentralized cluster which:
    • manages a singleton actor instance among the cluster nodes (with a specified role)
    • delegates ClusterClientReceptionist on every node to answer external connection requests
    • provides fail-over of the singleton actor to the next-oldest node in the cluster
  • A Master singleton actor which:
    • registers Workers and distributes work to available Workers
    • acknowledges work request reception with IotManager
    • publishes work results from Workers to ‘work-results’ topic via Akka distributed pub-sub
    • maintains work states using persistence journal
  • A ResultProcessor actor in the master cluster which:
    • gets instantiated upon starting up the IoT system (more on this below)
    • consumes work results by subscribing to the ‘work-results’ topic
    • sends work results received from Master to IotManager

3. Workers

  • An actor system of Workers each of which:
    • communicates via ClusterClient with the master cluster
    • registers with, pulls work from the Master actor
    • reports work status with the Master actor
    • instantiates a WorkProcessor actor to perform the actual work
  • WorkProcessor actors each of which:
    • processes the work requests from its parent Worker
    • generates work results and send back to Worker

Master-worker system with a ‘pull’ model

While significant changes have been made to the IoT actor system, much of the setup for the Master/Worker actor systems and MQTT pub-sub messaging remains largely unchanged from the previous version:

  • As separate independent actor systems, both the IoT and Worker systems communicate with the Master cluster via ClusterClient.
  • Using a ‘pull’ model which generally performs better at scale, the Worker actors register with the Master cluster and pull work when available.
  • Paho-Akka is used as the MQTT pub-sub messaging client.
  • A helper object, MqttConfig, encapsulates a MQTT pub-sub topic and broker information along with serialization methods to handle MQTT messaging using a test Mosquitto broker.

What’s new?

Now, let’s look at the major changes in the revised application:

First of all, Lightbend’s Activator has been retired and Sbt is being used instead.

On persisting actors state, a Redis data store is used as the persistence journal. In the previous version the shared LevelDB journal is coupled with the first seed node which becomes a single point of failure. With the Redis persistence journal decoupled from a specific cluster node, fault tolerance steps up a notch.

As mentioned earlier in the post, one of the key changes to the previous application is the using of actors representing individual IoT devices each with its own state and capability of communicating with entities designated for interfacing with external actor systems. Actors, lightweight and loosely-coupled by design, serve as an excellent vehicle for modeling individual IoT devices. In addition, non-blocking message passing among actors provides an efficient and economical means for communication and logic control of the device state.

The IotManager actor is responsible for creating and managing a specified number of Device actors. Upon startup, the IoT manager instantiates individual Device actors of random device type (thermostat, lamp or security alarm). These devices are maintained in an internal registry regularly updated by the IoT manager.

Each of the Device actors starts up with a random state and setting. For instance, a thermostat device may start with an ON state and a temperature setting of 68F whereas a lamp device might have an initial state of OFF and brightness setting of 2. Once instantiated, a Device actor will maintain its internal operational state and setting from then on and will report and update the state and setting per request.

Work and WorkResult

In this application, a Work object represents a request sent by a specific Device actor and carries the Device’s Id and its current state and setting data. A WorkResult object, on the other hand, represents a returned request for the Device actor to update its state and setting stored within the object.

Responsible for processing the WorkResult generated by the Worker actors, the ResultProcessor actor simulates the processing of work result – in this case it simply sends via the actorSelection method the work result back to the original Device actor through IotManager. Interacting with only the Master cluster system as a cluster client, the Worker actors have no knowledge of the ResultProcessor actor. ResultProcessor receives the work result through subscribing to the Akka distributed pub-sub topic which the Master is the publisher.

While a participant of the Master cluster actor system, the ResultProcessor actor gets instantiated when the IoT actor system starts up. The decoupling of ResultProcessor instantiation from the Master cluster ensures that no excessive ResultProcessor instances get started when multiple Master cluster nodes start up.

Test running the application

Complete source code of the application is available at GitHub.

To run the application on a single JVM, just git-clone the repo, run the following command at a command line terminal and observe the console output:

# Start a Redis server accessible to the master cluster to serve as the persistence journal:
$ nohup redis-server /path/to/conf &

# Launch the master cluster with 2 seed nodes, IoT actor system and Worker actor system:
$ cd {project-root}
$ bin/sbt "runMain akkaiot.Main [NumOfDevices]"

The optional NumOfDevices parameter defaults to 20.

To run the application on separate JVMs, git-clone the repo to a local disk, open up separate command line terminals and launch the different components on separate terminals:

# Start a Redis server accessible to the master cluster to serve as the persistence journal:
$ nohup redis-server /path/to/conf &

cd {project-root}
# Launch the master cluster seed node with persistence journal:
$ bin/sbt "runMain akkaiot.Main 2551"

# Launch additional master cluster seed node:
$ bin/sbt "runMain akkaiot.Main 2552"

# Launch the IoT node:
$ bin/sbt "runMain akkaiot.Main 3001 [NumOfDevices]"

# Launch a Worker node:
$ bin/sbt "runMain akkaiot.Main 0"

# Launch additional Worker node:
$ bin/sbt "runMain akkaiot.Main 0"

Sample console log

Below is filtered console log output from the console tracing the evolving state and setting of a thermostat device:

#
### Console log filtered for device thermostat-1015
#
[info] [INFO] [07/12/2017 14:38:44.707] [IotSystem-akka.actor.default-dispatcher-21] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 started
[info] [INFO] [07/12/2017 14:38:49.726] [IotSystem-akka.actor.default-dispatcher-29] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 with state 0 created work (Id: 69d82872-a9f4-492e-8ae4-28229b797994) 
[info] [INFO] [07/12/2017 14:38:49.726] [IotSystem-akka.actor.default-dispatcher-29] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Publishing MQTT Topic akka-iot-mqtt-topic: Device thermostat-1015
[info] [INFO] [07/12/2017 14:38:49.915] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Agent -> Received MQTT message: thermostat-1015 | State 0 | Setting 70
[info] [INFO] [07/12/2017 14:38:50.261] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work for thermostat-1015 : Work Id 69d82872-a9f4-492e-8ae4-28229b797994
[info] [INFO] [07/12/2017 14:38:50.265] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Work for thermostat-1015 accepted | Work Id 69d82872-a9f4-492e-8ae4-28229b797994
[info] [INFO] [07/12/2017 14:38:50.488] [ClusterSystem-akka.actor.default-dispatcher-25] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work for thermostat-1015 to Worker 3ec3b314-1c35-4e57-92b3-41c9edb86fbc | Work Id 69d82872-a9f4-492e-8ae4-28229b797994
[info] [INFO] [07/12/2017 14:38:50.489] [WorkerSystem-akka.actor.default-dispatcher-2] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Received work request from thermostat-1015 | State 0 | Setting 70
[info] [INFO] [07/12/2017 14:38:50.489] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker/work-processor] Work Processor -> thermostat-1015: Switch to COOL | LOWER temperature by -2
[info] [INFO] [07/12/2017 14:38:50.489] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Processed work: thermostat-1015 | Work Id 69d82872-a9f4-492e-8ae4-28229b797994
[info] [INFO] [07/12/2017 14:38:50.493] [ClusterSystem-akka.actor.default-dispatcher-27] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Got work result: thermostat-1015 | State 2 | Setting 68
[info] [INFO] [07/12/2017 14:38:50.493] [ClusterSystem-akka.actor.default-dispatcher-27] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Sent work result for thermostat-1015 to IoT Manager
[info] [INFO] [07/12/2017 14:38:50.495] [IotSystem-akka.actor.default-dispatcher-33] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Manager -> Work result forwarded to thermostat-1015 
[info] [INFO] [07/12/2017 14:38:50.495] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 received work result with work Id 69d82872-a9f4-492e-8ae4-28229b797994.
[info] [INFO] [07/12/2017 14:38:50.495] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Updated thermostat-1015 with state 2 and setting 68.
[info] [INFO] [07/12/2017 14:38:55.275] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 with state 2 created work (Id: df9622cd-6b80-42e9-be1b-f0a92d002d75) 
[info] [INFO] [07/12/2017 14:38:55.275] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Publishing MQTT Topic akka-iot-mqtt-topic: Device thermostat-1015
[info] [INFO] [07/12/2017 14:38:55.578] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Agent -> Received MQTT message: thermostat-1015 | State 2 | Setting 68
[info] [INFO] [07/12/2017 14:38:55.580] [ClusterSystem-akka.actor.default-dispatcher-25] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work for thermostat-1015 : Work Id df9622cd-6b80-42e9-be1b-f0a92d002d75
[info] [INFO] [07/12/2017 14:38:55.583] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Work for thermostat-1015 accepted | Work Id df9622cd-6b80-42e9-be1b-f0a92d002d75
[info] [INFO] [07/12/2017 14:38:55.586] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work for thermostat-1015 to Worker 3ec3b314-1c35-4e57-92b3-41c9edb86fbc | Work Id df9622cd-6b80-42e9-be1b-f0a92d002d75
[info] [INFO] [07/12/2017 14:38:55.587] [WorkerSystem-akka.actor.default-dispatcher-3] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Received work request from thermostat-1015 | State 2 | Setting 68
[info] [INFO] [07/12/2017 14:38:55.587] [WorkerSystem-akka.actor.default-dispatcher-3] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker/work-processor] Work Processor -> thermostat-1015: Keep state COOL | LOWER temperature by -2
[info] [INFO] [07/12/2017 14:38:55.587] [WorkerSystem-akka.actor.default-dispatcher-3] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Processed work: thermostat-1015 | Work Id df9622cd-6b80-42e9-be1b-f0a92d002d75
[info] [INFO] [07/12/2017 14:38:55.590] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Got work result: thermostat-1015 | State 2 | Setting 66
[info] [INFO] [07/12/2017 14:38:55.590] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Sent work result for thermostat-1015 to IoT Manager
[info] [INFO] [07/12/2017 14:38:55.591] [IotSystem-akka.actor.default-dispatcher-18] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Manager -> Work result forwarded to thermostat-1015 
[info] [INFO] [07/12/2017 14:38:55.591] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 received work result with work Id df9622cd-6b80-42e9-be1b-f0a92d002d75.
[info] [INFO] [07/12/2017 14:38:55.591] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Updated thermostat-1015 with state 2 and setting 66.
[info] [INFO] [07/12/2017 14:39:01.596] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 with state 2 created work (Id: c57d21af-3957-43ba-a995-f4f558900fa3) 
[info] [INFO] [07/12/2017 14:39:01.597] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Publishing MQTT Topic akka-iot-mqtt-topic: Device thermostat-1015
[info] [INFO] [07/12/2017 14:39:01.752] [IotSystem-akka.actor.default-dispatcher-17] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Agent -> Received MQTT message: thermostat-1015 | State 2 | Setting 66
[info] [INFO] [07/12/2017 14:39:01.753] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work for thermostat-1015 : Work Id c57d21af-3957-43ba-a995-f4f558900fa3
[info] [INFO] [07/12/2017 14:39:01.755] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Work for thermostat-1015 accepted | Work Id c57d21af-3957-43ba-a995-f4f558900fa3
[info] [INFO] [07/12/2017 14:39:01.757] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work for thermostat-1015 to Worker 3ec3b314-1c35-4e57-92b3-41c9edb86fbc | Work Id c57d21af-3957-43ba-a995-f4f558900fa3
[info] [INFO] [07/12/2017 14:39:01.758] [WorkerSystem-akka.actor.default-dispatcher-18] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Received work request from thermostat-1015 | State 2 | Setting 66
[info] [INFO] [07/12/2017 14:39:01.758] [WorkerSystem-akka.actor.default-dispatcher-18] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker/work-processor] Work Processor -> thermostat-1015: Switch to OFF | RAISE temperature by 2
[info] [INFO] [07/12/2017 14:39:01.758] [WorkerSystem-akka.actor.default-dispatcher-18] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Processed work: thermostat-1015 | Work Id c57d21af-3957-43ba-a995-f4f558900fa3
[info] [INFO] [07/12/2017 14:39:01.760] [ClusterSystem-akka.actor.default-dispatcher-25] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Got work result: thermostat-1015 | State 0 | Setting 68
[info] [INFO] [07/12/2017 14:39:01.761] [ClusterSystem-akka.actor.default-dispatcher-25] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Sent work result for thermostat-1015 to IoT Manager
[info] [INFO] [07/12/2017 14:39:01.761] [IotSystem-akka.actor.default-dispatcher-18] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Manager -> Work result forwarded to thermostat-1015 
[info] [INFO] [07/12/2017 14:39:01.761] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 received work result with work Id c57d21af-3957-43ba-a995-f4f558900fa3.
[info] [INFO] [07/12/2017 14:39:01.761] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Updated thermostat-1015 with state 0 and setting 68.
....
....

The following annotated console log showcases fault-tolerance of the master cluster – how it fails over to the 2nd node upon detecting that the 1st node crashes:

#
### Annotated console log from the 2nd cluster node
#
<<<
<<<--- 2nd cluster node (with port# 2552) starts shortly after 1st node (with port# 2551) has started.
<<<
Leos-MBP:akka-iot-mqtt-v2 leo$ bin/sbt "runMain akkaiot.Main 2552"
[info] Loading project definition from /Users/leo/apps/scala/akka-iot-mqtt-v2/project
[info] Set current project to akka-iot-mqtt (in build file:/Users/leo/apps/scala/akka-iot-mqtt-v2/)
[info] Running akkaiot.Main 2552
[info] [INFO] [07/12/2017 14:24:16.491] [main] [akka.remote.Remoting] Starting remoting
[info] [INFO] [07/12/2017 14:24:16.661] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2552]
[info] [INFO] [07/12/2017 14:24:16.662] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://ClusterSystem@127.0.0.1:2552]
[info] [INFO] [07/12/2017 14:24:16.675] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Starting up...
[info] [INFO] [07/12/2017 14:24:16.790] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Registered cluster JMX MBean [akka:type=Cluster]
[info] [INFO] [07/12/2017 14:24:16.790] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Started up successfully
[info] [INFO] [07/12/2017 14:24:16.794] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[info] [INFO] [07/12/2017 14:24:16.797] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Metrics collection has started successfully
[info] [WARN] [07/12/2017 14:24:16.822] [ClusterSystem-akka.actor.default-dispatcher-18] [akka.tcp://ClusterSystem@127.0.0.1:2552/system/cluster/core/daemon/downingProvider] Don't use auto-down feature of Akka Cluster in production. See 'Auto-downing (DO NOT USE)' section of Akka Cluster documentation.
<<<
<<<--- Gets 'welcome' acknowledgement from 1st cluster node.
<<<
[info] [INFO] [07/12/2017 14:24:17.234] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]
[info] [INFO] [07/12/2017 14:24:17.697] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master] ClusterSingletonManager state change [Start -> Younger]
<<<
<<<--- 1st cluster node crashes after running for about a minute!
<<<
[info] [WARN] [07/12/2017 14:25:17.329] [ClusterSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://ClusterSystem@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
[info] [INFO] [07/12/2017 14:25:17.336] [ClusterSystem-akka.actor.default-dispatcher-24] [akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0/endpointWriter] Message [akka.remote.EndpointWriter$AckIdleCheckTimer$] from Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0/endpointWriter#1930129898] to Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0/endpointWriter#1930129898] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[info] [INFO] [07/12/2017 14:25:17.828] [ClusterSystem-akka.actor.default-dispatcher-24] [akka://ClusterSystem/deadLetters] Message [akka.cluster.pubsub.DistributedPubSubMediator$Internal$Status] from Actor[akka://ClusterSystem/system/distributedPubSubMediator#-2106783060] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[info] [INFO] [07/12/2017 14:25:18.257] [ClusterSystem-akka.actor.default-dispatcher-24] [akka://ClusterSystem/deadLetters] Message [akka.cluster.ClusterHeartbeatSender$Heartbeat] from Actor[akka://ClusterSystem/system/cluster/core/daemon/heartbeatSender#-975290267] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[info] [INFO] [07/12/2017 14:25:18.826] [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/deadLetters] Message [akka.cluster.pubsub.DistributedPubSubMediator$Internal$Status] from Actor[akka://ClusterSystem/system/distributedPubSubMediator#-2106783060] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
....
....
[info] [WARN] [07/12/2017 14:25:20.828] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://ClusterSystem@127.0.0.1:2552/system/cluster/core/daemon] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Marking node(s) as UNREACHABLE [Member(address = akka.tcp://ClusterSystem@127.0.0.1:2551, status = Up)]. Node roles [backend]
[info] [INFO] [07/12/2017 14:25:21.258] [ClusterSystem-akka.actor.default-dispatcher-20] [akka://ClusterSystem/deadLetters] Message [akka.cluster.ClusterHeartbeatSender$Heartbeat] from Actor[akka://ClusterSystem/system/cluster/core/daemon/heartbeatSender#-975290267] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[info] [INFO] [07/12/2017 14:25:21.827] [ClusterSystem-akka.actor.default-dispatcher-24] [akka://ClusterSystem/deadLetters] Message [akka.cluster.pubsub.DistributedPubSubMediator$Internal$Status] from Actor[akka://ClusterSystem/system/distributedPubSubMediator#-2106783060] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[info] [WARN] [07/12/2017 14:25:23.260] [New I/O boss #3] [NettyTransport(akka://ClusterSystem)] Remote connection to null failed with java.net.ConnectException: Connection refused: /127.0.0.1:2551
[info] [WARN] [07/12/2017 14:25:23.262] [ClusterSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://ClusterSystem@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://ClusterSystem@127.0.0.1:2551]] Caused by: [Connection refused: /127.0.0.1:2551]
[info] [WARN] [07/12/2017 14:25:28.829] [New I/O boss #3] [NettyTransport(akka://ClusterSystem)] Remote connection to null failed with java.net.ConnectException: Connection refused: /127.0.0.1:2551
[info] [WARN] [07/12/2017 14:25:28.830] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://ClusterSystem@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://ClusterSystem@127.0.0.1:2551]] Caused by: [Connection refused: /127.0.0.1:2551]
[info] [INFO] [07/12/2017 14:25:30.845] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Leader is auto-downing unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2551]. Don't use auto-down feature of Akka Cluster in production. See 'Auto-downing (DO NOT USE)' section of Akka Cluster documentation.
[info] [INFO] [07/12/2017 14:25:30.846] [ClusterSystem-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Marking unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2551] as [Down]
<<<
<<<--- 1st cluster node (with port# 2551) is marked as 'down'.
<<<
[info] [INFO] [07/12/2017 14:25:31.830] [ClusterSystem-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Leader is removing unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2551]
[info] [INFO] [07/12/2017 14:25:31.832] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master] Previous oldest removed [akka.tcp://ClusterSystem@127.0.0.1:2551]
<<<
<<<--- 2nd cluster node (with port# 2552) replaces 1st node to start a new master singleton actor.
<<<
[info] [INFO] [07/12/2017 14:25:31.833] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master] Younger observed OldestChanged: [None -> myself]
[info] [WARN] [07/12/2017 14:25:31.833] [ClusterSystem-akka.remote.default-remote-dispatcher-15] [akka.remote.Remoting] Association to [akka.tcp://ClusterSystem@127.0.0.1:2551] having UID [1664659180] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.
[info] [INFO] [07/12/2017 14:25:31.877] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master] Singleton manager starting singleton actor [akka://ClusterSystem/user/master/singleton]
[info] [INFO] [07/12/2017 14:25:31.878] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master] ClusterSingletonManager state change [Younger -> Oldest]
[info] [INFO] [07/12/2017 14:25:31.994] [ClusterSystem-rediscala.rediscala-client-worker-dispatcher-28] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/RedisClient-$a] Connect to localhost/127.0.0.1:6379
[info] [INFO] [07/12/2017 14:25:32.015] [ClusterSystem-rediscala.rediscala-client-worker-dispatcher-28] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/RedisClient-$a] Connected to localhost/127.0.0.1:6379
[info] [INFO] [07/12/2017 14:25:32.093] [ClusterSystem-rediscala.rediscala-client-worker-dispatcher-28] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/RedisClient-$b] Connect to localhost/127.0.0.1:6379
[info] [INFO] [07/12/2017 14:25:32.095] [ClusterSystem-rediscala.rediscala-client-worker-dispatcher-28] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/RedisClient-$b] Connected to localhost/127.0.0.1:6379
<<<
<<<--- The new master singleton actor starts replaying events stored in the persistence journal.
<<<
[info] [INFO] [07/12/2017 14:25:32.485] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkAccepted
[info] [INFO] [07/12/2017 14:25:32.486] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkAccepted
[info] [INFO] [07/12/2017 14:25:32.486] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkAccepted
....
....
[info] [INFO] [07/12/2017 14:25:32.532] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkStarted
[info] [INFO] [07/12/2017 14:25:32.532] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkCompleted
[info] [INFO] [07/12/2017 14:25:32.533] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkAccepted
[info] [INFO] [07/12/2017 14:25:32.533] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkStarted
[info] [INFO] [07/12/2017 14:25:32.533] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkCompleted
<<<
<<<--- The new master singleton actor starts resuming live operation
<<<
[info] [INFO] [07/12/2017 14:25:32.863] [ClusterSystem-akka.actor.default-dispatcher-40] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for security-alarm-1013 : Work Id f2a1e777-1e35-4b79-9acb-87477358adc1
[info] [WARN] [SECURITY][07/12/2017 14:25:32.874] [ClusterSystem-akka.persistence.dispatchers.default-plugin-dispatcher-26] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [akkaiot.WorkQueue$WorkAccepted] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
[info] [WARN] [SECURITY][07/12/2017 14:25:32.908] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [akkaiot.Master$Ack] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
[info] [INFO] [07/12/2017 14:25:33.694] [ClusterSystem-akka.actor.default-dispatcher-40] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for security-alarm-1001 : Work Id cfe1ecb3-683d-46f4-ab7e-d8c66fa88755
[info] [INFO] [07/12/2017 14:25:33.851] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for lamp-1002 : Work Id 469f7148-a6f8-43e6-aa2e-764ac911c251
[info] [INFO] [07/12/2017 14:25:34.014] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1015 : Work Id 92cb01ea-bb90-482c-b20b-accd4770f21a
[info] [INFO] [07/12/2017 14:25:34.172] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1005 : Work Id 73ccc2c1-ceb7-4923-b705-8fe82c666bc5
[info] [INFO] [07/12/2017 14:25:34.614] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1012 : Work Id 711b212c-c53d-4f41-a9fa-e8f5ad44ad55
[info] [INFO] [07/12/2017 14:25:35.040] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for lamp-1009 : Work Id 435264de-5891-4ee9-a7f8-352f7c0abc11
[info] [INFO] [07/12/2017 14:25:35.533] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1008 : Work Id 2873a66e-b04f-40d0-be49-0500449f54f4
[info] [INFO] [07/12/2017 14:25:35.692] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1020 : Work Id eefc57d2-8571-4820-8a6e-a7b083fdbaca
[info] [INFO] [07/12/2017 14:25:35.849] [ClusterSystem-akka.actor.default-dispatcher-37] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for lamp-1007 : Work Id bdcb4f22-4ca9-4b76-9f43-68a1168637cf
[info] [INFO] [07/12/2017 14:25:36.009] [ClusterSystem-akka.actor.default-dispatcher-37] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for security-alarm-1006 : Work Id 01999269-3608-4877-85ef-6c75f39f7db9
[info] [INFO] [07/12/2017 14:25:37.024] [ClusterSystem-akka.actor.default-dispatcher-37] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1004 : Work Id ccbc9edb-7a60-4f65-8121-261928f01ff2
....
....

Scaling for production

The Actor model is well suited for building scalable distributed systems. While the application has an underlying architecture that emphasizes on scalability, it would require further effort in the following areas to make it production ready:

  • IotManager uses the ‘ask’ method for message receipt confirmation via a Future return by the Master. If business logic allows, using the fire-and-forget ‘tell’ method will be significantly more efficient especially at scale.
  • The MQTT broker used in the application is a test broker provided by Mosquitto. A production version of the broker should be installed preferably local to the the IoT system. MQTT brokers from other vendors like HiveMQ, RabbitMQ are also available.
  • As displayed in the console log when running the application, Akka’s default Java serializer isn’t best known for its efficiency. Other serializers such as Kryo, Protocol Buffers should be considered.
  • The Redis data store for actor state persistence should be configured for production environment

Further code changes to be considered

A couple of changes to the current application might be worth considering:

Device types are currently represented as strings, and code logic for device type-specific states and settings is repeated during instantiation of devices and processing of work requests. Such logic could be encapsulated within classes defined for individual device types. The payload would probably be larger as a consequence, but it might be worth for better code maintainability especially if there are many device types.

Another change to be considered is that Work and WorkResult could be generalized into a single class. Conversely, they could be further differentiated in accordance with specific business needs. A slightly more extensive change would be to retire ResultProcessor altogether and let Worker actors process WorkResult as well.

State mutation in Akka Actors

In this application, a few actors maintain mutable internal states using private variables (private var):

  • Master
  • IotManager
  • Device

As an actor by-design will never be accessed by multiple threads, it’s generally safe enough to use ‘private var’ to store changed states. But if one prefers state transitioning (as opposed to updating), Akka Actors provides a method to hot-swap an actor’s internal state.

Hot-swapping an actor’s state

Below is a sample snippet that illustrates how hot-swapping mimics a state machine without having to use any mutable variable for maintaining the actor state:

// An Actor that mutates internal state by hot-swapping
object Worker {
  def props(workerId, String, clusterClient: ActorRef) = // ...

  // ...
}

class Worker(workerId, String, clusterClient: ActorRef) extends Actor {
  import Worker._

  override def preStart(): Unit = // Initialize worker
  override def postStop(): Unit = // Terminate worker

  def sendToMaster(msg: Any): Unit = {
    clusterClient ! SendToAll("/user/master/singleton", msg)
  }

  // ...

  def receive = idle

  def idle: Receive = {
    case WorkIsReady =>
      // Tell Master it is free
      sendToMaster(WorkerIsFree(workerId))

    case Work =>
      // Send work to a work processing actor
      workProcessor ! work
      context.become(working)
  }

  def working: Receive = {
    case WorkProcessed(workResult) =>
      // Tell Master work is done
      sendToMaster(WorkIsDone(workerId, workResult))
      context.become(idle)

    case Work =>
      // Tell Master it is busy
      sendToMaster(WorkerIsBusy(workerId))
  }
}

Simplified for illustration, the above snippet depicts a Worker actor that pulls work from the Master cluster. The context.become method allows the actor to switch its internal state at run-time like a state machine. As shown in the simplified code, it takes an ‘Actor.Receive’ (which is a partial function) that implements a new message handler. Under the hood, Akka manages the hot-swapping via a stack. As a side note, according to the relevant source code, the stack for hot-swapping actor behavior is, ironically, a mutable ‘private var’ of List[Actor.Receive].

Recursive transformation of immutable parameter

Another functional approach to mutating actor state is via recursive transformation of an immutable parameter. As an example, we can avoid using a mutable ‘private var registry’ as shown in the following ActorManager actor and use ‘context.become’ to recursively transform a registry as an immutable parameter passed to the updateState method:

// Approach #1: Store map entries in a mutable registry
class ActorManager extends Actor {
  private var registry: Map.empty[String, ActorRef]
  def receive = {
    case Add(id, ref) =>
      registry += id -> ref
    // Other cases
  }
}
// Approach #2: Store map entries in a recursively transformed immutable registry
class ActorManager extends Actor {
  def receive = updateRegistry(Map.empty[String, ActorRef])
  def updateState(registry: Map[String, ActorRef]): Receive = {
    case Add(id, ref) =>
      context.become(updateState(registry + (id -> ref)))
    // Other cases
  }
}

Internet-of-Things And Akka Actors

IoT (Internet of Things) has recently been one of the most popular buzzwords. Despite being over-hyped, we’re indeed heading towards a foreseeable world in which all sorts of things are inter-connected. Before IoT became a hot acronym, I was heavily involved in building a Home-Area-Network SaaS platform over the course of 5 years in a previous startup I cofounded, so it’s no stranger to me.

At the low-level device network layer, there used to be platform service companies providing gateway hardware along with proprietary APIs for IoT devices running on sensor network protocols (such as ZigBee, Z-Wave). The landscape has been evolving over the past couple of years. As more and more companies begin to throw their weight behind building products in the IoT ecosystem, open standards for device connectivity emerge. One of them is MQTT (Message Queue Telemetry Transport).

Message Queue Telemetry Transport

MQTT had been relatively little-known until it was standardized at OASIS a couple of years ago. The lightweight publish-subscribe messaging protocol, MQTT, has since been increasingly adopted by major players, including Amazon, as the underlying connectivity protocols for IoT devices. It’s TCP/IP based but its variant, MQTT-SN (MQTT for Sensor Networks), covers sensor network communication protocols such as ZigBee. There are also quite a few MQTT message brokers, including HiveMQ, Mosquitto and RabbitMQ.

IoT makes a great use case for Akka actor systems which come with lightweight loosely-coupled actors in decentralized clusters with robust routing, sharding and pub-sub features, as mentioned in a previous blog post. The actor model can be rather easily structured to emulate the operations of a typical IoT network that scales in device volume. In addition, availability of MQTT clients for Akka such as Paho-Akka makes it easy to communicate with MQTT brokers.

A Scala-based IoT application

UPDATE: An expanded version of this application with individual actors representing each of the IoT devices, each of which maintains its own internal state and setting, is now available. Please see the Akka Actors IoT v.2 blog post for details.

In this blog post, I’m going to illustrate how to build a scalable distributed worker system using Akka actors to service requests from a MQTT-based IoT system. A good portion of the Akka clustering setup is derived from Lightbend’s Akka distributed workers template. Below is a diagram of the application:

IoT with MQTT and Akka Actor Systems

As shown in the diagram, the application consists of the following components:

1. IoT

  • A DeviceRequest actor which:
    • simulates work requests from IoT devices
    • publishes requests to a MQTT pub-sub topic
    • re-publishes requests upon receiving failure messages from a topic subscriber
  • An IotAgent actor which:
    • subscribes to the mqtt-topic for the work requests
    • sends received work requests via ClusterClient to the master cluster
    • sends DeviceRequest actor a failure message upon receiving failure messages from Master actor
  • A MQTT pub-sub client, MqttPubSub, for communicating with a MQTT broker
  • A configuration helper object, MqttConfig, consisting of:
    • MQTT pub-sub topic
    • URL for the MQTT broker
    • Serialization methods to convert objects to byte arrays, and vice versa

2. Master Cluster

  • A fault-tolerant decentralized cluster which:
    • manages a singleton actor instance among the cluster nodes (with a specified role)
    • delegates ClusterClientReceptionist on every node to answer external connection requests
    • provides fail-over of the singleton actor to the next-oldest node in the cluster
  • A Master singleton actor which:
    • registers Workers and distributes work to available Workers
    • acknowledges work request reception with IotAgent
    • publishes work results to a work-results topic via Akka distributed pub-sub
    • maintains work states using persistence journal
  • A PostProcessor actor in the master cluster which:
    • simulates post-processing of the work results
    • subscribes to the work-results topic

3. Workers

  • An actor system of Workers each of which:
    • communicates via ClusterClient with the master cluster
    • registers with, pulls work from the Master actor
    • reports work status with the Master actor
    • instantiates a WorkProcessor actor to perform the actual work
  • WorkProcessor actors which process the work requests

Source code is available at GitHub.

A few notes:

  1. Neither IotAgent nor Worker actor system is a part of the master cluster, hence both need to communicate with the Master via ClusterClient.
  2. Rather than having the Master actor spawn child Workers and push work over, the Workers are set up to register with the Master and pull work from it – a model similar to what Derek Wyatt advocated in his post.
  3. Paho-Akka is used as the MQTT pub-sub client with configuration information held within the helper object, MqttConfig.
  4. The helper object MqttConfig consists of MQTT pub-sub topic/broker information and methods to serialize/deserialize the Work objects which, in turn, contains Device objects. The explicit serializations are necessary since multiple JVMs will be at play if one launches the master cluster, IoT and worker actor systems on separate JVMs.
  5. The test Mosquitto broker at tcp://test.mosquitto.org:1883 serves as the MQTT broker. An alternative is to install a MQTT broker (Mosquitto, HiveMQ, etc) local to the IoT network.
  6. The IotAgent uses Actor’s ask method (?), instead of the fire-and-forget tell method (!), to confirm message receipt by the Master via a Future return. If the receipt confirmation is not so important, using the tell method will be a much preferred choice for performance.
  7. This is primarily a proof-of-concept application of IoT using Akka actors, hence code performance optimization isn’t a priority. In addition, for production systems, a production-grade persistence journal (e.g. Redis, Cassandra) should be used and multiple-Master via sharding could be considered.

Test-running

Similar to how you would test-run Lightbend’s distributed workers template, you may open up separate command line terminals and run the different components on separate JVMs, adding and killing the launched components to observe how the systems scale out, fail over, persist work states, etc. Here’s an example of test-run sequence:

Launch the master cluster seed node with persistence journal:
<project-root>/bin/activator "runMain worker.Main 2551"

Launch the IotAgent-DeviceRequest node:
<project-root>/bin/activator "runMain worker.Main 3001"

Launch additional master cluster seed node:
<project-root>/bin/activator "runMain worker.Main 2552"

Launch the Worker node:
<project-root>/bin/activator "runMain worker.Main 0"

Launch additional Worker node:
<project-root>/bin/activator "runMain worker.Main 0"

Below are some sample console output.

Console Output: Master seed node with persistence journal:

$ bin/activator "runMain worker.Main 2551"
[info] Loading project definition from /Users/leo/apps/scala/akka-iot-mqtt/project/project
[info] Updating {file:/Users/leo/apps/scala/akka-iot-mqtt/project/project/}akka-iot-mqtt-build-build...
...
...
[info] Done updating.
[info] Compiling 12 Scala sources to /Users/leo/apps/scala/akka-iot-mqtt/target/scala-2.11/classes...
background log: info: Running worker.Main 2551
background log: info: [INFO] [04/29/2016 08:54:19.120] [main] [akka.remote.Remoting] Starting remoting
background log: info: [INFO] [04/29/2016 08:54:21.627] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2551]
background log: info: [INFO] [04/29/2016 08:54:21.628] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://ClusterSystem@127.0.0.1:2551]
...
...
background log: info: [INFO] [04/29/2016 08:54:28.140] [ClusterSystem-akka.actor.default-dispatcher-15] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2551] is JOINING, roles [backend]
background log: info: [INFO] [04/29/2016 08:54:28.974] [ClusterSystem-akka.actor.default-dispatcher-15] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2551] to [Up]
background log: info: [INFO] [04/29/2016 08:54:29.102] [ClusterSystem-akka.actor.default-dispatcher-18] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master] Singleton manager starting singleton actor [akka://ClusterSystem/user/master/singleton]
background log: info: [INFO] [04/29/2016 08:54:29.103] [ClusterSystem-akka.actor.default-dispatcher-18] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master] ClusterSingletonManager state change [Start -> Oldest]
...
...

<<< /bin/activator "runMain worker.Main 2552"

background log: info: [INFO] [04/29/2016 08:57:17.648] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2552] is JOINING, roles [backend]
background log: info: [INFO] [04/29/2016 08:57:18.194] [ClusterSystem-akka.actor.default-dispatcher-14] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2552] to [Up]
...
...
background log: info: [INFO] [04/29/2016 08:57:29.043] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Marking node(s) as REACHABLE [Member(address = akka.tcp://ClusterSystem@127.0.0.1:2552, status = Up)]
...
...

<<< [while worker node 0-a is up]

background log: info: [INFO] [04/29/2016 09:05:09.448] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work: Work Id 4fc64049-fa21-49c6-9d54-743eaa055373 | Device Id lamp-5095
background log: info: [INFO] [04/29/2016 09:05:09.930] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work to: Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316 | Work Id 4fc64049-fa21-49c6-9d54-743eaa055373 | Device Id lamp-5095
background log: info: [INFO] [04/29/2016 09:05:09.934] [ClusterSystem-akka.actor.default-dispatcher-45] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Acknowledged work done: Work Id 4fc64049-fa21-49c6-9d54-743eaa055373 | Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316
background log: info: [INFO] [04/29/2016 09:05:10.169] [ClusterSystem-akka.actor.default-dispatcher-20] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/postprocessor] Post-processor -> Got work result NO CHANGE | Work Id 4fc64049-fa21-49c6-9d54-743eaa055373
background log: info: [INFO] [04/29/2016 09:05:15.011] [ClusterSystem-akka.actor.default-dispatcher-45] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work: Work Id a66d1079-5083-491e-9b5e-885e7914426f | Device Id security-alarm-9095
background log: info: [INFO] [04/29/2016 09:05:15.366] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work to: Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316 | Work Id a66d1079-5083-491e-9b5e-885e7914426f | Device Id security-alarm-9095
background log: info: [INFO] [04/29/2016 09:05:15.370] [ClusterSystem-akka.actor.default-dispatcher-45] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Acknowledged work done: Work Id a66d1079-5083-491e-9b5e-885e7914426f | Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316
background log: info: [INFO] [04/29/2016 09:05:15.542] [ClusterSystem-akka.actor.default-dispatcher-20] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/postprocessor] Post-processor -> Got work result Switch light to ON | Work Id a66d1079-5083-491e-9b5e-885e7914426f
background log: info: [INFO] [04/29/2016 09:05:22.412] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work: Work Id bc502538-5baa-494d-8565-5cb91ad27001 | Device Id thermostat-1004
background log: info: [INFO] [04/29/2016 09:05:22.699] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work to: Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316 | Work Id bc502538-5baa-494d-8565-5cb91ad27001 | Device Id thermostat-1004
background log: info: [INFO] [04/29/2016 09:05:22.703] [ClusterSystem-akka.actor.default-dispatcher-43] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Acknowledged work done: Work Id bc502538-5baa-494d-8565-5cb91ad27001 | Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316
background log: info: [INFO] [04/29/2016 09:05:22.798] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/postprocessor] Post-processor -> Got work result LOWER temperature by 1F | Work Id bc502538-5baa-494d-8565-5cb91ad27001
background log: info: [INFO] [04/29/2016 09:05:31.779] [ClusterSystem-akka.actor.default-dispatcher-45] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work: Work Id 1c815602-9666-4c5d-9029-4fdd719f48b4 | Device Id thermostat-1044
background log: info: [INFO] [04/29/2016 09:05:32.065] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work to: Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316 | Work Id 1c815602-9666-4c5d-9029-4fdd719f48b4 | Device Id thermostat-1044
background log: info: [INFO] [04/29/2016 09:05:32.070] [ClusterSystem-akka.actor.default-dispatcher-45] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Acknowledged work done: Work Id 1c815602-9666-4c5d-9029-4fdd719f48b4 | Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316
background log: info: [INFO] [04/29/2016 09:05:32.257] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/postprocessor] Post-processor -> Got work result RAISE temperature by 2F | Work Id 1c815602-9666-4c5d-9029-4fdd719f48b4
background log: info: [INFO] [04/29/2016 09:05:36.169] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work: Work Id fadc48cf-a8c7-4942-bd35-d97b7acf8ad9 | Device Id lamp-5471
background log: info: [INFO] [04/29/2016 09:05:36.359] [ClusterSystem-akka.actor.default-dispatcher-39] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work to: Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316 | Work Id fadc48cf-a8c7-4942-bd35-d97b7acf8ad9 | Device Id lamp-5471
background log: info: [INFO] [04/29/2016 09:05:36.363] [ClusterSystem-akka.actor.default-dispatcher-43] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Acknowledged work done: Work Id fadc48cf-a8c7-4942-bd35-d97b7acf8ad9 | Worker Id f3c61905-f624-46ca-9e4e-b3b0374fa316
background log: info: [INFO] [04/29/2016 09:05:36.496] [ClusterSystem-akka.actor.default-dispatcher-39] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/postprocessor] Post-processor -> Got work result Switch light to ON | Work Id fadc48cf-a8c7-4942-bd35-d97b7acf8ad9
...
...

<<< [downing worker node 54323]

background log: info: [WARN] [04/29/2016 09:12:43.526] [ClusterSystem-akka.remote.default-remote-dispatcher-34] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FWorkerSystem%40127.0.0.1%3A54323-3] Association with remote system [akka.tcp://WorkerSystem@127.0.0.1:54323] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
background log: info: [INFO] [04/29/2016 09:12:50.846] [ClusterSystem-akka.actor.default-dispatcher-49] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work: Work Id ea8f92ae-776f-4bd0-87b4-5ad3992b1daa | Device Id lamp-5210
background log: info: [INFO] [04/29/2016 09:12:50.884] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work to: Worker Id 68a11934-16ca-4bbb-a8c9-27d5a4ac5ede | Work Id ea8f92ae-776f-4bd0-87b4-5ad3992b1daa | Device Id lamp-5210
background log: info: [INFO] [04/29/2016 09:12:51.009] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Acknowledged work done: Work Id ea8f92ae-776f-4bd0-87b4-5ad3992b1daa | Worker Id 68a11934-16ca-4bbb-a8c9-27d5a4ac5ede
background log: info: [INFO] [04/29/2016 09:12:51.010] [ClusterSystem-akka.actor.default-dispatcher-39] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/postprocessor] Post-processor -> Got work result Switch light to ON | Work Id ea8f92ae-776f-4bd0-87b4-5ad3992b1daa
...
...

<<< [downing worker node 54351]

background log: info: [WARN] [04/29/2016 09:18:07.161] [ClusterSystem-akka.remote.default-remote-dispatcher-38] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FWorkerSystem%40127.0.0.1%3A54351-5] Association with remote system [akka.tcp://WorkerSystem@127.0.0.1:54351] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
...
...

<<< [downing iot-device node 3001]

background log: info: [WARN] [04/29/2016 09:18:43.385] [ClusterSystem-akka.remote.default-remote-dispatcher-38] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FIotSystem%40127.0.0.1%3A3001-8] Association with remote system [akka.tcp://IotSystem@127.0.0.1:3001] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
...
...

<<< [downing cluster seed node 2552]

background log: info: [WARN] [04/29/2016 09:18:53.937] [ClusterSystem-akka.remote.default-remote-dispatcher-34] [akka.tcp://ClusterSystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2552-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
...
...
background log: info: [INFO] [04/29/2016 09:19:07.754] [ClusterSystem-akka.actor.default-dispatcher-43] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is auto-downing unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2552]
background log: info: [INFO] [04/29/2016 09:19:07.783] [ClusterSystem-akka.actor.default-dispatcher-43] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Marking unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2552] as [Down]
background log: info: [INFO] [04/29/2016 09:19:08.775] [ClusterSystem-akka.actor.default-dispatcher-51] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is removing unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2552]
background log: info: [INFO] [04/29/2016 09:19:08.776] [ClusterSystem-akka.actor.default-dispatcher-41] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master] Member removed [akka.tcp://ClusterSystem@127.0.0.1:2552]
background log: info: [WARN] [04/29/2016 09:19:08.861] [ClusterSystem-akka.remote.default-remote-dispatcher-38] [akka.remote.Remoting] Association to [akka.tcp://ClusterSystem@127.0.0.1:2552] having UID [113992052] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.

Console Output: IotAgent-DeviceRequest node:

$ bin/activator "runMain worker.Main 3001"
[info] Loading project definition from /Users/leo/apps/scala/akka-iot-mqtt/project/project
[info] Loading project definition from /Users/leo/apps/scala/akka-iot-mqtt/project
[info] Set current project to akka-iot-mqtt (in build file:/Users/leo/apps/scala/akka-iot-mqtt/)
background log: info: Running worker.Main 3001
background log: info: [INFO] [04/29/2016 09:00:42.224] [main] [akka.remote.Remoting] Starting remoting
background log: info: [INFO] [04/29/2016 09:00:42.664] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://IotSystem@127.0.0.1:3001]
background log: info: [INFO] [04/29/2016 09:00:42.665] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://IotSystem@127.0.0.1:3001]
background log: info: [INFO] [04/29/2016 09:00:44.563] [IotSystem-akka.actor.default-dispatcher-2] [akka.tcp://IotSystem@127.0.0.1:3001/user/clusterClient] Connected to [akka.tcp://ClusterSystem@127.0.0.1:2552/system/receptionist]
background log: info: 09:00:45.632 [IotSystem-akka.actor.default-dispatcher-3] INFO  com.sandinh.paho.akka.MqttPubSub - connecting to tcp://test.mosquitto.org:1883..
background log: info: 09:00:46.330 [MQTT Call: paho329851808627927] INFO  com.sandinh.paho.akka.MqttPubSub - connected
background log: info: [INFO] [04/29/2016 09:00:46.334] [IotSystem-akka.actor.default-dispatcher-2] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> MQTT subscription to akka-iot-mqtt-topic acknowledged
background log: info: 09:00:46.513 [MQTT Call: paho329851808627927] INFO  com.sandinh.paho.akka.MqttPubSub - subscribed to [akka-iot-mqtt-topic]
background log: info: [INFO] [04/29/2016 09:00:49.413] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Device Id security-alarm-9094 | Device State OFF
background log: info: [INFO] [04/29/2016 09:00:49.553] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Publishing MQTT Topic akka-iot-mqtt-topic: Device Id security-alarm-9094 | Device State OFF
background log: info: 09:00:49.555 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - delivery complete [akka-iot-mqtt-topic]
background log: info: 09:00:49.736 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - message arrived akka-iot-mqtt-topic
background log: info: [INFO] [04/29/2016 09:00:49.738] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Received MQTT message: Device Id security-alarm-9094 | Device State OFF | Work Id 9016bdbd-e70d-4546-8cc8-6719c02f8404
background log: info: [INFO] [04/29/2016 09:00:49.738] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Sending work to cluster master
...
...
background log: info: [INFO] [04/29/2016 09:05:09.264] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Device Id lamp-5095 | Device State ON
background log: info: [INFO] [04/29/2016 09:05:09.264] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Publishing MQTT Topic akka-iot-mqtt-topic: Device Id lamp-5095 | Device State ON
background log: info: 09:05:09.265 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - delivery complete [akka-iot-mqtt-topic]
background log: info: 09:05:09.444 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - message arrived akka-iot-mqtt-topic
background log: info: [INFO] [04/29/2016 09:05:09.445] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Received MQTT message: Device Id lamp-5095 | Device State ON | Work Id 4fc64049-fa21-49c6-9d54-743eaa055373
background log: info: [INFO] [04/29/2016 09:05:09.445] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Sending work to cluster master
background log: info: [INFO] [04/29/2016 09:05:14.674] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Device Id security-alarm-9095 | Device State OFF
background log: info: [INFO] [04/29/2016 09:05:14.675] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Publishing MQTT Topic akka-iot-mqtt-topic: Device Id security-alarm-9095 | Device State OFF
background log: info: 09:05:14.675 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - delivery complete [akka-iot-mqtt-topic]
background log: info: 09:05:15.008 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - message arrived akka-iot-mqtt-topic
background log: info: [INFO] [04/29/2016 09:05:15.008] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Received MQTT message: Device Id security-alarm-9095 | Device State OFF | Work Id a66d1079-5083-491e-9b5e-885e7914426f
background log: info: [INFO] [04/29/2016 09:05:15.008] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Sending work to cluster master
background log: info: [INFO] [04/29/2016 09:05:22.213] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Device Id thermostat-1004 | Device State HEAT
background log: info: [INFO] [04/29/2016 09:05:22.213] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Publishing MQTT Topic akka-iot-mqtt-topic: Device Id thermostat-1004 | Device State HEAT
background log: info: 09:05:22.229 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - delivery complete [akka-iot-mqtt-topic]
background log: info: 09:05:22.409 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - message arrived akka-iot-mqtt-topic
background log: info: [INFO] [04/29/2016 09:05:22.409] [IotSystem-akka.actor.default-dispatcher-4] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Received MQTT message: Device Id thermostat-1004 | Device State HEAT | Work Id bc502538-5baa-494d-8565-5cb91ad27001
background log: info: [INFO] [04/29/2016 09:05:22.409] [IotSystem-akka.actor.default-dispatcher-4] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Sending work to cluster master
background log: info: [INFO] [04/29/2016 09:05:31.593] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Device Id thermostat-1044 | Device State HEAT
background log: info: [INFO] [04/29/2016 09:05:31.593] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Publishing MQTT Topic akka-iot-mqtt-topic: Device Id thermostat-1044 | Device State HEAT
background log: info: 09:05:31.593 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - delivery complete [akka-iot-mqtt-topic]
background log: info: 09:05:31.776 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - message arrived akka-iot-mqtt-topic
background log: info: [INFO] [04/29/2016 09:05:31.777] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Received MQTT message: Device Id thermostat-1044 | Device State HEAT | Work Id 1c815602-9666-4c5d-9029-4fdd719f48b4
background log: info: [INFO] [04/29/2016 09:05:31.777] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Sending work to cluster master
background log: info: [INFO] [04/29/2016 09:05:35.983] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Device Id lamp-5471 | Device State OFF
background log: info: [INFO] [04/29/2016 09:05:35.983] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/devicerequest] Device Request -> Publishing MQTT Topic akka-iot-mqtt-topic: Device Id lamp-5471 | Device State OFF
background log: info: 09:05:35.984 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - delivery complete [akka-iot-mqtt-topic]
background log: info: 09:05:36.166 [MQTT Call: paho329851808627927] DEBUG com.sandinh.paho.akka.MqttPubSub - message arrived akka-iot-mqtt-topic
background log: info: [INFO] [04/29/2016 09:05:36.166] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Received MQTT message: Device Id lamp-5471 | Device State OFF | Work Id fadc48cf-a8c7-4942-bd35-d97b7acf8ad9
background log: info: [INFO] [04/29/2016 09:05:36.166] [IotSystem-akka.actor.default-dispatcher-14] [akka.tcp://IotSystem@127.0.0.1:3001/user/iotagent] IoT Agent -> Sending work to cluster master
...
...

Console Output: Worker node:

$ bin/activator "runMain worker.Main 0"
[info] Loading project definition from /Users/leo/apps/scala/akka-iot-mqtt/project/project
[info] Loading project definition from /Users/leo/apps/scala/akka-iot-mqtt/project
[info] Set current project to akka-iot-mqtt (in build file:/Users/leo/apps/scala/akka-iot-mqtt/)
background log: info: Running worker.Main 0
background log: info: [INFO] [04/29/2016 09:02:21.265] [main] [akka.remote.Remoting] Starting remoting
background log: info: [INFO] [04/29/2016 09:02:23.385] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://WorkerSystem@127.0.0.1:54323]
background log: info: [INFO] [04/29/2016 09:02:23.419] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://WorkerSystem@127.0.0.1:54323]
background log: info: [INFO] [04/29/2016 09:02:26.278] [WorkerSystem-akka.actor.default-dispatcher-3] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/clusterClient] Connected to [akka.tcp://ClusterSystem@127.0.0.1:2552/system/receptionist]
...
...
background log: info: [INFO] [04/29/2016 09:05:09.932] [WorkerSystem-akka.actor.default-dispatcher-16] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Received work request: Job Adjust device | Device Id lamp-5095 | Device State ON
background log: info: [INFO] [04/29/2016 09:05:09.932] [WorkerSystem-akka.actor.default-dispatcher-16] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Finished work: Action NO CHANGE | Work Id 4fc64049-fa21-49c6-9d54-743eaa055373
background log: info: [INFO] [04/29/2016 09:05:15.368] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Received work request: Job Adjust device | Device Id security-alarm-9095 | Device State OFF
background log: info: [INFO] [04/29/2016 09:05:15.368] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Finished work: Action Switch light to ON | Work Id a66d1079-5083-491e-9b5e-885e7914426f
background log: info: [INFO] [04/29/2016 09:05:22.701] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Received work request: Job Adjust device | Device Id thermostat-1004 | Device State HEAT
background log: info: [INFO] [04/29/2016 09:05:22.701] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Finished work: Action LOWER temperature by 1F | Work Id bc502538-5baa-494d-8565-5cb91ad27001
background log: info: [INFO] [04/29/2016 09:05:32.068] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Received work request: Job Adjust device | Device Id thermostat-1044 | Device State HEAT
background log: info: [INFO] [04/29/2016 09:05:32.068] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Finished work: Action RAISE temperature by 2F | Work Id 1c815602-9666-4c5d-9029-4fdd719f48b4
background log: info: [INFO] [04/29/2016 09:05:36.361] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Received work request: Job Adjust device | Device Id lamp-5471 | Device State OFF
background log: info: [INFO] [04/29/2016 09:05:36.361] [WorkerSystem-akka.actor.default-dispatcher-3] [akka.tcp://WorkerSystem@127.0.0.1:54323/user/worker] Worker -> Finished work: Action Switch light to ON | Work Id fadc48cf-a8c7-4942-bd35-d97b7acf8ad9
...
...

Challenges Of Big Data + SaaS + HAN

This is part two of a previous post about building and operating a Big Data SaaS for Home Area Network devices during my 5-year tenure with EcoFactor. Simply put, our main goal was to add “smarts” to residential heating and cooling systems (i.e. heaters and air conditioners, a.k.a. HVAC) via ordinary thermostats. That focus led to a superficial perception by some people that we’re a smart thermostat device company. In actuality, we have always been a software service, virtually agnostic to both hardware and communications protocol. It’s more of an IoT version of the “Intel Inside” business model.

Challenges from all fronts

Like building any startup company, there was a wide spectrum of challenges confronting us which is what this post is going to talk about. Funding environment was pretty hellish as we started just shortly before the financial crisis in 2007-2008. And failure of some high-profile solar companies in subsequent years certainly didn’t help make the once hyped cleantech a favorable sector for investors.

The ever-growing fierce competition for software engineering talent was and has been a big challenge for pretty much every startup in the Silicon Valley. On the technology front, production-grade open-source Big Data technologies weren’t there, leading to the need for a lot of internal R&D effort by individual companies, which in turn requires domain experts in both development and operations who were scarce endangered species back then, thus completing the vicious infinite loop that starts with the hiring difficulty.

Operational processes

On the operational front, there was a long list of processes that need to be carefully established and managed – from user acquisition, on-boarding, device installer training, scheduling coordination for on-site device installation, technical support for installers, to customer service. To get into the details of how all that was done warrants writing a book. In charge of product and marketing, Scott Hublou who is also a co-founder of the company owned the “horrendous” list.

Many of the items in the list are correlated. For instance, getting HVAC technicians to create a HAN network and pair up thermostats with the HAN gateway during an on-site installation not only required a custom-built software tool with a well-thoughtout workflow and easy UI, but also thorough training and a knowledgeable support team to back them up for ad-hoc troubleshooting.

Back to the engineering side of the world, a key piece in operations is the technology infrastructure that needs to cope with future business growth. That includes systems hosting, network and data architecture, server clusters for distributed computing, load balancing systems, fail-over and monitoring mechanism, firewalls, etc. As a startup company, we started with something simple but expandable to conserve cash, and scaled up as quickly as necessary. That’s also a practical approach from the design point of view to avoid over-engineering.

State of WPAN

On hardware, applicable HAN communications protocol and HAN device hardware were far from ready for mass deployment at the time when we started exploring in that space. That’s a non-trivial challenge for anybody who wants to get into the very space. On the other hand, if done right it represents an opportunity for one to pioneer in a relatively new arena.

ZigBee, an IEEE 802.15.4 standard WPAN (Wireless Personal Area Network) protocol, was our selected communications protocol for scaled deployment. While it’s a robust protocol compared with others such as Z-Wave, its specifications was still undergoing changes and few real-world implementations had ever exploited its full features.

The protocol comes with a few predefined application profiles including Energy Efficiency and Home Automation profiles. Part of our core business is about translating HVAC operations data via thermostats into actionable business intelligence, hence ability to acquire key attributes from these devices is crucial. We quickly discovered that some attributes as basic as HVAC state were missing in certain application profiles and we had to not only utilize multiple profiles but also extend to using custom attributes in ZCL (ZigBee Cluster Library).

Working with technology partners

Working with hardware technology partners does present some other challenges. HAN device firmware and embedded software development is a totally different beast from SaaS/server application development. Python on Linux is a prominent embedded software platform. While that’s also a popular combo for server software development, the two worlds share little resemblance. Building a system that bridges the two worlds takes learning and collaborative effort from both camps.

Some of our HAN device partners were quick to realize the significance of the need to back their gateway devices with a scalable PaaS infrastructure and invest significant effort in M2M (Machine-to-Machine) through acquisition and internal development. But coming from a hardware background, there was inevitably a non-trivial learning curve for our hardware partners to get it right in areas such as software service scalability. Leveraging our internal scalable SaaS development experience and our partners’ embedded software engineering expertise, we managed to put together the best ingredients from both worlds into the cooperative work.

OTA firmware update

OTA (Over-the-Air) firmware update generally refers to wireless firmware update. Our devices run on a WPAN protocol and the firmware is OTA-able. It’s probably one of the operations that create the most anxiety, as an update failure may result in “bricking” the devices in volume, leading to the worst user experience. A bricked thermostat that results in an inoperable HVAC (i.e. heater / air conditioner) would be the last thing the home occupant wants to deal with on a 105F Summer day, or worse, a potentially life-threatening hazard on a 10F Winter night.

This critical task is all about making sure the entire update procedure is foolproof from end to end. The important thing is to go through lots of rehearsals in advance. In addition, the capability of rollback of firmware version is as critical as the forward-update so to undo the update should unforeseen issues arise post-update. Startups typically work at a cut-throat pace that it’s tempting to circumvent pre-production tests whenever possible. But this is one of those operations that even a minor compromise of stringent tests could mean end of business.

Pull vs Push

The around-the-clock time series data acquisition from a growing volume of primitive HAN devices is a capacity-intensive requirement. Understanding that it was going to be a temporary method for smaller-scale deployments, we started out using a simplistic pull model to mechanically acquire data from the HAN gateway devices. These devices gather data serially from their associated thermostat devices, making a single trip to a gateway-connected thermostat device cost a few seconds to tens of seconds. To come up with a data acquisition method that could scale, we needed something that is at least an order of magnitude faster.

With larger-scale deployments in the pipeline, we didn’t waste any time and worked collaboratively with all involved parties early on to build a scalable solution. We went back to the drawing board to scrutinize the various data communication methods that are supported by the WPAN specifications and laid out a few architectural changes. First, we switched the data acquisition model from pull to push. Such change affected not only data communications within our internal SaaS applications but the end-to-end data flow spanning across our partners’ PaaS systems.

One of the key changes was to come up with standards compliant methods that minimize necessary data retrievals via unexploited features such as attribute grouping and differential reporting under the push model. Attribute grouping allows selected attributes to be bundled as a single packet for delivery instead of spitting individual attributes serially in multiple deliveries. Differential reporting helps minimize necessary data deliveries by triggering data transfer only when at least one of the selected attributes has changed. All that means lots of extra work for everybody in the short term, but in exchange for a scalable solution in the long run.

Collaborative work pays off

The challenges mentioned above wouldn’t be resolvable hadn’t there been a team of cross-functional group technologists working diligently and creatively to make it happen. Performance was boosted by orders of magnitude after implementing the new data acquisition method. More importantly, the collective work in some way set a standard for large-scale data acquisition from SaaS-managed HAN devices. It was an invaluable experience being a part of the endeavor.