Scala On Spark – Streak

This is yet another programming example in my Scala-on-Spark blog series. Again, while it starts with the same minuscule weather data used in previous examples of the blog series, it can be viewed as an independent programming exercise.

In this example, we’re going to create a table that shows the streaks of consecutive months with non-zero precipitation.

Result should be similar to the following:

We’ll explore using Spark’s window functions in this example. As a side note, some of the previous examples in the blog series could be resolved using window functions as well. By means of aggregating over partitioned sliding windows of data, Spark’s window functions readily perform certain kinds of complex aggregations which would otherwise require repetitive nested groupings. They are similar to how PostgreSQL’s window functions work.

Now, let’s load up the same old minuscule weather data.

First, create a DataFrame of precipitation by weather station and month and filter it to consist of only months with positive precipitation..

Next, using window function, we capture sequences of row numbers ordered by month over partitions by weather station. For each row, we then use an UDF to calculate the base date by dating back from the corresponding month of the row in accordance with the row number. As shown in the following table, these base dates help trace chunks of contiguous months back to their common base dates.

Finally, we apply another row-number window function, but this time, over partitions by weather station as well as base date. This partitioning allows contiguous common base dates to generate new row numbers as the wanted streaks.

Using the same logic flow, we can also generate similar streak reports for temperature high/low (e.g. streak of temperature high above 75F). I’ll leave that as exercise for the readers.

Scala On Spark – Sum Over Periods

This is another programming example in my Scala-on-Spark blog series. While it uses the same minuscule weather data created in the first example of the blog series, it can be viewed as an independent programming exercise.

In this example, we want a table of total precipitation over custom past periods by weather stations. The specific periods in this example are the previous month, previous 3 months, and all previous months. We have data from July through December, and let’s say it’s now January hence the previous month is December.

The result should be like this:

User-defined functions (UDF) will be used in this example. Spark’s UDF supplements its API by allowing the vast library of Scala (or any of the other supported languages) functions to be used. That said, a method from Spark’s API should be picked over an UDF of same functionality as the former would likely perform more optimally.

First, let’s load up the said weather data.

We first create a DataFrame of precipitation by weather station and month, each with the number of months that lag the current month.

Next, we combine the list of months-lagged with monthly precipitation by means of a UDF to create a map column. To do that, we use Scala’s zip method within the UDF to create a list of tuples from the two input lists and convert the resulting list into a map.

Note that the map content might look different depending on when it is generated, as the months-lagged is relative to the current month when the application is run.

Using another UDF to sum precipitation counting backward from the previous months based on the number of months lagged, we create the result DataFrame.

Again, note that the months-lagged is relative to the current month when the application is executed, hence the months-lagged parameters for the aggMapValues UDF should be adjusted accordingly.

We can use similar approach to come up with a table for temperature high/low over the custom periods. Below are the steps for creating the result table for temperature high.

I’ll leave creating the temperature low result table as a programming exercise for the readers. Note that rather than calculating temperature high and low separately, one could aggregate both of them together in some of the steps with little code change. For those who are up for a slightly more challenging exercise, both temperature high and low data can in fact be transformed together in every step of the way.

Scala On Spark – Cumulative Pivot Sum

In a couple of recent R&D projects, I was using Apache Spark rather extensively to address some data processing needs on Hadoop clusters. Although there is an abundance of big data processing platforms these days, it didn’t take long for me to settle on Spark. One of the main reasons is that the programming language for the R&D is Scala, which is what Spark itself is written in. In particular, Spark’s inherent support for functional programming and compositional transformations on immutable data enables high performance at scale as well as readability. Other main reasons are very much in line with some of the key factors attributing to Spark’s rising popularity.

I’m starting a mini blog series on Scala-on-Spark (SoS) with each blog post demonstrating with some Scala programming example on Apache Spark. In the blog series, I’m going to illustrate how the functionality-rich SoS is able to resolve some non-trivial data processing problems with seemingly little effort. If nothing else, they are good brain-teasing programming exercise in Scala on Spark.

As the source data for the example, let’s consider a minuscule set of weather data stored in a DataFrame, which consists of the following columns:

  • Weather Station ID
  • Start Date of a half-month period
  • Temperature High (in Fahrenheit) over the period
  • Temperature Low (in Fahrenheit) over the period
  • Total Precipitation (in inches) over the period

Note that with a properly configured Spark cluster, the methods illustrated in the following example can be readily adapted to handle much more granular data at scale – e.g. down to sub-hourly weather data from tens of thousands of weather stations. It’s also worth mentioning that there can be other ways to solve the problems presented in the examples.

For illustration purpose, the following code snippets are executed on a Spark Shell. First thing is to generate a DataFrame with the said columns of sample data, which will be used as source data for this example and a couple following ones.

In this first example, the goal is to generate a table of cumulative precipitation by weather stations in month-by-month columns. By ‘cumulative sum’, it means the monthly precipitation will be cumulated from one month over to the next one (i.e. rolling sum). In other words, if July’s precipitation is 2 inches and August’s is 1 inch, the figure for August will be 3 inches. The result should look like the following table:

First, we transform the original DataFrame to include an additional year-month column, followed by using Spark’s groupBy, pivot and agg methods to generate the pivot table.

Next, we assemble a list of the year-month columns and traverse the list using method foldLeft, which is one of the most versatile Scala functions for custom iterative transformations. In this particular case, the data to be transformed by foldLeft is a tuple of (DataFrame, Double). Normally, transforming the DataFrame alone should suffice, but in this case we need an additional value to address to rolling cumulation requirement.

The tuple’s first DataFrame-type element, with monthlyPrecipDF as its initial value, will be transformed using the binary operator function specified as foldLeft’s second argument (i.e. (acc, c) => …). As for the tuple’s second Double-type element, with the first year-month as its initial value it’s for carrying the current month value over to the next iteration. The end result is a (DataFrame, Double) tuple successively transformed month-by-month.

Similar pivot aggregations can be applied to temperature high’s/low’s as well, with method sum replaced with method max/min.

Finally, we compute cumulative temperature high/low like cumulative precipitation, by replacing method sum with iterative max/min using Spark’s when-otherwise method.

Scala IoT Systems With Akka Actors II

In a previous blog post, I assembled a Scala application simplified from an IoT prototype using Akka Actors and MQTT to illustrate how an IoT system fits into the selected tech stack. The stripped-down application uses a single actor to simulate requests from a bunch of IoT devices.

In this post, I would like to share an expanded version of the previous application that uses loosely-coupled lightweight actors to simulate individual IoT devices, each of which maintains its own internal state and handles bidirectional communications via non-blocking message passing. Using a distributed workers system adapted from a Lightbend template along with a persistence journal, the end product is an IoT system equipped with a scalable fault-tolerant data processing system.

Main components

Below is a diagram and a summary of the revised Scala application which consists of 3 main components:

IoT with MQTT and Akka Actor Systems v.2

1. IoT

  • An IotManager actor which:
    • instantiates a specified number of devices upon start-up
    • subscribes to a MQTT pub-sub topic for the work requests
    • sends received work requests via ClusterClient to the master cluster
    • notifies Device actors upon receiving failure messages from Master actor
    • forwards work results to the corresponding devices upon receiving them from ResultProcessor
  • Device actors each of which:
    • simulates a thermostat, lamp, or security alarm with random initial state and setting
    • maintains and updates internal state and setting upon receiving work results from IotManager
    • generates work requests and publishes them to the MQTT pub-sub topic
    • re-publishes requests upon receiving failure messages from IotManager
  • A MQTT pub-sub broker and a MQTT client for communicating with the broker
  • A configuration helper object, MqttConfig, consisting of:
    • MQTT pub-sub topic
    • URL for the MQTT broker
    • serialization methods to convert objects to byte arrays, and vice versa

2. Master Cluster

  • A fault-tolerant decentralized cluster which:
    • manages a singleton actor instance among the cluster nodes (with a specified role)
    • delegates ClusterClientReceptionist on every node to answer external connection requests
    • provides fail-over of the singleton actor to the next-oldest node in the cluster
  • A Master singleton actor which:
    • registers Workers and distributes work to available Workers
    • acknowledges work request reception with IotManager
    • publishes work results from Workers to ‘work-results’ topic via Akka distributed pub-sub
    • maintains work states using persistence journal
  • A ResultProcessor actor in the master cluster which:
    • gets instantiated upon starting up the IoT system (more on this below)
    • consumes work results by subscribing to the ‘work-results’ topic
    • sends work results received from Master to IotManager

3. Workers

  • An actor system of Workers each of which:
    • communicates via ClusterClient with the master cluster
    • registers with, pulls work from the Master actor
    • reports work status with the Master actor
    • instantiates a WorkProcessor actor to perform the actual work
  • WorkProcessor actors each of which:
    • processes the work requests from its parent Worker
    • generates work results and send back to Worker

Master-worker system with a ‘pull’ model

While significant changes have been made to the IoT actor system, much of the setup for the Master/Worker actor systems and MQTT pub-sub messaging remains largely unchanged from the previous version:

  • As separate independent actor systems, both the IoT and Worker systems communicate with the Master cluster via ClusterClient.
  • Using a ‘pull’ model which generally performs better at scale, the Worker actors register with the Master cluster and pull work when available.
  • Paho-Akka is used as the MQTT pub-sub messaging client.
  • A helper object, MqttConfig, encapsulates a MQTT pub-sub topic and broker information along with serialization methods to handle MQTT messaging using a test Mosquitto broker.

What’s new?

Now, let’s look at the major changes in the revised application:

First of all, Lightbend’s Activator has been retired and Sbt is being used instead.

On persisting actors state, a Redis data store is used as the persistence journal. In the previous version the shared LevelDB journal is coupled with the first seed node which becomes a single point of failure. With the Redis persistence journal decoupled from a specific cluster node, fault tolerance steps up a notch.

