Tag Archives: akka streams

Akka Dynamic Pub-Sub Service

As shown in a previous blog post illustrating with a simple text mining application, Akka Stream provides a robust `GraphDSL` domain specific language for assembling stream graphs using a mix of fan-in/fan-out junctions. While `GraphDSL` is a great tool, the supported fan-in/fan-out components are limited to have a fixed number of inputs and outputs as all connections of the graph must be known and connected upfront.

To build a streaming service that allows new producers and consumers to be dynamically added, one would need to look outside of the `GraphDSL`. In this blog post, we’re going to look at how to build a dynamic publish-subscribe service.

MergeHub

Akka provides MergeHub that serves as a dynamic fan-in junction in the form of a Source to be attached to with a single consumer. Once materialized, multiple producers can be attached to the hub where elements coming from these producers are emitted in a first-comes-first-served fashion along with `backpressure` support.

MergeHub.source has the following method signature:

// MergeHub.source
def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]]

Example:

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, DelayOverflowStrategy}
import akka.stream.scaladsl._
import akka.{Done, NotUsed}

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val consumer = Sink.foreach(println)

val toConsumer: Sink[String, NotUsed] =
  MergeHub.source[String](perProducerBufferSize = 32).to(consumer).run()

Source(1 to 3).map(i => s"Batch-delayed-$i").
  delay(2.seconds, DelayOverflowStrategy.backpressure).
  runWith(toConsumer)

Source(4 to 5).map(i => s"Batch-immediate-$i").
  runWith(toConsumer)

// Batch-immediate-4
// Batch-immediate-5
// Batch-delayed-1
// Batch-delayed-2
// Batch-delayed-3

BroadcastHub

BroadcastHub, on the other hand, serves as a dynamic fan-out junction in the form of a Sink to be attached to with a single producer. Similarly, once materialized, multiple consumers can be attached to it, again, with `backpressure` support.

BroadcastHub.sink has the following method signature:

// BroadcastHub.sink
def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]]

Example:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.{Done, NotUsed}

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val producer = Source(1 to 20).map(i => s"Element-$i")

val fromProducer: Source[String, NotUsed] =
  producer.toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).run()

fromProducer.runForeach{ case s: String => println("Consumer-A-" + s) }
fromProducer.runForeach{ case s: String => println("Consumer-B-" + s) }

// Consumer-A-Element-1
// Consumer-B-Element-1
// Consumer-A-Element-2
// Consumer-B-Element-2
// Consumer-B-Element-3
// Consumer-A-Element-3
// ...
// Consumer-B-Element-19
// Consumer-A-Element-18
// Consumer-B-Element-20
// Consumer-A-Element-19
// Consumer-A-Element-20

It should be cautioned that if the source has fewer elements than the `bufferSize` specified in BroadcastHub.sink, none of the elements will be consumed by the attached consumers. It took me a while to realize it’s `fromProducer` that “silently” consumes the elements when materialized before the attached consumers have a chance to consume them. That, to me, is really an undocumented “bug”. Using `alsoToMat` as shown below, one can uncover the seemingly “missing” elements in such case:

// Testing for Source with elements fewer than `bufferSize`:
val producer = Source(1 to 5).map(i => s"Element-$i")

val fromProducer: Source[String, NotUsed] =
  producer.
    alsoToMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).
    toMat(Sink.foreach{ case s: String => println("Consumer-" + s) })(Keep.left).
    run()
// Consumer-Element-1
// Consumer-Element-2
// Consumer-Element-3
// Consumer-Element-4
// Consumer-Element-5

MergeHub + BroadcastHub

By connecting a `MergeHub` with a `BroadcastHub`, one can create a dynamic publish-subscribe “channel” in the form of a Flow via Flow.fromSinkAndSource:

// Creating a pub-sub channel as a Flow
val (psSink, psSource) = MergeHub.source[String](perProducerBufferSize).
  toMat(BroadcastHub.sink[String](bufferSize))(Keep.both).run()

val psChannel: Flow[String, String, NotUsed] =
  Flow.fromSinkAndSource(psSink, psSource)

