Category Archives: All About Software Technology

Scala On Spark – Cumulative Pivot Sum

In a couple of recent R&D projects, I was using Apache Spark rather extensively to address some data processing needs on Hadoop clusters. Although there is an abundance of big data processing platforms these days, it didn’t take long for me to settle on Spark. One of the main reasons is that the programming language for the R&D is Scala, which is what Spark itself is written in. In particular, Spark’s inherent support for functional programming and compositional transformations on immutable data enables high performance at scale as well as readability. Other main reasons are very much in line with some of the key factors attributing to Spark’s rising popularity.

I’m starting a mini blog series on Scala-on-Spark (SoS) with each blog post demonstrating with some Scala programming example on Apache Spark. In the blog series, I’m going to illustrate how the functionality-rich SoS is able to resolve some non-trivial data processing problems with seemingly little effort. If nothing else, they are good brain-teasing programming exercise in Scala on Spark.

As the source data for the example, let’s consider a minuscule set of weather data stored in a DataFrame, which consists of the following columns:

  • Weather Station ID
  • Start Date of a half-month period
  • Temperature High (in Fahrenheit) over the period
  • Temperature Low (in Fahrenheit) over the period
  • Total Precipitation (in inches) over the period

Note that with a properly configured Spark cluster, the methods illustrated in the following example can be readily adapted to handle much more granular data at scale – e.g. down to sub-hourly weather data from tens of thousands of weather stations. It’s also worth mentioning that there can be other ways to solve the problems presented in the examples.

For illustration purpose, the following code snippets are executed on a Spark Shell. First thing is to generate a DataFrame with the said columns of sample data, which will be used as source data for this example and a couple following ones.

import java.sql.Date

val weatherDataDF = Seq(
  (100, Date.valueOf("2017-07-01"), 75, 59, 0.0),
  (100, Date.valueOf("2017-07-16"), 77, 59, 0.5),
  (100, Date.valueOf("2017-08-01"), 80, 63, 1.0),
  (100, Date.valueOf("2017-08-16"), 78, 62, 1.0),
  (100, Date.valueOf("2017-09-01"), 74, 59, 0.0),
  (100, Date.valueOf("2017-09-16"), 72, 57, 0.0),
  (100, Date.valueOf("2017-10-01"), 68, 54, 0.0),
  (100, Date.valueOf("2017-10-16"), 66, 54, 0.0),
  (100, Date.valueOf("2017-11-01"), 64, 50, 0.5),
  (100, Date.valueOf("2017-11-16"), 61, 48, 1.0),
  (100, Date.valueOf("2017-12-01"), 59, 46, 2.0),
  (100, Date.valueOf("2017-12-16"), 57, 45, 1.5),
  (115, Date.valueOf("2017-07-01"), 76, 57, 0.0),
  (115, Date.valueOf("2017-07-16"), 76, 56, 1.0),
  (115, Date.valueOf("2017-08-01"), 78, 57, 0.0),
  (115, Date.valueOf("2017-08-16"), 81, 57, 0.0),
  (115, Date.valueOf("2017-09-01"), 77, 54, 0.0),
  (115, Date.valueOf("2017-09-16"), 72, 50, 0.0),
  (115, Date.valueOf("2017-10-01"), 65, 45, 0.0),
  (115, Date.valueOf("2017-10-16"), 59, 40, 1.5),
  (115, Date.valueOf("2017-11-01"), 55, 37, 1.0),
  (115, Date.valueOf("2017-11-16"), 52, 35, 2.0),
  (115, Date.valueOf("2017-12-01"), 45, 30, 3.0),
  (115, Date.valueOf("2017-12-16"), 41, 28, 1.5)
).toDF("station", "start_date", "temp_high", "temp_low", "total_precip")

In this first example, the goal is to generate a table of cumulative precipitation by weather stations in month-by-month columns. By ‘cumulative sum’, it means the monthly precipitation will be cumulated from one month over to the next one (i.e. rolling sum). In other words, if July’s precipitation is 2 inches and August’s is 1 inch, the figure for August will be 3 inches. The result should look like the following table:

+-------+-------+-------+-------+-------+-------+-------+
|station|2017-07|2017-08|2017-09|2017-10|2017-11|2017-12|
+-------+-------+-------+-------+-------+-------+-------+
|    100|    0.5|    2.5|    2.5|    2.5|    4.0|    7.5|
|    115|    1.0|    1.0|    1.0|    2.5|    5.5|   10.0|
+-------+-------+-------+-------+-------+-------+-------+

First, we transform the original DataFrame to include an additional year-month column, followed by using Spark’s groupBy, pivot and agg methods to generate the pivot table.

import org.apache.spark.sql.functions._

val monthlyData = weatherDataDF.
  withColumn("year_mo", concat(
    year($"start_date"), lit("-"), lpad(month($"start_date"), 2, "0")
  )).
  groupBy("station").pivot("year_mo")
  
val monthlyPrecipDF = monthlyData.agg(sum($"total_precip"))

monthlyPrecipDF.show
// +-------+-------+-------+-------+-------+-------+-------+
// |station|2017-07|2017-08|2017-09|2017-10|2017-11|2017-12|
// +-------+-------+-------+-------+-------+-------+-------+
// |    115|    1.0|    0.0|    0.0|    1.5|    3.0|    4.5|
// |    100|    0.5|    2.0|    0.0|    0.0|    1.5|    3.5|
// +-------+-------+-------+-------+-------+-------+-------+

Next, we assemble a list of the year-month columns and traverse the list using method foldLeft, which is one of the most versatile Scala functions for custom iterative transformations. In this particular case, the data to be transformed by foldLeft is a tuple of (DataFrame, Double). Normally, transforming the DataFrame alone should suffice, but in this case we need an additional value to address to rolling cumulation requirement.

The tuple’s first DataFrame-type element, with monthlyPrecipDF as its initial value, will be transformed using the binary operator function specified as foldLeft’s second argument (i.e. (acc, c) => …). As for the tuple’s second Double-type element, with the first year-month as its initial value it’s for carrying the current month value over to the next iteration. The end result is a (DataFrame, Double) tuple successively transformed month-by-month.

