Monthly Archives: March 2018

HTTPS Redirection With Akka HTTP

Akka HTTP is a HTTP-based toolkit built on top of Akka Stream. Rather than a framework for rapid web server development, it’s principally designed as a suite of tools for building custom integration layers to wire potentially complex business logic with a REST/HTTP interface. Perhaps for that reason, one might be surprised that there isn’t any example code for something as common as running a HTTPS-by-default web server.

Almost every major website operates using the HTTPS protocol by default for security purpose these days. Under the protocol, the required SSL certificate and the bidirectional encryption of the communications between the web server and client does ensure the authenticity of the website as well as avoid man-in-the-middle attack. It might be an over-kill for, say, an information-only website, but the ‘lock’ icon indicating a valid SSL certificate on the web browser address bar certainly makes site visitors feel more secure.

In this blog post, I’ll assemble a snippet using Akka HTTP to illustrate how to set up a skeletal web server which redirects all plain-HTTP requests to the HTTPS listener. For testing purpose in a development environment, I’ll also include steps of creating a self-signed SSL certificate. Note that such self-signed certificate should only be used for internal testing purpose.

HTTP and HTTPS cannot serve on the same port

Intuitively, one would consider binding both HTTP and HTTPS services to the same port on which all requests are processed by a HTTPS handler. Unfortunately, HTTPS uses SSL/TLS protocol which for security reason can’t be simply downgraded to HTTP upon detecting unencrypted requests. A straight-forward solution would be to bind the HTTP and HTTPS services to separate ports and redirect all requests coming into the HTTP port to the HTTPS port.

First let’s create ‘build.sbt’ with necessary library dependencies under the project root subdirectory:

name := "akka-http-secureserver"

version := "1.0"

scalaVersion := "2.11.12"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.4.20",
  "com.typesafe.akka" %% "akka-stream" % "2.4.20",
  "com.typesafe.akka" %% "akka-http" % "10.0.11"
)

Next, create the main application in, say, ${project-root}/src/main/SecureServer.scala:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model.{HttpEntity, ContentTypes, StatusCodes}
import akka.http.scaladsl.{Http, ConnectionContext, HttpsConnectionContext}
import akka.http.scaladsl.server.Directives._
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import java.security.{SecureRandom, KeyStore}
import javax.net.ssl.{SSLContext, KeyManagerFactory, TrustManagerFactory}
import java.io.InputStream
import scala.io.StdIn

object SecureServer {
  def main(args: Array[String]) {

    implicit val system = ActorSystem("my-system")
    implicit val materializer = ActorMaterializer()
    implicit val ec = system.dispatcher

    val password: Array[Char] = "mypassword".toCharArray  // Unsafe to provide password here!

    val ks: KeyStore = KeyStore.getInstance("PKCS12")
    val keystore: InputStream = getClass.getClassLoader.getResourceAsStream("server.p12")

    require(keystore != null, "Keystore required!")
    ks.load(keystore, password)

    val keyManagerFactory: KeyManagerFactory = KeyManagerFactory.getInstance("SunX509")
    keyManagerFactory.init(ks, password)

    val tmf: TrustManagerFactory = TrustManagerFactory.getInstance("SunX509")
    tmf.init(ks)

    val sslContext: SSLContext = SSLContext.getInstance("TLS")
    sslContext.init(keyManagerFactory.getKeyManagers, tmf.getTrustManagers, new SecureRandom)
    val httpsContext: HttpsConnectionContext = ConnectionContext.https(sslContext)

    val hostName = "dev.genuine.com"
    val portHttp = 8080
    val portHttps = 8443

    val route =
      scheme("http") {
        extract(_.request.uri) { uri =>
          redirect( uri.withScheme("https").withAuthority(hostName, portHttps),
            StatusCodes.MovedPermanently
          )
        }
      } ~
      pathSingleSlash {
        get {
          complete( HttpEntity( ContentTypes.`text/html(UTF-8)`,
            "Welcome to Akka-HTTP!"
          ) )
        }
      } ~
      path("hello") {
        get {
          complete( HttpEntity( ContentTypes.`text/html(UTF-8)`,
            "Hello from Akka-HTTP!"
          ) )
        }
      }

    Http().bindAndHandle(route, hostName, portHttp)
    Http().bindAndHandle(route, hostName, portHttps, connectionContext = httpsContext)

    println(s"Server online at https://${hostName}:${portHttps}/\nPress RETURN to stop...")
    StdIn.readLine()

    system.terminate()
  }
}