Note that `Keep.both` in the above snippet produces a Tuple of materialized values `(Sink[T, NotUsed], Source[T, NotUsed])` from `MergeHub.source[T]` and `BroadcastHub.sink[T]`. The pub-sub channel `psChannel` can be illustrated as follows:

    p                           psChannel                          c
    r  \                                                        /  o
    o   \    -------------------        -------------------    /   n
    d    \  |  MergeHub.source  |      | BroadcastHub.sink |  /    s
    u  ---  |                   | ---> |                   |  ---  u
    c    /  |      (psSink)     |      |     (psSource)    |  \    m
    e   /    -------------------        -------------------    \   e
    r  /                                                        \  r
    s                                                              s
         Source[T,Sink[T,NotUsed]]    Sink[T,Source[T,NotUsed]]

Below is sample code for a simple pub-sub channel `psChannel`:

// A simple pub-sub service example
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, DelayOverflowStrategy}
import akka.stream.scaladsl._
import akka.{Done, NotUsed}

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val (psSink, psSource) = MergeHub.source[String](perProducerBufferSize = 32).
  toMat(BroadcastHub.sink[String](bufferSize = 32))(Keep.both).run()

// Optional: avoid building up backpressure in the absence of subscribers
psSource.runWith(Sink.ignore)

Serving as a pub-sub channel, the input of `psChannel` is published via `psSink` to all subscribers while its output streams through `psSource` all the elements published. For example:

// Serving as a pub-sub service
val p1 = Source(1 to 3).map(i => s"Batch-delayed-$i").
  delay(2.seconds, DelayOverflowStrategy.backpressure)

val p2 = Source(4 to 5).map(i => s"Batch-immediate-$i")

p1.runWith(psSink)
p2.runWith(psSink)

val s1 = psSource
val s2 = psSource

s1.runForeach(s => println("Consumer-A-" + s))
s2.runForeach(s => println("Consumer-B-" + s))

// Consumer-A-Batch-immediate-4
// Consumer-B-Batch-immediate-4
// Consumer-B-Batch-immediate-5
// Consumer-A-Batch-immediate-5
// Consumer-A-Batch-delayed-1
// Consumer-A-Batch-delayed-2
// Consumer-B-Batch-delayed-1
// Consumer-B-Batch-delayed-2
// Consumer-B-Batch-delayed-3
// Consumer-A-Batch-delayed-3

Running `psChannel` as a `Flow`:

// Using the pub-sub service as a Flow
val psChannel: Flow[String, String, NotUsed] =
  Flow.fromSinkAndSource(psSink, psSource)

Source(1 to 3).map(i => s"Batch-delayed-$i").
  delay(2.seconds, DelayOverflowStrategy.backpressure).
  viaMat(psChannel)(Keep.right).
  to(Sink.foreach{ case s: String => println("Consumer-A-" + s) }).
  run()

Source(4 to 5).map(i => s"Batch-immediate-$i").
  viaMat(psChannel)(Keep.right).
  to(Sink.foreach{ case s: String => println("Consumer-B-" + s) }).
  run()

// Consumer-B-Batch-immediate-4
// Consumer-B-Batch-immediate-5
// Consumer-A-Batch-immediate-4
// Consumer-A-Batch-immediate-5
// Consumer-A-Batch-delayed-1
// Consumer-A-Batch-delayed-2
// Consumer-B-Batch-delayed-1
// Consumer-A-Batch-delayed-3
// Consumer-B-Batch-delayed-2
// Consumer-B-Batch-delayed-3

Note that each of the input elements for `psChannel` gets consumed by every consumer.

Other relevant topics that might be of interest include KillSwitch for stream completion control and PartitionHub for routing Stream elements from a given producer to a dynamic set of consumers.

Akka Content-based Substreaming

In a previous blog post, a simple text mining application was developed using Akka Streams. In that application, the graph-building create() DSL method was used to build the stream topology that consists of the routing logic of the various stream components.

