Tag Archives: scala

A Brief Overview Of Scala Futures

As demand for computing performance continues to grow, contemporary applications have been increasingly exploiting the collective processing power of all the available CPU cores to maximize task execution in parallel. But writing asynchronous code requires methodical processing control to avoid issues such as race condition and can be quite challenging even for experienced programmers.

The Scala Future API provides a comprehensive set of functions for writing concurrent, asynchronous code. By design, Scala is a functional programming language with an inherent emphasis on immutability and composability that help avoid issues like race condition and facilitate successive transformations. In this blog post, we’re going to explore how the Scala Futures benefit from those functional features.

Simulating a CPU-bound task

Let’s first prepare a couple of items for upcoming uses:

  1. a `Result` case class representing result of work with a work `id` and time spent in milliseconds
  2. a `doWork` method which mimics executing some CPU-bound work for a random period of time and returns a `Result` object
case class Result(val id: Int, val millis: Long)

def doWork(id: Int): Result = {
  import java.util.concurrent.ThreadLocalRandom
  val millis = ThreadLocalRandom.current.nextLong(1000, 2000)
  val millisTMO = 1500
  val start = System.currentTimeMillis()

  while ((System.currentTimeMillis() - start) < millis) {}

  if (millis < millisTMO) {
    println("Work Result: " + Result(id, millis))
    Result(id, millis)
  } else {
    println(s"TimeOut: work id $id would take ${millis - millisTMO}ms above limit to complete!")
    throw new Throwable("TimeOut!")
  }
}

Note that the side-effecting println within `doWork` is for illustrating when each of the asynchronously launched tasks is executed.

The conventional way of running asynchronous tasks

Using the `doWork` method, the conventional way of asynchronously running a number of tasks typically involves a configurable thread pool using Java Executor to execute the tasks as individual Runnables.

import java.util.concurrent.Executors

case class Task(id: Int) extends Runnable {
  @Override
  def run(): Unit = {
    doWork(id)
  }
}

val pool = Executors.newFixedThreadPool(4)

(1 to 4).foreach( id => pool.execute(Task(id)) )

// Work Result: Result(3, 1119)
// Work Result: Result(1, 1448)
// TimeOut: work id 4 would take 260ms above limit to complete! Exception ...
// TimeOut: work id 2 would take 471ms above limit to complete! Exception ...

Despite the (1 to 4) ordering of the tasks, the chronological work result printouts with shuffled work ids shows that they were processed in parallel. It’s worth noting that method run() does not return a value.

Using Scala Futures

import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Try, Success, Failure}

val pool = Executors.newFixedThreadPool(4)

implicit val ec = ExecutionContext.fromExecutorService(pool)

(1 to 4).foreach( id => Future{ doWork(id) }.onComplete(println) )

// Work Result: Result(1, 1042)
// Work Result: Result(4, 1128)
// Work Result: Result(3, 1399)
// TimeOut: work id 2 would take 302ms above limit to complete! Exception ...

By simply wrapping `doWork` in a Future, each task is now asynchronously executed and results are captured by the `onComplete` callback method. The callback method takes a `Try[T] => U` function and can be expanded to handle the success/failure cases accordingly:

Future{ doWork(id) }.onComplete{
  case Success(res) => println("Success: " + res)
  case Failure(err) => println("Failure: " + err)
}

We’re using the same Executor thread pool which can be configured to optimize for specific computing environment (e.g. number of CPU cores). The implicit ExecutionContext is required for executing callback methods such as `onComplete`. One could also fall back to Scala’s default ExecutionContext, which is a Fork/Join Executor, by simply importing the following:

import scala.concurrent.ExecutionContext.Implicits.global

However, Scala Futures provide a lot more than just a handy wrapper for executing non-blocking tasks with callback methods.

Immutability and composability

Scala Futures involve code running in multiple threads. By adhering to using Scala’s immutable collections, defining values as immutable `val` (contrary to variables as mutable `var`), and relying on functional transformations (as opposed to mutations), one can easily write concurrent code that is thread-safe, avoiding problems such as race condition.

