Tag Archives: scala

Scala IoT Systems With Akka Actors II

In a previous blog post, I assembled a Scala application simplified from an IoT prototype using Akka Actors and MQTT to illustrate how an IoT system fits into the selected tech stack. The stripped-down application uses an actor to simulate devices requests.

In this post, I would like to expand the previous application a little to showcase how the loosely-coupled lightweight Actors serve as individual IoT devices, each of which maintains its internal state and handles bidirectional communications via non-blocking message passing. Using a distributed workers system adapted from a Lightbend template along with a persistence journal, the end product is an IoT system equipped with a scalable fault-tolerant data processing system.

Main components

Below is a diagram and a summary of the revised Scala application which consists of 3 main components:

IoT with MQTT and Akka Actor Systems v.2

1. IoT

  • An IotManager actor which:
    • instantiates a specified number of devices upon start-up
    • subscribes to a MQTT pub-sub topic for the work requests
    • sends received work requests via ClusterClient to the master cluster
    • notifies Device actors upon receiving failure messages from Master actor
    • forwards work results to the corresponding devices upon receiving them from ResultProcessor
  • Device actors each of which:
    • simulates a thermostat, lamp, or security alarm with random initial state and setting
    • maintains and updates internal state and setting upon receiving work results from IotManager
    • generates work requests and publishes them to the MQTT pub-sub topic
    • re-publishes requests upon receiving failure messages from IotManager
  • A MQTT pub-sub broker and a MQTT client for communicating with the broker
  • A configuration helper object, MqttConfig, consisting of:
    • MQTT pub-sub topic
    • URL for the MQTT broker
    • serialization methods to convert objects to byte arrays, and vice versa

2. Master Cluster

  • A fault-tolerant decentralized cluster which:
    • manages a singleton actor instance among the cluster nodes (with a specified role)
    • delegates ClusterClientReceptionist on every node to answer external connection requests
    • provides fail-over of the singleton actor to the next-oldest node in the cluster
  • A Master singleton actor which:
    • registers Workers and distributes work to available Workers
    • acknowledges work request reception with IotManager
    • publishes work results from Workers to ‘work-results’ topic via Akka distributed pub-sub
    • maintains work states using persistence journal
  • A ResultProcessor actor in the master cluster which:
    • gets instantiated upon starting up the IoT system (more on this below)
    • consumes work results by subscribing to the ‘work-results’ topic
    • sends work results received from Master to IotManager

3. Workers

  • An actor system of Workers each of which:
    • communicates via ClusterClient with the master cluster
    • registers with, pulls work from the Master actor
    • reports work status with the Master actor
    • instantiates a WorkProcessor actor to perform the actual work
  • WorkProcessor actors each of which:
    • processes the work requests from its parent Worker
    • generates work results and send back to Worker

Master-worker system with a ‘pull’ model

While significant changes have been made to the IoT actor system, much of the setup for the Master/Worker actor systems and MQTT pub-sub messaging remains largely unchanged from the previous version:

  • As separate independent actor systems, both the IoT and Worker systems communicate with the Master cluster via ClusterClient.
  • Using a ‘pull’ model which generally performs better at scale, the Worker actors register with the Master cluster and pull work when available.
  • Paho-Akka is used as the MQTT pub-sub messaging client.
  • A helper object, MqttConfig, encapsulates a MQTT pub-sub topic and broker information along with serialization methods to handle MQTT messaging using a test Mosquitto broker.

What’s new?

Now, let’s look at the major changes in the revised application:

First of all, Lightbend’s Activator has been retired and Sbt is being used instead.

On persisting actors state, a Redis data store is used as the persistence journal. In the previous version the shared LevelDB journal is coupled with the first seed node which becomes a single point of failure. With the Redis persistence journal decoupled from a specific cluster node, fault tolerance steps up a notch.

As mentioned earlier in the post, one of the key changes to the previous application is the using of actors representing individual IoT devices each with its own state and capability of communicating with entities designated for interfacing with external actor systems. Actors, lightweight and loosely-coupled by design, serve as an excellent vehicle for modeling individual IoT devices. In addition, non-blocking message passing among actors provides an efficient and economical means for communication and logic control of the device state.

The IotManager actor is responsible for creating and managing a specified number of Device actors. Upon startup, the IoT manager instantiates individual Device actors of random device type (thermostat, lamp or security alarm). These devices are maintained in an internal registry regularly updated by the IoT manager.

