Category Archives: All About Software Technology

Programming Exercise – Binary Tree

Like sorting algorithms, binary tree implementation is another good programming exercise. In particular, methods for traversing a tree, searching nodes in a binary search tree and many other binary tree operations form a great recipe for refreshing one’s programming skills. Appended is an implementation of a pretty comprehensive set of binary tree (and binary search tree) operations in Java.

Iterative and recursive methods for each of the operations are developed and grouped in two separate classes, BinTreeI and BinTreeR. In general, most operations are easier to be implemented using recursive methods. For instance, calculating tree height using iterative method is a rather non-trivial exercise whereas it’s almost an one-liner using recursion. Some of the operations such as searching and inserting a tree node are only applicable to binary search tree (BST), for which in-order tree traversal should be used. For generality, pre-order and post-order traversal methods are also included in the code.

Similar to the implementation of sorting algorithms in a previous blog, Java Generics and Comparable interface are used. If wanted, the underlying tree node could be further expanded to contain more complex node data such as map entries (e.g. with class type <Key extends Comparable<Key>, Value>, and Map.Entry<K,V> data).

Node.java – binary tree node

package bintree;

class Node<E extends Comparable<E>> {
    public E element;
    public Node left;
    public Node right;

    public Node() {
        this(null);
    }

    public Node(E e) {
        element = e;
        left = null;
        right = null;
    }

    boolean isLeaf() {
        return (left == null && right == null);
    }

    public void printNode() {
        if (isLeaf())
            System.out.print(element + "* -> ");
        else
            System.out.print(element + " -> ");
    }
}

BinTree.java – base binary tree class

package bintree;

import java.util.LinkedList;

// BinTree interface
@SuppressWarnings({"unchecked"})
abstract class BinTree<E extends Comparable<E>> {
    public Node root;

    public BinTree() {
        root = null;
    }

    public BinTree(E e) {
        root = new Node(e);
    }

    public boolean isEmpty() {
        return root == null;
    }

    public void addLeft(Node currNode, E e) {
        // CAUTION: this method overwrites left subtree, if exists
        currNode.left = new Node(e);
    }

    public void addRight(Node currNode, E e) {
        // CAUTION: this method overwrites right subtree, if exists
        currNode.right = new Node(e);
    }

    public void removeLeft(Node currNode) {
        if (currNode.left != null) {
            currNode.left = null;
        }
    }

    public void removeRight(Node currNode) {
        if (currNode.right != null) {
            currNode.right = null;
        }
    }

    // Search the subtree under the current node for matching data
    // (Applicable to Binary Search Tree only)
    public abstract boolean search(Node currNode, E e);

    // Insert a node into the subtree under the current node
    // (Applicable to Binary Search Tree only)
    public abstract void insert(Node currNode, E e);

    // Delete the first node found with matching data by replacing it with the
    // max-value node in the branch to the left of the found node
    // (Applicable to Binary Search Tree only)
    public abstract boolean deleteFirst(Node currNode, E e);

    public abstract int size(Node currNode);

    public abstract int height(Node currNode);

    public Node rotateLeft(Node currNode) {
        Node tempNode = currNode.right;
        currNode.right = tempNode.left;
        tempNode.left = currNode;
        return tempNode;  // New root node
    }

    public Node rotateRight(Node currNode) {
        Node tempNode = currNode.left;
        currNode.left = tempNode.right;
        tempNode.right = currNode;
        return tempNode;  // New root node
    }

    // In-order traversal
    public abstract void traverseIn(Node currNode);

    // Pre-order traversal
    public abstract void traversePre(Node currNode);

    // Post-order traversal
    public abstract void traversePost(Node currNode);

    public void printNodeEnd() {
        System.out.println("(end)");
    }
}  

BinTreeR.java – binary tree class using recursive methods

package bintree;

import java.util.LinkedList;

// BinTree class using recursive methods
@SuppressWarnings({"unchecked"})
class BinTreeR<E extends Comparable<E>> extends BinTree<E> {
    public Node root;

    public BinTreeR() {
        super();
    }

    public BinTreeR(E e) {
        super(e);
    }

    public boolean isEmpty() {
        return super.isEmpty();
    }