The top half of the main code are initialization routines for the Akka actor system, stream materializer (which are what Akka HTTP is built on top of) and creating HTTPS connection context. The rest of the code is a standard Akka HTTP snippet with URL routing and server port binding. A good portion of the code is borrowed from this Akka server-side HTTPS support link.

Within the ‘scheme(“http”)’ routing code block is the core logic for HTTPS redirection:

    // HTTPS redirection
    uri =>
      redirect( uri.withScheme("https").withAuthority(hostName, portHttps),
        StatusCodes.MovedPermanently
      )

Note that there is no need for applying ‘withAuthority()’ if you’re using standard HTTPS port (i.e. 443).

Next step would be to put in place the PKCS #12 formatted file, ‘server.p12’, which consists of the PKCS private key and X.509 SSL certificate. It should be placed under ${project-root}/src/main/resources/. At the bottom of this blog post are steps for creating the server key/certificate using open-source library, OpenSSL.

Once the private key/certificate is in place, to run the server application from a Linux command prompt, simply use ‘sbt’ as below:

# Run SecureServer
cd ${project-root}
sbt "runMain SecureServer"

To test it out from a web browser, visit http://dev.genuine.com:8080/hello and you should see the URL get redirected to https://dev.genuine.com:8443/hello. The web browser will warn about security of the site and that’s just because the SSL certificate is a self-signed one.

Generating server key and self-signed SSL certificate in PKCS #12 format

#
# Steps to generate private key and self-signed X.509 certificate in PKCS #12 format
#

## Generate private key
openssl genrsa -des3 -out server.key 2048

# --
Generating RSA private key, 2048 bit long modulus
..................................+++
.......................................................+++
e is 65537 (0x10001)
Enter pass phrase for server.key: genuine
Verifying - Enter pass phrase for server.key:
# --

## Generate CSR
openssl req -new -key server.key -out server.csr

# --
Enter pass phrase for server.key:
Country Name (2 letter code) [AU]:US
State or Province Name (full name) [Some-State]:California
Locality Name (eg, city) []:Sunnyvale
Organization Name (eg, company) [Internet Widgits Pty Ltd]:Genuine
Organizational Unit Name (eg, section) []:
Common Name (e.g. server FQDN or YOUR name) []:dev.genuine.com
Email Address []:postmaster@genuine.com
A challenge password []:
# --

## Remove pass phrase
cp server.key server.key.orig
openssl rsa -in server.key.orig -out server.key

# --
Enter pass phrase for server.key.orig:
writing RSA key
# --

## Generate certificate
openssl x509 -req -days 365 -in server.csr -signkey server.key -out server.crt

# --
Signature ok
subject=/C=US/ST=California/L=Sunnyvale/O=Genuine/CN=dev.genuine.com/emailAddress=postmaster@genuine.com
Getting Private key
# --

## Convert to PKCS #12 or PFX format
openssl pkcs12 -export -out server.p12 -inkey server.key -in server.crt 

# --
Enter Export Password:
Verifying - Enter Export Password:
# --

## Move the PKCS #12 file to the server application resources subdirectory
mv server.p12 ${project-root}/src/main/resources/

Scala On Spark – Word-pair Count