val yearMonths = monthlyPrecipDF.columns.filter(_ != "station")

val cumulativePrecipDF = yearMonths.drop(1).
  foldLeft((monthlyPrecipDF, yearMonths.head))( (acc, c) =>
    ( acc._1.withColumn(c, col(acc._2) + col(c)), c )
)._1

cumulativePrecipDF.show
// +-------+-------+-------+-------+-------+-------+-------+
// |station|2017-07|2017-08|2017-09|2017-10|2017-11|2017-12|
// +-------+-------+-------+-------+-------+-------+-------+
// |    115|    1.0|    1.0|    1.0|    2.5|    5.5|   10.0|
// |    100|    0.5|    2.5|    2.5|    2.5|    4.0|    7.5|
// +-------+-------+-------+-------+-------+-------+-------+

Similar pivot aggregations can be applied to temperature high’s/low’s as well, with method sum replaced with method max/min.

val monthlyHighDF = monthlyData.agg(max($"temp_high").as("high"))

monthlyHighDF.show
// +-------+-------+-------+-------+-------+-------+-------+
// |station|2017-07|2017-08|2017-09|2017-10|2017-11|2017-12|
// +-------+-------+-------+-------+-------+-------+-------+
// |    115|     76|     81|     77|     65|     55|     45|
// |    100|     77|     80|     74|     68|     64|     59|
// +-------+-------+-------+-------+-------+-------+-------+

val monthlyLowDF = monthlyData.agg(min($"temp_low").as("low"))

monthlyLowDF.show
// +-------+-------+-------+-------+-------+-------+-------+
// |station|2017-07|2017-08|2017-09|2017-10|2017-11|2017-12|
// +-------+-------+-------+-------+-------+-------+-------+
// |    115|     56|     57|     50|     40|     35|     28|
// |    100|     59|     62|     57|     54|     48|     45|
// +-------+-------+-------+-------+-------+-------+-------+

Finally, we compute cumulative temperature high/low like cumulative precipitation, by replacing method sum with iterative max/min using Spark’s when-otherwise method.

val cumulativeHighDF = yearMonths.drop(1).
  foldLeft((monthlyHighDF, yearMonths.head))( (acc, c) =>
    ( acc._1.withColumn(c, when(col(acc._2) > col(c), col(acc._2)).
      otherwise(col(c))), c )
)._1

cumulativeHighDF.show
// +-------+-------+-------+-------+-------+-------+-------+
// |station|2017-07|2017-08|2017-09|2017-10|2017-11|2017-12|
// +-------+-------+-------+-------+-------+-------+-------+
// |    115|     76|     81|     81|     81|     81|     81|
// |    100|     77|     80|     80|     80|     80|     80|
// +-------+-------+-------+-------+-------+-------+-------+

val cumulativeLowDF = yearMonths.drop(1).
  foldLeft((monthlyLowDF, yearMonths.head))( (acc, c) =>
    ( acc._1.withColumn(c, when(col(acc._2) < col(c), col(acc._2)).
      otherwise(col(c))), c )
)._1

cumulativeLowDF.show
// +-------+-------+-------+-------+-------+-------+-------+
// |station|2017-07|2017-08|2017-09|2017-10|2017-11|2017-12|
// +-------+-------+-------+-------+-------+-------+-------+
// |    115|     56|     56|     50|     40|     35|     28|
// |    100|     59|     59|     57|     54|     48|     45|
// +-------+-------+-------+-------+-------+-------+-------+

Scala IoT Systems With Akka Actors II

Back in 2016, I built an Internet-of-Thing (IoT) prototype system leveraging the “minimalist” design principle of the Actor model to simulate low-cost, low-powered IoT devices. A simplified version of the prototype was published in a previous blog post. The stripped-down application was written in Scala along with the Akka Actors run-time library, which is arguably the predominant Actor model implementation at present. Message Queue Telemetry Transport (MQTT) was used as the publish-subscribe messaging protocol for the simulated IoT devices. For simplicity, a single actor was used to simulate requests from a bunch of IoT devices.

In this blog post, I would like to share a version closer to the design of the full prototype system. With the same tech stack used in the previous application, it’s an expanded version (hence, II) that uses loosely-coupled lightweight actors to simulate individual IoT devices, each of which maintains its own internal state and handles bidirectional communications via non-blocking message passing. Using a distributed workers system adapted from a Lightbend template along with a persistence journal, the end product is an IoT system equipped with a scalable fault-tolerant data processing system.

Main components

Below is a diagram and a summary of the revised Scala application which consists of 3 main components:

IoT with MQTT and Akka Actor Systems v.2

1. IoT

  • An IotManager actor which:
    • instantiates a specified number of devices upon start-up
    • subscribes to a MQTT pub-sub topic for the work requests
    • sends received work requests via ClusterClient to the master cluster
    • notifies Device actors upon receiving failure messages from Master actor
    • forwards work results to the corresponding devices upon receiving them from ResultProcessor
  • Device actors each of which:
    • simulates a thermostat, lamp, or security alarm with random initial state and setting
    • maintains and updates internal state and setting upon receiving work results from IotManager
    • generates work requests and publishes them to the MQTT pub-sub topic
    • re-publishes requests upon receiving failure messages from IotManager
  • A MQTT pub-sub broker and a MQTT client for communicating with the broker
  • A configuration helper object, MqttConfig, consisting of:
    • MQTT pub-sub topic
    • URL for the MQTT broker
    • serialization methods to convert objects to byte arrays, and vice versa

2. Master Cluster

  • A fault-tolerant decentralized cluster which:
    • manages a singleton actor instance among the cluster nodes (with a specified role)
    • delegates ClusterClientReceptionist on every node to answer external connection requests
    • provides fail-over of the singleton actor to the next-oldest node in the cluster
  • A Master singleton actor which:
    • registers Workers and distributes work to available Workers
    • acknowledges work request reception with IotManager
    • publishes work results from Workers to ‘work-results’ topic via Akka distributed pub-sub
    • maintains work states using persistence journal
  • A ResultProcessor actor in the master cluster which:
    • gets instantiated upon starting up the IoT system (more on this below)
    • consumes work results by subscribing to the ‘work-results’ topic
    • sends work results received from Master to IotManager

