Tag Archives: real time streaming

Text Mining With Akka Streams

Reactive Systems, whose core characteristics are declared in the Reactive Manifesto, have started to emerge in recent years as message-driven systems that emphasize scalability, responsiveness and resilience. It’s pretty clear from the requirements that a system can’t be simply made Reactive. Rather, it should be built from the architectural level to be Reactive.

Akka’s actor systems, which rely on asynchronous message-passing among lightweight loosely-coupled actors, serve a great run-time platform for building Reactive Systems on the JVM (Java Virtual Machine). I posted a few blogs along with sample code about Akka actors in the past. This time I’m going to talk about something different but closely related.

Reactive Streams

While bearing a similar name, Reactive Streams is a separate initiative that mandates its implementations to be capable of processing stream data asynchronously and at the same time automatically regulating the stream flows in a non-blocking fashion.

Akka Streams, built on top of Akka actor systems, is an implementation of Reactive Streams. Equipped with the back-pressure functionality, it eliminates the need of manually buffering stream flows or custom-building stream buffering mechanism to avoid buffer overflow problems.

Extracting n-grams from text

In text mining, n-grams are useful data in the area of NLP (natural language processing). In this blog post, I’ll illustrate extracting n-grams from a stream of text messages using Akka Streams with Scala as the programming language.

First thing first, let’s create an object with methods for generating random text content:

Source code: TextMessage.scala

Some minimal effort has been made to generate random clauses of likely pronounceable fake words along with punctuations. To make it a little more flexible, lengths of individual words and clauses would be supplied as parameters.

Next, create another object with text processing methods responsible for extracting n-grams from input text, with n being an input parameter. Using Scala’s sliding(size, step) iterator method with size n and step default to 1, a new iterator of sliding window view is generated to produce the wanted n-grams.

Source code: TextProcessor.scala

Now that the text processing tools are in place, we can focus on building the main streaming application in which Akka Streams plays the key role.

First, make sure we have the necessary library dependencies included in build.sbt:

Source code: build.sbt

As Akka Streams is relatively new development work, more recent Akka versions (2.4.9 or higher) should be used.

Let’s start with a simple stream for this text mining application:

Source code: NgramStream_v01.scala

As shown in the source code, constructing a simple stream like this is just defining and chaining together the text-generating source, the text-processing flow and the text-display sink as follows:

Graph DSL

Akka Streams provides a Graph DSL (domain-specific language) that helps build the topology of stream flows using predefined fan-in/fan-out functions.

What Graph DSL does is somewhat similar to how Apache Storm‘s TopologyBuilder pieces together its spouts (i.e. stream sources), bolts (i.e. stream processors) and stream grouping/partitioning functions, as illustrated in a previous blog about HBase streaming.

Back-pressue

Now, let’s branch off the stream using Graph DSL to illustrate how the integral back-pressue feature is at play.

Source code: NgramStream_v02.scala

Streaming to a file should be significantly slower than streaming to the console. To make the difference more noticeable, a delay is deliberately added to streaming each line of text in the file sink.

Running the application and you will notice that the console display is slowed down. It’s the result of the upstream data flow being regulated to accommodate the relatively slow file I/O outlet even though the other console outlet is able to consume relatively faster – all that being conducted in a non-blocking fashion.

Graph DSL create() methods

To build a streaming topology using Graph DSL, you’ll need to use one of the create() methods defined within trait GraphApply, which is extended by object GraphDSL. Here are the signatures of the create() methods:

Note that the sbt-boilerplate template language is needed to interpret the create() method being used in the application that takes multiple stream components as input parameters.

Materialized values

In Akka Streams, materializing a constructed stream is the step of actually running the stream with the necessary resources. To run the stream, the implicitly passed factory method ActorMaterializer() is required to allocate the resources for stream execution. That includes starting up the underlying Akka actors to process the stream.

Every processing stage of the stream can produce a materialized value. By default, using the via(flow) and to(sink) functions, the materialized value of the left-most stage will be preserved. As in the following example, for graph1, the materialized value of the source is preserved:

To allow one to selectively capture the materialized values of the specific stream components, Akka Streams provides functions viaMat(flow) and toMat(sink) along with a combiner function, Keep. As shown in the above example, for graph2, the materialized value of the flow is preserved, whereas for graph3, materialized values for both the flow and sink are preserved.