Each of the Device actors starts up with a random state and setting. For instance, a thermostat device may start with an ON state and a temperature setting of 68F whereas a lamp device might have an initial state of OFF and brightness setting of 2. Once instantiated, a Device actor will maintain its internal operational state and setting from then on and will report and update the state and setting per request.

Work and WorkResult

In this application, a Work object represents a request sent by a specific Device actor and carries the Device’s Id and its current state and setting data. A WorkResult object, on the other hand, represents a returned request for the Device actor to update its state and setting stored within the object.

Responsible for processing the WorkResult generated by the Worker actors, the ResultProcessor actor simulates the processing of work result – in this case it simply sends via the actorSelection method the work result back to the original Device actor through IotManager. Interacting with only the Master cluster system as a cluster client, the Worker actors have no knowledge of the ResultProcessor actor. ResultProcessor receives the work result through subscribing to the Akka distributed pub-sub topic which the Master is the publisher.

While a participant of the Master cluster actor system, the ResultProcessor actor gets instantiated when the IoT actor system starts up. The decoupling of ResultProcessor instantiation from the Master cluster ensures that no excessive ResultProcessor instances get started when multiple Master cluster nodes start up.

Test running the application

Complete source code of the application is available at GitHub.

To run the application on a single JVM, just git-clone the repo, run the following command at a command line terminal and observe the console output:

The optional NumOfDevices parameter defaults to 20.

To run the application on separate JVMs, git-clone the repo to a local disk, open up separate command line terminals and launch the different components on separate terminals:

Sample console log

Below is filtered console log output from the console tracing the evolving state and setting of a thermostat device:

The following annotated console log showcases fault-tolerance of the master cluster – how it fails over to the 2nd node upon detecting that the 1st node crashes:

Scaling for production

While the application has an underlying architecture that emphasizes on scalability, it would require further effort in the following areas to make it production ready:

  • IotManager uses the ‘ask’ method for message receipt confirmation via a Future return by the Master. If business logic allows, using the fire-and-forget ‘tell’ method will be significantly more efficient especially at scale.
  • The MQTT broker used in the application is a test broker provided by Mosquitto. A production version of the broker should be installed preferably local to the the IoT system. MQTT brokers from other vendors like HiveMQ, RabbitMQ are also available.
  • As displayed in the console log when running the application, Akka’s default Java serializer isn’t best known for its efficiency. Other serializers such as Kryo, Protocol Buffers should be considered.
  • The Redis data store for actor state persistence should be configured for production environment

Further code changes to be considered

A couple of changes to the current application might be worth considering:

Device types are currently represented as strings, and code logic for device type-specific states and settings is repeated during instantiation of devices and processing of work requests. Such logic could be encapsulated within classes defined for individual device types. The payload would probably be larger as a consequence, but it might be worth for better code maintainability especially if there are many device types.

Another change to be considered is that Work and WorkResult could be generalized into a single class. Conversely, they could be further differentiated in accordance with specific business needs. A slightly more extensive change would be to retire ResultProcessor altogether and let Worker actors process WorkResult as well.

State mutation in Akka Actors

In this application, a few actors maintain mutable internal states using private variables (private var):

  • Master
  • IotManager
  • Device

As an actor by design will never be accessed by multiple threads, it’s generally safe enough to use ‘private var’ to store changed states. But if one prefers updating states in the way like a state machine, Akka Actors provides a ‘context.become‘ method to hot-swap message loop within an actor.

As an example, we can avoid using a mutable ‘private var registry’ as shown in the following ActorManager actor and use ‘context.become’ to recursively transform a registry as an immutable parameter passed to be updateState method:

Under the hood, Akka Actors maintains the hot-swapped code using a stack. Ironically though, digging into the relevant source code, the stack for hot-swapping actor behavior is in fact a mutable ‘private var’ of List[Actor.Receive].

Text Mining With Akka Streams

Reactive Systems, whose core characteristics are declared in the Reactive Manifesto, have started to emerge in recent years as message-driven systems that emphasize scalability, responsiveness and resilience. It’s pretty clear from the requirements that a system can’t be simply made Reactive. Rather, it should be built from the architectural level to be Reactive.

Akka’s actor systems, which rely on asynchronous message-passing among lightweight loosely-coupled actors, serve a great run-time platform for building Reactive Systems on the JVM (Java Virtual Machine). I posted a few blogs along with sample code about Akka actors in the past. This time I’m going to talk about something different but closely related.

