For high concurrency at scale, event-driven server design with non-blocking I/O operations has been one of the most popular server architectures. Nginx and Node.js, both leading server platforms in their own spaces, adopt the very technology. Among the various event-driven server implementations, the Reactor pattern remains a prominent design pattern that leverages an event loop equipped with a demultiplexer to efficiently select events that are ready to be processed by a set of event handers.
Back in 2013, I wrote a blog post about building a barebone server using Java NIO API to implement the Reactor pattern with non-blocking I/O in Java. The goal here is to rewrite the NIO-based Reactor server in Scala.
Java NIO and Reactor pattern
A quick recap of Java NIO, which consists of the following key components:
- Buffer – a container of primitive typed data (e.g. Byte, Int) that can be optimized for native I/O operations with memory alignment and paging functionality
- Channel – a connector associated with an I/O entity (e.g. files, sockets) that supports non-blocking I/O operations
- Selector – a demultiplexer on an event loop that selects events which are ready for carrying out pre-registered I/O operations (e.g. read, write)
Note that NIO Channel
implements SelectableChannel which can be registered with the Selector
as a SelectionKey
for any I/O operations of interest. To optimally handle high-volume client connections to the server, channels can be configured via method configureBlocking(false)
to support non-blocking I/O.
With NIO Buffers
enabling optimal memory access and native I/O operations, Channels
programmatically connecting I/O entities, and Selector
serving as the demultiplexer on an event loop selecting ready-to-go I/O events to execute in a non-blocking fashion, the Java NIO API is a great fit for implementing an effective Reactor server.
Reactor event loop
This Scala version of the NIO Reactor server consists of two main classes NioReactor
and Handler
, along with a trait SelKeyAttm
which is the base class for objects that are to be coupled with individual selection-keys as their attachments (more on this later).
Central to the NioReactor
class is the “perpetual” event loop performed by class method selectorLoop()
. It’s an recursive function that doesn’t ever return (thus returning Nothing
), equivalent to the conventional infinite while(true){}
loop. All it does is to repetitively check for the selection-keys whose corresponding channels are ready for the registered I/O operations and iterate through the keys to carry out the necessary work defined in the passed-in function iterFn()
.
import java.util.{Iterator => JavaIter} @scala.annotation.tailrec final def selectorLoop(iterFn: (JavaIter[SelectionKey]) => Unit): Nothing = { selector.select() val it = selector.selectedKeys().iterator() iterFn(it) selectorLoop(iterFn) }
Function iterateSelKeys
, which is passed in as the parameter for the event loop function, holds the selection-keys iteration logic. While it’s tempting to convert the Java Iterator
used in the original Java application to a Scala Iterator
, the idea was scrapped due to the need for the timely removal of the iterated selection-key elements via remove()
which apparently is a required step for the time-critical inner working of the selector. Scala Iterator
(or Iterable) does not have such method or its equivalence.
@scala.annotation.tailrec final def iterateSelKeys(it: JavaIter[SelectionKey]): Unit = { if (it.hasNext()) { val sk = it.next() it.remove() val attm: SelKeyAttm = sk.attachment().asInstanceOf[SelKeyAttm] if (attm != null) attm.run() iterateSelKeys(it) } else () }
Contrary to the selection-key attachments being of type Runnable
in the original version, they’re now a subtype of SelKeyAttm
each of which implements method run() that gets called once selected by the Selector
. Using Scala Futures, Runnables
are no longer the object type of the selection-key attachments. By making SelKeyAttm
the base type for objects attached to the selection-keys, a slightly more specific “contract” (in the form of method specifications) is set up for those objects to adhere to.
Acceptor
The Acceptor, associated with the NIO ServerSocketChannel
for the listener socket, is a subtype of SelKeyAttm
. It’s responsible for reception of server connection requests.
class Acceptor extends SelKeyAttm { def run(): Try[Unit] = Try { val channel: SocketChannel = serverChannel.accept() if (channel != null) new Handler(selector, channel) () } .recover { case e: IOException => println(s"Acceptor: $e") } }
Part of class NioReactor’s constructor routine is to bind the ServerSocketChannel
to a specified port number. It’s also where the ServerSocketChannel
is configured to be non-blocking and registered with the selector it’s ready to accept connections (OP_ACCEPT
), subsequently creating a selection-key with the Acceptor
instance as its attachment.
class NioReactor(port: Int) { implicit val ec: ExecutionContext = NioReactor.ec val selector: Selector = Selector.open() val serverChannel: ServerSocketChannel = ServerSocketChannel.open() serverChannel.socket().bind(new InetSocketAddress(port)) serverChannel.configureBlocking(false) val sk: SelectionKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT) sk.attach(new Acceptor()) // ... }
The companion object of the NioReactor
class is set up with a thread pool to run the Reactor server at a provided port number in a Scala Future.
object NioReactor { val poolSize: Int = 10 val workerPool = Executors.newFixedThreadPool(poolSize) implicit val ec = ExecutionContext.fromExecutorService(workerPool) def apply(port: Int = 9090): Future[Unit] = Future { (new NioReactor(port)).loop() } .recover { case e: IOException => println(s"Reactor($port): $e") } // ... }
Event handlers
As shown in the snippet of the Acceptor
class, upon acceptance of a server connection, an instance of Handler
is spawned. All events (in our case, the reading requests from and writing responses to client sockets) are processed by those handlers, which are another subtype of SelKeyAttm
.
The Handler
class instance takes a Selector
and a SocketChannel
as parameters, initializes a couple of ByteBuffers
for read/write, configures the SocketChannel
to be non-blocking, registers with the selector for I/O operation OP_READ
, creates a selection-key with the existing handler instance as its attachment, followed by nudging the selector for immediate return of any selected channels.
Method run()
is responsible for, upon being called, carrying out the main read/write handling logic in accordance with the selection-key the passed-in SocketChannel
is associated with and the corresponding I/O operation of interest.
object Handler { val readBufSize: Int = 1024 val writeBufSize: Int = 1024 } class Handler(sel: Selector, channel: SocketChannel)(implicit ec: ExecutionContext) extends SelKeyAttm { import Handler._ var selKey: SelectionKey = null val readBuf = ByteBuffer.allocate(readBufSize) var writeBuf = ByteBuffer.allocate(writeBufSize) channel.configureBlocking(false) selKey = channel.register(sel, SelectionKey.OP_READ) selKey.attach(this) sel.wakeup() def run(): Try[Unit] = Try { if (selKey.isReadable()) read() else if (selKey.isWritable()) write() } .recover { case e: IOException => println(s"Handler run(): $e") } def process(): Unit = ??? def read(): Unit = ??? def write(): Unit = ??? }
Processing read/write buffers
Method read()
calls channel.read(readBuf)
which reads a preset number of bytes from the channel into the readBuf
ByteBuffer and returns the number of Bytes read. If the channel has reached “end-of-stream”, in which case channel.read()
will return -1
, the corresponding selection-key will be cancelled and the channel will be closed; otherwise, processing work will commence.
def read(): Unit = synchronized { Try { val numBytes: Int = channel.read(readBuf) println("Handler read(): #bytes read into 'readBuf' buffer = " + numBytes) if (numBytes == -1) { selKey.cancel() channel.close() println("Handler read(): client connection might have been dropped!") } else { Future { process() } .recover { case e: IOException => println(s"Handler process(): $e") } } } .recover { case e: IOException => println(s"Handler read(): $e") } }
Method process()
does the actual post-read processing work. It’s supposed to do the heavy-lifting (thus being wrapped in a Scala Future), although in this trivial server example, all it does is simply echoing whatever read from the readBuf
ByteBuffer using the NIO Buffer API and write into the writeBuf
ByteBuffer, followed by switching the selection-key’s I/O operation of interest to OP_WRITE
.
def process(): Unit = synchronized { readBuf.flip() val bytes: Array[Byte] = Array.ofDim[Byte](readBuf.remaining()) readBuf.get(bytes, 0, bytes.length) print("Handler process(): " + new String(bytes, Charset.forName("ISO-8859-1"))) writeBuf = ByteBuffer.wrap(bytes) selKey.interestOps(SelectionKey.OP_WRITE) selKey.selector().wakeup() }
Method write()
calls channel.write(writeBuf)
to write from the writeBuf
ByteBuffer into the calling channel, followed by clearing both the read/write ByteBuffers and switching the selection-key’s I/O operation of interest back to OP_READ
.
def write(): Unit = { Try { val numBytes: Int = channel.write(writeBuf) println("Handler write(): #bytes read from 'writeBuf' buffer = " + numBytes) if (numBytes > 0) { readBuf.clear() writeBuf.clear() selKey.interestOps(SelectionKey.OP_READ) selKey.selector().wakeup() } } .recover { case e: IOException => println(s"Handler write(): $e") } }
Final thoughts
In this code rewrite in Scala, the main changes include the replacement of:
- Java
Runnable
with ScalaFuture
along with the base typeSelKeyAttm
for theAcceptor
andHandler
objects that are to be attached to selection-keys while-loop
with recursive functionstry-catch
withTry-recover
While Java NIO is a great API for building efficient I/O-heavy applications, its underlying design apparently favors the imperative programming style. Rewriting the NIO-based Reactor server application using a functional programming language like Scala doesn’t necessarily make the code easier to read or maintain, as many function calls in the API return void
(i.e. Scala Unit) and mutate variables passed in as parameters, making it difficult to be thoroughly rewritten in an idiomatic fashion.
Full source code of the Scala NIO Reactor server application is available at this GitHub repo.
To compile and run the Reactor server, git-clone
the repo and run sbt
from the project-root at a terminal on the server host:
$ sbt compile $ sbt "runMain reactor.NioReactor `port`"
Skipping the port
number will bind the server to the default port 9090
.
To connect to the Reactor server, use telnet
from one or more client host terminals:
telnet `server-host` `port` # e.g. telnet reactor.example.com 8080, telnet localhost 9090
Any text input from the client host(s) will be echoed back by the Reactor server, which itself will also report what has been processed. Below are sample input/output from a couple of client host terminals and the server terminal:
## Client host terminal #1: $ telnet 192.168.1.100 9090 Trying ::1... Connected to 192.168.1.100. Escape character is '^]'. blah blah blah from term #1 blah blah blah from term #1 ^] telnet> quit Connection closed. ## Client host terminal #2: $ telnet 192.168.1.100 9090 Trying ::1... Connected to 192.168.1.100. Escape character is '^]'. foo bar from term #2 foo bar from term #2 ^] telnet> quit Connection closed. ## Server host terminal: $ sbt "runMain reactor.NioReactor" [info] ... [info] running reactor.NioReactor Handler read(): #bytes read into 'readBuf' buffer = 29 Handler write(): #bytes read from 'writeBuf' buffer = 29 Handler read(): #bytes read into 'readBuf' buffer = 22 Handler write(): #bytes read from 'writeBuf' buffer = 22 Handler read(): #bytes read into 'readBuf' buffer = -1 Handler read(): client connection might have been dropped! Handler read(): #bytes read into 'readBuf' buffer = -1 Handler read(): client connection might have been dropped! ^C [warn] Canceling execution... Cancelled: runMain reactor.NioReactor
As a side note, the output from method Handler.process()
which is wrapped in a Scala Future will be reported if the server is being run from within an IDE like IntelliJ
.