This time, we’re going to try something a little different. Rather than using the create() graph-builder, we’ll directly use some fan-out/fan-in functions to process stream data. In particular, we’ll process a messaging stream and dynamically demultiplex the stream into substreams based on the group the messages belong to.

For illustration purpose, the messaging element in the stream source is modeled as a case class with group and content info. The substreams dynamically split by message group will go into individual file sinks, each of which is wrapped along with the file name in another case class.

    val groups = List("a", "b", "c")

    case class Message(group: String, content: String)
    case class FSink(name: String, sink: Sink[ByteString, Future[IOResult]])

Next, we create a map with message group as key and the corresponding sink (which has the actual file path) as value:

    val fileSinks = groups.map{ g =>
      val sink = FSink(
        s"file-${g}.txt",
        FileIO.toPath(Paths.get(s"/path/to/file-${g}.txt"))
      )
      (g, sink)
    }.toMap

Demultiplexing via Akka Streams groupBy()

We then use Akka Streams groupBy() to split the stream by message group into substreams:

    val messageSource: Source[Message, NotUsed] = Source(List(
      // ...
    ))

    messageSource.map(m => (m.group, m)).
      groupBy(groups.size, _._1).
      fold(("", List.empty[Message])) {
        case ((_, list), (g, m)) => (g, m :: list)
      }.
      mapAsync(parallelism = groups.size) {
        case (g: String, list: List[Message]) =>
          Source(list.reverse).map(_.content).map(ByteString(_)).
            runWith(fileSinks(g).sink)
      }.
      mergeSubstreams.
      runWith(Sink.ignore)

Note that after applying groupBy() (see method signature), the split substreams are processed in parallel for each message group using mapAsync() which transforms the stream by applying a specified function to each of the elements. Since our goal is to create the individual files, the final ‘mergeSubstreams’ is just for combining the substreams to be executed with an unused sink.

Putting all the above pieces altogether:

    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl._
    import akka.util.ByteString
    import akka.{NotUsed, Done}
    import akka.stream.IOResult
    import scala.concurrent.Future
    import java.nio.file.Paths

    implicit val system = ActorSystem("sys")
    implicit val materializer = ActorMaterializer()
    import system.dispatcher

    val groups = List("a", "b", "c")

    case class Message(group: String, content: String)
    case class FSink(name: String, sink: Sink[ByteString, Future[IOResult]])

    val messageSource: Source[Message, NotUsed] = Source(List(
      Message("a", "Got an apple.\n"),
      Message("a", "A couple of avocados.\n"),
      Message("a", "Here's a dozen of apricots.\n"),
      Message("b", "Brought a banana.\n"),
      Message("b", "A lot of black-eyed peas.\n"),
      Message("b", "Here's some broccoli.\n"),
      Message("c", "Lots of cherries.\n"),
      Message("c", "A big pile of chilli peppers over here.\n")
    ))

    val fileSinks = groups.map{ g =>
      val sink = FSink(
        s"file-${g}.txt",
        FileIO.toPath(Paths.get(s"/path/to/file-${g}.txt"))
      )
      (g, sink)
    }.toMap

    messageSource.map(m => (m.group, m)).
      groupBy(groups.size, _._1).
      fold(("", List.empty[Message])) {
        case ((_, list), (g, m)) => (g, m :: list)
      }.
      mapAsync(parallelism = groups.size) {
        case (g: String, list: List[Message]) =>
          Source(list.reverse).map(_.content).map(ByteString(_)).
            runWith(fileSinks(g).sink)
      }.
      mergeSubstreams.
      runWith(Sink.ignore)

Merging stream sources with flatMapConcat

Conversely, given the split files, we can merge them back into a single stream using flatMapConcat. The word ‘merge’ is being used loosely here, as flatMapConcat (see method signature) actually consumes the individual sources one after another by flattening the source stream elements using concatenation.

    val files = groups.map(g => s"/path/to/file-${g}.txt")

    val source = Source(files).flatMapConcat(f => FileIO.fromPath(Paths.get(f)))

    val sink: Sink[ByteString, Future[IOResult]] = FileIO.toPath(
      Paths.get("/path/to/file-merged.txt")
    )

    source.runWith(sink)

In case the files are large and processing them in measured chunks for memory reservation is preferred:

    val chunking = Framing.delimiter(
      ByteString("\n"), maximumFrameLength = 10240, allowTruncation = true
    )

    source.via(chunking).runWith(sink)

Line-feed (i.e. “\n”) is being used as the delimiter for each frame here, but it can be set to anything else.

HTTPS Redirection With Akka HTTP

Akka HTTP is a HTTP-based toolkit built on top of Akka Stream. Rather than a framework for rapid web server development, it’s principally designed as a suite of tools for building custom integration layers to wire potentially complex business logic with a REST/HTTP interface. Perhaps for that reason, one might be surprised that there isn’t any example code for something as common as running a HTTPS-by-default web server.

Almost every major website operates using the HTTPS protocol by default for security purpose these days. Under the protocol, the required SSL certificate and the bidirectional encryption of the communications between the web server and client does ensure the authenticity of the website as well as avoid man-in-the-middle attack. It might be an over-kill for, say, an information-only website, but the ‘lock’ icon indicating a valid SSL certificate on the web browser address bar certainly makes site visitors feel more secure.

In this blog post, I’ll assemble a snippet using Akka HTTP to illustrate how to set up a skeletal web server which redirects all plain-HTTP requests to the HTTPS listener. For testing purpose in a development environment, I’ll also include steps of creating a self-signed SSL certificate. Note that such self-signed certificate should only be used for internal testing purpose.

HTTP and HTTPS cannot serve on the same port

Intuitively, one would consider binding both HTTP and HTTPS services to the same port on which all requests are processed by a HTTPS handler. Unfortunately, HTTPS uses SSL/TLS protocol which for security reason can’t be simply downgraded to HTTP upon detecting unencrypted requests. A straight-forward solution would be to bind the HTTP and HTTPS services to separate ports and redirect all requests coming into the HTTP port to the HTTPS port.

First let’s create ‘build.sbt’ with necessary library dependencies under the project root subdirectory:

name := "akka-http-secureserver"

version := "1.0"

scalaVersion := "2.11.12"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.4.20",
  "com.typesafe.akka" %% "akka-stream" % "2.4.20",
  "com.typesafe.akka" %% "akka-http" % "10.0.11"
)

Next, create the main application in, say, ${project-root}/src/main/SecureServer.scala:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model.{HttpEntity, ContentTypes, StatusCodes}
import akka.http.scaladsl.{Http, ConnectionContext, HttpsConnectionContext}
import akka.http.scaladsl.server.Directives._
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import java.security.{SecureRandom, KeyStore}
import javax.net.ssl.{SSLContext, KeyManagerFactory, TrustManagerFactory}
import java.io.InputStream
import scala.io.StdIn

object SecureServer {
  def main(args: Array[String]) {

    implicit val system = ActorSystem("my-system")
    implicit val materializer = ActorMaterializer()
    implicit val ec = system.dispatcher

    val password: Array[Char] = "mypassword".toCharArray  // Unsafe to provide password here!

    val ks: KeyStore = KeyStore.getInstance("PKCS12")
    val keystore: InputStream = getClass.getClassLoader.getResourceAsStream("server.p12")

    require(keystore != null, "Keystore required!")
    ks.load(keystore, password)

    val keyManagerFactory: KeyManagerFactory = KeyManagerFactory.getInstance("SunX509")
    keyManagerFactory.init(ks, password)

    val tmf: TrustManagerFactory = TrustManagerFactory.getInstance("SunX509")
    tmf.init(ks)

    val sslContext: SSLContext = SSLContext.getInstance("TLS")
    sslContext.init(keyManagerFactory.getKeyManagers, tmf.getTrustManagers, new SecureRandom)
    val httpsContext: HttpsConnectionContext = ConnectionContext.https(sslContext)

    val hostName = "dev.genuine.com"
    val portHttp = 8080
    val portHttps = 8443

    val route =
      scheme("http") {
        extract(_.request.uri) { uri =>
          redirect( uri.withScheme("https").withAuthority(hostName, portHttps),
            StatusCodes.MovedPermanently
          )
        }
      } ~
      pathSingleSlash {
        get {
          complete( HttpEntity( ContentTypes.`text/html(UTF-8)`,
            "Welcome to Akka-HTTP!"
          ) )
        }
      } ~
      path("hello") {
        get {
          complete( HttpEntity( ContentTypes.`text/html(UTF-8)`,
            "Hello from Akka-HTTP!"
          ) )
        }
      }

    Http().bindAndHandle(route, hostName, portHttp)
    Http().bindAndHandle(route, hostName, portHttps, connectionContext = httpsContext)

    println(s"Server online at https://${hostName}:${portHttps}/\nPress RETURN to stop...")
    StdIn.readLine()

    system.terminate()
  }
}