Reactive Streams

While bearing a similar name, Reactive Streams is a separate initiative that mandates its implementations to be capable of processing stream data asynchronously and at the same time automatically regulating the stream flows in a non-blocking fashion.

Akka Streams, built on top of Akka actor systems, is an implementation of Reactive Streams. Equipped with the back-pressure functionality, it eliminates the need of manually buffering stream flows or custom-building stream buffering mechanism to avoid buffer overflow problems.

Extracting n-grams from text

In text mining, n-grams are useful data in the area of NLP (natural language processing). In this blog post, I’ll illustrate extracting n-grams from a stream of text messages using Akka Streams with Scala as the programming language.

First thing first, let’s create an object with methods for generating random text content:

Source code: TextMessage.scala

Some minimal effort has been made to generate random clauses of likely pronounceable fake words along with punctuations. To make it a little more flexible, lengths of individual words and clauses would be supplied as parameters.

Next, create another object with text processing methods responsible for extracting n-grams from input text, with n being an input parameter. Using Scala’s sliding(size, step) iterator method with size n and step default to 1, a new iterator of sliding window view is generated to produce the wanted n-grams.

Source code: TextProcessor.scala

Now that the text processing tools are in place, we can focus on building the main streaming application in which Akka Streams plays the key role.

First, make sure we have the necessary library dependencies included in build.sbt:

Source code: build.sbt

As Akka Streams is relatively new development work, more recent Akka versions (2.4.9 or higher) should be used.

Let’s start with a simple stream for this text mining application:

Source code: NgramStream_v01.scala

As shown in the source code, constructing a simple stream like this is just defining and chaining together the text-generating source, the text-processing flow and the text-display sink as follows:

Graph DSL

Akka Streams provides a Graph DSL (domain-specific language) that helps build the topology of stream flows using predefined fan-in/fan-out functions.

What Graph DSL does is somewhat similar to how Apache Storm‘s TopologyBuilder pieces together its spouts (i.e. stream sources), bolts (i.e. stream processors) and stream grouping/partitioning functions, as illustrated in a previous blog about HBase streaming.

Back-pressure

Now, let’s branch off the stream using Graph DSL to illustrate how the integral back-pressure feature is at play.

Source code: NgramStream_v02.scala

Streaming to a file should be significantly slower than streaming to the console. To make the difference more noticeable, a delay is deliberately added to streaming each line of text in the file sink.

Running the application and you will notice that the console display is slowed down. It’s the result of the upstream data flow being regulated to accommodate the relatively slow file I/O outlet even though the other console outlet is able to consume relatively faster – all that being conducted in a non-blocking fashion.

Graph DSL create() methods

To build a streaming topology using Graph DSL, you’ll need to use one of the create() methods defined within trait GraphApply, which is extended by object GraphDSL. Here are the signatures of the create() methods:

Note that the sbt-boilerplate template language is needed to interpret the create() method being used in the application that takes multiple stream components as input parameters.

Materialized values

In Akka Streams, materializing a constructed stream is the step of actually running the stream with the necessary resources. To run the stream, the implicitly passed factory method ActorMaterializer() is required to allocate the resources for stream execution. That includes starting up the underlying Akka actors to process the stream.

Every processing stage of the stream can produce a materialized value. By default, using the via(flow) and to(sink) functions, the materialized value of the left-most stage will be preserved. As in the following example, for graph1, the materialized value of the source is preserved:

To allow one to selectively capture the materialized values of the specific stream components, Akka Streams provides functions viaMat(flow) and toMat(sink) along with a combiner function, Keep. As shown in the above example, for graph2, the materialized value of the flow is preserved, whereas for graph3, materialized values for both the flow and sink are preserved.

Back to our fileSink function as listed below, toMat(fileIOSink)(Keep.right) instructs Akka Streams to keep the materialized value of the fileIOSink as a Future value of type IOResult:

Using Graph DSL, as seen earlier in the signature of the create() method, one can select what materialized value is to be preserved by specifying the associated stream components accordingly as the curried parameters:

In our case, we want the materialized value of fileSink, thus the curried parameters should look like this:

Defining the stream graph

Akka Streams provides a number of functions for fan-out (e.g. Broadcast, Balance) and fan-in (e.g. Merge, Concat). In our example, we want a simple topology with a single text source and the same n-gram generator flow branching off to two sinks in parallel:

Adding a message counter

