Tag Archives: spark

Spark – Time Series Sessions

When analyzing time series activity data, it’s often useful to group the chronological activities into “target”-based sessions. These “targets” could be products, events, web pages, etc.

Using a simplified time series log of web page activities, we’ll look at how web page-based sessions can be created in this blog post.

Let’s say we have a log of chronological web page activities as shown below:

+----+-------------------+--------+-----------+
|user|          timestamp|    page|     action|
+----+-------------------+--------+-----------+
| 101|2018-06-01 10:30:15|    home|   redirect|
| 101|2018-06-01 10:31:00|    home| info-click|
| 101|2018-06-01 10:33:00|    home| info-click|
| 101|2018-06-01 10:33:45|products|      visit|
| 101|2018-06-01 10:35:00|products| info-click|
| 101|2018-06-01 10:50:30|products| info-click|
| 101|2018-06-01 11:40:15|about-us|      visit|
| 101|2018-06-01 11:41:00|about-us| info-click|
| 101|2018-06-01 15:00:45|    home|      visit|
| 101|2018-06-01 15:02:00|    home| info-click|
| ...|           ...     |     ...|        ...|
+----+-------------------+--------+-----------+

And let’s say we want to group the log data by web page to generate user-defined sessions with format `userID-#`, where # is a monotonically increasing integer, like below:

+----+-------------------+--------+-----------+-------+
|user|          timestamp|    page|     action|sess_id|
+----+-------------------+--------+-----------+-------+
| 101|2018-06-01 10:30:15|    home|   redirect|  101-1|
| 101|2018-06-01 10:31:00|    home| info-click|  101-1|
| 101|2018-06-01 10:33:00|    home| info-click|  101-1|
| 101|2018-06-01 10:33:45|products|      visit|  101-2|
| 101|2018-06-01 10:35:00|products| info-click|  101-2|
| 101|2018-06-01 10:50:30|products| info-click|  101-2|
| 101|2018-06-01 11:40:15|about-us|      visit|  101-3|
| 101|2018-06-01 11:41:00|about-us| info-click|  101-3|
| 101|2018-06-01 15:00:45|    home|      visit|  101-4|
| 101|2018-06-01 15:02:00|    home| info-click|  101-4|
| ...|           ...     |     ...|        ...|    ...|
+----+-------------------+--------+-----------+-------+

The first thing that pops up in one’s mind might be to perform a `groupBy(user, page)` or a Window `partitionBy(user, page)`. But that wouldn’t work since doing so would disregard time gaps between the same page, resulting in all rows with the same page grouped together under a given user.

First thing first, let’s assemble a DataFrame with some sample web page activity data:

val df = Seq(
  (101, "2018-06-01 10:30:15", "home", "redirect"),
  (101, "2018-06-01 10:32:00", "home", "info-click"),
  (101, "2018-06-01 10:35:00", "home", "info-click"),
  (101, "2018-06-01 11:00:45", "products", "visit"),
  (101, "2018-06-01 11:12:00", "products", "info-click"),
  (101, "2018-06-01 11:25:30", "products", "info-click"),
  (101, "2018-06-01 11:38:15", "about-us", "info-click"),
  (101, "2018-06-01 11:50:00", "about-us", "info-click"),
  (101, "2018-06-01 12:01:45", "home", "visit"),
  (101, "2018-06-01 12:04:00", "home", "info-click"),
  (101, "2018-06-01 20:02:45", "home", "visit"),
  (101, "2018-06-01 20:40:00", "products", "info-click"),
  (101, "2018-06-01 20:46:30", "products", "info-click"),
  (101, "2018-06-01 20:50:15", "products", "add-to-cart"),
  (220, "2018-06-01 18:15:30", "home", "redirect"),
  (220, "2018-06-01 18:17:00", "home", "info-click"),
  (220, "2018-06-01 18:40:45", "home", "info-click"),
  (220, "2018-06-01 18:52:30", "home", "info-click"),
  (220, "2018-06-01 19:04:45", "products", "info-click"),
  (220, "2018-06-01 19:17:00", "products", "info-click"),
  (220, "2018-06-01 19:30:30", "products", "info-click"),
  (220, "2018-06-01 19:42:30", "products", "info-click"),
  (220, "2018-06-01 19:45:30", "products", "add-to-cart")
).toDF("user", "timestamp", "page", "action")

The solution to be presented here involves a few steps:

  1. Generate a new column `first_ts` which, for each user, has the value of `timestamp` in the current row if the `page` value is different from that in the previous row; otherwise `null`.
  2. Backfill all the `null`s in `first_ts` with the `last` non-null value via Window function `last()` and store in the new column `sess_ts`.
  3. Assemble session IDs by concatenating `user` and the `dense_rank` of `sess_ts` within each `user` partition.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val winUserTS = Window.partitionBy($"user").orderBy("timestamp")
