Over the past few years, event-based architecture with non-blocking operations has been the norm for high-concurrency server architecture. The per-connection threading (process-based) architecture is no longer favored as an efficient design, especially for handling high volume of concurrent connections. The increasing popularity of Nginx and the relative decline of Apache httpd these days demonstrated the trend.
Java New I/O
Java’s NIO (New I/O, a.k.a. Non-blocking I/O) provides a set of APIs to efficiently handle I/O operations. The key ingredients of NIO include Buffer, Channel and Selector. A NIO Buffer virtually provides direct access to the operating system’s physical memory along with a rich set of methods for alignment and paging of the selected memory that stores any primitive-type data of interest. A NIO Channel then serves as the conduit for bulk data transfers between the Buffer and the associated entity (e.g. a socket).
A socket channel can be configured in non-blocking mode and events such as reading data from the associated socket no longer block the invoking thread for more time than necessary. Together with the NIO Selector responsible for selecting those of the concurrent events that are ready to be processed, NIO APIs are well equipped to handle event-based operations in an efficient fashion.
Non-blocking vs Asynchronous
Note that non-blocking mode is different from asynchronous mode. In non-blocking mode, a requested operation always returns the result immediately regardless of success or failure, thus freeing the invoking thread from being blocked. In asynchronous mode, a separate thread is used to carry out the requested operation in parallel with the invoking thread. Java 7 enhanced NIO to include support for asynchronous file and socket channels.
Reactor pattern
The Reactor pattern is a popular event-based architecture. Using NIO, implementing a basic event-based server on top of Reactor pattern is pretty straight forward. Appended is a bare minimal Reactor-pattern server consisting of a Reactor class and a Handler class.
The single-threaded Reactor class houses the main dispatcher loop responsible for selecting registered events that are ready for socket read/write operations. Registered with the dispatcher during initialization, the also single-threaded Acceptor is responsible for accepting socket connections requested by clients. Finally, the Handler class takes care of the actual events (read from socket, process data, write to socket) in accordance with its operational state.
Each Handler is associated with a SocketChannel and the Selector maintained by the Reactor class. Both variables are declared immutable for performance as well as allowing access by the inner Runnable class. The handler registers with the dispatcher indicating its interested operation (read or write) and gets dispatched when the associated socket is ready for the operation. The Runnable class forms the worker thread pool and is responsible for data processing (in this simple case, echoing), leaving the Handler thread responsible for just socket read/write.
To test the server, just launch it on a host (e.g. server1.example.com) and run a few telnet instances connecting to the server port (e.g. telnet server1.example.com 9090).
Source code: Reactor.java
package reactor; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.Selector; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverChannel; static final int WORKER_POOL_SIZE = 10; static ExecutorService workerPool; Reactor(int port) throws IOException { selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.socket().bind(new InetSocketAddress(port)); serverChannel.configureBlocking(false); // Register the server socket channel with interest-set set to ACCEPT operation SelectionKey sk = serverChannel.register(selector, SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); } public void run() { try { while (true) { selector.select(); Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey sk = (SelectionKey) it.next(); it.remove(); Runnable r = (Runnable) sk.attachment(); if (r != null) r.run(); } } } catch (IOException ex) { ex.printStackTrace(); } } class Acceptor implements Runnable { public void run() { try { SocketChannel channel = serverChannel.accept(); if (channel != null) new Handler(selector, channel); } catch (IOException ex) { ex.printStackTrace(); } } } public static void main(String[] args) { workerPool = Executors.newFixedThreadPool(WORKER_POOL_SIZE); try { new Thread(new Reactor(9090)).start(); } catch (IOException ex) { ex.printStackTrace(); } } }
Source code: Handler.java
package reactor; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.channels.Selector; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; class Handler implements Runnable { final SocketChannel channel; final SelectionKey selKey; static final int READ_BUF_SIZE = 1024; static final int WRiTE_BUF_SIZE = 1024; ByteBuffer readBuf = ByteBuffer.allocate(READ_BUF_SIZE); ByteBuffer writeBuf = ByteBuffer.allocate(WRiTE_BUF_SIZE); Handler(Selector sel, SocketChannel sc) throws IOException { channel = sc; channel.configureBlocking(false); // Register the socket channel with interest-set set to READ operation selKey = channel.register(sel, SelectionKey.OP_READ); selKey.attach(this); selKey.interestOps(SelectionKey.OP_READ); sel.wakeup(); } public void run() { try { if (selKey.isReadable()) read(); else if (selKey.isWritable()) write(); } catch (IOException ex) { ex.printStackTrace(); } } // Process data by echoing input to output synchronized void process() { byte[] bytes; readBuf.flip(); bytes = new byte[readBuf.remaining()]; readBuf.get(bytes, 0, bytes.length); System.out.print("process(): " + new String(bytes, Charset.forName("ISO-8859-1"))); writeBuf = ByteBuffer.wrap(bytes); // Set the key's interest to WRITE operation selKey.interestOps(SelectionKey.OP_WRITE); selKey.selector().wakeup(); } synchronized void read() throws IOException { int numBytes; try { numBytes = channel.read(readBuf); System.out.println("read(): #bytes read into 'readBuf' buffer = " + numBytes); if (numBytes == -1) { selKey.cancel(); channel.close(); System.out.println("read(): client connection might have been dropped!"); } else { Reactor.workerPool.execute(new Runnable() { public void run() { process(); } }); } } catch (IOException ex) { ex.printStackTrace(); return; } } void write() throws IOException { int numBytes = 0; try { numBytes = channel.write(writeBuf); System.out.println("write(): #bytes read from 'writeBuf' buffer = " + numBytes); if (numBytes > 0) { readBuf.clear(); writeBuf.clear(); // Set the key's interest-set back to READ operation selKey.interestOps(SelectionKey.OP_READ); selKey.selector().wakeup(); } } catch (IOException ex) { ex.printStackTrace(); } } }