3. Workers

  • An actor system of Workers each of which:
    • communicates via ClusterClient with the master cluster
    • registers with, pulls work from the Master actor
    • reports work status with the Master actor
    • instantiates a WorkProcessor actor to perform the actual work
  • WorkProcessor actors each of which:
    • processes the work requests from its parent Worker
    • generates work results and send back to Worker

Master-worker system with a ‘pull’ model

While significant changes have been made to the IoT actor system, much of the setup for the Master/Worker actor systems and MQTT pub-sub messaging remains largely unchanged from the previous version:

  • As separate independent actor systems, both the IoT and Worker systems communicate with the Master cluster via ClusterClient.
  • Using a ‘pull’ model which generally performs better at scale, the Worker actors register with the Master cluster and pull work when available.
  • Paho-Akka is used as the MQTT pub-sub messaging client.
  • A helper object, MqttConfig, encapsulates a MQTT pub-sub topic and broker information along with serialization methods to handle MQTT messaging using a test Mosquitto broker.

What’s new?

Now, let’s look at the major changes in the revised application:

First of all, Lightbend’s Activator has been retired and Sbt is being used instead.

On persisting actors state, a Redis data store is used as the persistence journal. In the previous version the shared LevelDB journal is coupled with the first seed node which becomes a single point of failure. With the Redis persistence journal decoupled from a specific cluster node, fault tolerance steps up a notch.

As mentioned earlier in the post, one of the key changes to the previous application is the using of actors representing individual IoT devices each with its own state and capability of communicating with entities designated for interfacing with external actor systems. Actors, lightweight and loosely-coupled by design, serve as an excellent vehicle for modeling individual IoT devices. In addition, non-blocking message passing among actors provides an efficient and economical means for communication and logic control of the device state.

The IotManager actor is responsible for creating and managing a specified number of Device actors. Upon startup, the IoT manager instantiates individual Device actors of random device type (thermostat, lamp or security alarm). These devices are maintained in an internal registry regularly updated by the IoT manager.

Each of the Device actors starts up with a random state and setting. For instance, a thermostat device may start with an ON state and a temperature setting of 68F whereas a lamp device might have an initial state of OFF and brightness setting of 2. Once instantiated, a Device actor will maintain its internal operational state and setting from then on and will report and update the state and setting per request.

Work and WorkResult

In this application, a Work object represents a request sent by a specific Device actor and carries the Device’s Id and its current state and setting data. A WorkResult object, on the other hand, represents a returned request for the Device actor to update its state and setting stored within the object.

Responsible for processing the WorkResult generated by the Worker actors, the ResultProcessor actor simulates the processing of work result – in this case it simply sends via the actorSelection method the work result back to the original Device actor through IotManager. Interacting with only the Master cluster system as a cluster client, the Worker actors have no knowledge of the ResultProcessor actor. ResultProcessor receives the work result through subscribing to the Akka distributed pub-sub topic which the Master is the publisher.

While a participant of the Master cluster actor system, the ResultProcessor actor gets instantiated when the IoT actor system starts up. The decoupling of ResultProcessor instantiation from the Master cluster ensures that no excessive ResultProcessor instances get started when multiple Master cluster nodes start up.

Test running the application

Complete source code of the application is available at GitHub.

To run the application on a single JVM, just git-clone the repo, run the following command at a command line terminal and observe the console output:

# Start a Redis server accessible to the master cluster to serve as the persistence journal:
$ nohup redis-server /path/to/conf &

# Launch the master cluster with 2 seed nodes, IoT actor system and Worker actor system:
$ cd {project-root}
$ bin/sbt "runMain akkaiot.Main [NumOfDevices]"

The optional NumOfDevices parameter defaults to 20.

To run the application on separate JVMs, git-clone the repo to a local disk, open up separate command line terminals and launch the different components on separate terminals:

# Start a Redis server accessible to the master cluster to serve as the persistence journal:
$ nohup redis-server /path/to/conf &

cd {project-root}
# Launch the master cluster seed node with persistence journal:
$ bin/sbt "runMain akkaiot.Main 2551"

# Launch additional master cluster seed node:
$ bin/sbt "runMain akkaiot.Main 2552"

# Launch the IoT node:
$ bin/sbt "runMain akkaiot.Main 3001 [NumOfDevices]"

# Launch a Worker node:
$ bin/sbt "runMain akkaiot.Main 0"

# Launch additional Worker node:
$ bin/sbt "runMain akkaiot.Main 0"

Sample console log

Below is filtered console log output from the console tracing the evolving state and setting of a thermostat device:

#
### Console log filtered for device thermostat-1015
#
[info] [INFO] [07/12/2017 14:38:44.707] [IotSystem-akka.actor.default-dispatcher-21] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 started
[info] [INFO] [07/12/2017 14:38:49.726] [IotSystem-akka.actor.default-dispatcher-29] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 with state 0 created work (Id: 69d82872-a9f4-492e-8ae4-28229b797994) 
[info] [INFO] [07/12/2017 14:38:49.726] [IotSystem-akka.actor.default-dispatcher-29] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Publishing MQTT Topic akka-iot-mqtt-topic: Device thermostat-1015
[info] [INFO] [07/12/2017 14:38:49.915] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Agent -> Received MQTT message: thermostat-1015 | State 0 | Setting 70
[info] [INFO] [07/12/2017 14:38:50.261] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work for thermostat-1015 : Work Id 69d82872-a9f4-492e-8ae4-28229b797994
[info] [INFO] [07/12/2017 14:38:50.265] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Work for thermostat-1015 accepted | Work Id 69d82872-a9f4-492e-8ae4-28229b797994
[info] [INFO] [07/12/2017 14:38:50.488] [ClusterSystem-akka.actor.default-dispatcher-25] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work for thermostat-1015 to Worker 3ec3b314-1c35-4e57-92b3-41c9edb86fbc | Work Id 69d82872-a9f4-492e-8ae4-28229b797994
[info] [INFO] [07/12/2017 14:38:50.489] [WorkerSystem-akka.actor.default-dispatcher-2] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Received work request from thermostat-1015 | State 0 | Setting 70
[info] [INFO] [07/12/2017 14:38:50.489] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker/work-processor] Work Processor -> thermostat-1015: Switch to COOL | LOWER temperature by -2
[info] [INFO] [07/12/2017 14:38:50.489] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Processed work: thermostat-1015 | Work Id 69d82872-a9f4-492e-8ae4-28229b797994
[info] [INFO] [07/12/2017 14:38:50.493] [ClusterSystem-akka.actor.default-dispatcher-27] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Got work result: thermostat-1015 | State 2 | Setting 68
[info] [INFO] [07/12/2017 14:38:50.493] [ClusterSystem-akka.actor.default-dispatcher-27] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Sent work result for thermostat-1015 to IoT Manager
[info] [INFO] [07/12/2017 14:38:50.495] [IotSystem-akka.actor.default-dispatcher-33] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Manager -> Work result forwarded to thermostat-1015 
[info] [INFO] [07/12/2017 14:38:50.495] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 received work result with work Id 69d82872-a9f4-492e-8ae4-28229b797994.
[info] [INFO] [07/12/2017 14:38:50.495] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Updated thermostat-1015 with state 2 and setting 68.
[info] [INFO] [07/12/2017 14:38:55.275] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 with state 2 created work (Id: df9622cd-6b80-42e9-be1b-f0a92d002d75) 
[info] [INFO] [07/12/2017 14:38:55.275] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Publishing MQTT Topic akka-iot-mqtt-topic: Device thermostat-1015
[info] [INFO] [07/12/2017 14:38:55.578] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Agent -> Received MQTT message: thermostat-1015 | State 2 | Setting 68
[info] [INFO] [07/12/2017 14:38:55.580] [ClusterSystem-akka.actor.default-dispatcher-25] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work for thermostat-1015 : Work Id df9622cd-6b80-42e9-be1b-f0a92d002d75
[info] [INFO] [07/12/2017 14:38:55.583] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Work for thermostat-1015 accepted | Work Id df9622cd-6b80-42e9-be1b-f0a92d002d75
[info] [INFO] [07/12/2017 14:38:55.586] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work for thermostat-1015 to Worker 3ec3b314-1c35-4e57-92b3-41c9edb86fbc | Work Id df9622cd-6b80-42e9-be1b-f0a92d002d75
[info] [INFO] [07/12/2017 14:38:55.587] [WorkerSystem-akka.actor.default-dispatcher-3] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Received work request from thermostat-1015 | State 2 | Setting 68
[info] [INFO] [07/12/2017 14:38:55.587] [WorkerSystem-akka.actor.default-dispatcher-3] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker/work-processor] Work Processor -> thermostat-1015: Keep state COOL | LOWER temperature by -2
[info] [INFO] [07/12/2017 14:38:55.587] [WorkerSystem-akka.actor.default-dispatcher-3] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Processed work: thermostat-1015 | Work Id df9622cd-6b80-42e9-be1b-f0a92d002d75
[info] [INFO] [07/12/2017 14:38:55.590] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Got work result: thermostat-1015 | State 2 | Setting 66
[info] [INFO] [07/12/2017 14:38:55.590] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Sent work result for thermostat-1015 to IoT Manager
[info] [INFO] [07/12/2017 14:38:55.591] [IotSystem-akka.actor.default-dispatcher-18] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Manager -> Work result forwarded to thermostat-1015 
[info] [INFO] [07/12/2017 14:38:55.591] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 received work result with work Id df9622cd-6b80-42e9-be1b-f0a92d002d75.
[info] [INFO] [07/12/2017 14:38:55.591] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Updated thermostat-1015 with state 2 and setting 66.
[info] [INFO] [07/12/2017 14:39:01.596] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 with state 2 created work (Id: c57d21af-3957-43ba-a995-f4f558900fa3) 
[info] [INFO] [07/12/2017 14:39:01.597] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Publishing MQTT Topic akka-iot-mqtt-topic: Device thermostat-1015
[info] [INFO] [07/12/2017 14:39:01.752] [IotSystem-akka.actor.default-dispatcher-17] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Agent -> Received MQTT message: thermostat-1015 | State 2 | Setting 66
[info] [INFO] [07/12/2017 14:39:01.753] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work for thermostat-1015 : Work Id c57d21af-3957-43ba-a995-f4f558900fa3
[info] [INFO] [07/12/2017 14:39:01.755] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Work for thermostat-1015 accepted | Work Id c57d21af-3957-43ba-a995-f4f558900fa3
[info] [INFO] [07/12/2017 14:39:01.757] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work for thermostat-1015 to Worker 3ec3b314-1c35-4e57-92b3-41c9edb86fbc | Work Id c57d21af-3957-43ba-a995-f4f558900fa3
[info] [INFO] [07/12/2017 14:39:01.758] [WorkerSystem-akka.actor.default-dispatcher-18] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Received work request from thermostat-1015 | State 2 | Setting 66
[info] [INFO] [07/12/2017 14:39:01.758] [WorkerSystem-akka.actor.default-dispatcher-18] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker/work-processor] Work Processor -> thermostat-1015: Switch to OFF | RAISE temperature by 2
[info] [INFO] [07/12/2017 14:39:01.758] [WorkerSystem-akka.actor.default-dispatcher-18] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Processed work: thermostat-1015 | Work Id c57d21af-3957-43ba-a995-f4f558900fa3
[info] [INFO] [07/12/2017 14:39:01.760] [ClusterSystem-akka.actor.default-dispatcher-25] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Got work result: thermostat-1015 | State 0 | Setting 68
[info] [INFO] [07/12/2017 14:39:01.761] [ClusterSystem-akka.actor.default-dispatcher-25] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Sent work result for thermostat-1015 to IoT Manager
[info] [INFO] [07/12/2017 14:39:01.761] [IotSystem-akka.actor.default-dispatcher-18] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Manager -> Work result forwarded to thermostat-1015 
[info] [INFO] [07/12/2017 14:39:01.761] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 received work result with work Id c57d21af-3957-43ba-a995-f4f558900fa3.
[info] [INFO] [07/12/2017 14:39:01.761] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Updated thermostat-1015 with state 0 and setting 68.
....
....

The following annotated console log showcases fault-tolerance of the master cluster – how it fails over to the 2nd node upon detecting that the 1st node crashes:

#
### Annotated console log from the 2nd cluster node
#
<<<
<<<--- 2nd cluster node (with port# 2552) starts shortly after 1st node (with port# 2551) has started.
<<<
Leos-MBP:akka-iot-mqtt-v2 leo$ bin/sbt "runMain akkaiot.Main 2552"
[info] Loading project definition from /Users/leo/apps/scala/akka-iot-mqtt-v2/project
[info] Set current project to akka-iot-mqtt (in build file:/Users/leo/apps/scala/akka-iot-mqtt-v2/)
[info] Running akkaiot.Main 2552
[info] [INFO] [07/12/2017 14:24:16.491] [main] [akka.remote.Remoting] Starting remoting
[info] [INFO] [07/12/2017 14:24:16.661] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2552]
[info] [INFO] [07/12/2017 14:24:16.662] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://ClusterSystem@127.0.0.1:2552]
[info] [INFO] [07/12/2017 14:24:16.675] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Starting up...
[info] [INFO] [07/12/2017 14:24:16.790] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Registered cluster JMX MBean [akka:type=Cluster]
[info] [INFO] [07/12/2017 14:24:16.790] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Started up successfully
[info] [INFO] [07/12/2017 14:24:16.794] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[info] [INFO] [07/12/2017 14:24:16.797] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Metrics collection has started successfully
[info] [WARN] [07/12/2017 14:24:16.822] [ClusterSystem-akka.actor.default-dispatcher-18] [akka.tcp://ClusterSystem@127.0.0.1:2552/system/cluster/core/daemon/downingProvider] Don't use auto-down feature of Akka Cluster in production. See 'Auto-downing (DO NOT USE)' section of Akka Cluster documentation.
<<<
<<<--- Gets 'welcome' acknowledgement from 1st cluster node.
<<<
[info] [INFO] [07/12/2017 14:24:17.234] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]
[info] [INFO] [07/12/2017 14:24:17.697] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master] ClusterSingletonManager state change [Start -> Younger]
<<<
<<<--- 1st cluster node crashes after running for about a minute!
<<<
[info] [WARN] [07/12/2017 14:25:17.329] [ClusterSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://ClusterSystem@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
[info] [INFO] [07/12/2017 14:25:17.336] [ClusterSystem-akka.actor.default-dispatcher-24] [akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0/endpointWriter] Message [akka.remote.EndpointWriter$AckIdleCheckTimer$] from Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0/endpointWriter#1930129898] to Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0/endpointWriter#1930129898] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[info] [INFO] [07/12/2017 14:25:17.828] [ClusterSystem-akka.actor.default-dispatcher-24] [akka://ClusterSystem/deadLetters] Message [akka.cluster.pubsub.DistributedPubSubMediator$Internal$Status] from Actor[akka://ClusterSystem/system/distributedPubSubMediator#-2106783060] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[info] [INFO] [07/12/2017 14:25:18.257] [ClusterSystem-akka.actor.default-dispatcher-24] [akka://ClusterSystem/deadLetters] Message [akka.cluster.ClusterHeartbeatSender$Heartbeat] from Actor[akka://ClusterSystem/system/cluster/core/daemon/heartbeatSender#-975290267] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[info] [INFO] [07/12/2017 14:25:18.826] [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/deadLetters] Message [akka.cluster.pubsub.DistributedPubSubMediator$Internal$Status] from Actor[akka://ClusterSystem/system/distributedPubSubMediator#-2106783060] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
....
....
[info] [WARN] [07/12/2017 14:25:20.828] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://ClusterSystem@127.0.0.1:2552/system/cluster/core/daemon] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Marking node(s) as UNREACHABLE [Member(address = akka.tcp://ClusterSystem@127.0.0.1:2551, status = Up)]. Node roles [backend]
[info] [INFO] [07/12/2017 14:25:21.258] [ClusterSystem-akka.actor.default-dispatcher-20] [akka://ClusterSystem/deadLetters] Message [akka.cluster.ClusterHeartbeatSender$Heartbeat] from Actor[akka://ClusterSystem/system/cluster/core/daemon/heartbeatSender#-975290267] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[info] [INFO] [07/12/2017 14:25:21.827] [ClusterSystem-akka.actor.default-dispatcher-24] [akka://ClusterSystem/deadLetters] Message [akka.cluster.pubsub.DistributedPubSubMediator$Internal$Status] from Actor[akka://ClusterSystem/system/distributedPubSubMediator#-2106783060] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[info] [WARN] [07/12/2017 14:25:23.260] [New I/O boss #3] [NettyTransport(akka://ClusterSystem)] Remote connection to null failed with java.net.ConnectException: Connection refused: /127.0.0.1:2551
[info] [WARN] [07/12/2017 14:25:23.262] [ClusterSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://ClusterSystem@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://ClusterSystem@127.0.0.1:2551]] Caused by: [Connection refused: /127.0.0.1:2551]
[info] [WARN] [07/12/2017 14:25:28.829] [New I/O boss #3] [NettyTransport(akka://ClusterSystem)] Remote connection to null failed with java.net.ConnectException: Connection refused: /127.0.0.1:2551
[info] [WARN] [07/12/2017 14:25:28.830] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://ClusterSystem@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://ClusterSystem@127.0.0.1:2551]] Caused by: [Connection refused: /127.0.0.1:2551]
[info] [INFO] [07/12/2017 14:25:30.845] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Leader is auto-downing unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2551]. Don't use auto-down feature of Akka Cluster in production. See 'Auto-downing (DO NOT USE)' section of Akka Cluster documentation.
[info] [INFO] [07/12/2017 14:25:30.846] [ClusterSystem-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Marking unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2551] as [Down]
<<<
<<<--- 1st cluster node (with port# 2551) is marked as 'down'.
<<<
[info] [INFO] [07/12/2017 14:25:31.830] [ClusterSystem-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Leader is removing unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2551]
[info] [INFO] [07/12/2017 14:25:31.832] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master] Previous oldest removed [akka.tcp://ClusterSystem@127.0.0.1:2551]
<<<
<<<--- 2nd cluster node (with port# 2552) replaces 1st node to start a new master singleton actor.
<<<
[info] [INFO] [07/12/2017 14:25:31.833] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master] Younger observed OldestChanged: [None -> myself]
[info] [WARN] [07/12/2017 14:25:31.833] [ClusterSystem-akka.remote.default-remote-dispatcher-15] [akka.remote.Remoting] Association to [akka.tcp://ClusterSystem@127.0.0.1:2551] having UID [1664659180] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.
[info] [INFO] [07/12/2017 14:25:31.877] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master] Singleton manager starting singleton actor [akka://ClusterSystem/user/master/singleton]
[info] [INFO] [07/12/2017 14:25:31.878] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master] ClusterSingletonManager state change [Younger -> Oldest]
[info] [INFO] [07/12/2017 14:25:31.994] [ClusterSystem-rediscala.rediscala-client-worker-dispatcher-28] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/RedisClient-$a] Connect to localhost/127.0.0.1:6379
[info] [INFO] [07/12/2017 14:25:32.015] [ClusterSystem-rediscala.rediscala-client-worker-dispatcher-28] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/RedisClient-$a] Connected to localhost/127.0.0.1:6379
[info] [INFO] [07/12/2017 14:25:32.093] [ClusterSystem-rediscala.rediscala-client-worker-dispatcher-28] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/RedisClient-$b] Connect to localhost/127.0.0.1:6379
[info] [INFO] [07/12/2017 14:25:32.095] [ClusterSystem-rediscala.rediscala-client-worker-dispatcher-28] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/RedisClient-$b] Connected to localhost/127.0.0.1:6379
<<<
<<<--- The new master singleton actor starts replaying events stored in the persistence journal.
<<<
[info] [INFO] [07/12/2017 14:25:32.485] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkAccepted
[info] [INFO] [07/12/2017 14:25:32.486] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkAccepted
[info] [INFO] [07/12/2017 14:25:32.486] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkAccepted
....
....
[info] [INFO] [07/12/2017 14:25:32.532] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkStarted
[info] [INFO] [07/12/2017 14:25:32.532] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkCompleted
[info] [INFO] [07/12/2017 14:25:32.533] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkAccepted
[info] [INFO] [07/12/2017 14:25:32.533] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkStarted
[info] [INFO] [07/12/2017 14:25:32.533] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkCompleted
<<<
<<<--- The new master singleton actor starts resuming live operation
<<<
[info] [INFO] [07/12/2017 14:25:32.863] [ClusterSystem-akka.actor.default-dispatcher-40] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for security-alarm-1013 : Work Id f2a1e777-1e35-4b79-9acb-87477358adc1
[info] [WARN] [SECURITY][07/12/2017 14:25:32.874] [ClusterSystem-akka.persistence.dispatchers.default-plugin-dispatcher-26] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [akkaiot.WorkQueue$WorkAccepted] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
[info] [WARN] [SECURITY][07/12/2017 14:25:32.908] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [akkaiot.Master$Ack] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
[info] [INFO] [07/12/2017 14:25:33.694] [ClusterSystem-akka.actor.default-dispatcher-40] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for security-alarm-1001 : Work Id cfe1ecb3-683d-46f4-ab7e-d8c66fa88755
[info] [INFO] [07/12/2017 14:25:33.851] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for lamp-1002 : Work Id 469f7148-a6f8-43e6-aa2e-764ac911c251
[info] [INFO] [07/12/2017 14:25:34.014] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1015 : Work Id 92cb01ea-bb90-482c-b20b-accd4770f21a
[info] [INFO] [07/12/2017 14:25:34.172] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1005 : Work Id 73ccc2c1-ceb7-4923-b705-8fe82c666bc5
[info] [INFO] [07/12/2017 14:25:34.614] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1012 : Work Id 711b212c-c53d-4f41-a9fa-e8f5ad44ad55
[info] [INFO] [07/12/2017 14:25:35.040] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for lamp-1009 : Work Id 435264de-5891-4ee9-a7f8-352f7c0abc11
[info] [INFO] [07/12/2017 14:25:35.533] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1008 : Work Id 2873a66e-b04f-40d0-be49-0500449f54f4
[info] [INFO] [07/12/2017 14:25:35.692] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1020 : Work Id eefc57d2-8571-4820-8a6e-a7b083fdbaca
[info] [INFO] [07/12/2017 14:25:35.849] [ClusterSystem-akka.actor.default-dispatcher-37] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for lamp-1007 : Work Id bdcb4f22-4ca9-4b76-9f43-68a1168637cf
[info] [INFO] [07/12/2017 14:25:36.009] [ClusterSystem-akka.actor.default-dispatcher-37] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for security-alarm-1006 : Work Id 01999269-3608-4877-85ef-6c75f39f7db9
[info] [INFO] [07/12/2017 14:25:37.024] [ClusterSystem-akka.actor.default-dispatcher-37] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1004 : Work Id ccbc9edb-7a60-4f65-8121-261928f01ff2
....
....