val winUserSess = Window.partitionBy($"user").orderBy("sess_ts")

df.
  withColumn( "first_ts", when( row_number.over(winUserTS) === 1 ||
      $"page" =!= lag($"page", 1).over(winUserTS), $"timestamp"
    ) ).
  withColumn( "sess_ts", last($"first_ts", ignoreNulls = true).over(
      winUserTS.rowsBetween(Window.unboundedPreceding, 0)
    ) ).
  withColumn( "session_id", concat($"user", lit("-"), dense_rank.over(winUserSess)) ).
  show(50)
+----+-------------------+--------+-----------+-------------------+-------------------+----------+
|user|          timestamp|    page|     action|           first_ts|            sess_ts|session_id|
+----+-------------------+--------+-----------+-------------------+-------------------+----------+
| 101|2018-06-01 10:30:15|    home|   redirect|2018-06-01 10:30:15|2018-06-01 10:30:15|     101-1|
| 101|2018-06-01 10:32:00|    home| info-click|               null|2018-06-01 10:30:15|     101-1|
| 101|2018-06-01 10:35:00|    home| info-click|               null|2018-06-01 10:30:15|     101-1|
| 101|2018-06-01 11:00:45|products|      visit|2018-06-01 11:00:45|2018-06-01 11:00:45|     101-2|
| 101|2018-06-01 11:12:00|products| info-click|               null|2018-06-01 11:00:45|     101-2|
| 101|2018-06-01 11:25:30|products| info-click|               null|2018-06-01 11:00:45|     101-2|
| 101|2018-06-01 11:38:15|about-us| info-click|2018-06-01 11:38:15|2018-06-01 11:38:15|     101-3|
| 101|2018-06-01 11:50:00|about-us| info-click|               null|2018-06-01 11:38:15|     101-3|
| 101|2018-06-01 12:01:45|    home|      visit|2018-06-01 12:01:45|2018-06-01 12:01:45|     101-4|
| 101|2018-06-01 12:04:00|    home| info-click|               null|2018-06-01 12:01:45|     101-4|
| 101|2018-06-01 20:02:45|    home|      visit|               null|2018-06-01 12:01:45|     101-4|
| 101|2018-06-01 20:40:00|products| info-click|2018-06-01 20:40:00|2018-06-01 20:40:00|     101-5|
| 101|2018-06-01 20:46:30|products| info-click|               null|2018-06-01 20:40:00|     101-5|
| 101|2018-06-01 20:50:15|products|add-to-cart|               null|2018-06-01 20:40:00|     101-5|
| 220|2018-06-01 18:15:30|    home|   redirect|2018-06-01 18:15:30|2018-06-01 18:15:30|     220-1|
| 220|2018-06-01 18:17:00|    home| info-click|               null|2018-06-01 18:15:30|     220-1|
| 220|2018-06-01 18:40:45|    home| info-click|               null|2018-06-01 18:15:30|     220-1|
| 220|2018-06-01 18:52:30|    home| info-click|               null|2018-06-01 18:15:30|     220-1|
| 220|2018-06-01 19:04:45|products| info-click|2018-06-01 19:04:45|2018-06-01 19:04:45|     220-2|
| 220|2018-06-01 19:17:00|products| info-click|               null|2018-06-01 19:04:45|     220-2|
| 220|2018-06-01 19:30:30|products| info-click|               null|2018-06-01 19:04:45|     220-2|
| 220|2018-06-01 19:42:30|products| info-click|               null|2018-06-01 19:04:45|     220-2|
| 220|2018-06-01 19:45:30|products|add-to-cart|               null|2018-06-01 19:04:45|     220-2|
+----+-------------------+--------+-----------+-------------------+-------------------+----------+

Note that the final output includes all the intermediate columns (i.e. `first_ts` and `sess_ts`) for demonstration purpose.

Spark – Interpolating Time Series Data

Like many software professionals, I often resort to Stack Overflow (through search engines) when looking for clues to solve programming problems at hand. Besides looking for programming solution ideas, when I find some free time I would also visit the site to help answer questions posted there. Occasionally, some of the more interesting question topics would trigger me to expand them into blog posts. In fact, a good chunk of the content in my Scala-on-Spark (SoS) mini blog series was from selected Stack Overflow questions I’ve answered in the past. So is the topic being tackled in this blog post.

Suppose we have time-series data with time gaps among the chronological timestamps like below:

+---+-------------------+------+
| id|          timestamp|amount|
+---+-------------------+------+
|  1|2019-01-16 08:30:00| 100.0|
|  1|2019-01-16 08:35:00| 110.0|
|  1|2019-01-16 08:38:00| 120.0|
|  2|2019-01-16 12:00:00| 200.0|
|  2|2019-01-16 12:03:00| 210.0|
|  2|2019-01-16 12:04:00| 220.0|
|  2|2019-01-16 12:08:00| 230.0|
+---+-------------------+------+

Our goal is to expand column `timestamp` into per-minute timestamps and column `amount` into linearly interpolated values like below:

+---+-------------------+------+
| id|          timestamp|amount|
+---+-------------------+------+
|  1|2019-01-16 08:30:00| 100.0|
|  1|2019-01-16 08:31:00| 102.0|
|  1|2019-01-16 08:32:00| 104.0|
|  1|2019-01-16 08:33:00| 106.0|
|  1|2019-01-16 08:34:00| 108.0|
|  1|2019-01-16 08:35:00| 110.0|
|  1|2019-01-16 08:36:00|113.33|
|  1|2019-01-16 08:37:00|116.67|
|  1|2019-01-16 08:38:00| 120.0|
|...|           ...     |   ...|
+---+-------------------+------+

There are different ways to solve interpolation problems. Since timestamps can be represented as `Long` values (i.e. Unix time), it might make sense to consider using method spark.range to create a time series of contiguous timestamps and left-join with the dataset at hand. The catch, though, is that the method applies to the entire dataset (as opposed to per-group) and requires the `start` and `end` of the timestamp range as its parameters that might not be known in advance.

A more flexible approach would be to use a UDF (user-defined function) for custom data manipulation, though at the expense of potential performance degradation (since built-in Spark APIs that leverage Spark’s optimization engine generally scale better than UDFs). For more details about native functions versus UDFs in Spark, check out this blog post.

Nevertheless, the solution being proposed here involves using a UDF which, for each row, takes values of `timestamp` and `amount` in both the current row and previous row as parameters, and returns a list of interpolated `(timestamp, amount)` Tuples. Using the java.time API, the previous and current String-type timestamps will be converted into a LocalDateTime range to be linearly interpolated.

def tsInterpolate(tsPattern: String) = udf{

  (ts1: String, ts2: String, amt1: Double, amt2: Double) =>
    import java.time.LocalDateTime
    import java.time.format.DateTimeFormatter

    val timeFormat = DateTimeFormatter.ofPattern(tsPattern)

    val perMinuteTS = if (ts1 == ts2) Vector(ts1) else {
      val ldt1 = LocalDateTime.parse(ts1, timeFormat)
      val ldt2 = LocalDateTime.parse(ts2, timeFormat)
      Iterator.iterate(ldt1.plusMinutes(1))(_.plusMinutes(1)).
        takeWhile(! _.isAfter(ldt2)).
        map(_.format(timeFormat)).
        toVector
    }

    val perMinuteAmt = for {
      i <- 1 to perMinuteTS.size
    } yield amt1 + ((amt2 - amt1) * i / perMinuteTS.size)

    perMinuteTS zip perMinuteAmt
}

Note that `Iterator.iterate(init)(next).takeWhile(condition)` in the UDF is just a functional version of the conventional `while-loop`.

With the UDF in place, we provide the function the timestamp pattern along with the previous/current timestamp pair and previous/current amount pair to produce a list of interpolated `timestamp-amount` pairs. The output will then be `flattened` using Spark's built-in `explode` function.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val df = Seq(
  (1, "2019-01-16 08:30:00", 100.0),
  (1, "2019-01-16 08:35:00", 110.0),
  (1, "2019-01-16 08:38:00", 120.0),
  (2, "2019-01-16 12:00:00", 200.0),
  (2, "2019-01-16 12:03:00", 210.0),
  (2, "2019-01-16 12:04:00", 220.0),
  (2, "2019-01-16 12:08:00", 230.0)
).toDF("id", "timestamp", "amount")

val tsPattern = "yyyy-MM-dd HH:mm:ss"
val win = Window.partitionBy($"id").orderBy($"timestamp")

df.
  withColumn("timestampPrev", when(row_number.over(win) === 1, $"timestamp").
    otherwise(lag($"timestamp", 1).over(win))
  ).
  withColumn("amountPrev", when(row_number.over(win) === 1, $"amount").
    otherwise(lag($"amount", 1).over(win))
  ).
  withColumn("interpolatedList",
    tsInterpolate(tsPattern)($"timestampPrev", $"timestamp", $"amountPrev", $"amount")
  ).
  withColumn("interpolated", explode($"interpolatedList")).
  select(
    $"id", $"interpolated._1".as("timestamp"), round($"interpolated._2", 2).as("amount")
  ).
  show