So far, the few programming examples in the SoS (Scala on Spark) blog series have all centered around DataFrames. In this blog post, I would like to give an example on Spark’s RDD (resilient distributed data), which is an immutable distributed collection of data that can be processed via functional transformations (e.g. map, filter, reduce).

The main difference between the RDD and DataFrame APIs is that the former provides more granular low-level functionality whereas the latter is equipped with powerful SQL-style functions to process table-form data. Note that even though a DataFrame is in table form with named columns, the underlying JVM only treats each row of the data a generic untyped object. As a side note, Spark also supports another data abstraction called Dataset, which is a distributed collection of strongly-typed objects.

Back to the RDD world. In this programming exercise, our goal is to count the number of occurrences of every distinct pair of consecutive words in a text file. In essence, for every given distinct word in a text file we’re going to count the number of occurrences of all distinct words following the word. As a trivial example, if the text is “I am what I am”, the result should be (i, am) = 2, (what, i) = 1, (am, what) = 1.

For illustration purpose, let’s assemble a small piece of text as follows and save it in a file, say in a Hadoop HDFS file system:

This is line one.
And this is line two.
Is this line three?
This is another line.
And this is yet another line!
Line one and line two are similar.
But line two and line three are not similar!
And line three and line four are not similar.
But line four and line five are similar!

Simple word count

As a warm-up exercise, let’s perform a hello-world word count, which simply reports the count of every distinct word in a text file. Using the ‘textFile()’ method in SparkContext, which serves as the entry point for every program to be able to access resources on a Spark cluster, we load the content from the HDFS file:

// Count occurrences of distinct words
val wordCountRDD = sc.textFile("hdfs://path/to/textfile").
  flatMap( _.split("""[\s,.;:!?]+""") ).
  map( _.toLowerCase ).
  map( (_, 1) ).
  reduceByKey( _ + _ ).
  sortBy( z => (z._2, z._1), ascending = false )

Viewed as a collection of lines (delimited by carriage returns), we first use ‘flatMap’ to split each line of the text by punctuations into an array of words then flatten the arrays. Note that ‘_.split()’ is just a Scala short-hand for ‘line => line.split()’.

Next, all words are lowercased (to disregard cases) with the transformation ‘word => word.toLowerCase’, followed by a map transformation ‘word => (word, 1)’ for tallying. Using ‘reduceByKey’, the reduction transformation ‘(total, count) => total + count’ (short-handed as ‘(_ + _)’) for each key transforms every word into a tuple of (word, totalcount). The final sorting is just for ordering the result by count.

Since the dataset is small, we can ‘collect’ the result data to see the output:

wordCountRDD.collect.foreach{
  case (a, b) => println(f"$a%10s" + "  : " + f"$b%4s")
}

      line  :   13
       and  :    7
      this  :    5
        is  :    5
   similar  :    4
       are  :    4
       two  :    3
     three  :    3
       one  :    2
       not  :    2
      four  :    2
       but  :    2
   another  :    2
       yet  :    1
      five  :    1

On a related note, Spark’s ‘reduceByKey()’ along with a couple of other ‘xxxxByKey()’ functions are handy tools for this kind of key-value pair transformations. Had they not been provided, one would have to do it with a little more hand-crafting work like:

  groupBy( _._1 ).mapValues( _.map(_._2).sum )

  // OR

  foldLeft( Map[String, Int]() )( (acc, x) => 
    acc + (x._1 -> (acc.getOrElse(x._1, 0) + x._2) )
  )

Word-pair count

Now, let’s move onto the main topic of this blog post – counting distinct pairs of consecutive words:

import org.apache.spark.mllib.rdd.RDDFunctions._

// Count occurrences of distinct word pairs
val wordPairCountRDD = sc.textFile("hdfs://path/to/textfile").
  flatMap( _.split("""[\s,.;:!?]+""") ).
  map( _.toLowerCase ).
  sliding(2).
  map{ case Array(x, y) => ((x, y), 1) }.
  reduceByKey( _ + _ ).
  sortBy( z => (z._2, z._1._1, z._1._2), ascending = false )