Scaling for production

The Actor model is well suited for building scalable distributed systems. While the application has an underlying architecture that emphasizes on scalability, it would require further effort in the following areas to make it production ready:

  • IotManager uses the ‘ask’ method for message receipt confirmation via a Future return by the Master. If business logic allows, using the fire-and-forget ‘tell’ method will be significantly more efficient especially at scale.
  • The MQTT broker used in the application is a test broker provided by Mosquitto. A production version of the broker should be installed preferably local to the the IoT system. MQTT brokers from other vendors like HiveMQ, RabbitMQ are also available.
  • As displayed in the console log when running the application, Akka’s default Java serializer isn’t best known for its efficiency. Other serializers such as Kryo, Protocol Buffers should be considered.
  • The Redis data store for actor state persistence should be configured for production environment

Further code changes to be considered

A couple of changes to the current application might be worth considering:

Device types are currently represented as strings, and code logic for device type-specific states and settings is repeated during instantiation of devices and processing of work requests. Such logic could be encapsulated within classes defined for individual device types. The payload would probably be larger as a consequence, but it might be worth for better code maintainability especially if there are many device types.

Another change to be considered is that Work and WorkResult could be generalized into a single class. Conversely, they could be further differentiated in accordance with specific business needs. A slightly more extensive change would be to retire ResultProcessor altogether and let Worker actors process WorkResult as well.