    public void addLeft(Node currNode, E e) {
        super.addLeft(currNode, e);
    }

    public void addRight(Node currNode, E e) {
        super.addRight(currNode, e);
    }

    public void removeLeft(Node currNode) {
        super.removeLeft(currNode);
    }

    public void removeRight(Node currNode) {
        super.removeRight(currNode);
    }

    // Search the subtree under the current node for matching data
    // (Applicable to Binary Search Tree only)
    public boolean search(Node currNode, E e) {
        if (currNode == null) {
            return false;
        }
        else {
            if (((Comparable) e).compareTo(currNode.element) == 0) {
                return true;
            }
            else {
                if (((Comparable) e).compareTo(currNode.element) < 0)
                    return search(currNode.left, e);            
                else
                    return search(currNode.right, e);         
            }
        }
    }

    // Insert a node into the subtree under the current node
    // (Applicable to Binary Search Tree only)
    public void insert(Node currNode, E e) {
        if (currNode == null)  // Do nothing if the input node is null
            return;

        if (((Comparable) e).compareTo(currNode.element) <= 0) {
            if (currNode.left == null)
                currNode.left = new Node(e);
            else
                insert(currNode.left, e);            
        }
        else {
            if (currNode.right == null)
                currNode.right = new Node(e);
            else
                insert(currNode.right, e);            
        }
    }

    // Delete the first node found with matching data in the subtree under the current node
    // by replacing it with the max-value node in the branch to the left of the found node
    // (Applicable to Binary Search Tree only)
    public boolean deleteFirst(Node currNode, E e) {
        return deleteFirst(currNode, null, true, e);
    }

    // deleteFirst helper recursive method carrying parentNode
    public boolean deleteFirst(Node currNode, Node parentNode, boolean onLeft, E e) {
        if (currNode == null) {
            return false;
        }
        else {
            if (((Comparable) e).compareTo(currNode.element) == 0) {
                if (currNode.left == null) {
                    if (onLeft)
                        parentNode.left = currNode.right;
                    else
                        parentNode.right = currNode.right;
                }
                else if (currNode.right == null) {
                    if (onLeft)
                        parentNode.left = currNode.left;
                    else
                        parentNode.right = currNode.left;
                }
                else {
                    // currNode takes data of maxNode on currNode's left branch and remove maxNode
                    Node maxNode = currNode.left;
                    Node maxNodeParent = currNode;

                    if (maxNode.right == null) {
                        currNode.element = maxNode.element;
                        maxNodeParent.left = maxNode.left;
                    }
                    else {
                        while (maxNode.right != null) {
                            maxNodeParent = maxNode;
                            maxNode = maxNode.right;
                        }
                        currNode.element = maxNode.element;
                        maxNodeParent.right = null;
                    }
                }
                return true;
            }
            else {
                if (((Comparable) e).compareTo(currNode.element) < 0) {
                    onLeft = true;
                    return deleteFirst(currNode.left, currNode, onLeft, e);
                }
                else {
                    onLeft = false;
                    return deleteFirst(currNode.right, currNode, onLeft, e);
                }
            }
        }
    }

    public int size(Node currNode) {
        if (currNode == null)
            return 0;
        else
            return (size(currNode.left) + size(currNode.right) + 1);
    }

    public int height(Node currNode) {
        if (currNode == null)
            return -1;
        else
            return (Math.max(height(currNode.left), height(currNode.right)) + 1);
    }

    public Node rotateLeft(Node currNode) {
        return super.rotateLeft(currNode);
    }

    public Node rotateRight(Node currNode) {
        return super.rotateRight(currNode);
    }

    // In-order traversal (recursive)
    public void traverseIn(Node currNode) {
        if(currNode == null) {
            return;
        }
        else {
            traverseIn(currNode.left);
            currNode.printNode();
            traverseIn(currNode.right);
        }
    }

    // Pre-order traversal (recursive)
    public void traversePre(Node currNode) {
        if(currNode == null) {
            return;
        }
        else {
            currNode.printNode();
            traversePre(currNode.left);
            traversePre(currNode.right);
        }
    }