Let’s further expand our n-gram extraction application to include displaying a count. A simple count-flow is created to map each message string into numeric 1, and a count-sink to sum up all these 1′s streamed to the sink. Adding them as the third flow and sink to the existing stream topology yields something similar to the following:

Source code: NgramStream_v03.scala

Full source code of the application is at GitHub.

Final thoughts

Having used Apache Storm, I see it a rather different beast compared with Akka Streams. A full comparison between the two would obviously be an extensive exercise by itself, but it suffices to say that both are great platforms for streaming applications.

Perhaps one of the biggest differences between the two is that Storm provides granular message delivery options (at most / at least / exactly once, guaranteed message delivery) whereas Akka Streams by design questions the premise of reliable messaging on distributed systems. For instance, if guaranteed message delivery is a requirement, Akka Streams would probably not be the best choice.

Back-pressure has recently been added to Storm’s v.1.0.x built-in feature list, so there is indeed some flavor of reactiveness in it. Aside from message delivery options, choosing between the two technologies might be a decision basing more on other factors such as engineering staff’s expertise, concurrency model preference, etc.

Outside of the turf of typical streaming systems, Akka Streams also plays a key role as the underlying platform for an emerging service stack. Viewed as the next-generation of Spray.io, Akka HTTP is built on top of Akka Streams. Designed for building HTTP-based integration layers, Akka HTTP provides versatile streaming-oriented HTTP routing and request/response transformation mechanism. Under the hood, Akka Streams’ back-pressure functionality regulates data streaming between the server and the remote client, consequently conserving memory utilization on the server.

Generic Merge Sort In Scala

Many software engineers may not need to explicitly deal with type parameterization or generic types in their day-to-day job, but it’s very likely that the libraries and frameworks that they’re heavily using have already done their duty to ensure static type-safety via such parametric polymorphism feature.

In a static-typing functional programming language like Scala, such feature would often need to be used first-hand in order to create useful functions that ensure type-safety while keeping the code lean and versatile. Generics is apparently taken seriously in Scala’s inherent language design. That, coupled with Scala’s implicit conversion, constitutes a signature feature of Scala’s. Given Scala’s love of “smileys”, a few of them are designated for the relevant functionalities.

Merge Sort

Merge Sort is a popular text-book sorting algorithm that I think also serves a great brain-teasing programming exercise. I have an old blog post about implementing Merge Sort using Java Generics. In this post, I’m going to use Merge Sort again to illustrate Scala’s type parameterization.

By means of a merge function which recursively merge-sorts the left and right halves of a partitioned list, a basic Merge Sort function for integer sorting might be something similar to the following:

A quick test …

Contrary to Java Generics’ MyClass<T> notation, Scala’s generic types are in the form of MyClass[T]. Let’s generalize the integer Merge Sort as follows:

The compiler immediately complains about the ‘<' comparison, since T might not be a type that supports ordering for '<' to make any sense. To generalize the Merge Sort function for any list type that supports ordering, we can supply a parameter in a curried form as follows:

Another quick test ...

That works well, but it's cumbersome that one needs to supply the corresponding Ordering[T] for the list type. That's where implicit parameter can help:

Testing again ...

Note that the 'if (lHead < rHead)' condition is now replaced with 'if (order.lt(lHead, rHead))'. That's because math.Ordering defines its own less-than method for generic types.

Let's dig a little deeper into how it works. Scala's math.Ordering extends Java’s Comparator interface and implements method compare(x: T, y: T) for all the common types, Int, Long, Float, Double, String, etc. It then provides all these lt(x: T, y: T), gt(x: T, y: T), …, methods that know how to perform all the less-than, greater-than comparisons for various types.

The following are highlights of math.Ordering’s partial source code:

Context Bound

Scala provides a typeclass pattern called Context Bound which represents such common pattern of passing in an implicit value:

With the context bound syntactic sugar, it becomes:

The mergeSort function using context bound looks as follows:

Note that ‘implicitly[Ordering[T]]’ is there for access to methods in math.Ordering which is no longer passed in with a parameter name.

Scala’s math.Ordered versus math.Ordering

One noteworthy thing about math.Ordering is that it does not overload comparison operators ‘<', '>‘, etc, which is why method lt(x: T, y: T) is used instead in mergeSort for the ‘<' operator. To use comparison operators like '<', one would need to import order.mkOrderingOps (or order._) within the mergeSort function. That's because in math.Ordering, comparison operators ‘<', '>‘, etc, are all defined in inner class Ops which can be instantiated by calling method mkOrderingOps.

