Tag Archives: reactor server

NIO-based Reactor In Scala

For high concurrency at scale, event-driven server design with non-blocking I/O operations has been one of the most popular server architectures. Nginx and Node.js, both leading server platforms in their own spaces, adopt the very technology. Among the various event-driven server implementations, the Reactor pattern remains a prominent design pattern that leverages an event loop equipped with a demultiplexer to efficiently select events that are ready to be processed by a set of event handers.

Back in 2013, I wrote a blog post about building a barebone server using Java NIO API to implement the Reactor pattern with non-blocking I/O in Java. The goal here is to rewrite the NIO-based Reactor server in Scala.

Java NIO and Reactor pattern

A quick recap of Java NIO, which consists of the following key components:

  • Buffer – a container of primitive typed data (e.g. Byte, Int) that can be optimized for native I/O operations with memory alignment and paging functionality
  • Channel – a connector associated with an I/O entity (e.g. files, sockets) that supports non-blocking I/O operations
  • Selector – a demultiplexer on an event loop that selects events which are ready for carrying out pre-registered I/O operations (e.g. read, write)

Note that NIO Channel implements SelectableChannel which can be registered with the Selector as a SelectionKey for any I/O operations of interest. To optimally handle high-volume client connections to the server, channels can be configured via method configureBlocking(false) to support non-blocking I/O.

With NIO Buffers enabling optimal memory access and native I/O operations, Channels programmatically connecting I/O entities, and Selector serving as the demultiplexer on an event loop selecting ready-to-go I/O events to execute in a non-blocking fashion, the Java NIO API is a great fit for implementing an effective Reactor server.

Reactor event loop

This Scala version of the NIO Reactor server consists of two main classes NioReactor and Handler, along with a trait SelKeyAttm which is the base class for objects that are to be coupled with individual selection-keys as their attachments (more on this later).

Central to the NioReactor class is the “perpetual” event loop performed by class method selectorLoop(). It’s an recursive function that doesn’t ever return (thus returning Nothing), equivalent to the conventional infinite while(true){} loop. All it does is to repetitively check for the selection-keys whose corresponding channels are ready for the registered I/O operations and iterate through the keys to carry out the necessary work defined in the passed-in function iterFn().

  import java.util.{Iterator => JavaIter}

  @scala.annotation.tailrec
  final def selectorLoop(iterFn: (JavaIter[SelectionKey]) => Unit): Nothing = {
    selector.select()
    val it = selector.selectedKeys().iterator()
    iterFn(it)
    selectorLoop(iterFn)
  }

Function iterateSelKeys, which is passed in as the parameter for the event loop function, holds the selection-keys iteration logic. While it’s tempting to convert the Java Iterator used in the original Java application to a Scala Iterator, the idea was scrapped due to the need for the timely removal of the iterated selection-key elements via remove() which apparently is a required step for the time-critical inner working of the selector. Scala Iterator (or Iterable) does not have such method or its equivalence.

  @scala.annotation.tailrec
  final def iterateSelKeys(it: JavaIter[SelectionKey]): Unit = {
    if (it.hasNext()) {
      val sk = it.next()
      it.remove()
      val attm: SelKeyAttm = sk.attachment().asInstanceOf[SelKeyAttm]
      if (attm != null)
        attm.run()
      iterateSelKeys(it)
    }
    else ()
  }

Contrary to the selection-key attachments being of type Runnable in the original version, they’re now a subtype of SelKeyAttm each of which implements method run() that gets called once selected by the Selector. Using Scala Futures, Runnables are no longer the object type of the selection-key attachments. By making SelKeyAttm the base type for objects attached to the selection-keys, a slightly more specific “contract” (in the form of method specifications) is set up for those objects to adhere to.

Acceptor

The Acceptor, associated with the NIO ServerSocketChannel for the listener socket, is a subtype of SelKeyAttm. It’s responsible for reception of server connection requests.

  class Acceptor extends SelKeyAttm {
    def run(): Try[Unit] = Try {
        val channel: SocketChannel = serverChannel.accept()
        if (channel != null)
          new Handler(selector, channel)
        ()
      }
      .recover {
        case e: IOException => println(s"Acceptor: $e")
      }
  }

Part of class NioReactor’s constructor routine is to bind the ServerSocketChannel to a specified port number. It’s also where the ServerSocketChannel is configured to be non-blocking and registered with the selector it’s ready to accept connections (OP_ACCEPT), subsequently creating a selection-key with the Acceptor instance as its attachment.

class NioReactor(port: Int) {
  implicit val ec: ExecutionContext = NioReactor.ec