    // Post-order traversal (recursive)
    public void traversePost(Node currNode) {
        if(currNode == null) {
            return;
        }
        else {
            traversePost(currNode.left);
            traversePost(currNode.right);
            currNode.printNode();
        }
    }

    public void printNodeEnd() {
        super.printNodeEnd();
    }
}  

BinTreeI.java – binary tree class using iterative methods

package bintree;

import java.util.LinkedList;

// BinTree class using iterative methods
@SuppressWarnings({"unchecked"})
class BinTreeI<E extends Comparable<E>> extends BinTree<E> {
    public Node root;

    public BinTreeI() {
        super();
    }

    public BinTreeI(E e) {
        super(e);
    }

    public boolean isEmpty() {
        return super.isEmpty();
    }

    public void addLeft(Node currNode, E e) {
        super.addLeft(currNode, e);
    }

    public void addRight(Node currNode, E e) {
        super.addRight(currNode, e);
    }

    public void removeLeft(Node currNode) {
        super.removeLeft(currNode);
    }

    public void removeRight(Node currNode) {
        super.removeRight(currNode);
    }

    // Search the subtree under the current node for matching data
    // (Applicable to Binary Search Tree only)
    public boolean search(Node currNode, E e) {
        if (currNode == null)
            return false;

        while (currNode != null) {
            if (((Comparable) e).compareTo(currNode.element) == 0) {
                return true;
            }
            else {
                if (((Comparable) e).compareTo(currNode.element) < 0) {
                    currNode = currNode.left;
                }
                else {
                    currNode = currNode.right;
                }
            }
        }
        return false;
    }
    // Insert a node into the subtree under the current node
    // (Applicable to Binary Search Tree only)
    public void insert(Node currNode, E e) {
        if (currNode == null)  // Do nothing if the input node is null
            return;

        while (currNode != null) {
            if (((Comparable) e).compareTo(currNode.element) <= 0) {
                if (currNode.left != null)
                    currNode = currNode.left;
                else {
                    currNode.left = new Node(e);
                    break;
                }
            }
            else {
                if (currNode.right != null)
                    currNode = currNode.right;
                else {
                    currNode.right = new Node(e);
                    break;
                }
            }
        }
    }

    // Delete the first node found with matching data in the subtree under the current node
    // by replacing it with the max-value node in the branch to the left of the found node
    // (Applicable to Binary Search Tree only)
    public boolean deleteFirst(Node currNode, E e) {
        if (currNode == null)  // Do nothing if the input node is null
            return false;

        boolean onLeft = true;
        Node parentNode = null;

        // Search for the first node with matching data
        while (currNode != null) {
            if (((Comparable) e).compareTo(currNode.element) == 0) {
                break;
            }
            else {
                parentNode = currNode;
                if (((Comparable) e).compareTo(currNode.element) < 0) {
                    onLeft = true;
                    currNode = currNode.left;
                }
                else {
                    onLeft = false;
                    currNode = currNode.right;
                }
            }
        }

        if (currNode == null) {  // No node with matching data
            return false;
        }
        else {
            if (currNode.left == null) {
                // Remove currNode
                if (onLeft)
                    parentNode.left = currNode.right;
                else
                    parentNode.right = currNode.right;
            }
            else if (currNode.right == null) {
                // Remove currNode
                if (onLeft)
                    parentNode.left = currNode.left;
                else
                    parentNode.right = currNode.left;
            }
            else {
                // currNode takes data of maxNode on currNode's left branch and remove maxNode
                Node maxNode = currNode.left;
                Node maxNodeParent = currNode;

                if (maxNode.right == null) {
                    currNode.element = maxNode.element;
                    maxNodeParent.left = maxNode.left;
                }
                else {
                    while (maxNode.right != null) {
                        maxNodeParent = maxNode;
                        maxNode = maxNode.right;
                    }
                    currNode.element = maxNode.element;
                    maxNodeParent.right = null;
                }
            }
        }
        return true;
    }

    // Calculate tree size using in-order traversal logic
    public int size(Node currNode) {
        LinkedList<Node> stack = new LinkedList<Node>();
        int nodeCount = 0;

        while (currNode != null || !stack.isEmpty()) {
            if (currNode != null) {
                stack.push(currNode);
                currNode = currNode.left;
            }
            else {
                currNode = stack.pop();
                nodeCount++;
                currNode = currNode.right;
            }
        }
        return nodeCount;
    }