But perhaps one of the most sought-after features of Scala Futures is the support of composable transformations in the functional programming way. For example, we can chain a bunch of Futures via methods like `map` and `filter`:

Future{ doWork(1) }.
  filter( _.millis < 1400 ).
  map( _ => doWork(2) )

// Work Result: Result(1,1133)
// TimeOut: work id 2 would take 333ms above limit to complete!

The above snippet asynchronously runs doWork(1), and if finished within 1400 ms, continues to run the next task doWork(2).

Another example: let’s say we have a number of predefined methods `doChainedWork(id, res)` with id = 1, 2, 3, …, each taking a work result and deriving a new work result like below:

def doChainedWork(id: Int, res: Result): Result = {
  // Take a `Result` and derive a new `Result` from it ...
}

And let’s say we want to successively apply `doChainedWork` in a non-blocking fashion. We can simply wrap each of them in a Future and chain them using flatMap:

Future{ doChainedWork(1) }.
  flatMap( res => Future{ doChainedWork(2, res) } ).
  flatMap( res => Future{ doChainedWork(3, res) } )

Note that neither of the above trivialized example code handles failures, hence will break upon the first exception. Depending on the specific business logic, that might not be desirable.

Using map and recover on Futures

While the `onComplete` callback in the previous example can handle failures, its `Unit` return type hinders composability. This section addresses the very issue.

import scala.concurrent.{Future, Await}
import scala.concurrent.duration._
import scala.util.{Try, Success, Failure}

import scala.concurrent.ExecutionContext.Implicits.global

(1 to 4).map( id =>
  Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
)

Await.result(combinedFuture, 10.seconds)

// Work Result: Result(2, 1074)
// Work Result: Result(1, 1363)
// Work Result: Result(4, 1369)
// TimeOut: work id 2 would take 370ms above limit to complete! Exception ...

// res1: scala.collection.immutable.IndexedSeq[scala.concurrent.
//   Future[Product with Serializable with scala.util.Either[Throwable,Result[Int]]]] =
//   Vector(
//     Future(Success(Right(Result(1, 1363)))),
//     Future(Success(Right(Result(2, 1074)))),
//     Future(Success(Left(java.lang.Throwable: TimeOut!))),
//     Future(Success(Right(Result(4, 1369))))
//   )

In the Future trait, methods `map` and `recover` have the following signature:

def map[S](f: (T) => S)(implicit executor: ExecutionContext): Future[S]

def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U]

When a Future results in success, method `map` applies the provided function to the result. On the other hand, if the Future results in failure with a Throwable, method `recover` applies a given partial function that matches the Throwable. Both methods return a new Future.

In the above sample code, we use `map` to create a Future of Right[Result] when doWork succeeds, and `recover` to create a Future of Left[Throwable] when doWork fails. Using Either[Throwable, Result[Int]] as the return data type, we capture successful and failed return in a type-safe fashion, allowing composition of any additional transformations.

Method `Await` is used to wait for a given duration for the combined Future to complete and return the result.

From a sequence of Futures to a Future of sequence

Oftentimes, when faced with a set of Futures each of which consists of values to be consumed, we would prefer wrapping the set of values within a single Future. For that purpose, the Scala Future companion object provides a useful method `Future.sequence` which converts a sequence of Futures to a Future of sequence.

val combinedFuture = Future.sequence( (1 to 4).map( id =>
  Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
) )

// Work Result: Result(2, 1262)
// Work Result: Result(3, 1490)
// TimeOut: work id 1 would take 6ms above limit to complete!
// TimeOut: work id 4 would take 405ms above limit to complete!

// res1: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[
//   Product with Serializable with scala.util.Either[Throwable,Result]]] =
//   Future(Success(Vector(
//     Left(java.lang.Throwable: TimeOut!),
//     Right(Result(2, 1262)),
//     Right(Result(3, 1490)),
//     Left(java.lang.Throwable: TimeOut!)
//   )))

As shown in the example, a collection of `Future[Either[Throwable,Result]]` is transformed into a single Future of `Either[Throwable,Result]` elements.