  val selector: Selector = Selector.open()
  val serverChannel: ServerSocketChannel = ServerSocketChannel.open()

  serverChannel.socket().bind(new InetSocketAddress(port))
  serverChannel.configureBlocking(false)

  val sk: SelectionKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT)
  sk.attach(new Acceptor())

  // ...
}

The companion object of the NioReactor class is set up with a thread pool to run the Reactor server at a provided port number in a Scala Future.

object NioReactor {
  val poolSize: Int = 10
  val workerPool = Executors.newFixedThreadPool(poolSize)
  implicit val ec = ExecutionContext.fromExecutorService(workerPool)

  def apply(port: Int = 9090): Future[Unit] = Future {
      (new NioReactor(port)).loop()
    }
    .recover {
      case e: IOException => println(s"Reactor($port): $e")
    }

  // ...
}

Event handlers

As shown in the snippet of the Acceptor class, upon acceptance of a server connection, an instance of Handler is spawned. All events (in our case, the reading requests from and writing responses to client sockets) are processed by those handlers, which are another subtype of SelKeyAttm.

The Handler class instance takes a Selector and a SocketChannel as parameters, initializes a couple of ByteBuffers for read/write, configures the SocketChannel to be non-blocking, registers with the selector for I/O operation OP_READ, creates a selection-key with the existing handler instance as its attachment, followed by nudging the selector for immediate return of any selected channels.

Method run() is responsible for, upon being called, carrying out the main read/write handling logic in accordance with the selection-key the passed-in SocketChannel is associated with and the corresponding I/O operation of interest.

object Handler {
  val readBufSize: Int = 1024
  val writeBufSize: Int = 1024
}

class Handler(sel: Selector, channel: SocketChannel)(implicit ec: ExecutionContext) extends SelKeyAttm {
  import Handler._

  var selKey: SelectionKey = null
  val readBuf = ByteBuffer.allocate(readBufSize)
  var writeBuf = ByteBuffer.allocate(writeBufSize)

  channel.configureBlocking(false)

  selKey = channel.register(sel, SelectionKey.OP_READ)
  selKey.attach(this)
  sel.wakeup()

  def run(): Try[Unit] = Try {
      if (selKey.isReadable())
        read()
      else if (selKey.isWritable())
        write()
    }
    .recover {
      case e: IOException => println(s"Handler run(): $e")
    }

  def process(): Unit = ???

  def read(): Unit = ???

  def write(): Unit = ???

}

Processing read/write buffers

Method read() calls channel.read(readBuf) which reads a preset number of bytes from the channel into the readBuf ByteBuffer and returns the number of Bytes read. If the channel has reached “end-of-stream”, in which case channel.read() will return -1, the corresponding selection-key will be cancelled and the channel will be closed; otherwise, processing work will commence.

  def read(): Unit = synchronized {
      Try {
          val numBytes: Int = channel.read(readBuf)
          println("Handler read(): #bytes read into 'readBuf' buffer = " + numBytes)
  
          if (numBytes == -1) {
            selKey.cancel()
            channel.close()
            println("Handler read(): client connection might have been dropped!")
          }
          else {
            Future {
                process()
              }
              .recover {
                case e: IOException => println(s"Handler process(): $e")
              }
          }
        }
        .recover {
          case e: IOException => println(s"Handler read(): $e")
        }
    }

Method process() does the actual post-read processing work. It’s supposed to do the heavy-lifting (thus being wrapped in a Scala Future), although in this trivial server example, all it does is simply echoing whatever read from the readBuf ByteBuffer using the NIO Buffer API and write into the writeBuf ByteBuffer, followed by switching the selection-key’s I/O operation of interest to OP_WRITE.

  def process(): Unit = synchronized {
      readBuf.flip()
      val bytes: Array[Byte] = Array.ofDim[Byte](readBuf.remaining())
      readBuf.get(bytes, 0, bytes.length)
      print("Handler process(): " + new String(bytes, Charset.forName("ISO-8859-1")))

      writeBuf = ByteBuffer.wrap(bytes)

      selKey.interestOps(SelectionKey.OP_WRITE)
      selKey.selector().wakeup()
    }

Method write() calls channel.write(writeBuf) to write from the writeBuf ByteBuffer into the calling channel, followed by clearing both the read/write ByteBuffers and switching the selection-key’s I/O operation of interest back to OP_READ.

  def write(): Unit = {
    Try {
        val numBytes: Int = channel.write(writeBuf)
        println("Handler write(): #bytes read from 'writeBuf' buffer = " + numBytes)

        if (numBytes > 0) {
          readBuf.clear()
          writeBuf.clear()

          selKey.interestOps(SelectionKey.OP_READ)
          selKey.selector().wakeup()
        }
      }
      .recover {
        case e: IOException => println(s"Handler write(): $e")
      }
  }