    // Calculate tree height using post-order traversal logic
    public int height(Node currNode) {
        LinkedList<Node> stack = new LinkedList<Node>();
        Node prevNode = null;
        int maxCount = 0;

        if (currNode == null)
            return -1;
        stack.push(currNode);

        while (!stack.isEmpty()) {
            currNode = stack.peek();
            // Traversing down the tree from left or right
            if (prevNode == null || currNode == prevNode.left || currNode == prevNode.right) {
                if (currNode.left != null)
                    stack.push(currNode.left);
                else if (currNode.right != null)
                    stack.push(currNode.right);
            }
            // Traversing up the tree from the left
            else if (currNode.left == prevNode) {
                if (currNode.right != null)
                    stack.push(currNode.right);
            }
            // Traversing up the tree from the right, at a leaf node or otherwise
            else {
                currNode = stack.pop();
            }
            prevNode = currNode;
            if (stack.size() > maxCount)
                maxCount = stack.size();
        }
        return maxCount - 1;
    }

    public Node rotateLeft(Node currNode) {
        return super.rotateLeft(currNode);
    }

    public Node rotateRight(Node currNode) {
        return super.rotateRight(currNode);
    }

    // In-order traversal
    public void traverseIn(Node currNode) {
        LinkedList<Node> stack = new LinkedList<Node>();

        while (currNode != null || !stack.isEmpty()) {
            if (currNode != null) {
                stack.push(currNode);
                currNode = currNode.left;
            }
            else {
                currNode = stack.pop();
                currNode.printNode();
                currNode = currNode.right;
            }
        }
        printNodeEnd();
    }

    // Pre-order traversal
    public void traversePre(Node currNode) {
        LinkedList<Node> stack = new LinkedList<Node>();

        while (currNode != null || !stack.isEmpty()) {
            if (currNode != null) {
                stack.push(currNode);
                currNode.printNode();
                currNode = currNode.left;
            }
            else {
                currNode = stack.pop();
                currNode = currNode.right;
            }
        }
        printNodeEnd();
    }

    // Post-order traversal using a single stack
    public void traversePost(Node currNode) {
        LinkedList<Node> stack = new LinkedList<Node>();
        Node prevNode = null;

        if (currNode == null)
            return;
        stack.push(currNode);

        while (!stack.isEmpty()) {
            currNode = stack.peek();
            // Traversing down the tree from left or right
            if (prevNode == null || currNode == prevNode.left || currNode == prevNode.right) {
                if (currNode.left != null)
                    stack.push(currNode.left);
                else if (currNode.right != null)
                    stack.push(currNode.right);
            }
            // Traversing up the tree from the left
            else if (currNode.left == prevNode) {
                if (currNode.right != null)
                    stack.push(currNode.right);
            }
            // Traversing up the tree from the right or otherwise
            else {
                currNode.printNode();
                currNode = stack.pop();
            }
            prevNode = currNode;
        }
        printNodeEnd();
    }

    /*
    // Post-order traversal using two stacks
    public void traversePost(Node currNode) {
        LinkedList<Node> inStack = new LinkedList<Node>();
        LinkedList<Node> outStack = new LinkedList<Node>();

        if (currNode == null)
            return;
        inStack.push(currNode);

        while (!inStack.isEmpty()) {
            currNode = inStack.pop();
            outStack.push(currNode);
            if (currNode.left != null)
                inStack.push(currNode.left);
            if (currNode.right != null)
                inStack.push(currNode.right);
        }

        while (!outStack.isEmpty()) {
            currNode = outStack.pop();
            currNode.printNode();
        }
        printNodeEnd();
    }
    */

    public void printNodeEnd() {
        super.printNodeEnd();
    }
}  

BinTreeMain.java – test application

package bintree;

import java.io.*;
import java.util.*;

class BinTreeMain {

    @SuppressWarnings({"unchecked"})
    public static void main(String[] args) {
        if (args.length != 1 || !args[0].matches("[iIrR]")) {
            System.out.println("Usage: java -cp build/bintree-r1.0.0.jar bintree.BinTreeMain I|R (Iterative or Recursive)");
            System.out.println("  e.g. java -cp build/bintree-r1.0.0.jar bintree.BinTreeMain R");
            System.exit(0);
        }

        BinTree tree = null;

        boolean useRecurMethods = args[0].matches("[rR]");
        if (!useRecurMethods) {
            tree = new BinTreeI();
        }
        else {
            tree = new BinTreeR();
        }

        /* Tree content

                      (E)
                     /   \
                   /       \
                (C)         (H)
               /   \       /   \
            (B)    (D)  (G)     (J)
           /           /       /
        (A)         (F)     (I)
                           /   \
                       (HH)     (II)

        */

        tree.root = new Node("E");

        // Test insert
        tree.insert(tree.root, "H");
        tree.insert(tree.root, "C");
        tree.insert(tree.root, "D");
        tree.insert(tree.root, "G");
        tree.insert(tree.root, "B");
        tree.insert(tree.root, "J");
        tree.insert(tree.root, "F");
        tree.insert(tree.root, "A");
        tree.insert(tree.root, "I");
        tree.insert(tree.root, "HH");
        tree.insert(tree.root, "II");

        // Test search
        System.out.println("Result: " + (tree.search(tree.root, "G") ? "Found!" : "Not found!"));
        System.out.println("Result: " + (tree.search(tree.root, "FF") ? "Found!" : "Not found!"));

        System.out.println("\nTraversing tree from root ...");
        tree.traverseIn(tree.root);
        if (useRecurMethods) tree.printNodeEnd();
        System.out.println("\nSize of tree: " + tree.size(tree.root));
        System.out.println("Height of tree: " + tree.height(tree.root));

        // Test delete
        System.out.println("\nDeleting first node containing B from root ...");
        System.out.println("Result: " + (tree.deleteFirst(tree.root, "B") ? "Found and deleted!" : "Not found!"));

        System.out.println("\nDeleting first node containing CC from root ...");
        System.out.println("Result: " + (tree.deleteFirst(tree.root, "CC") ? "Found and deleted!" : "Not found!"));

        System.out.println("\nDeleting first node containing H from root ...");
        System.out.println("Result: " + (tree.deleteFirst(tree.root, "H") ? "Found and deleted!" : "Not found!"));

        System.out.println("\nDeleting first node containing J from root ...");
        System.out.println("Result: " + (tree.deleteFirst(tree.root, "J") ? "Found and deleted!" : "Not found!"));

        System.out.println("\nDeleting first node containing E (root node) from root ...");
        System.out.println("Result: " + (tree.deleteFirst(tree.root, "E") ? "Found and deleted!" : "Not found!"));
        System.out.println("Root node now has data " + tree.root.element);

        System.out.println("\nTraversing tree from root ...");
        tree.traverseIn(tree.root);
        if (useRecurMethods) tree.printNodeEnd();
        System.out.println("\nSize of tree: " + tree.size(tree.root));
        System.out.println("Height of tree: " + tree.height(tree.root));
    }
}

The War For Software Talent

Having built software engineering teams for startups throughout my recent career, I must say that it’s not the easiest thing to do. It has gotten even tougher in the past few years as competition for software veterans has been more fierce than ever.

A mundane supply-and-demand issue

Prominent technology companies such as Google, Twitter, Amazon, have evolved over the past decade to become powerful titans by all standards. They have been vacuuming the talent space like black holes. Their continual sky-high stock value has led to spin-offs of child startups by their departed alumni who are often times some of the best talents themselves. These spun-off entrepreneurs were once key contributors but are no longer so, thus depleting the talent pool for hire.

The spin-offs leave the mother companies no choice but to step up their vacuum machine power. Meanwhile, these spun-offs create their own vacuum machines and, with some elite company names in their profiles, instantly become additional black holes to scoop up talents. That’s all good in helping to prosper the technology industry. But it inconveniently starves the average startups of desperately needed veterans, and running the code below is not going to help.

if (demand > supply) {
    throw new InsufficientResourceException();
}