Or, we could use method `Future.traverse` which is a more generalized version of `Future.sequence`. It allows one to provide a function, `f: A => Future[B]`, as an additional input to be applied to the items in the individual Futures of the input sequence. The following snippet that takes a `(1 to 4)` range and an `Int => Future[Either[Throwable,Result]]` function as input carries out the same transformation as the above `Future.sequence` snippet.

val combinedFuture = Future.traverse(1 to 4)( id =>
  Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
)

First completed Future

If one only cares about the first completed Future out of a list of Futures launched in parallel, the Scala Future companion object provides a handy method `Future.firstCompletedOf`.

val firstCompleted = Future.firstCompletedOf( (1 to 4).map( id =>
  Future{ doWork(id) }.map( Right(_) ).recover{ case e => Left(e) }
) )

Await.ready(firstCompleted, 10.seconds)

// Work Result: Result(4, 1071)
// Work Result: Result(2, 1181)
// Work Result: Result(1, 1321)
// TimeOut: work id 3 would take 152ms above limit to complete!

// res1: Product with Serializable with scala.util.Either[Throwable,Result] = Right(Result(4, 1071))

Grasping the “future”

As the code examples have demonstrated so far, Scala Future is an abstraction which returns a value in the future. A take-away point is that once a Future is “kicked off”, it’s in some sense “untouchable” and its value won’t be available till it’s completed with success or failure. In another blog post some other time, we’ll explore how one can seize a little more control in the “when” and “what” in completing a Future.

Scala On Spark – Word-pair Count

So far, the few programming examples in the SoS (Scala on Spark) blog series have all centered around DataFrames. In this blog post, I would like to give an example on Spark’s RDD (resilient distributed data), which is an immutable distributed collection of data that can be processed via functional transformations (e.g. map, filter, reduce).

The main difference between the RDD and DataFrame APIs is that the former provides more granular low-level functionality whereas the latter is equipped with powerful SQL-style functions to process table-form data. Note that even though a DataFrame is in table form with named columns, the underlying JVM only treats each row of the data a generic untyped object. As a side note, Spark also supports another data abstraction called Dataset, which is a distributed collection of strongly-typed objects.

Back to the RDD world. In this programming exercise, our goal is to count the number of occurrences of every distinct pair of consecutive words in a text file. In essence, for every given distinct word in a text file we’re going to count the number of occurrences of all distinct words following the word. As a trivial example, if the text is “I am what I am”, the result should be (i, am) = 2, (what, i) = 1, (am, what) = 1.

For illustration purpose, let’s assemble a small piece of text as follows and save it in a file, say in a Hadoop HDFS file system:

This is line one.
And this is line two.
Is this line three?
This is another line.
And this is yet another line!
Line one and line two are similar.
But line two and line three are not similar!
And line three and line four are not similar.
But line four and line five are similar!

Simple word count

As a warm-up exercise, let’s perform a hello-world word count, which simply reports the count of every distinct word in a text file. Using the ‘textFile()’ method in SparkContext, which serves as the entry point for every program to be able to access resources on a Spark cluster, we load the content from the HDFS file:

// Count occurrences of distinct words
val wordCountRDD = sc.textFile("hdfs://path/to/textfile").
  flatMap( _.split("""[\s,.;:!?]+""") ).
  map( _.toLowerCase ).
  map( (_, 1) ).
  reduceByKey( _ + _ ).
  sortBy( z => (z._2, z._1), ascending = false )

Viewed as a collection of lines (delimited by carriage returns), we first use ‘flatMap’ to split each line of the text by punctuations into an array of words then flatten the arrays. Note that ‘_.split()’ is just a Scala short-hand for ‘line => line.split()’.

Next, all words are lowercased (to disregard cases) with the transformation ‘word => word.toLowerCase’, followed by a map transformation ‘word => (word, 1)’ for tallying. Using ‘reduceByKey’, the reduction transformation ‘(total, count) => total + count’ (short-handed as ‘(_ + _)’) for each key transforms every word into a tuple of (word, totalcount). The final sorting is just for ordering the result by count.

Since the dataset is small, we can ‘collect’ the result data to see the output:

wordCountRDD.collect.foreach{
  case (a, b) => println(f"$a%10s" + "  : " + f"$b%4s")
}

      line  :   13
       and  :    7
      this  :    5
        is  :    5
   similar  :    4
       are  :    4
       two  :    3
     three  :    3
       one  :    2
       not  :    2
      four  :    2
       but  :    2
   another  :    2
       yet  :    1
      five  :    1

On a related note, Spark’s ‘reduceByKey()’ along with a couple of other ‘xxxxByKey()’ functions are handy tools for this kind of key-value pair transformations. Had they not been provided, one would have to do it with a little more hand-crafting work like:

  groupBy( _._1 ).mapValues( _.map(_._2).sum )

  // OR

  foldLeft( Map[String, Int]() )( (acc, x) => 
    acc + (x._1 -> (acc.getOrElse(x._1, 0) + x._2) )
  )

Word-pair count

Now, let’s move onto the main topic of this blog post – counting distinct pairs of consecutive words:

import org.apache.spark.mllib.rdd.RDDFunctions._

// Count occurrences of distinct word pairs
val wordPairCountRDD = sc.textFile("hdfs://path/to/textfile").
  flatMap( _.split("""[\s,.;:!?]+""") ).
  map( _.toLowerCase ).
  sliding(2).
  map{ case Array(x, y) => ((x, y), 1) }.
  reduceByKey( _ + _ ).
  sortBy( z => (z._2, z._1._1, z._1._2), ascending = false )

Even though the required logic for counting word pairs is apparently more complex than that of counting individual words, the necessary transformations look only slightly different. It’s partly due to how compositions of modularized functions can make complex data transformations look seemingly simple in a functional programming language like Scala. Another key factor in this case is the availability of the powerful ‘sliding(n)’ function, which transforms a collection of elements into sliding windows each in the form of an array of size ‘n’. For example, applying sliding(2) to a sequence of words “apples”, “and”, “oranges” would result in Array(“apples”, “and”) and Array(“and”, “oranges”).

Scanning through the compositional functions, the split by punctuations and lowercasing do exactly the same thing as in the hello-world word count case. Next, ‘sliding(2)’ generates sliding window of word pairs each stored in an array. The subsequent ‘map’ each of the word-pair arrays into a key/value tuple with the word-pair-tuple being the key and 1 being the count value.

Similar to the reduction transformation in the hello-world word count case, ‘reduceByKey()’ generates count for each word pair. Result is then sorted by count, 1st word in word-pair, 2nd word in word-pair. Output of the word-pair count using ‘collect’ is as follows:

wordPairCountRDD.collect.foreach{
  case ((a, b), c) => println(f"$a%10s" + "    -> " + f"$b%10s" + "  : " + f"$c%4s")
}

       and    ->       line  :    5
      this    ->         is  :    4
      line    ->        two  :    3
      line    ->      three  :    3
   similar    ->        but  :    2
       one    ->        and  :    2
       not    ->    similar  :    2
      line    ->        one  :    2
      line    ->       four  :    2
        is    ->       line  :    2
       but    ->       line  :    2
       are    ->    similar  :    2
       are    ->        not  :    2
   another    ->       line  :    2
       and    ->       this  :    2
       yet    ->    another  :    1
       two    ->         is  :    1
       two    ->        are  :    1
       two    ->        and  :    1
     three    ->       this  :    1
     three    ->        are  :    1
     three    ->        and  :    1
      this    ->       line  :    1
   similar    ->        and  :    1
      line    ->       line  :    1
      line    ->       five  :    1
      line    ->        and  :    1
        is    ->        yet  :    1
        is    ->       this  :    1
        is    ->    another  :    1
      four    ->        are  :    1
      four    ->        and  :    1
      five    ->        are  :    1

Creating a word-pair count method

The above word-pair counting snippet can be repurposed to serve as a general method for counting a specific word-pair in a text file:

import org.apache.spark.SparkContext
import org.apache.spark.mllib.rdd.RDDFunctions._

def wordPairCount(word1: String, word2: String, filePath: String)(implicit sc: SparkContext) =
  sc.textFile(filePath).
    flatMap( _.split("""[\s,.;:!?]+""") ).
    map( _.toLowerCase ).
    sliding(2).
    collect{ case Array(`word1`, `word2`) => ((word1, word2), 1) }.
    reduceByKey( _ + _ )

It’s worth noting that Scala’s collect method (not to be confused with Spark’s RDD ‘collect’ method) has now replaced method ‘map’ in the previous snippet. It’s because we’re now interested in counting only the specific word-pair word1 and word2, thus requiring the inherent filtering functionality from method ‘collect’. Also note that in the ‘case’ statement the pair of words are enclosed in backticks to refer to the passed-in words, rather than arbitrary pattern-matching variables.

To use the word-pair count method, simply provide the pair of consecutive words and the file path as parameters, along with the SparkContext to be passed in an implicit parameter. For example:

implicit val sc = SparkContext.getOrCreate

wordPairCount("line", "two", "hdfs://path/to/textfile")
// res1: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[56] at reduceByKey at :42

res1.collect
// res2: Array[((String, String), Int)] = Array(((line,two),3))

Scala On Spark – Streak

This is yet another programming example in my Scala-on-Spark blog series. Again, while it starts with the same minuscule weather data used in previous examples of the blog series, it can be viewed as an independent programming exercise.

In this example, we’re going to create a table that shows the streaks of consecutive months with non-zero precipitation.

Result should be similar to the following:

+-------+-------+--------------+------+
|station|year_mo|monthly_precip|streak|
+-------+-------+--------------+------+
|    100|2017-07|           0.5|     1|
|    100|2017-08|           2.0|     2|
|    100|2017-11|           1.5|     1|
|    100|2017-12|           3.5|     2|
|    115|2017-07|           1.0|     1|
|    115|2017-10|           1.5|     1|
|    115|2017-11|           3.0|     2|
|    115|2017-12|           4.5|     3|
+-------+-------+--------------+------+

We’ll explore using Spark’s window functions in this example. As a side note, some of the previous examples in the blog series could be resolved using window functions as well. By means of aggregating over partitioned sliding windows of data, Spark’s window functions readily perform certain kinds of complex aggregations which would otherwise require repetitive nested groupings. They are similar to how PostgreSQL’s window functions work.

Now, let’s load up the same old minuscule weather data.

import java.sql.Date

// DataFrame columns:
//   Weather Station ID
//   Start Date of a half-month period
//   Temperature High (in Fahrenheit) over the period
//   Temperature Low (in Fahrenheit) over the period
//   Total Precipitation (in inches) over the period
val weatherDataDF = Seq(
  (100, Date.valueOf("2017-07-01"), 75, 59, 0.0),
  (100, Date.valueOf("2017-07-16"), 77, 59, 0.5),
  (100, Date.valueOf("2017-08-01"), 80, 63, 1.0),
  (100, Date.valueOf("2017-08-16"), 78, 62, 1.0),
  (100, Date.valueOf("2017-09-01"), 74, 59, 0.0),
  (100, Date.valueOf("2017-09-16"), 72, 57, 0.0),
  (100, Date.valueOf("2017-10-01"), 68, 54, 0.0),
  (100, Date.valueOf("2017-10-16"), 66, 54, 0.0),
  (100, Date.valueOf("2017-11-01"), 64, 50, 0.5),
  (100, Date.valueOf("2017-11-16"), 61, 48, 1.0),
  (100, Date.valueOf("2017-12-01"), 59, 46, 2.0),
  (100, Date.valueOf("2017-12-16"), 57, 45, 1.5),
  (115, Date.valueOf("2017-07-01"), 76, 57, 0.0),
  (115, Date.valueOf("2017-07-16"), 76, 56, 1.0),
  (115, Date.valueOf("2017-08-01"), 78, 57, 0.0),
  (115, Date.valueOf("2017-08-16"), 81, 57, 0.0),
  (115, Date.valueOf("2017-09-01"), 77, 54, 0.0),
  (115, Date.valueOf("2017-09-16"), 72, 50, 0.0),
  (115, Date.valueOf("2017-10-01"), 65, 45, 0.0),
  (115, Date.valueOf("2017-10-16"), 59, 40, 1.5),
  (115, Date.valueOf("2017-11-01"), 55, 37, 1.0),
  (115, Date.valueOf("2017-11-16"), 52, 35, 2.0),
  (115, Date.valueOf("2017-12-01"), 45, 30, 3.0),
  (115, Date.valueOf("2017-12-16"), 41, 28, 1.5)
).toDF("station", "start_date", "temp_high", "temp_low", "total_precip")