Final thoughts

In this code rewrite in Scala, the main changes include the replacement of:

  • Java Runnable with Scala Future along with the base type SelKeyAttm for the Acceptor and Handler objects that are to be attached to selection-keys
  • while-loop with recursive functions
  • try-catch with Try-recover

While Java NIO is a great API for building efficient I/O-heavy applications, its underlying design apparently favors the imperative programming style. Rewriting the NIO-based Reactor server application using a functional programming language like Scala doesn’t necessarily make the code easier to read or maintain, as many function calls in the API return void (i.e. Scala Unit) and mutate variables passed in as parameters, making it difficult to be thoroughly rewritten in an idiomatic fashion.

Full source code of the Scala NIO Reactor server application is available at this GitHub repo.

To compile and run the Reactor server, git-clone the repo and run sbt from the project-root at a terminal on the server host:

$ sbt compile
$ sbt "runMain reactor.NioReactor `port`"

Skipping the port number will bind the server to the default port 9090.

To connect to the Reactor server, use telnet from one or more client host terminals:

telnet `server-host` `port`
#  e.g. telnet reactor.example.com 8080, telnet localhost 9090

Any text input from the client host(s) will be echoed back by the Reactor server, which itself will also report what has been processed. Below are sample input/output from a couple of client host terminals and the server terminal:

## Client host terminal #1:

$ telnet 192.168.1.100 9090
Trying ::1...
Connected to 192.168.1.100.
Escape character is '^]'.
blah blah blah from term #1
blah blah blah from term #1
^]
telnet> quit
Connection closed.

## Client host terminal #2:

$ telnet 192.168.1.100 9090
Trying ::1...
Connected to 192.168.1.100.
Escape character is '^]'.
foo bar from term #2
foo bar from term #2
^]
telnet> quit
Connection closed.

## Server host terminal:

$ sbt "runMain reactor.NioReactor"
[info] ...
[info] running reactor.NioReactor 
Handler read(): #bytes read into 'readBuf' buffer = 29
Handler write(): #bytes read from 'writeBuf' buffer = 29
Handler read(): #bytes read into 'readBuf' buffer = 22
Handler write(): #bytes read from 'writeBuf' buffer = 22
Handler read(): #bytes read into 'readBuf' buffer = -1
Handler read(): client connection might have been dropped!
Handler read(): #bytes read into 'readBuf' buffer = -1
Handler read(): client connection might have been dropped!
^C
[warn] Canceling execution...
Cancelled: runMain reactor.NioReactor

As a side note, the output from method Handler.process() which is wrapped in a Scala Future will be reported if the server is being run from within an IDE like IntelliJ.

NIO-based Reactor

Over the past few years, event-based architecture with non-blocking operations has been the norm for high-concurrency server architecture. The per-connection threading (process-based) architecture is no longer favored as an efficient design, especially for handling high volume of concurrent connections. The increasing popularity of Nginx and the relative decline of Apache httpd these days demonstrated the trend.

Java New I/O

Java’s NIO (New I/O, a.k.a. Non-blocking I/O) provides a set of APIs to efficiently handle I/O operations. The key ingredients of NIO include Buffer, Channel and Selector. A NIO Buffer virtually provides direct access to the operating system’s physical memory along with a rich set of methods for alignment and paging of the selected memory that stores any primitive-type data of interest. A NIO Channel then serves as the conduit for bulk data transfers between the Buffer and the associated entity (e.g. a socket).

A socket channel can be configured in non-blocking mode and events such as reading data from the associated socket no longer block the invoking thread for more time than necessary. Together with the NIO Selector responsible for selecting those of the concurrent events that are ready to be processed, NIO APIs are well equipped to handle event-based operations in an efficient fashion.

Non-blocking vs Asynchronous

Note that non-blocking mode is different from asynchronous mode. In non-blocking mode, a requested operation always returns the result immediately regardless of success or failure, thus freeing the invoking thread from being blocked. In asynchronous mode, a separate thread is used to carry out the requested operation in parallel with the invoking thread. Java 7 enhanced NIO to include support for asynchronous file and socket channels.

Reactor pattern