+---+-------------------+------+
| id|          timestamp|amount|
+---+-------------------+------+
|  1|2019-01-16 08:30:00| 100.0|
|  1|2019-01-16 08:31:00| 102.0|
|  1|2019-01-16 08:32:00| 104.0|
|  1|2019-01-16 08:33:00| 106.0|
|  1|2019-01-16 08:34:00| 108.0|
|  1|2019-01-16 08:35:00| 110.0|
|  1|2019-01-16 08:36:00|113.33|
|  1|2019-01-16 08:37:00|116.67|
|  1|2019-01-16 08:38:00| 120.0|
|  2|2019-01-16 12:00:00| 200.0|
|  2|2019-01-16 12:01:00|203.33|
|  2|2019-01-16 12:02:00|206.67|
|  2|2019-01-16 12:03:00| 210.0|
|  2|2019-01-16 12:04:00| 220.0|
|  2|2019-01-16 12:05:00| 222.5|
|  2|2019-01-16 12:06:00| 225.0|
|  2|2019-01-16 12:07:00| 227.5|
|  2|2019-01-16 12:08:00| 230.0|
+---+-------------------+------+

Scala On Spark – Word-pair Count

So far, the few programming examples in the SoS (Scala on Spark) blog series have all centered around DataFrames. In this blog post, I would like to give an example on Spark’s RDD (resilient distributed data), which is an immutable distributed collection of data that can be processed via functional transformations (e.g. map, filter, reduce).

The main difference between the RDD and DataFrame APIs is that the former provides more granular low-level functionality whereas the latter is equipped with powerful SQL-style functions to process table-form data. Note that even though a DataFrame is in table form with named columns, the underlying JVM only treats each row of the data a generic untyped object. As a side note, Spark also supports another data abstraction called Dataset, which is a distributed collection of strongly-typed objects.

Back to the RDD world. In this programming exercise, our goal is to count the number of occurrences of every distinct pair of consecutive words in a text file. In essence, for every given distinct word in a text file we’re going to count the number of occurrences of all distinct words following the word. As a trivial example, if the text is “I am what I am”, the result should be (i, am) = 2, (what, i) = 1, (am, what) = 1.

For illustration purpose, let’s assemble a small piece of text as follows and save it in a file, say in a Hadoop HDFS file system:

This is line one.
And this is line two.
Is this line three?
This is another line.
And this is yet another line!
Line one and line two are similar.
But line two and line three are not similar!
And line three and line four are not similar.
But line four and line five are similar!

Simple word count

As a warm-up exercise, let’s perform a hello-world word count, which simply reports the count of every distinct word in a text file. Using the ‘textFile()’ method in SparkContext, which serves as the entry point for every program to be able to access resources on a Spark cluster, we load the content from the HDFS file:

// Count occurrences of distinct words
val wordCountRDD = sc.textFile("hdfs://path/to/textfile").
  flatMap( _.split("""[\s,.;:!?]+""") ).
  map( _.toLowerCase ).
  map( (_, 1) ).
  reduceByKey( _ + _ ).
  sortBy( z => (z._2, z._1), ascending = false )

Viewed as a collection of lines (delimited by carriage returns), we first use ‘flatMap’ to split each line of the text by punctuations into an array of words then flatten the arrays. Note that ‘_.split()’ is just a Scala short-hand for ‘line => line.split()’.

Next, all words are lowercased (to disregard cases) with the transformation ‘word => word.toLowerCase’, followed by a map transformation ‘word => (word, 1)’ for tallying. Using ‘reduceByKey’, the reduction transformation ‘(total, count) => total + count’ (short-handed as ‘(_ + _)’) for each key transforms every word into a tuple of (word, totalcount). The final sorting is just for ordering the result by count.

Since the dataset is small, we can ‘collect’ the result data to see the output:

wordCountRDD.collect.foreach{
  case (a, b) => println(f"$a%10s" + "  : " + f"$b%4s")
}

      line  :   13
       and  :    7
      this  :    5
        is  :    5
   similar  :    4
       are  :    4
       two  :    3
     three  :    3
       one  :    2
       not  :    2
      four  :    2
       but  :    2
   another  :    2
       yet  :    1
      five  :    1