Because of the imbalance in the supply-and-demand of top talents, the compensation package for them has skyrocketed to the point that the average startups are having a hard time to justify it. Unlike established and well-funded companies, these startups must watch where every single dime goes and a salary at the level of a small startup CEO’s range for a veteran engineer can be a deal breaker.

Product Managers, QA, DevOps, …

For any given software engineering team, focus is often put on the engineers responsible for the very coding task. But any company who has ever built real-world products realizes how tough it would be to deliver them without an able product manager. Star product managers with adequate technical background and product management skills are equally hard to find. In all my previous startup ventures, product managers have always been some of the hardest roles to fill with the right people.

Good QA engineers are more difficult to find than R&D engineers. Many engineers simply lack the attributes, including patience and a detective mind, required to perform well in quality assurance. In addition, most people with strong programming skills prefer seeking R&D jobs to QA jobs. The end result is that many QA engineers come from a background of less stringent training in programming. In a technology demanding engineering team, that can be a barrier to carrying out quality QA tasks.

Experienced operations staff who handle systems and database administration have always been hard to find. The fact that I’ve been forced to play those roles myself on and off since my first startup venture in the 90’s tells a lot. With more and more cloud-based services arising and blurring the line between the software development and network operations worlds, DevOps were born. Instead of supporting non-technical users, they support highly technical software engineers. So, now we’re talking about a hard-to-find sys op with some software engineering background. These DevOps are like network engineers in the old days. They are endangered species.

What about quants?

The recent Big Data analytics movement has spiked a sudden demand of data scientists, a.k.a. quants. Quants possess domain expertise in quantitative, statistical analysis and machine learning algorithms that are crucial to businesses that need to digest their ever growing Big Data. Often times, an advanced degree in a natural science discipline such as physics or mathematics is required to qualify for such jobs, although other disciplines like mechanical engineering, operations research also prove highly applicable.

Despite the demanding requirement, I was having less trouble finding qualified quants than veteran software engineers. Perhaps demand for quants is still relatively fresh and not many companies know how best to tackle it yet. Coming from a natural science academic background myself, it was also a bit troubling to find out that a quant with a PhD in Physics from MIT and 5 years of post-Doctoral work costs less than a software engineer with a BS in Computer Science from an average college and 5 years of programming experience.

Where can I find them?

To build or grow your engineering team, if you do not already have at least a couple of trusted lieutenants and engineers as part of the team’s backbone, you will surely be up against a pretty big challenge. Unfortunately, that’s not an uncommon situation. As you’re advancing your career throughout the years, those who were once your star team members might’ve grown to playing similar role to yours or starting their own ventures (and competing with you for talents). So, chances are that there are some critical roles you need to fill from time to time.

Conventional wisdom suggests that hiring through internal referrals is always preferred. That is still true, as ever. It also makes sense to expand it further to connect with your friends, alumni, excolleagues, advisory board and board of directors for more leads. While it’s hard to guarantee success, posting jobs on job boards and professional social networks such as Monster and LinkedIn remains logical steps to advertise your hiring needs. One can also try local community networks like Craigslist especially if you only want local candidates.

For projects with well-defined specs and clear metric for measuring success, using less expensive off-shore resources may make sense. Cost for a near-shore full-time engineer is between one-third and half of that of a local engineer, but one should factor in the extra management cost incurred. Near-shore has an advantage of more synchronous time zones. Trusted references about their service quality are essential in your evaluation process.

