Tag Archives: akka streams

Akka Dynamic Pub-Sub Service

As shown in a previous blog post illustrating with a simple text mining application, Akka Stream provides a robust GraphDSL domain specific language for assembling stream graphs using a mix of fan-in/fan-out junctions. While GraphDSL is a great tool, the supported fan-in/fan-out components are limited to have a fixed number of inputs and outputs as all connections of the graph must be known and connected upfront.

To build a streaming service that allows new producers and consumers to be dynamically added, one would need to look outside of the GraphDSL. In this blog post, we’re going to look at how to build a dynamic publish-subscribe service.

MergeHub

Akka provides MergeHub that serves as a dynamic fan-in junction in the form of a Source to be attached to with a single consumer. Once materialized, multiple producers can be attached to the hub where elements coming from these producers are emitted in a first-comes-first-served fashion along with backpressure support.

MergeHub.source has the following method signature:

Example:

BroadcastHub

BroadcastHub, on the other hand, serves as a dynamic fan-out junction in the form of a Sink to be attached to with a single producer. Similarly, once materialized, multiple consumers can be attached to it, again, with backpressure support.

BroadcastHub.sink has the following method signature:

Example:

It should be cautioned that if the source has fewer elements than the bufferSize specified in BroadcastHub.sink, none of the elements will be consumed by the attached consumers. It took me a while to realize it’s fromProducer that “silently” consumes the elements when materialized before the attached consumers have a chance to consume them. That, to me, is really an undocumented “bug”. Using alsoToMat as shown below, one can uncover the seemingly “missing” elements in such case:

MergeHub + BroadcastHub

By connecting a MergeHub with a BroadcastHub, one can create a dynamic publish-subscribe “channel” in the form of a Flow via Flow.fromSinkAndSource:

Note that Keep.both in the above snippet produces a Tuple of materialized values (Sink[T, NotUsed], Source[T, NotUsed]) from MergeHub.source[T] and BroadcastHub.sink[T]. The pub-sub channel psChannel can be illustrated as follows:

Below is sample code for a simple pub-sub channel psChannel:

Serving as a pub-sub channel, the input of psChannel is published via psSink to all subscribers while its output streams through psSource all the elements published. For example:

Running psChannel as a Flow:

Note that each of the input elements for psChannel gets consumed by every consumer.

Other relevant topics that might be of interest include KillSwitch for stream completion control and PartitionHub for routing Stream elements from a given producer to a dynamic set of consumers.

Akka Content-based Substreaming

In a previous blog post, a simple text mining application was developed using Akka Streams. In that application, the graph-building create() DSL method was used to build the stream topology that consists of the routing logic of the various stream components.

This time, we’re going to try something a little different. Rather than using the create() graph-builder, we’ll directly use some fan-out/fan-in functions to process stream data. In particular, we’ll process a messaging stream and dynamically demultiplex the stream into substreams based on the group the messages belong to.

For illustration purpose, the messaging element in the stream source is modeled as a case class with group and content info. The substreams dynamically split by message group will go into individual file sinks, each of which is wrapped along with the file name in another case class.

Next, we create a map with message group as key and the corresponding sink (which has the actual file path) as value:

Demultiplexing via Akka Streams groupBy()

We then use Akka Streams groupBy() to split the stream by message group into substreams:

Note that after applying groupBy() (see method signature), the split substreams are processed in parallel for each message group using mapAsync() which transforms the stream by applying a specified function to each of the elements. Since our goal is to create the individual files, the final ‘mergeSubstreams’ is just for combining the substreams to be executed with an unused sink.

Putting all the above pieces altogether:

Merging stream sources with flatMapConcat

Conversely, given the split files, we can merge them back into a single stream using flatMapConcat. The word ‘merge’ is being used loosely here, as flatMapConcat (see method signature) actually consumes the individual sources one after another by flattening the source stream elements using concatenation.

In case the files are large and processing them in measured chunks for memory reservation is preferred:

Line-feed (i.e. “\n”) is being used as the delimiter for each frame here, but it can be set to anything else.

HTTPS Redirection With Akka HTTP

Akka HTTP is a HTTP-based toolkit built on top of Akka Stream. Rather than a framework for rapid web server development, it’s principally designed as a suite of tools for building custom integration layers to wire potentially complex business logic with a REST/HTTP interface. Perhaps for that reason, one might be surprised that there isn’t any example code for something as common as running a HTTPS-by-default web server.

Almost every major website operates using the HTTPS protocol by default for security purpose these days. Under the protocol, the required SSL certificate and the bidirectional encryption of the communications between the web server and client does ensure the authenticity of the website as well as avoid man-in-the-middle attack. It might be an over-kill for, say, an information-only website, but the ‘lock’ icon indicating a valid SSL certificate on the web browser address bar certainly makes site visitors feel more secure.

In this blog post, I’ll assemble a snippet using Akka HTTP to illustrate how to set up a skeletal web server which redirects all plain-HTTP requests to the HTTPS listener. For testing purpose in a development environment, I’ll also include steps of creating a self-signed SSL certificate. Note that such self-signed certificate should only be used for internal testing purpose.

HTTP and HTTPS cannot serve on the same port

Intuitively, one would consider binding both HTTP and HTTPS services to the same port on which all requests are processed by a HTTPS handler. Unfortunately, HTTPS uses SSL/TLS protocol which for security reason can’t be simply downgraded to HTTP upon detecting unencrypted requests. A straight-forward solution would be to bind the HTTP and HTTPS services to separate ports and redirect all requests coming into the HTTP port to the HTTPS port.

First let’s create ‘build.sbt’ with necessary library dependencies under the project root subdirectory:

Next, create the main application in, say, ${project-root}/src/main/SecureServer.scala:

The top half of the main code are initialization routines for the Akka actor system, stream materializer (which are what Akka HTTP is built on top of) and creating HTTPS connection context. The rest of the code is a standard Akka HTTP snippet with URL routing and server port binding. A good portion of the code is borrowed from this Akka server-side HTTPS support link.

Within the ‘scheme(“http”)’ routing code block is the core logic for HTTPS redirection:

Note that there is no need for applying ‘withAuthority()’ if you’re using standard HTTPS port (i.e. 443).

Next step would be to put in place the PKCS #12 formatted file, ‘server.p12’, which consists of the PKCS private key and X.509 SSL certificate. It should be placed under ${project-root}/src/main/resources/. At the bottom of this blog post are steps for creating the server key/certificate using open-source library, OpenSSL.

Once the private key/certificate is in place, to run the server application from a Linux command prompt, simply use ‘sbt’ as below:

To test it out from a web browser, visit http://dev.genuine.com:8080/hello and you should see the URL get redirected to https://dev.genuine.com:8443/hello. The web browser will warn about security of the site and that’s just because the SSL certificate is a self-signed one.

Generating server key and self-signed SSL certificate in PKCS #12 format