Back to our fileSink function as listed below, toMat(fileIOSink)(Keep.right) instructs Akka Streams to keep the materialized value of the fileIOSink as a Future value of type IOResult:

Using Graph DSL, as seen earlier in the signature of the create() method, one can select what materialized value is to be preserved by specifying the associated stream components accordingly as the curried parameters:

In our case, we want the materialized value of fileSink, thus the curried parameters should look like this:

Defining the stream graph

Akka Streams provides a number of functions for fan-out (e.g. Broadcast, Balance) and fan-in (e.g. Merge, Concat). In our example, we want a simple topology with a single text source and the same n-gram generator flow branching off to two sinks in parallel:

Adding a message counter

Let’s further expand our n-gram extraction application to include displaying a count. A simple count-flow is created to map each message string into numeric 1, and a count-sink to sum up all these 1′s streamed to the sink. Adding them as the third flow and sink to the existing stream topology yields something similar to the following:

Source code: NgramStream_v03.scala

Full source code of the application is at GitHub.

Final thoughts

Having used Apache Storm, I see it a rather different beast compared with Akka Streams. A full comparison between the two would obviously be an extensive exercise by itself, but it suffices to say that both are great platforms for streaming applications.

Perhaps one of the biggest differences between the two is that Storm provides granular message delivery options (at most / at least / exactly once, guaranteed message delivery) whereas Akka Streams by design questions the premise of reliable messaging on distributed systems. For instance, if guaranteed message delivery is a requirement, Akka Streams would probably not be the best choice.

Back-pressure has recently been added to Storm’s v.1.0.x built-in feature list, so there is indeed some flavor of reactiveness in it. Aside from message delivery options, choosing between the two technologies might be a decision basing more on other factors such as engineering staff’s expertise, concurrency model preference, etc.

Outside of the turf of typical streaming systems, Akka Streams also plays a key role as the underlying platform for an emerging service stack. Viewed as the next-generation of Spray.io, Akka HTTP is built on top of Akka Streams. Designed for building HTTP-based integration layers, Akka HTTP provides versatile streaming-oriented HTTP routing and request/response transformation mechanism. Under the hood, Akka Streams’ back-pressure functionality regulates data streaming between the server and the remote client, consequently conserving memory utilization on the server.

Real-time Big Data Revisited

My previous blog post about real-time Big Data centers around some relevant open-source software (e.g. Storm, Kafka). This post shifts the focus towards reviewing its current state.

One thing the computing technology industry has never been starved of is the successive up and down of buzzwords – B2B, P2P, SOA, AOP, M2M, SaaS/PaaS, IOT, RWD (responsive web design), SDN (software-defined networking), … Recently, Big Data is one of the few that has taken the center stage.

How big is Big Data?

What is Big Data anyway? Typical structured data is in table format with columns and rows. For example, a dataset of 500,000 Web pages might be represented by 500,000 rows of data each with 3 columns of text: URL, page title, page content. In general, people use the term Big Data to represent data with large amount of columns and/or rows. But how big is big?

The “yield point” at which a contemporary RDBMS (relational database management system) can no longer perform well on decent server hardware is often considered the starting point for a Big Data system. That’s obviously a vague unscientific reference. In a recent startup operation, we maintained a pretty massive transactional RDBMS (with fail-over) on a couple of ordinary quad-core Xeon server boxes stuffed with a bunch of RAID 0+1 disks. There were a couple of optimally tuned transactional tables at 400+ million rows with actively used queries outer-joining them and the database performed just fine, showing no signs of yield any time soon. On the other hand, I had also seen ordinary queries bringing a database down to halt with transactional tables at just a few million rows.

Is Big Data for everyone?

Nevertheless, I’ve heard quite a few horror stories about companies delving into Big Data only to realize the extensive (read: expensive) R&D work was unjustified. Some grudgingly returned to the relational database model after pouring tons of resource into building a column-oriented distributed database system. It’s tempting to conclude that you need to immediately switch from RDBMS to column-oriented databases when a projection shows that your dataset will grow to 1 petabytes in 3 years. The conclusion may be flawed if the actual business requirement analysis isn’t thorough. For instance, it could be that:

  • the dataset won’t reach anywhere near a small percentage of the petabyte scale for the first 2+ years
  • data older than 3 months is not required to be in raw format and can be aggregated to only fractions of the original data volume
  • the petabytes data size is due to certain huge data fields and actual row size is under tens of millions, which can be managed with a properly administered RDBMS

There are a lot of tech discussions about the pros and cons of relational databases versus column-oriented databases so I’m not going to repeat those arguments. It suffices to say that by switching from RDBMS to column-oriented databases, you’re trading away a whole bunch of good stuff that relational databases offer, for primarily high data capacity, fast write and built-in fault tolerance.

Adding real-time into the mix

Real-time is a term subject to contextual interpretation. In a more loose sense, response time in milliseconds to a few seconds is often regarded as real-time. As data volume increases, even such a loose requirement is no easy matter.

Let’s say it’s objectively determined that column-oriented database needs to be a part of your Big Data system, the next question is probably about how “real-time” you need the system to service data requests. Trying to make every bit of data in a Big Data system available for real-time (or near-real-time) random access is a difficult proposition. A more practical approach is to maintain a data warehouse with a set of updatable pre-computed views on all persisted data augmented by a real-time subsystem which provides access to the recently transacted data that hasn’t made it to the warehouse. The real-time subsystem will be kept relatively lean by regularly discarding data that has been secured in the warehouse.

Lambda Architecture

The Lambda Architecture advocated by Nathan Marz (the creator of Storm) proposes a Big Data system composed of a batch and a real-time subsystems to cooperatively serve real-time queries across the entire persisted dataset. Based on a preview of the early-access-edition book by Marz, my understanding of the architecture is that it consists of:

  • a Batch Layer that appends data to the immutable master dataset and continuously refreshes batch views (in the form of query functions) by recomputing arbitrary functions on the entire dataset
  • a Serving Layer that processes the batch views and provides query service
  • a Speed Layer that processes real-time views from newly acquired data and regularly rotates data off to the Batch Layer

Apparently, the architecture’s underlying design is oriented towards functional programming which is in principle rooted in Lambda Calculus. Under this computing paradigm, arbitrary data processing operations are expressed as compositions of functions which are program state-independent and operate on the entire immutable dataset.

The architecture also showcases the principle of separation of concern with each of the layers handling specific Big Data tasks it’s purposely designed for. The master dataset is maintained in the Batch Layer as append-only immutable raw data on a redundant distributed computing platform (e.g. Hadoop HDFS), allowing full data reprocessing in the event of major data processing errors. On the other hand, the Speed Layer would be better served by a real-time messaging or streaming system (e.g. Storm) backed by a random read-write capable persistent storage (e.g HBase). It’s an architecture that is elegant in principle and I look forward to seeing its final edition and real-world implementations.

Is real-time Big Data ripe for mainstream businesses?

Aside from distribution companies such as Cloudera, HortonWorks, there is a wide range of companies and startups building their entire business on providing Big Data service. Then there are these tech giants (e.g. EMC) which see Big Data a significant part of their strategic direction. As to the need for real-time, there has been debate on whether the actual demand is imminent for businesses, other than a handful of global real-time search/newsfeed services such as Twitter.

On one hand, a bunch of commercial products and open-source software frameworks have emerged to address the very need. On the other hand, businesses at large are still struggling to interpret the actual needs (i.e. how big and how real-time) by themselves and/or customers. Here’s one data point – I recently had a discussion with a founder of a Big Data platform provider who expressed skepticism about the imminent demand for real-time Big Data based on what he heard from his customers.

My own observation (admittedly with a limited scope) is that the real-time or near-real-time demand perhaps in an obscure fashion is already there today for many businesses. In other words, I think most Big Data companies already have something in their systems as part of their business requirement to address the real-time need to a range of extent. And I believe such observation isn’t skewed by my own career experience. If you’re maintaining a Big Data operation, chances are that you’re already implementing some sort of real-time subsystem.

Today, short of a robust customizable framework, many businesses take a relatively simpler approach to dump incoming data into a column-oriented database like HBase, perform filtering scans and ouput the selective data into a relational database for their real-time query need. Until a readily customizable framework with a robust underlying architecture like the Lambda Architecture is available, these businesses will have to continue to exhaust engineering resource to build their own real-time Big Data solutions.

Streaming Real-time Data Into HBase

Fast-write is generally a characteristic strength of distributed NoSQL databases such as HBase, Cassandra. Yet, for a distributed application that needs to capture rapid streams of data in a database, standard connection pooling provided by the database might not be up to the task. For instance, I didn’t get the kind of wanted performance when using HBase’s HTablePool to accommodate real-time streaming of data from a high-parallelism data dumping Storm bolt.

To dump rapid real-time streaming data into HBase, instead of HTablePool it might be more efficient to embed some queueing mechanism in the HBase storage module. An ex-colleague of mine, who is the architect at a VoIP service provider, employs the very mechanism in their production HBase database. Below is a simple implementation that has been tested performing well with a good-sized Storm topology. The code is rather self-explanatory. The HBaseStreamers class consists of a threaded inner class, Streamer, which maintains a queue of HBase Put using LinkedBlockingQueue. Key parameters are in the HBaseStreamers constructor argument list, including the ZooKeeper quorum, HBase table name, HTable auto-flush switch, number of streaming queues and streaming queue capacity.

Next, write a wrapper class similar to the following to isolate HBase specifics from the streaming application.

To test it with a distributed streaming application using Storm, write a bolt similar to the following skeleton. All that is needed is to initialize HBaseStreamers from within the bolt’s prepare() method and dump data to HBase from within bolt’s execute().

Finally, write a Storm spout to serve as the streaming data source and a Storm topology builder to put the spout and bolt together.

The parallelism/queue parameters are set to relatively small numbers in the above sample code. Once tested working, one can tweak all the various dials in accordance with the server cluster capacity. These dials include the following:

For simplicity, only HBase Put is being handled in the above implementation. It certainly can be expanded to handle also HBase Increment so as to carry out aggregation functions such as count. The primary goal of this Storm-to-HBase streaming exercise is to showcase the using of a module equipped with some “elasticity” by means of configurable queues. The queueing mechanism within HBaseStreamers provides cushioned funnels for the data streams and helps optimize the overall data intake bandwidth. Keep in mind, though, that doesn’t remove the need of administration work for a properly configured HBase-Hadoop system.

Real-time Big Data

Although demand for large scale distributed computing solutions has existed for decades, the term Big Data did not get a lot of public attention till Google published its data processing programming model, MapReduce, back in 2004. The Java-based Hadoop framework further popularized the term a couple of years later, partly due to the ubiquitous popularity of Java.

From Batch to Real-time

Hadoop has proven itself a great platform for running MapReduce applications on fault-tolerant HDFS clusters, typically composed of inexpensive servers. It does very well in the large-scale batch data processing problem domain. Adding a distributed database system such as HBase or Cassandra helps extend the platform to address the needs for real-time (or near-real-time) access to structured data. But in order to be able to use feature-rich messaging or streaming functionality, one will need some suitable systems that operate well on a distributed platform.

I remember feeling the need for such a real-time Big Data system when I was with a cleantech startup, EcoFactor, a couple of years ago seeking solutions to handle the increasingly demanding time-series data processing in a near-real-time fashion. It would have saved me and my team a lot of internal development work had such a system been available. One of my recent R&D projects after I left the company was to adopt suitable software components to address such real-time distributed data processing needs. The following highlights the key pieces I picked and what prompted me to pick them for the task.

Over the past couple of years, Kafka and Storm have emerged as two maturing distributed data processing systems. Kafka was developed by LinkedIn for high performance messaging, whereas Storm, developed by Twitter (through the acquisition of BackType), addresses the real-time streaming problem space. Although the two technologies were independently built, some real-time Big Data solution seekers see advantages bringing the two together by integrating Kafka as the source queue with a Storm streaming system. According to a published blog earlier this year, Twitter also uses the Kafka-Storm combo to handle its demanding real-time search.

High-performance Distributed Messaging

Kafka is a distributed publish-subscribe messaging system equipped with robust semantic partitioning and high messaging throughput by leveraging kernel-managed disk cache. High performance is evidently a key initiative in Kafka’s underlying architecture. It adopts the design principle that leverages kernel page caching to minimize data copying and context switching for higher messaging throughput. It also uses message grouping (MessageSet) to reduce network calls. At-least-once message processing is guaranteed. If exactly-once is a business requirement, one approach is to programmatically keep track of the messaging state by coupling the data with the message offset to eliminate duplication.

Kafka flows data by having the publishers push data to the brokers (Kafka servers) and subscribers pull from the brokers, giving the flexibility of a more diverse set of message consumers in the messaging system. It uses ZooKeeper for auto message broker discovery for non-static broker configurations. It also uses ZooKeeper to maintain message topics and partitions. Messages can be programmatically partitioned over a server cluster and consumed with ordering preserved within individual partitions. There are two APIs for the consumer – a high-level consumer (ConsumerConnector) that heavily leverages ZooKeeper to handle broker discovery, consumer rebalancing and message offset tracking; and a low-level consumer (SimpleConsumer) that allows users to manually customize all the key messaging features.

Setting up Kafka on a cluster is pretty straight forward. The version used in my project is 0.7.2. Each Kafka server is identified by a unique broker id. Each broker comes with tunable configurations on the socket server, logs and connections to a ZooKeeper ensemble (if enabled). There are also a handful of configurable properties that are producer-specific (e.g. producer.type, serializer.class, partitioner.class) and consumer-specific (e.g. autocommit.interval.ms).

The following links detail Kafka’s core design principles:
http://kafka.apache.org/07/design.html
http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf

Real-time Distributed Streaming

Storm is a distributed streaming system that streams data through a customizable topology of data sources (spouts) and processors (bolts). It uses ZooKeeper, as well, to coordinate among the master and worker nodes in a cluster and manage transactional state as needed. Streams (composed of tuples), spouts and bolts constitute the core components of a Storm topology. A set of stream grouping methods are provided for partitioning a stream among consuming bolts in accordance with various use cases. The Storm topology executes across multiple configurable worker processes, each of which is a JVM. A Thrift-based topology builder is used to define and submit a topology.

On reliability, Storm guarantees that every tuple from a spout gets fully processed. It manages the complete lifecycle of each input tuple by tracking its id (msgId) and using methods ack(Object msgId) and fail(Object msgId) to report processing result. By anchoring the input tuple from the spout to every associated tuple being emitted in the consuming bolts, the spout can replay the tuple in the event of failure. This ensures at-least-once message processing.

Transactional Stream Processing

Storm’s transactional topology goes another step further to ensure exactly-once message processing. It processes tuples by batches, each identified by a transaction id which is maintained in a persistent storage. A transaction is composed of two phases – processing phase and commit phase. The processing phase allows batches to be proceeded in parallel (if the specific business logic warrants it), whereas the commit phase requires batches to be strongly ordered. For a given batch, any failure during the processing or commit phases will result in a replay of the entire transaction. To avoid over-update to a replayed batch, the current transaction id is examined against the stored transaction id within the strong-ordered commit phase and persisted update takes place only when the the id’s differ.

Then, there is this high-level abstraction layer, Trident API, on top of Storm that helps internalize some state management effort. It also introduces opaque transaction spouts to address failure cases in which loss of partial data source forbids replaying of the batch. It achieves such fault tolerance by maintaining in persistent storage a previous-state computed value (e.g. word count) in additional to the current computed value and transaction id. The idea is to reliably carry over partial value changes across strong-ordered batches, allowing the failed tuples in a partially failed batch to be processed in a subsequent batch.

Deploying Storm on a production cluster requires a little extra effort. The version, 0.8.1, used in my project requires a dated version of ZeroMQ – a native socket/messaging library in C++, which in turn needs JZMQ for Java binding. To build ZeroMQ, UUID library (libuuid-devel) is needed as well. Within the cluster, the master node runs the “Nimbus” daemon and each of the worker nodes runs a “Supervisor” daemon. It also comes with a administrative web UI that is handy for status monitoring.

The following links provide details on the topics of Storm’s message reliability, transactional topology and Trident’s essentials:
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing
https://github.com/nathanmarz/storm/wiki/Transactional-topologies
https://github.com/nathanmarz/storm/wiki/Trident-state

And, 1 + 1 = 1 …

Both Kafka and Storm are promising technologies in the real-time Big Data arena. While they work great individually, their functionalities also complement each other well. A commonly seen use case is to have Storm being the center piece of a streaming system with Kafka spouts providing queueing mechanism and data processing bolts carrying out specific business logic. If persistent storage is needed which is often the case, one can develop a bolt to persist data into a NoSQL database such as HBase or Cassandra.

These are all exciting technologies and are part of what makes contemporary open-source distributed computing software prosperous. Even though they’re promising, that doesn’t mean they’re suitable for every company that wants to run some real-time Big Data systems. At their current level of maturity, adopting them still requires some hands-on software technologists to objectively assess needs, design, implement and come up with infrastructure support plan.