As for quants, there might not be many candidates available for hiring through the above channels. Because academic specialty weighs a lot in a quant job’s requirement, it makes sense to try acquire those talents directly from the academia. NACELink (http://www.nacelink.com/) is a great starting point for advertising your need through their extensive school network.

Recruiters?

If you foresee an on-going hiring need, you should get help from technical recruiters. They work on a contingency or retained basis. The latter is often preferred when you have a relatively large number of job openings to be filled in the short term. It’s recommended that a goal with a timeline be set upfront to measure success so as to keep precious time and budget in control. On the other hand, there is no reason for you to limit your hiring channel to only one type of recruiters.

Finding competent technical recruiters is tricky, as you wouldn’t know whether a recruiter is good or not till you have the chance to at least go through a couple of leads from him/her. For each job opening, it’s recommended that you always provide your recruiters a carefully thought-out set of technical questions for them to go over as the initial filter. A couple of reasons for that:

1. Nobody knows better than yourself what exactly the expertise you want from the candidate
2. Many technical recruiters don’t necessarily have strong technical background, despite their technical title

Conclusion: No magic pills

Finding top talents to join your engineering team can be an exhausting effort, fruitless at times. You have to invest a huge amount of your own time and effort, even if you use recruiters. The fact is that there are great techies who aren’t good at marketing themselves and mediocre ones with excellent profiles on paper. People talk about the 80-20 rule, but you’ll be considered very lucky if 20% of your team are truly top talents.

A significant part in the process of acquiring talents involves some selling effort. While selling the company’s prospect and evangelizing the adopted technologies help, many engineering veterans nowadays are sophisticated enough to have done their homework. So, the key is not to oversell them and be consistent among the “sellers”. It’s kind of analogous to reporting to your investors in a board meeting – just highlight the key achievements that count and back them by unambiguous metrics. They already did their homework. Even if not, you should assume they did.

Streaming Real-time Data Into HBase

Fast-write is generally a characteristic strength of distributed NoSQL databases such as HBase, Cassandra. Yet, for a distributed application that needs to capture rapid streams of data in a database, standard connection pooling provided by the database might not be up to the task. For instance, I didn’t get the kind of wanted performance when using HBase’s HTablePool to accommodate real-time streaming of data from a high-parallelism data dumping Storm bolt.

To dump rapid real-time streaming data into HBase, instead of HTablePool it might be more efficient to embed some queueing mechanism in the HBase storage module. An ex-colleague of mine, who is the architect at a VoIP service provider, employs the very mechanism in their production HBase database. Below is a simple implementation that has been tested performing well with a good-sized Storm topology. The code is rather self-explanatory. The HBaseStreamers class consists of a threaded inner class, Streamer, which maintains a queue of HBase Put using LinkedBlockingQueue. Key parameters are in the HBaseStreamers constructor argument list, including the ZooKeeper quorum, HBase table name, HTable auto-flush switch, number of streaming queues and streaming queue capacity.

package hbstream;

import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseStreamers {
    private Configuration hbaseConfig;
    private Streamer[] streamers;
    private boolean started = false;

    private class Streamer implements Runnable {
        private LinkedBlockingQueue<Put> queue;
        private HTable table;
        private String tableName;
        private int counter = 0;

        public Streamer(String tableName, boolean autoFlush, int capacity) throws Exception {
            table = new HTable(hbaseConfig, tableName);
            table.setAutoFlush(autoFlush);
            this.tableName = tableName;
            queue = new LinkedBlockingQueue<Put>(capacity);
        }

        public void run() {
            while (true) {
                try {
                    Put put = queue.take();
                    table.put(put);
                    counter++;
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        public void write(Put put) throws Exception {
            queue.put(put);
        }

        public void flush() {
            if (!table.isAutoFlush()) {
                try {
                    table.flushCommits();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        public int size() {
            return queue.size();
        }

        public int counter() {
            return counter;
        }
    }

    public HBaseStreamers(String quorum, String port, String tableName, boolean autoFlush, int numOfStreamers, int capacity) throws Exception {
        hbaseConfig = HBaseConfiguration.create();
        hbaseConfig.set("hbase.zookeeper.quorum", quorum);
        hbaseConfig.set("hbase.zookeeper.property.clientPort", port);
        streamers = new Streamer[numOfStreamers];
        for (int i = 0; i < streamers.length; i++) {
            streamers[i] = new Streamer(tableName, autoFlush, capacity);
        }
    }

    public Runnable[] getStreamers() {
        return streamers;
    }

    public synchronized void start() {
        if (started) {
            return;
        }
        started = true;
        int count = 1;
        for (Streamer streamer : streamers) {
            new Thread(streamer, streamer.tableName + " HBStreamer " + count).start();
            count++;
        }
    }

    public void write(Put put) throws Exception {
        int i = (int) (System.currentTimeMillis() % streamers.length);
        streamers[i].write(put);
    }

    public void flush() {
        for (Streamer streamer : streamers) {
            streamer.flush();
        }
    }

    public int size() {
        int size = 0;
        for (Streamer st : streamers) {
            size += st.size();
        }
        return size;
    }

    public int counter() {
        int counter = 0;
        for (Streamer st : streamers) {
            counter += st.counter();
        }
        return counter;
    }
}

Next, write a wrapper class similar to the following to isolate HBase specifics from the streaming application.

package hbstream;
....

public class StreamToHBase {
    private static final String tableName = "stormhbtest";
    private static final byte[] colFamily = Bytes.toBytes("data");
    private static final byte[] colQualifier = Bytes.toBytes("message");
    private static boolean isInit = false;
    private static HBaseStreamers hbStreamers = null;
    ....

    public static synchronized void init(String zkQuorum, String zkPort, boolean autoFlush, int numOfStreamers, int queueCapacity)
        throws Exception {

        if (isInit == true)
            return;
        isInit = true;
        HBaseStreamers streamers = new HBaseStreamers(zkQuorum, zkPort, tableName, autoFlush, numOfStreamers, queueCapacity);
        streamers.start();
        hbStreamers = streamers;
        ....
    }

    public static void writeMessage(String message) throws Exception {
        byte[] value = Bytes.toBytes(message);
        byte[] rowIdBytes = Bytes.toBytes(UUID.randomUUID().toString());
        Put p = new Put(rowIdBytes);
        p.add(colFamily, colQualifier, value);
        if (hbStreamers != null) {
            hbStreamers.write(p);
        }
    }
    ....
}

To test it with a distributed streaming application using Storm, write a bolt similar to the following skeleton. All that is needed is to initialize HBaseStreamers from within the bolt’s prepare() method and dump data to HBase from within bolt’s execute().

package hbstream;
....

public class HBStreamTestBolt extends BaseRichBolt {
    OutputCollector _collector;
    ....

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        ....
        try {
            StreamToHBase.init("172.16.47.101, 172.16.47.102, 172.16.47.103", "2181", false, 4, 1000);
        }
        catch (Exception e) {
            ....
        }
        ....
    }

    @Override
    public void execute(Tuple tuple) {
        ....
        try {
            StreamToHBase.writeMessage(message);
        }
        catch (Exception e) {
            ....
        }
        ....
    }
    ....
}

Finally, write a Storm spout to serve as the streaming data source and a Storm topology builder to put the spout and bolt together.

package hbstream;
....

public class HBStreamTestSpout extends BaseRichSpout {
    SpoutOutputCollector _collector;
    ....

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        words = new ArrayList();
        ....
    }

    @Override
    public void nextTuple() {
        int rand = (int) (Math.random() * 1000);
        String word = words.get(rand % words.size());
        _collector.emit(new Values(word));
        ....
    }
    ....
}
package hbstream;
....

public class HBStreamTopology {
    ....

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("testSpout", new HBStreamTestSpout(), 4);
        builder.setBolt("testBolt", new HBStreamTestBolt(), 6)
               .shuffleGrouping("testSpout");

        Config conf = new Config();
        conf.setNumWorkers(2);
        StormSubmitter.submitTopology("HBStreamTopology", conf, builder.createTopology());
        ....
    }
}

The parallelism/queue parameters are set to relatively small numbers in the above sample code. Once tested working, one can tweak all the various dials in accordance with the server cluster capacity. These dials include the following:

StreamToHBase.init():
    - boolean autoFlush
    - int numOfStreamers
    - int queueCapacity

TopologyBuilder.setSpout():
    - Number parallelismHint

TopologyBuilder.setBolt():
    - Number parallelismHint

Config.setNumWorkers():
    - int workers

For simplicity, only HBase Put is being handled in the above implementation. It certainly can be expanded to handle also HBase Increment so as to carry out aggregation functions such as count. The primary goal of this Storm-to-HBase streaming exercise is to showcase the using of a module equipped with some “elasticity” by means of configurable queues. The queueing mechanism within HBaseStreamers provides cushioned funnels for the data streams and helps optimize the overall data intake bandwidth. Keep in mind, though, that doesn’t remove the need of administration work for a properly configured HBase-Hadoop system.