First, create a DataFrame of precipitation by weather station and month and filter it to consist of only months with positive precipitation.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val positivePrecipDF = weatherDataDF.groupBy($"station", last_day($"start_date").as("mo_end")).
  agg(sum($"total_precip").as("monthly_precip")).
  where($"monthly_precip" > 0)

positivePrecipDF.orderBy($"station", $"mo_end").show
// +-------+----------+--------------+
// |station|    mo_end|monthly_precip|
// +-------+----------+--------------+
// |    100|2017-07-31|           0.5|
// |    100|2017-08-31|           2.0|
// |    100|2017-11-30|           1.5|
// |    100|2017-12-31|           3.5|
// |    115|2017-07-31|           1.0|
// |    115|2017-10-31|           1.5|
// |    115|2017-11-30|           3.0|
// |    115|2017-12-31|           4.5|
// +-------+----------+--------------+

Next, using window function, we capture sequences of row numbers ordered by month over partitions by weather station. For each row, we then use an UDF to calculate the base date by dating back from the corresponding month of the row in accordance with the row number. As shown in the following table, these base dates help trace chunks of contiguous months back to their common base dates.

// UDF to calculate the last day of the month which is n months prior to the input date
def baseDate = udf{ (d: java.sql.Date, n: Long) =>
  val base = d.toLocalDate.plusMonths(-n)
  java.sql.Date.valueOf(base.withDayOfMonth(base.lengthOfMonth))
}

val rankedDF = positivePrecipDF.withColumn("row_num", row_number.over(
  Window.partitionBy($"station").orderBy($"mo_end")
)).withColumn(
  "baseDate", baseDate($"mo_end", $"row_num")
)

rankedDF.show
// +-------+----------+--------------+-------+----------+
// |station|    mo_end|monthly_precip|row_num|  baseDate|
// +-------+----------+--------------+-------+----------+
// |    115|2017-07-31|           1.0|      1|2017-06-30|
// |    115|2017-10-31|           1.5|      2|2017-08-31|
// |    115|2017-11-30|           3.0|      3|2017-08-31|
// |    115|2017-12-31|           4.5|      4|2017-08-31|
// |    100|2017-07-31|           0.5|      1|2017-06-30|
// |    100|2017-08-31|           2.0|      2|2017-06-30|
// |    100|2017-11-30|           1.5|      3|2017-08-31|
// |    100|2017-12-31|           3.5|      4|2017-08-31|
// +-------+----------+--------------+-------+----------+

Finally, we apply another row-number window function, but this time, over partitions by weather station as well as base date. This partitioning allows contiguous common base dates to generate new row numbers as the wanted streaks.

val streakDF = rankedDF.select(
  $"station", $"mo_end", $"monthly_precip", row_number.over(
    Window.partitionBy($"station", $"baseDate").orderBy($"mo_end")
  ).as("streak")
).withColumn(
  "year_mo", concat(year($"mo_end"), lit("-"), lpad(month($"mo_end"), 2, "0"))
).select(
  $"station", $"year_mo", $"monthly_precip", $"streak"
).orderBy($"station", $"mo_end")

streakDF.show
// +-------+-------+--------------+------+
// |station|year_mo|monthly_precip|streak|
// +-------+-------+--------------+------+
// |    100|2017-07|           0.5|     1|
// |    100|2017-08|           2.0|     2|
// |    100|2017-11|           1.5|     1|
// |    100|2017-12|           3.5|     2|
// |    115|2017-07|           1.0|     1|
// |    115|2017-10|           1.5|     1|
// |    115|2017-11|           3.0|     2|
// |    115|2017-12|           4.5|     3|
// +-------+-------+--------------+------+

Using the same logic flow, we can also generate similar streak reports for temperature high/low (e.g. streak of temperature high above 75F). I’ll leave that as exercise for the readers.