State mutation in Akka Actors

In this application, a few actors maintain mutable internal states using private variables (private var):

  • Master
  • IotManager
  • Device

As an actor by-design will never be accessed by multiple threads, it’s generally safe enough to use ‘private var’ to store changed states. But if one prefers state transitioning (as opposed to updating), Akka Actors provides a method to hot-swap an actor’s internal state.

Hot-swapping an actor’s state

Below is a sample snippet that illustrates how hot-swapping mimics a state machine without having to use any mutable variable for maintaining the actor state:

// An Actor that mutates internal state by hot-swapping
object Worker {
  def props(workerId, String, clusterClient: ActorRef) = // ...

  // ...
}

class Worker(workerId, String, clusterClient: ActorRef) extends Actor {
  import Worker._

  override def preStart(): Unit = // Initialize worker
  override def postStop(): Unit = // Terminate worker

  def sendToMaster(msg: Any): Unit = {
    clusterClient ! SendToAll("/user/master/singleton", msg)
  }

  // ...

  def receive = idle

  def idle: Receive = {
    case WorkIsReady =>
      // Tell Master it is free
      sendToMaster(WorkerIsFree(workerId))

    case Work =>
      // Send work to a work processing actor
      workProcessor ! work
      context.become(working)
  }

  def working: Receive = {
    case WorkProcessed(workResult) =>
      // Tell Master work is done
      sendToMaster(WorkIsDone(workerId, workResult))
      context.become(idle)

    case Work =>
      // Tell Master it is busy
      sendToMaster(WorkerIsBusy(workerId))
  }
}

Simplified for illustration, the above snippet depicts a Worker actor that pulls work from the Master cluster. The context.become method allows the actor to switch its internal state at run-time like a state machine. As shown in the simplified code, it takes an ‘Actor.Receive’ (which is a partial function) that implements a new message handler. Under the hood, Akka manages the hot-swapping via a stack. As a side note, according to the relevant source code, the stack for hot-swapping actor behavior is, ironically, a mutable ‘private var’ of List[Actor.Receive].

Recursive transformation of immutable parameter

Another functional approach to mutating actor state is via recursive transformation of an immutable parameter. As an example, we can avoid using a mutable ‘private var registry’ as shown in the following ActorManager actor and use ‘context.become’ to recursively transform a registry as an immutable parameter passed to the updateState method:

// Approach #1: Store map entries in a mutable registry
class ActorManager extends Actor {
  private var registry: Map.empty[String, ActorRef]
  def receive = {
    case Add(id, ref) =>
      registry += id -> ref
    // Other cases
  }
}
// Approach #2: Store map entries in a recursively transformed immutable registry
class ActorManager extends Actor {
  def receive = updateRegistry(Map.empty[String, ActorRef])
  def updateState(registry: Map[String, ActorRef]): Receive = {
    case Add(id, ref) =>
      context.become(updateState(registry + (id -> ref)))
    // Other cases
  }
}

Text Mining With Akka Streams

Reactive Systems, whose core characteristics are declared in the Reactive Manifesto, have started to emerge in recent years as message-driven systems that emphasize scalability, responsiveness and resilience. It’s pretty clear from the requirements that a system can’t be simply made Reactive. Rather, it should be built from the architectural level to be Reactive.

Akka’s actor systems, which rely on asynchronous message-passing among lightweight loosely-coupled actors, serve a great run-time platform for building Reactive Systems on the JVM (Java Virtual Machine). I posted a few blogs along with sample code about Akka actors in the past. This time I’m going to talk about something different but closely related.

Reactive Streams

While bearing a similar name, Reactive Streams is a separate initiative that mandates its implementations to be capable of processing stream data asynchronously and at the same time automatically regulating the stream flows in a non-blocking fashion.

Akka Streams, built on top of Akka actor systems, is an implementation of Reactive Streams. Equipped with the back-pressure functionality, it eliminates the need of manually buffering stream flows or custom-building stream buffering mechanism to avoid buffer overflow problems.

Extracting n-grams from text

In text mining, n-grams are useful data in the area of NLP (natural language processing). In this blog post, I’ll illustrate extracting n-grams from a stream of text messages using Akka Streams with Scala as the programming language.

First thing first, let’s create an object with methods for generating random text content:

Source code: TextMessage.scala

Some minimal effort has been made to generate random clauses of likely pronounceable fake words along with punctuations. To make it a little more flexible, lengths of individual words and clauses would be supplied as parameters.

Next, create another object with text processing methods responsible for extracting n-grams from input text, with n being an input parameter. Using Scala’s sliding(size, step) iterator method with size n and step default to 1, a new iterator of sliding window view is generated to produce the wanted n-grams.

Source code: TextProcessor.scala

Now that the text processing tools are in place, we can focus on building the main streaming application in which Akka Streams plays the key role.

First, make sure we have the necessary library dependencies included in build.sbt:

Source code: build.sbt

As Akka Streams is relatively new development work, more recent Akka versions (2.4.9 or higher) should be used.

Let’s start with a simple stream for this text mining application:

Source code: NgramStream_v01.scala

As shown in the source code, constructing a simple stream like this is just defining and chaining together the text-generating source, the text-processing flow and the text-display sink as follows:

    val textSource: Source[String, NotUsed] =
      Source((1 to numMessages).map(_ =>
        TextMessage.genRandText(minWordsInText: Int, maxWordsInText: Int))
      )

    def ngramFlow(n: Int): Flow[String, String, NotUsed] =
      Flow[String].map(text => TextProcessor.genNgrams(text, n))

    val textSink = Sink.foreach[String](println)

    textSource.via(ngramFlow(nVal)).runWith(textSink).
      onComplete {
        ...
      }

Graph DSL

Akka Streams provides a Graph DSL (domain-specific language) that helps build the topology of stream flows using predefined fan-in/fan-out functions.

What Graph DSL does is somewhat similar to how Apache Storm‘s TopologyBuilder pieces together its spouts (i.e. stream sources), bolts (i.e. stream processors) and stream grouping/partitioning functions, as illustrated in a previous blog about HBase streaming.

Back-pressure