Even though the required logic for counting word pairs is apparently more complex than that of counting individual words, the necessary transformations look only slightly different. It’s partly due to how compositions of modularized functions can make complex data transformations look seemingly simple in a functional programming language like Scala. Another key factor in this case is the availability of the powerful ‘sliding(n)’ function, which transforms a collection of elements into sliding windows each in the form of an array of size ‘n’. For example, applying sliding(2) to a sequence of words “apples”, “and”, “oranges” would result in Array(“apples”, “and”) and Array(“and”, “oranges”).

Scanning through the compositional functions, the split by punctuations and lowercasing do exactly the same thing as in the hello-world word count case. Next, ‘sliding(2)’ generates sliding window of word pairs each stored in an array. The subsequent ‘map’ each of the word-pair arrays into a key/value tuple with the word-pair-tuple being the key and 1 being the count value.

Similar to the reduction transformation in the hello-world word count case, ‘reduceByKey()’ generates count for each word pair. Result is then sorted by count, 1st word in word-pair, 2nd word in word-pair. Output of the word-pair count using ‘collect’ is as follows:

wordPairCountRDD.collect.foreach{
  case ((a, b), c) => println(f"$a%10s" + "    -> " + f"$b%10s" + "  : " + f"$c%4s")
}

       and    ->       line  :    5
      this    ->         is  :    4
      line    ->        two  :    3
      line    ->      three  :    3
   similar    ->        but  :    2
       one    ->        and  :    2
       not    ->    similar  :    2
      line    ->        one  :    2
      line    ->       four  :    2
        is    ->       line  :    2
       but    ->       line  :    2
       are    ->    similar  :    2
       are    ->        not  :    2
   another    ->       line  :    2
       and    ->       this  :    2
       yet    ->    another  :    1
       two    ->         is  :    1
       two    ->        are  :    1
       two    ->        and  :    1
     three    ->       this  :    1
     three    ->        are  :    1
     three    ->        and  :    1
      this    ->       line  :    1
   similar    ->        and  :    1
      line    ->       line  :    1
      line    ->       five  :    1
      line    ->        and  :    1
        is    ->        yet  :    1
        is    ->       this  :    1
        is    ->    another  :    1
      four    ->        are  :    1
      four    ->        and  :    1
      five    ->        are  :    1

Creating a word-pair count method

The above word-pair counting snippet can be repurposed to serve as a general method for counting a specific word-pair in a text file:

import org.apache.spark.SparkContext
import org.apache.spark.mllib.rdd.RDDFunctions._

def wordPairCount(word1: String, word2: String, filePath: String)(implicit sc: SparkContext) =
  sc.textFile(filePath).
    flatMap( _.split("""[\s,.;:!?]+""") ).
    map( _.toLowerCase ).
    sliding(2).
    collect{ case Array(`word1`, `word2`) => ((word1, word2), 1) }.
    reduceByKey( _ + _ )

It’s worth noting that Scala’s collect method (not to be confused with Spark’s RDD ‘collect’ method) has now replaced method ‘map’ in the previous snippet. It’s because we’re now interested in counting only the specific word-pair word1 and word2, thus requiring the inherent filtering functionality from method ‘collect’. Also note that in the ‘case’ statement the pair of words are enclosed in backticks to refer to the passed-in words, rather than arbitrary pattern-matching variables.

To use the word-pair count method, simply provide the pair of consecutive words and the file path as parameters, along with the SparkContext to be passed in an implicit parameter. For example:

implicit val sc = SparkContext.getOrCreate

wordPairCount("line", "two", "hdfs://path/to/textfile")
// res1: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[56] at reduceByKey at :42

res1.collect
// res2: Array[((String, String), Int)] = Array(((line,two),3))