Author Archives: Leo Cheung

Generic Top N Elements In Scala

Getting the top N elements from a list of elements is a common need in applications that involve data retrieval. If the list is big, it’ll be inefficient and wasteful (in terms of processing resource) to sort the entire list when one is interested in only the top few elements.

Consider a list of real numbers (i.e. Double or Float-typed) and let’s say we want to fetch the smallest N numbers from the list. A commonly used algorithm for the task is rather straight forward:

Start with the first N numbers of the list as the selected N-element sublist, then check for each of the remaining numbers of the list and if it’s smaller than the largest number in the sublist swap out in each iteration the largest number with it.

Algorithmic steps

Formulating that as programmatic steps, we have:

  1. Maintain a sorted N-element list in descending order, hence its head is the max in the list (Assuming N isn’t a big number, the cost of sorting is trivial).
  2. In each iteration, if the current element in the original list is smaller than the head element in the N-element list, replace the head element with the current element; otherwise leave the current N-element list unchanged.

Upon completing the iterations, the N-element list will consist of the smallest elements in the original list and a final sorting in ascending order will result in a sorted N-element list:

// Top N (smallest) elements from a Double-type list

def topN(n: Int, list: List[Double]): List[Double] = {
  def maxListHead(l: List[Double], e: Double): List[Double] = {
    if (e < l.head) (e :: l.tail).sortWith(_ > _) else l
  }
  list.drop(n).foldLeft( list.take(n).sortWith(_ > _) )( maxListHead ).
    sortWith(_ < _)
}

val list: List[Double] = List(3, 2, 8, 2, 9, 1, 5, 5, 9, 1, 7, 3, 4)

topN(5, list)
// res1: List[Double] = List(1.0, 1.0, 2.0, 2.0, 3.0)

Note that it would be trivial to modify the method to fetch the largest N numbers (instead of the smallest) in which case one only needs to reverse the inequality operator in ‘e < l.head’, the iterative ‘sorthWith(_ > _)’ and the final ‘sorthWith(_ < _)’.

Refactoring to eliminate sorting

Now, let’s say we’re going to use it to fetch some top N elements where N is a little bigger, like top 5,000 from a 1 million element list. Except for the inevitable final sorting of the sublist, all the other ‘sorthWith()’ operations can be replaced with something less expensive. Since all we care is to be able to conditionally swap out the largest number in the sublist, we just need the largest number to be placed at the head of the sublist and the same algorithmic flow will work fine.

The refactored topN method below replaces all ‘sortWith()’ (except for the final sorting) with ‘bigHead()’ which places the largest number of the input list at its head position:

// Top N (smallest) elements from a Double-type list (refactored)

def topN(n: Int, list: List[Double]): List[Double] = {
  def bigHead(l: List[Double]): List[Double] = list match {
    case Nil => list
    case _ =>
      l.tail.foldLeft( List(l.head) )( (acc, x) =>
        if (x >= acc.head) x :: acc else acc.head :: x :: acc.tail
    )
  }
  def maxListHead(l: List[Double], e: Double): List[Double] = {
    if (e < l.head) bigHead(e :: l.tail) else l
  }
  list.drop(n).foldLeft( bigHead(list.take(n)) )( maxListHead ).
    sortWith(_ < _)
}

Generic top N numeric elements

Next, we generalize method topN to handle any list of elements of the type implicitly associated with Numeric, with a typeclass pattern known as context bound.

// Top N (smallest) elements from a generic numeric list

def topN[T](n: Int, list: List[T])(implicit num: Numeric[T]): List[T] = {
  import num.{mkNumericOps, mkOrderingOps}
  def bigHead(l: List[T]): List[T] = list match {
    case Nil => list
    case _ =>       l.tail.foldLeft( List(l.head) )( (acc, x) =>
        if (x >= acc.head) x :: acc else acc.head :: x :: acc.tail
      )
  }
  def maxListHead(l: List[T], e: T): List[T] = {
    if (e < l.head) bigHead(e :: l.tail) else l
  }
  list.drop(n).foldLeft( bigHead(list.take(n)) )( maxListHead ).
    sortWith(_ < _)
}

val list: List[Double] = List(3, 2, 8, 2, 9, 1, 5, 5, 9, 1, 7, 3, 4)
topN[Double](5, list)
// res1: List[Double] = List(1.0, 1.0, 2.0, 2.0, 3.0)

val list: List[Int] = List(3, 2, 8, 2, 9, 1, 5, 5, 9, 1, 7, 3, 4)
topN[Int](5, list)
// res2: List[Int] = List(1, 1, 2, 2, 3)

With the context bound for type ‘T’, importing the implicit mkNumericOps and mkOrderingOps methods makes arithmetic and comparison operators available for the list elements to be compared and ordered.

Generic top N objects ordered by mapped values

To further generalize topN, rather than being limited to numeric elements we enable it to take a list of generic objects and return the top N elements ordered by the individual element’s corresponding value (e.g. an order-able class field of the object). To accomplish that, we revise topN as follows:

  • Loosen the context bound from ‘Numeric’ to the more generic Ordering so that items can be ordered by non-numeric values such as strings
  • Take as an additional parameter a mapping function that tells what values corresponding to the objects are to be ordered by
// Top N elements from a list of objects ordered by object-mapped values

case class Score[T : Numeric](id: String, rank: T)

def topN[S, T : Ordering](n: Int, list: List[S], f: S => T): List[S] = {
  val orderer = implicitly[Ordering[T]]
  import orderer._
  def bigHead(l: List[S]): List[S] = list match {
    case Nil => list
    case _ =>
      l.tail.foldLeft( List(l.head) )( (acc, x) =>
        if (f(x) >= f(acc.head)) x :: acc else acc.head :: x :: acc.tail
      )
  }
  def maxListHead(l: List[S], e: S): List[S] = {
    if (f(e) < f(l.head)) bigHead((e :: l.tail)) else l
  }
  list.drop(n).foldLeft( bigHead(list.take(n)) )( maxListHead ).
    sortWith(f(_) < f(_))
}

val scores = List(
  Score("a", 3), Score("b", 6), Score("c", 8), Score("d", 1), Score("e", 11),
  Score("f", 5), Score("g", 9), Score("h", 12), Score("i", 2), Score("j", 10)
)
topN(5, scores, (s: Score[Int]) => s.rank)
// res1: List[Score[Int]] = List(Score(d,1), Score(i,2), Score(a,3), Score(f,5), Score(b,6))

Note that the type parameter ‘T : Ordering’, which signifies a context bound, is the shorthand notation for:

// Context Bound
case class Score[T](id: String, rank: T)(implicit orderer: Ordering[T])

Akka Content-based Substreaming

In a previous blog post, a simple text mining application was developed using Akka Streams. In that application, the graph-building create() DSL method was used to build the stream topology that consists of the routing logic of the various stream components.

This time, we’re going to try something a little different. Rather than using the create() graph-builder, we’ll directly use some fan-out/fan-in functions to process stream data. In particular, we’ll process a messaging stream and dynamically demultiplex the stream into substreams based on the group the messages belong to.

For illustration purpose, the messaging element in the stream source is modeled as a case class with group and content info. The substreams dynamically split by message group will go into individual file sinks, each of which is wrapped along with the file name in another case class.

    val groups = List("a", "b", "c")

    case class Message(group: String, content: String)
    case class FSink(name: String, sink: Sink[ByteString, Future[IOResult]])

Next, we create a map with message group as key and the corresponding sink (which has the actual file path) as value:

    val fileSinks = groups.map{ g =>
      val sink = FSink(
        s"file-${g}.txt",
        FileIO.toPath(Paths.get(s"/path/to/file-${g}.txt"))
      )
      (g, sink)
    }.toMap

Demultiplexing via Akka Streams groupBy()

We then use Akka Streams groupBy() to split the stream by message group into substreams:

    val messageSource: Source[Message, NotUsed] = Source(List(
      // ...
    ))

    messageSource.map(m => (m.group, m)).
      groupBy(groups.size, _._1).
      fold(("", List.empty[Message])) {
        case ((_, list), (g, m)) => (g, m :: list)
      }.
      mapAsync(parallelism = groups.size) {
        case (g: String, list: List[Message]) =>
          Source(list.reverse).map(_.content).map(ByteString(_)).
            runWith(fileSinks(g).sink)
      }.
      mergeSubstreams.
      runWith(Sink.ignore)

Note that after applying groupBy() (see method signature), the split substreams are processed in parallel for each message group using mapAsync() which transforms the stream by applying a specified function to each of the elements. Since our goal is to create the individual files, the final ‘mergeSubstreams’ is just for combining the substreams to be executed with an unused sink.

Putting all the above pieces altogether:

    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl._
    import akka.util.ByteString
    import akka.{NotUsed, Done}
    import akka.stream.IOResult
    import scala.concurrent.Future
    import java.nio.file.Paths

    implicit val system = ActorSystem("sys")
    implicit val materializer = ActorMaterializer()
    import system.dispatcher

    val groups = List("a", "b", "c")

    case class Message(group: String, content: String)
    case class FSink(name: String, sink: Sink[ByteString, Future[IOResult]])

    val messageSource: Source[Message, NotUsed] = Source(List(
      Message("a", "Got an apple.\n"),
      Message("a", "A couple of avocados.\n"),
      Message("a", "Here's a dozen of apricots.\n"),
      Message("b", "Brought a banana.\n"),
      Message("b", "A lot of black-eyed peas.\n"),
      Message("b", "Here's some broccoli.\n"),
      Message("c", "Lots of cherries.\n"),
      Message("c", "A big pile of chilli peppers over here.\n")
    ))

    val fileSinks = groups.map{ g =>
      val sink = FSink(
        s"file-${g}.txt",
        FileIO.toPath(Paths.get(s"/path/to/file-${g}.txt"))
      )
      (g, sink)
    }.toMap

    messageSource.map(m => (m.group, m)).
      groupBy(groups.size, _._1).
      fold(("", List.empty[Message])) {
        case ((_, list), (g, m)) => (g, m :: list)
      }.
      mapAsync(parallelism = groups.size) {
        case (g: String, list: List[Message]) =>
          Source(list.reverse).map(_.content).map(ByteString(_)).
            runWith(fileSinks(g).sink)
      }.
      mergeSubstreams.
      runWith(Sink.ignore)

Merging stream sources with flatMapConcat

Conversely, given the split files, we can merge them back into a single stream using flatMapConcat. The word ‘merge’ is being used loosely here, as flatMapConcat (see method signature) actually consumes the individual sources one after another by flattening the source stream elements using concatenation.

    val files = groups.map(g => s"/path/to/file-${g}.txt")

    val source = Source(files).flatMapConcat(f => FileIO.fromPath(Paths.get(f)))

    val sink: Sink[ByteString, Future[IOResult]] = FileIO.toPath(
      Paths.get("/path/to/file-merged.txt")
    )

    source.runWith(sink)

In case the files are large and processing them in measured chunks for memory reservation is preferred:

    val chunking = Framing.delimiter(
      ByteString("\n"), maximumFrameLength = 10240, allowTruncation = true
    )

    source.via(chunking).runWith(sink)

Line-feed (i.e. “\n”) is being used as the delimiter for each frame here, but it can be set to anything else.

HTTPS Redirection With Akka HTTP

Akka HTTP is a HTTP-based toolkit built on top of Akka Stream. Rather than a framework for rapid web server development, it’s principally designed as a suite of tools for building custom integration layers to wire potentially complex business logic with a REST/HTTP interface. Perhaps for that reason, one might be surprised that there isn’t any example code for something as common as running a HTTPS-by-default web server.

Almost every major website operates using the HTTPS protocol by default for security purpose these days. Under the protocol, the required SSL certificate and the bidirectional encryption of the communications between the web server and client does ensure the authenticity of the website as well as avoid man-in-the-middle attack. It might be an over-kill for, say, an information-only website, but the ‘lock’ icon indicating a valid SSL certificate on the web browser address bar certainly makes site visitors feel more secure.

In this blog post, I’ll assemble a snippet using Akka HTTP to illustrate how to set up a skeletal web server which redirects all plain-HTTP requests to the HTTPS listener. For testing purpose in a development environment, I’ll also include steps of creating a self-signed SSL certificate. Note that such self-signed certificate should only be used for internal testing purpose.

HTTP and HTTPS cannot serve on the same port

Intuitively, one would consider binding both HTTP and HTTPS services to the same port on which all requests are processed by a HTTPS handler. Unfortunately, HTTPS uses SSL/TLS protocol which for security reason can’t be simply downgraded to HTTP upon detecting unencrypted requests. A straight-forward solution would be to bind the HTTP and HTTPS services to separate ports and redirect all requests coming into the HTTP port to the HTTPS port.

First let’s create ‘build.sbt’ with necessary library dependencies under the project root subdirectory:

name := "akka-http-secureserver"

version := "1.0"

scalaVersion := "2.11.12"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.4.20",
  "com.typesafe.akka" %% "akka-stream" % "2.4.20",
  "com.typesafe.akka" %% "akka-http" % "10.0.11"
)

Next, create the main application in, say, ${project-root}/src/main/SecureServer.scala:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model.{HttpEntity, ContentTypes, StatusCodes}
import akka.http.scaladsl.{Http, ConnectionContext, HttpsConnectionContext}
import akka.http.scaladsl.server.Directives._
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import java.security.{SecureRandom, KeyStore}
import javax.net.ssl.{SSLContext, KeyManagerFactory, TrustManagerFactory}
import java.io.InputStream
import scala.io.StdIn

object SecureServer {
  def main(args: Array[String]) {

    implicit val system = ActorSystem("my-system")
    implicit val materializer = ActorMaterializer()
    implicit val ec = system.dispatcher

    val password: Array[Char] = "mypassword".toCharArray  // Unsafe to provide password here!

    val ks: KeyStore = KeyStore.getInstance("PKCS12")
    val keystore: InputStream = getClass.getClassLoader.getResourceAsStream("server.p12")

    require(keystore != null, "Keystore required!")
    ks.load(keystore, password)

    val keyManagerFactory: KeyManagerFactory = KeyManagerFactory.getInstance("SunX509")
    keyManagerFactory.init(ks, password)

    val tmf: TrustManagerFactory = TrustManagerFactory.getInstance("SunX509")
    tmf.init(ks)

    val sslContext: SSLContext = SSLContext.getInstance("TLS")
    sslContext.init(keyManagerFactory.getKeyManagers, tmf.getTrustManagers, new SecureRandom)
    val httpsContext: HttpsConnectionContext = ConnectionContext.https(sslContext)

    val hostName = "dev.genuine.com"
    val portHttp = 8080
    val portHttps = 8443

    val route =
      scheme("http") {
        extract(_.request.uri) { uri =>
          redirect( uri.withScheme("https").withAuthority(hostName, portHttps),
            StatusCodes.MovedPermanently
          )
        }
      } ~
      pathSingleSlash {
        get {
          complete( HttpEntity( ContentTypes.`text/html(UTF-8)`,
            "Welcome to Akka-HTTP!"
          ) )
        }
      } ~
      path("hello") {
        get {
          complete( HttpEntity( ContentTypes.`text/html(UTF-8)`,
            "Hello from Akka-HTTP!"
          ) )
        }
      }

    Http().bindAndHandle(route, hostName, portHttp)
    Http().bindAndHandle(route, hostName, portHttps, connectionContext = httpsContext)

    println(s"Server online at https://${hostName}:${portHttps}/\nPress RETURN to stop...")
    StdIn.readLine()

    system.terminate()
  }
}

The top half of the main code are initialization routines for the Akka actor system, stream materializer (which are what Akka HTTP is built on top of) and creating HTTPS connection context. The rest of the code is a standard Akka HTTP snippet with URL routing and server port binding. A good portion of the code is borrowed from this Akka server-side HTTPS support link.

Within the ‘scheme(“http”)’ routing code block is the core logic for HTTPS redirection:

    // HTTPS redirection
    uri =>
      redirect( uri.withScheme("https").withAuthority(hostName, portHttps),
        StatusCodes.MovedPermanently
      )

Note that there is no need for applying ‘withAuthority()’ if you’re using standard HTTPS port (i.e. 443).

Next step would be to put in place the PKCS #12 formatted file, ‘server.p12’, which consists of the PKCS private key and X.509 SSL certificate. It should be placed under ${project-root}/src/main/resources/. At the bottom of this blog post are steps for creating the server key/certificate using open-source library, OpenSSL.

Once the private key/certificate is in place, to run the server application from a Linux command prompt, simply use ‘sbt’ as below:

# Run SecureServer
cd ${project-root}
sbt "runMain SecureServer"

To test it out from a web browser, visit http://dev.genuine.com:8080/hello and you should see the URL get redirected to https://dev.genuine.com:8443/hello. The web browser will warn about security of the site and that’s just because the SSL certificate is a self-signed one.

Generating server key and self-signed SSL certificate in PKCS #12 format

#
# Steps to generate private key and self-signed X.509 certificate in PKCS #12 format
#

## Generate private key
openssl genrsa -des3 -out server.key 2048

# --
Generating RSA private key, 2048 bit long modulus
..................................+++
.......................................................+++
e is 65537 (0x10001)
Enter pass phrase for server.key: genuine
Verifying - Enter pass phrase for server.key:
# --

## Generate CSR
openssl req -new -key server.key -out server.csr

# --
Enter pass phrase for server.key:
Country Name (2 letter code) [AU]:US
State or Province Name (full name) [Some-State]:California
Locality Name (eg, city) []:Sunnyvale
Organization Name (eg, company) [Internet Widgits Pty Ltd]:Genuine
Organizational Unit Name (eg, section) []:
Common Name (e.g. server FQDN or YOUR name) []:dev.genuine.com
Email Address []:postmaster@genuine.com
A challenge password []:
# --

## Remove pass phrase
cp server.key server.key.orig
openssl rsa -in server.key.orig -out server.key

# --
Enter pass phrase for server.key.orig:
writing RSA key
# --

## Generate certificate
openssl x509 -req -days 365 -in server.csr -signkey server.key -out server.crt

# --
Signature ok
subject=/C=US/ST=California/L=Sunnyvale/O=Genuine/CN=dev.genuine.com/emailAddress=postmaster@genuine.com
Getting Private key
# --

## Convert to PKCS #12 or PFX format
openssl pkcs12 -export -out server.p12 -inkey server.key -in server.crt 

# --
Enter Export Password:
Verifying - Enter Export Password:
# --

## Move the PKCS #12 file to the server application resources subdirectory
mv server.p12 ${project-root}/src/main/resources/