Scala’s math.Ordered extends Java’s Comparable interface (instead of Comparator) and implements method compareTo(y: T), derived from math.Ordering’s compare(x: T, y: T) via implicit parameter. One nice thing about math.Ordered is that it consists of overloaded comparison operators.

The following highlights partial source code of math.Ordered:

Using math.Ordered, an implicit method, implicit orderer: T => Ordered[T], (as opposed to an implicit value when using math.Ordering) is passed to the mergeSort function as a curried parameter. As illustrated in a previous blog post, it’s an implicit conversion rule for the compiler to fall back to when encountering problem associated with type T.

Below is a version of generic Merge Sort using math.Ordered:

View Bound

A couple of notes:

  1. The implicit method ‘implicit orderer: T => Ordered[T]‘ is passed into the mergeSort function also as an implicit parameter.
  2. Function mergeSort has a signature of the following common form:

Such pattern of implicit method passed in as implicit paramter is so common that it’s given the term called View Bound and awarded a designated smiley ‘<%'. Using view bound, it can be expressed as:

Applying to the mergeSort function, it gives a slightly more lean and mean look:

As a side note, while the view bound looks like the other smiley '<:' (Upper Bound), they represent very different things. An upper bound is commonly seen in the following form:

This means someFunction takes only input parameter of type T that is a sub-type of (or the same as) type S. While at it, a Lower Bound represented by the '>:’ smiley in the form of [T >: S] means the input parameter can only be a super-type of (or the same as) type S.

Implicit Conversion In Scala

These days, software engineers with knowledge of robust frameworks/libraries are abundant, but those who fully command the core basics of a language platform remain scarce. When required to come up with coding solutions to perform, scale or resolve tricky bugs, a good understanding of the programming language’s core features is often the real deal.

Scala’s signature strengths

Having immersed in a couple of R&D projects using Scala (along with Akka actors) over the past 6 months, I’ve come to appreciate quite a few things it offers. Aside from an obvious signature strength of being a good hybrid of functional programming and object-oriented programming, others include implicit conversion, type parametrization and futures/promises. In addition, Akka actors coupled with Scala make a highly scalable concurrency solution applicable to many distributed systems including IoT systems.

In this blog post, I’m going to talk about Scala’s implicit conversion which I think is part of the language’s core basics. For illustration purpose, simple arithmetics of complex numbers will be implemented using the very feature.

A basic complex-number class would probably look something like the following:

Since a complex number can have zero imaginary component leaving only the real component, it’s handy to have an auxiliary constructor for those real-only cases as follows:

Just a side note, an auxiliary constructor must invoke another constructor of the class as its first action and cannot invoke a superclass constructor.

Next, let’s override method toString to cover various cases of how a x + yi complex number would look:

Let’s also fill out the section for the basic arithmetic operations:

Testing it out …

So far so good. But what about this?

The compiler complains because it does not know how to handle arithmetic operations between a Complex and a Double. With the auxiliary constructor, ‘a + new Complex(1.0)’ will compile fine, but it’s cumbersome to have to represent every real-only complex number that way. We could resolve the problem by adding methods like the following for the ‘+’ method:

But then what about this?

The compiler interprets ‘a + 1.0′ as a.+(1.0). Since a is a Complex, the proposed new ‘+’ method in the Complex class can handle it. But ’2.0 + b’ will fail because there isn’t a ‘+’ method in Double that can handle Complex. This is where implicit conversion shines.

The implicit method realToComplex hints the compiler to fall back to using the method when it encounters a compilation problem associated with type Double. In many cases, the implicit methods would never be explicitly called thus their name can be pretty much arbitrary. For instance, renaming realToComplex to foobar in this case would get the same job done.

As a bonus, arithmetic operations between Complex and Integer (or Long, Float) would work too. That’s because Scala already got, for instance, integer-to-double covered internally using implicit conversion in its abstract class Int, and in version 2.9.x or older, object Predef:

Testing again …

Implicit conversion scope

To ensure the implicit conversion rule to be effective when you use the Complex class, we need to keep it in scope. By defining the implicit method or importing a snippet containing the method in the current scope, it’ll certainly serve us well. An alternative is to define it in a companion object as follows:

As a final note, in case factory method is preferred thus removing the need for the ‘new’ keyword in instantiation, we could slightly modify the companion object/class as follows:

Another quick test …