As mentioned earlier in the post, one of the key changes to the previous application is the using of actors representing individual IoT devices each with its own state and capability of communicating with entities designated for interfacing with external actor systems. Actors, lightweight and loosely-coupled by design, serve as an excellent vehicle for modeling individual IoT devices. In addition, non-blocking message passing among actors provides an efficient and economical means for communication and logic control of the device state.

The IotManager actor is responsible for creating and managing a specified number of Device actors. Upon startup, the IoT manager instantiates individual Device actors of random device type (thermostat, lamp or security alarm). These devices are maintained in an internal registry regularly updated by the IoT manager.

Each of the Device actors starts up with a random state and setting. For instance, a thermostat device may start with an ON state and a temperature setting of 68F whereas a lamp device might have an initial state of OFF and brightness setting of 2. Once instantiated, a Device actor will maintain its internal operational state and setting from then on and will report and update the state and setting per request.

Work and WorkResult

In this application, a Work object represents a request sent by a specific Device actor and carries the Device’s Id and its current state and setting data. A WorkResult object, on the other hand, represents a returned request for the Device actor to update its state and setting stored within the object.

Responsible for processing the WorkResult generated by the Worker actors, the ResultProcessor actor simulates the processing of work result – in this case it simply sends via the actorSelection method the work result back to the original Device actor through IotManager. Interacting with only the Master cluster system as a cluster client, the Worker actors have no knowledge of the ResultProcessor actor. ResultProcessor receives the work result through subscribing to the Akka distributed pub-sub topic which the Master is the publisher.

While a participant of the Master cluster actor system, the ResultProcessor actor gets instantiated when the IoT actor system starts up. The decoupling of ResultProcessor instantiation from the Master cluster ensures that no excessive ResultProcessor instances get started when multiple Master cluster nodes start up.

Test running the application

Complete source code of the application is available at GitHub.

To run the application on a single JVM, just git-clone the repo, run the following command at a command line terminal and observe the console output:

The optional NumOfDevices parameter defaults to 20.

To run the application on separate JVMs, git-clone the repo to a local disk, open up separate command line terminals and launch the different components on separate terminals:

Sample console log

Below is filtered console log output from the console tracing the evolving state and setting of a thermostat device:

The following annotated console log showcases fault-tolerance of the master cluster – how it fails over to the 2nd node upon detecting that the 1st node crashes:

Scaling for production

While the application has an underlying architecture that emphasizes on scalability, it would require further effort in the following areas to make it production ready:

  • IotManager uses the ‘ask’ method for message receipt confirmation via a Future return by the Master. If business logic allows, using the fire-and-forget ‘tell’ method will be significantly more efficient especially at scale.
  • The MQTT broker used in the application is a test broker provided by Mosquitto. A production version of the broker should be installed preferably local to the the IoT system. MQTT brokers from other vendors like HiveMQ, RabbitMQ are also available.
  • As displayed in the console log when running the application, Akka’s default Java serializer isn’t best known for its efficiency. Other serializers such as Kryo, Protocol Buffers should be considered.
  • The Redis data store for actor state persistence should be configured for production environment

Further code changes to be considered

A couple of changes to the current application might be worth considering:

Device types are currently represented as strings, and code logic for device type-specific states and settings is repeated during instantiation of devices and processing of work requests. Such logic could be encapsulated within classes defined for individual device types. The payload would probably be larger as a consequence, but it might be worth for better code maintainability especially if there are many device types.

Another change to be considered is that Work and WorkResult could be generalized into a single class. Conversely, they could be further differentiated in accordance with specific business needs. A slightly more extensive change would be to retire ResultProcessor altogether and let Worker actors process WorkResult as well.

State mutation in Akka Actors

In this application, a few actors maintain mutable internal states using private variables (private var):

  • Master
  • IotManager
  • Device

As an actor by-design will never be accessed by multiple threads, it’s generally safe enough to use ‘private var’ to store changed states. But if one prefers updating states in the way like a state machine, Akka Actors provides a method to hot-swap an actor’s internal state.

Hot-swapping like state machine

Below is a sample snippet that illustrates how hot-swapping mimics a state machine without having to use any mutable variable for maintaining the actor state:

Simplified for illustration, the above snippet depicts a Worker actor that pulls work from the Master cluster. The context.become method allows the actor to switch its internal state at run-time like a state machine. As shown in the simplified code, it takes an ‘Actor.Receive’ (which is a partial function) that implements a new message handler. Under the hood, Akka manages the hot-swapping via a stack. As a side note, according to the relevant source code, the stack for hot-swapping actor behavior is, ironically, a mutable ‘private var’ of List[Actor.Receive].

Recursive transformation of immutable parameter

Another functional approach to mutating actor state is via recursive transformation of an immutable parameter. As an example, we can avoid using a mutable ‘private var registry’ as shown in the following ActorManager actor and use ‘context.become’ to recursively transform a registry as an immutable parameter passed to be updateState method: