Category Archives: All About Software Technology

Real-time Big Data

Although demand for large scale distributed computing solutions has existed for decades, the term Big Data did not get a lot of public attention till Google published its data processing programming model, MapReduce, back in 2004. The Java-based Hadoop framework further popularized the term a couple of years later, partly due to the ubiquitous popularity of Java.

From Batch to Real-time

Hadoop has proven itself a great platform for running MapReduce applications on fault-tolerant HDFS clusters, typically composed of inexpensive servers. It does very well in the large-scale batch data processing problem domain. Adding a distributed database system such as HBase or Cassandra helps extend the platform to address the needs for real-time (or near-real-time) access to structured data. But in order to be able to use feature-rich messaging or streaming functionality, one will need some suitable systems that operate well on a distributed platform.

I remember feeling the need for such a real-time Big Data system when I was with a cleantech startup, EcoFactor, a couple of years ago seeking solutions to handle the increasingly demanding time-series data processing in a near-real-time fashion. It would have saved me and my team a lot of internal development work had such a system been available. One of my recent R&D projects after I left the company was to adopt suitable software components to address such real-time distributed data processing needs. The following highlights the key pieces I picked and what prompted me to pick them for the task.

Over the past couple of years, Kafka and Storm have emerged as two maturing distributed data processing systems. Kafka was developed by LinkedIn for high performance messaging, whereas Storm, developed by Twitter (through the acquisition of BackType), addresses the real-time streaming problem space. Although the two technologies were independently built, some real-time Big Data solution seekers see advantages bringing the two together by integrating Kafka as the source queue with a Storm streaming system. According to a published blog earlier this year, Twitter also uses the Kafka-Storm combo to handle its demanding real-time search.

High-performance Distributed Messaging

Kafka is a distributed publish-subscribe messaging system equipped with robust semantic partitioning and high messaging throughput by leveraging kernel-managed disk cache. High performance is evidently a key initiative in Kafka’s underlying architecture. It adopts the design principle that leverages kernel page caching to minimize data copying and context switching for higher messaging throughput. It also uses message grouping (MessageSet) to reduce network calls. At-least-once message processing is guaranteed. If exactly-once is a business requirement, one approach is to programmatically keep track of the messaging state by coupling the data with the message offset to eliminate duplication.

Kafka flows data by having the publishers push data to the brokers (Kafka servers) and subscribers pull from the brokers, giving the flexibility of a more diverse set of message consumers in the messaging system. It uses ZooKeeper for auto message broker discovery for non-static broker configurations. It also uses ZooKeeper to maintain message topics and partitions. Messages can be programmatically partitioned over a server cluster and consumed with ordering preserved within individual partitions. There are two APIs for the consumer – a high-level consumer (ConsumerConnector) that heavily leverages ZooKeeper to handle broker discovery, consumer rebalancing and message offset tracking; and a low-level consumer (SimpleConsumer) that allows users to manually customize all the key messaging features.

Setting up Kafka on a cluster is pretty straight forward. The version used in my project is 0.7.2. Each Kafka server is identified by a unique broker id. Each broker comes with tunable configurations on the socket server, logs and connections to a ZooKeeper ensemble (if enabled). There are also a handful of configurable properties that are producer-specific (e.g. producer.type, serializer.class, partitioner.class) and consumer-specific (e.g. autocommit.interval.ms).

The following links detail Kafka’s core design principles:
http://kafka.apache.org/07/design.html
http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf

Real-time Distributed Streaming

Storm is a distributed streaming system that streams data through a customizable topology of data sources (spouts) and processors (bolts). It uses ZooKeeper, as well, to coordinate among the master and worker nodes in a cluster and manage transactional state as needed. Streams (composed of tuples), spouts and bolts constitute the core components of a Storm topology. A set of stream grouping methods are provided for partitioning a stream among consuming bolts in accordance with various use cases. The Storm topology executes across multiple configurable worker processes, each of which is a JVM. A Thrift-based topology builder is used to define and submit a topology.

On reliability, Storm guarantees that every tuple from a spout gets fully processed. It manages the complete lifecycle of each input tuple by tracking its id (msgId) and using methods ack(Object msgId) and fail(Object msgId) to report processing result. By anchoring the input tuple from the spout to every associated tuple being emitted in the consuming bolts, the spout can replay the tuple in the event of failure. This ensures at-least-once message processing.

Transactional Stream Processing

Storm’s transactional topology goes another step further to ensure exactly-once message processing. It processes tuples by batches, each identified by a transaction id which is maintained in a persistent storage. A transaction is composed of two phases – processing phase and commit phase. The processing phase allows batches to be proceeded in parallel (if the specific business logic warrants it), whereas the commit phase requires batches to be strongly ordered. For a given batch, any failure during the processing or commit phases will result in a replay of the entire transaction. To avoid over-update to a replayed batch, the current transaction id is examined against the stored transaction id within the strong-ordered commit phase and persisted update takes place only when the the id’s differ.

Then, there is this high-level abstraction layer, Trident API, on top of Storm that helps internalize some state management effort. It also introduces opaque transaction spouts to address failure cases in which loss of partial data source forbids replaying of the batch. It achieves such fault tolerance by maintaining in persistent storage a previous-state computed value (e.g. word count) in additional to the current computed value and transaction id. The idea is to reliably carry over partial value changes across strong-ordered batches, allowing the failed tuples in a partially failed batch to be processed in a subsequent batch.

Deploying Storm on a production cluster requires a little extra effort. The version, 0.8.1, used in my project requires a dated version of ZeroMQ – a native socket/messaging library in C++, which in turn needs JZMQ for Java binding. To build ZeroMQ, UUID library (libuuid-devel) is needed as well. Within the cluster, the master node runs the “Nimbus” daemon and each of the worker nodes runs a “Supervisor” daemon. It also comes with a administrative web UI that is handy for status monitoring.

The following links provide details on the topics of Storm’s message reliability, transactional topology and Trident’s essentials:
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing
https://github.com/nathanmarz/storm/wiki/Transactional-topologies
https://github.com/nathanmarz/storm/wiki/Trident-state

And, 1 + 1 = 1 …

Both Kafka and Storm are promising technologies in the real-time Big Data arena. While they work great individually, their functionalities also complement each other well. A commonly seen use case is to have Storm being the center piece of a streaming system with Kafka spouts providing queueing mechanism and data processing bolts carrying out specific business logic. If persistent storage is needed which is often the case, one can develop a bolt to persist data into a NoSQL database such as HBase or Cassandra.

These are all exciting technologies and are part of what makes contemporary open-source distributed computing software prosperous. Even though they’re promising, that doesn’t mean they’re suitable for every company that wants to run some real-time Big Data systems. At their current level of maturity, adopting them still requires some hands-on software technologists to objectively assess needs, design, implement and come up with infrastructure support plan.

Programming Exercise – Sorting Algorithm

Unless algorithm development is part of the job, many software engineers use readily available algorithmic methods as needed and rarely need to develop algorithms themselves. I’m not talking about some complicated statistical or machine learning algorithms. Just simple mundane ones such as a sorting algorithm. Even if you don’t need to code algorithms, going back to writing a sorting program can still be a good exercise to review certain basic skills that might not be frequently used in your current day job. It’s a good-sized programming exercise that isn’t too trivial or taking up too much time. It also reminds you some clever (or dumb) tricks on how to perform sorting by means of recursive divide-and-merge, pivot partitioning, etc. And if nothing else, it might help you in your next technical job interview in the near future.

If you’re up for such an exercise, first, look up from Wikipedia or any other suitable source for a sorting algorithm (e.g. Merge Sort, Quick Sort) of your choice to re-familiarize yourself with its underlying mechanism. Next, decide on the scope of the application – for example, do you want an integer-sorting application or something more generic? … etc. Next, pseudo code, pick the programming language of your choice, and go for it.

Appended is a simple implementation of both Merge Sort and Quick Sort in Java. For the convenience of making method calls with varying data types and sorting algorithms, an interface (SimpleSort) and a wrapper (Sorter) are used. Java Generics is used to generalize the sorter for different data types. Adding Generics to a sorting application requires the using of either the Comparable or Comparator interface, as ordering is necessary in sorting. In this example application, the Comparable interface is used since the default ordering is good enough for basic sorting. The overall implementation code is pretty self explanatory.

SimpleSort.java

package sorting;

public interface SimpleSort<E extends Comparable<E>> {

    public void sort(E[] list);
}

Sorter.java

package sorting;

public class Sorter<E extends Comparable<E>> {

    public void performSorting(E[] list, SimpleSort<E> sortingAlgo) {
        long startTime, elapsedTime;

        System.out.println("\nOriginal " + list.getClass().getSimpleName() + " list:  ...");
        for (int i = 0; i < list.length; i++) {
            System.out.print(list[i] + "  ");
        }
        System.out.println();

        startTime = System.currentTimeMillis();
        sortingAlgo.sort(list);
        elapsedTime = System.currentTimeMillis() - startTime;

        System.out.println("\nResult: " + sortingAlgo.getClass().getSimpleName() + " ...");
        for (int i = 0; i < list.length; i++) {
            System.out.print(list[i] + "  ");
        }
        System.out.println("\n\nTime taken: " + elapsedTime + "ms");
    }
}

MergeSort.java

package sorting;

public class MergeSort<E extends Comparable<E>> implements SimpleSort<E> {

    private E[] list;
    private E[] holder;

    @SuppressWarnings({"unchecked"})
    public void sort(E[] list) {
        this.list = list;
        int size = list.length;
        // Type erasure: first bound class of <E extends Comparable<E>>
        holder = (E[]) new Comparable[size];

        mergeSort(0, size - 1);
    }

    private void mergeSort(int left, int right) {
        if (left < right) {
            int center = left + (right - left) / 2;
            mergeSort(left, center);
            mergeSort(center + 1, right);
            merge(left, center, right);
        }
    }

    private void merge(int left, int center, int right) {

        for (int i = left; i <= right; i++) {
            holder[i] = list[i];
        }

        int i = left;
        int j = center + 1;
        int k = left;

        while (i <= center && j <= right) {

            if (holder[i].compareTo(holder[j]) <= 0) {
                list[k] = holder[i];  // Overwrite list[k] with element from the left list
                i++;
            }
            else {
                list[k] = holder[j];  // Overwrite list[k] with element from the right list
                j++;
            }
            k++;
        }

        // Overwirte remaining list[k] if the left list isn't exhausted
        // No need to do the same for the right list, as its elements are already correctly placed
        while (i <= center) {
            list[k] = holder[i];
            k++;
            i++;
        }
    }
}

QuickSort.java

package sorting;

import java.util.Random;

public class QuickSort<E extends Comparable<E>> implements SimpleSort<E> {

    private E[] list;

    @SuppressWarnings({"unchecked"})
    public void sort(E[] list) {
        this.list = list;
        int size = list.length;

        quickSort(list, 0, size-1);
    }

    private void swapListElements(E[] list, int index1, int index2) {
        E tempValue = list[index1];
        list[index1] = list[index2];
        list[index2] = tempValue;
    }

    private int partitionIndex(E[] list, int left, int right, int pivot) {
        int pivotIndex;
        E pivotValue;

        pivotValue = list[pivot];
        swapListElements(list, pivot, right);  // Swap pivot element to rightmost index

        pivotIndex = left;  // Calculate pivot index starting from leftmost index
        for (int i = left; i < right; i++) {
            if (list[i].compareTo(pivotValue) < 0) {
                swapListElements(list, i, pivotIndex);
                pivotIndex++;
            }
        }
        swapListElements(list, pivotIndex, right);  // Swap pivot element back to calculated pivot index

        return pivotIndex;
    }

    private void quickSort(E[] list, int left, int right) {
        int randomIndex, pivotIndex;

        if (left < right) {
            // Pick random index between left and right (inclusive)
            Random randomNum = new Random();
            randomIndex = randomNum.nextInt(right-left+1) + left;

            pivotIndex = partitionIndex(list, left, right, randomIndex);
            quickSort(list, left, pivotIndex - 1);
            quickSort(list, pivotIndex + 1, right);
        }
        else
            return;
    }
}

SortingMain.java

package sorting;

public class SortingMain {

    @SuppressWarnings({"unchecked"})
    public static void main(String[] args) {

        Integer[] integerList = {8, 15, 6, 2, 11, 4, 7, 12, 3};
        Double[]  doubleList = {2.5, 8.0, 7.2, 4.9, 6.6, 1.8, 3.0, 7.5};
        Character[] characterList = {'M', 'X', 'B', 'C', 'T', 'R', 'F', 'S'};
        String[] stringList = {"lion", "tiger", "snow leopard", "jaguar", "cheeta", "cougar", "leopard"};

        Sorter sorter = new Sorter();

        sorter.performSorting(integerList, new MergeSort());
        sorter.performSorting(doubleList, new QuickSort());
        sorter.performSorting(characterList, new MergeSort());
        sorter.performSorting(stringList, new QuickSort());
    }
}

Yet Another Blogger

I read tech blogs from time to time but have never published one myself. So, why now? Short answer: Feeling a bit obligated …

Despite a much needed reality check in 2000/2001 resulting in a rather painful setback, the technology industry has managed to prosper over the past two decades. Thanks to the relentless pursuit by a worldwide community of ever better software technologies in the form of open-source software and public knowledge base, we all benefit tremendously from having an abundant supply of tools and resource. Open-source software was mostly toy-ish or experimental stuff just 10-15 years ago. It now serves as the underlying technology foundation for many businesses. The vast knowledge base in the form of wikis, forums, blogs and Q&A sites have also made learning and solution seeking so much easier nowadays. It would be beyond joyous had Wikipedia, StackOverflow been available back then.

Throughout my 20-year career playing both the software developer and engineering management roles, I’ve learned a lot from my jobs and my colleagues, but more so from the said technology community. It has almost become an obligation that I should try pay back to the community, in a however small way. Besides some random thoughts on the state of the evolving technology industry or interesting things observed in my career, I’m hoping to produce also materials at the code level that are applicable to the very blog topic.

Since the departure from my most recent cleantech startup venture (EcoFactor) as its founding technology head, I’ve been working on some R&D projects on my own and have been doing a bit more coding myself – which is usually fun except when wrestling with some stubborn bugs. The slightly heavier coding work is indeed a refreshing exercise and is going to better equip me to give topical code examples, should needs arise.