The top half of the main code are initialization routines for the Akka actor system, stream materializer (which are what Akka HTTP is built on top of) and creating HTTPS connection context. The rest of the code is a standard Akka HTTP snippet with URL routing and server port binding. A good portion of the code is borrowed from this Akka server-side HTTPS support link.

Within the ‘scheme(“http”)’ routing code block is the core logic for HTTPS redirection:

    // HTTPS redirection
    uri =>
      redirect( uri.withScheme("https").withAuthority(hostName, portHttps),
        StatusCodes.MovedPermanently
      )

Note that there is no need for applying ‘withAuthority()’ if you’re using standard HTTPS port (i.e. 443).

Next step would be to put in place the PKCS #12 formatted file, ‘server.p12’, which consists of the PKCS private key and X.509 SSL certificate. It should be placed under ${project-root}/src/main/resources/. At the bottom of this blog post are steps for creating the server key/certificate using open-source library, OpenSSL.

Once the private key/certificate is in place, to run the server application from a Linux command prompt, simply use ‘sbt’ as below:

# Run SecureServer
cd ${project-root}
sbt "runMain SecureServer"

To test it out from a web browser, visit http://dev.genuine.com:8080/hello and you should see the URL get redirected to https://dev.genuine.com:8443/hello. The web browser will warn about security of the site and that’s just because the SSL certificate is a self-signed one.

Generating server key and self-signed SSL certificate in PKCS #12 format

#
# Steps to generate private key and self-signed X.509 certificate in PKCS #12 format
#

## Generate private key
openssl genrsa -des3 -out server.key 2048

# --
Generating RSA private key, 2048 bit long modulus
..................................+++
.......................................................+++
e is 65537 (0x10001)
Enter pass phrase for server.key: genuine
Verifying - Enter pass phrase for server.key:
# --

## Generate CSR
openssl req -new -key server.key -out server.csr

# --
Enter pass phrase for server.key:
Country Name (2 letter code) [AU]:US
State or Province Name (full name) [Some-State]:California
Locality Name (eg, city) []:Sunnyvale
Organization Name (eg, company) [Internet Widgits Pty Ltd]:Genuine
Organizational Unit Name (eg, section) []:
Common Name (e.g. server FQDN or YOUR name) []:dev.genuine.com
Email Address []:postmaster@genuine.com
A challenge password []:
# --

## Remove pass phrase
cp server.key server.key.orig
openssl rsa -in server.key.orig -out server.key

# --
Enter pass phrase for server.key.orig:
writing RSA key
# --

## Generate certificate
openssl x509 -req -days 365 -in server.csr -signkey server.key -out server.crt

# --
Signature ok
subject=/C=US/ST=California/L=Sunnyvale/O=Genuine/CN=dev.genuine.com/emailAddress=postmaster@genuine.com
Getting Private key
# --

## Convert to PKCS #12 or PFX format
openssl pkcs12 -export -out server.p12 -inkey server.key -in server.crt 

# --
Enter Export Password:
Verifying - Enter Export Password:
# --

## Move the PKCS #12 file to the server application resources subdirectory
mv server.p12 ${project-root}/src/main/resources/