Now, let’s branch off the stream using Graph DSL to illustrate how the integral back-pressure feature is at play.

Source code: NgramStream_v02.scala

Streaming to a file should be significantly slower than streaming to the console. To make the difference more noticeable, a delay is deliberately added to streaming each line of text in the file sink.

Running the application and you will notice that the console display is slowed down. It’s the result of the upstream data flow being regulated to accommodate the relatively slow file I/O outlet even though the other console outlet is able to consume relatively faster – all that being conducted in a non-blocking fashion.

Graph DSL create() methods

To build a streaming topology using Graph DSL, you’ll need to use one of the create() methods defined within trait GraphApply, which is extended by object GraphDSL. Here are the signatures of the create() methods:

trait GraphApply {
  def create[S <: Shape]()(buildBlock: GraphDSL.Builder[NotUsed] => S): Graph[S, NotUsed] = {
    val builder = new GraphDSL.Builder
    val s = buildBlock(builder)
    val mod = builder.module.replaceShape(s)
    new GraphApply.GraphImpl(s, mod)
  }
  ...

  [2..#
  def create[S <: Shape, Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) => Mat)(
    buildBlock: GraphDSL.Builder[Mat] => ([#g1.Shape#]) => S): Graph[S, Mat] = {
    val builder = new GraphDSL.Builder
    val curried = combineMat.curried
    val s##1 = builder.add(g##1, (m##1: M##1) => curried(m##1))
    [2..#val s1 = builder.add(g1, (f: M1 => Any, m1: M1) => f(m1))#
    ]
    val s = buildBlock(builder)([#s1#])
    val mod = builder.module.replaceShape(s)
    new GraphApply.GraphImpl(s, mod)
  }#
  ]
}

Note that the sbt-boilerplate template language is needed to interpret the create() method being used in the application that takes multiple stream components as input parameters.

Materialized values

In Akka Streams, materializing a constructed stream is the step of actually running the stream with the necessary resources. To run the stream, the implicitly passed factory method ActorMaterializer() is required to allocate the resources for stream execution. That includes starting up the underlying Akka actors to process the stream.

Every processing stage of the stream can produce a materialized value. By default, using the via(flow) and to(sink) functions, the materialized value of the left-most stage will be preserved. As in the following example, for graph1, the materialized value of the source is preserved:

    val source: Source[String, Future[String]] = ...
    val flow: Flow[String, String, Future[Int]] = ...
    val sink: Sink[String, Future[IOResult]] = ...

    val graph1: RunnableGraph[Future[String]] = source.via(flow).to(sink)
    val graph2: RunnableGraph[Future[Int]] = source.viaMat(flow)(Keep.right).to(sink)
    val graph3: RunnableGraph[(Future[String], Future[IOResult])] = source.via(flow).toMat(sink)(Keep.both)

To allow one to selectively capture the materialized values of the specific stream components, Akka Streams provides functions viaMat(flow) and toMat(sink) along with a combiner function, Keep. As shown in the above example, for graph2, the materialized value of the flow is preserved, whereas for graph3, materialized values for both the flow and sink are preserved.

Back to our fileSink function as listed below, toMat(fileIOSink)(Keep.right) instructs Akka Streams to keep the materialized value of the fileIOSink as a Future value of type IOResult:

    def fileSink(filename: String): Sink[String, Future[IOResult]] =
      Flow[String].
        map(line => { Thread.sleep(500); ByteString(line + "\n") }).
        toMat(FileIO.toPath(Paths.get(filename)))(Keep.right)
        // sleep() delay added to illustrate back-pressure

Using Graph DSL, as seen earlier in the signature of the create() method, one can select what materialized value is to be preserved by specifying the associated stream components accordingly as the curried parameters:

    ([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) => Mat)

In our case, we want the materialized value of fileSink, thus the curried parameters should look like this:

    (consoleSink, fileSink(filename))((_, file) => file)

Defining the stream graph

Akka Streams provides a number of functions for fan-out (e.g. Broadcast, Balance) and fan-in (e.g. Merge, Concat). In our example, we want a simple topology with a single text source and the same n-gram generator flow branching off to two sinks in parallel:

          val bcast = builder.add(Broadcast[String](2))
          textSource ~> ngramFlow(nVal) ~> bcast.in
          bcast.out(0) ~> console
          bcast.out(1) ~> file

Adding a message counter

Let’s further expand our n-gram extraction application to include displaying a count. A simple count-flow is created to map each message string into numeric 1, and a count-sink to sum up all these 1’s streamed to the sink. Adding them as the third flow and sink to the existing stream topology yields something similar to the following:

          val bcast = builder.add(Broadcast[String](3))
          val ngFlow = ngramFlow(nVal)
          textSource ~> bcast.in
          bcast.out(0) ~> ngFlow ~> console
          bcast.out(1) ~> ngFlow ~> file
          bcast.out(2) ~> countFlow ~> count

Source code: NgramStream_v03.scala

Full source code of the application is at GitHub.

Final thoughts

Having used Apache Storm, I see it a rather different beast compared with Akka Streams. A full comparison between the two would obviously be an extensive exercise by itself, but it suffices to say that both are great platforms for streaming applications.

Perhaps one of the biggest differences between the two is that Storm provides granular message delivery options (at most / at least / exactly once, guaranteed message delivery) whereas Akka Streams by design questions the premise of reliable messaging on distributed systems. For instance, if guaranteed message delivery is a requirement, Akka Streams would probably not be the best choice.

Back-pressure has recently been added to Storm’s v.1.0.x built-in feature list, so there is indeed some flavor of reactiveness in it. Aside from message delivery options, choosing between the two technologies might be a decision basing more on other factors such as engineering staff’s expertise, concurrency model preference, etc.

Outside of the turf of typical streaming systems, Akka Streams also plays a key role as the underlying platform for an emerging service stack. Viewed as the next-generation of Spray.io, Akka HTTP is built on top of Akka Streams. Designed for building HTTP-based integration layers, Akka HTTP provides versatile streaming-oriented HTTP routing and request/response transformation mechanism. Under the hood, Akka Streams’ back-pressure functionality regulates data streaming between the server and the remote client, consequently conserving memory utilization on the server.