On a related note, Spark’s ‘reduceByKey()’ along with a couple of other ‘xxxxByKey()’ functions are handy tools for this kind of key-value pair transformations. Had they not been provided, one would have to do it with a little more hand-crafting work like:

  groupBy( _._1 ).mapValues( _.map(_._2).sum )

  // OR

  foldLeft( Map[String, Int]() )( (acc, x) => 
    acc + (x._1 -> (acc.getOrElse(x._1, 0) + x._2) )
  )

Word-pair count

Now, let’s move onto the main topic of this blog post – counting distinct pairs of consecutive words:

import org.apache.spark.mllib.rdd.RDDFunctions._

// Count occurrences of distinct word pairs
val wordPairCountRDD = sc.textFile("hdfs://path/to/textfile").
  flatMap( _.split("""[\s,.;:!?]+""") ).
  map( _.toLowerCase ).
  sliding(2).
  map{ case Array(x, y) => ((x, y), 1) }.
  reduceByKey( _ + _ ).
  sortBy( z => (z._2, z._1._1, z._1._2), ascending = false )

Even though the required logic for counting word pairs is apparently more complex than that of counting individual words, the necessary transformations look only slightly different. It’s partly due to how compositions of modularized functions can make complex data transformations look seemingly simple in a functional programming language like Scala. Another key factor in this case is the availability of the powerful ‘sliding(n)’ function, which transforms a collection of elements into sliding windows each in the form of an array of size ‘n’. For example, applying sliding(2) to a sequence of words “apples”, “and”, “oranges” would result in Array(“apples”, “and”) and Array(“and”, “oranges”).

Scanning through the compositional functions, the split by punctuations and lowercasing do exactly the same thing as in the hello-world word count case. Next, ‘sliding(2)’ generates sliding window of word pairs each stored in an array. The subsequent ‘map’ each of the word-pair arrays into a key/value tuple with the word-pair-tuple being the key and 1 being the count value.

Similar to the reduction transformation in the hello-world word count case, ‘reduceByKey()’ generates count for each word pair. Result is then sorted by count, 1st word in word-pair, 2nd word in word-pair. Output of the word-pair count using ‘collect’ is as follows:

wordPairCountRDD.collect.foreach{
  case ((a, b), c) => println(f"$a%10s" + "    -> " + f"$b%10s" + "  : " + f"$c%4s")
}

       and    ->       line  :    5
      this    ->         is  :    4
      line    ->        two  :    3
      line    ->      three  :    3
   similar    ->        but  :    2
       one    ->        and  :    2
       not    ->    similar  :    2
      line    ->        one  :    2
      line    ->       four  :    2
        is    ->       line  :    2
       but    ->       line  :    2
       are    ->    similar  :    2
       are    ->        not  :    2
   another    ->       line  :    2
       and    ->       this  :    2
       yet    ->    another  :    1
       two    ->         is  :    1
       two    ->        are  :    1
       two    ->        and  :    1
     three    ->       this  :    1
     three    ->        are  :    1
     three    ->        and  :    1
      this    ->       line  :    1
   similar    ->        and  :    1
      line    ->       line  :    1
      line    ->       five  :    1
      line    ->        and  :    1
        is    ->        yet  :    1
        is    ->       this  :    1
        is    ->    another  :    1
      four    ->        are  :    1
      four    ->        and  :    1
      five    ->        are  :    1

Creating a word-pair count method

The above word-pair counting snippet can be repurposed to serve as a general method for counting a specific word-pair in a text file:

import org.apache.spark.SparkContext
import org.apache.spark.mllib.rdd.RDDFunctions._

def wordPairCount(word1: String, word2: String, filePath: String)(implicit sc: SparkContext) =
  sc.textFile(filePath).
    flatMap( _.split("""[\s,.;:!?]+""") ).
    map( _.toLowerCase ).
    sliding(2).
    collect{ case Array(`word1`, `word2`) => ((word1, word2), 1) }.
    reduceByKey( _ + _ )

It’s worth noting that Scala’s collect method (not to be confused with Spark’s RDD ‘collect’ method) has now replaced method ‘map’ in the previous snippet. It’s because we’re now interested in counting only the specific word-pair word1 and word2, thus requiring the inherent filtering functionality from method ‘collect’. Also note that in the ‘case’ statement the pair of words are enclosed in backticks to refer to the passed-in words, rather than arbitrary pattern-matching variables.

To use the word-pair count method, simply provide the pair of consecutive words and the file path as parameters, along with the SparkContext to be passed in an implicit parameter. For example:

implicit val sc = SparkContext.getOrCreate

wordPairCount("line", "two", "hdfs://path/to/textfile")
// res1: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[56] at reduceByKey at :42

res1.collect
// res2: Array[((String, String), Int)] = Array(((line,two),3))