The Reactor pattern is a popular event-based architecture. Using NIO, implementing a basic event-based server on top of Reactor pattern is pretty straight forward. Appended is a bare minimal Reactor-pattern server consisting of a Reactor class and a Handler class.

The single-threaded Reactor class houses the main dispatcher loop responsible for selecting registered events that are ready for socket read/write operations. Registered with the dispatcher during initialization, the also single-threaded Acceptor is responsible for accepting socket connections requested by clients. Finally, the Handler class takes care of the actual events (read from socket, process data, write to socket) in accordance with its operational state.

Each Handler is associated with a SocketChannel and the Selector maintained by the Reactor class. Both variables are declared immutable for performance as well as allowing access by the inner Runnable class. The handler registers with the dispatcher indicating its interested operation (read or write) and gets dispatched when the associated socket is ready for the operation. The Runnable class forms the worker thread pool and is responsible for data processing (in this simple case, echoing), leaving the Handler thread responsible for just socket read/write.

To test the server, just launch it on a host (e.g. server1.example.com) and run a few telnet instances connecting to the server port (e.g. telnet server1.example.com 9090).

Source code: Reactor.java

package reactor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverChannel;
    static final int WORKER_POOL_SIZE = 10;
    static ExecutorService workerPool;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverChannel = ServerSocketChannel.open();
        serverChannel.socket().bind(new InetSocketAddress(port));
        serverChannel.configureBlocking(false);

        // Register the server socket channel with interest-set set to ACCEPT operation
        SelectionKey sk = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());
    }

    public void run() {
        try {
            while (true) {

                selector.select();
                Iterator it = selector.selectedKeys().iterator();

                while (it.hasNext()) {
                    SelectionKey sk = (SelectionKey) it.next();
                    it.remove();
                    Runnable r = (Runnable) sk.attachment();
                    if (r != null)
                        r.run();
                }
            }
        }
        catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    class Acceptor implements Runnable {
        public void run() {
            try {
                SocketChannel channel = serverChannel.accept();
                if (channel != null)
                    new Handler(selector, channel);
            }
            catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        workerPool = Executors.newFixedThreadPool(WORKER_POOL_SIZE);

        try {
            new Thread(new Reactor(9090)).start();
        }
        catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

Source code: Handler.java

package reactor;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

class Handler implements Runnable {
    final SocketChannel channel;
    final SelectionKey selKey;

    static final int READ_BUF_SIZE = 1024;
    static final int WRiTE_BUF_SIZE = 1024;
    ByteBuffer readBuf = ByteBuffer.allocate(READ_BUF_SIZE);
    ByteBuffer writeBuf = ByteBuffer.allocate(WRiTE_BUF_SIZE);

    Handler(Selector sel, SocketChannel sc) throws IOException {
        channel = sc;
        channel.configureBlocking(false);

        // Register the socket channel with interest-set set to READ operation
        selKey = channel.register(sel, SelectionKey.OP_READ);
        selKey.attach(this);
        selKey.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }

    public void run() {
        try {
            if (selKey.isReadable())
                read();
            else if (selKey.isWritable())
                write();
        }
        catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    // Process data by echoing input to output
    synchronized void process() {
        byte[] bytes;

        readBuf.flip();
        bytes = new byte[readBuf.remaining()];
        readBuf.get(bytes, 0, bytes.length);
        System.out.print("process(): " + new String(bytes, Charset.forName("ISO-8859-1")));

        writeBuf = ByteBuffer.wrap(bytes);

        // Set the key's interest to WRITE operation
        selKey.interestOps(SelectionKey.OP_WRITE);
        selKey.selector().wakeup();
    }

    synchronized void read() throws IOException {
        int numBytes;

        try {
            numBytes = channel.read(readBuf);
            System.out.println("read(): #bytes read into 'readBuf' buffer = " + numBytes);

            if (numBytes == -1) {
                selKey.cancel();
                channel.close();
                System.out.println("read(): client connection might have been dropped!");
            }
            else {
                Reactor.workerPool.execute(new Runnable() {
                    public void run() {
                        process();
                    }
                });
            }
        }
        catch (IOException ex) {
            ex.printStackTrace();
            return;
        }
    }

    void write() throws IOException {
        int numBytes = 0;

        try {
            numBytes = channel.write(writeBuf);
            System.out.println("write(): #bytes read from 'writeBuf' buffer = " + numBytes);

            if (numBytes > 0) {
                readBuf.clear();
                writeBuf.clear();

                // Set the key's interest-set back to READ operation
                selKey.interestOps(SelectionKey.OP_READ);
                selKey.selector().wakeup();
            }
        }
        catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}