Tag Archives: time series

Spark – Custom Timeout Sessions

In the previous blog post, we saw how one could partition a time series log of web activities into web page-based sessions. Operating on the same original dataset, we’re going to generate sessions based on a different set of rules.

Rather than web page-based, sessions are defined with the following rules:

  1. A session expires after inactivity of a timeout period (say `tmo1`), and,
  2. An active session expires after a timeout period (say `tmo2`).

First, we assemble the original sample dataset used in the previous blog:

val df = Seq(
  (101, "2018-06-01 10:30:15", "home", "redirect"),
  (101, "2018-06-01 10:32:00", "home", "info-click"),
  (101, "2018-06-01 10:35:00", "home", "info-click"),
  (101, "2018-06-01 11:00:45", "products", "visit"),
  (101, "2018-06-01 11:12:00", "products", "info-click"),
  (101, "2018-06-01 11:25:30", "products", "info-click"),
  (101, "2018-06-01 11:38:15", "about-us", "info-click"),
  (101, "2018-06-01 11:50:00", "about-us", "info-click"),
  (101, "2018-06-01 12:01:45", "home", "visit"),
  (101, "2018-06-01 12:04:00", "home", "info-click"),
  (101, "2018-06-01 20:02:45", "home", "visit"),
  (101, "2018-06-01 20:40:00", "products", "info-click"),
  (101, "2018-06-01 20:46:30", "products", "info-click"),
  (101, "2018-06-01 20:50:15", "products", "add-to-cart"),
  (220, "2018-06-01 18:15:30", "home", "redirect"),
  (220, "2018-06-01 18:17:00", "home", "info-click"),
  (220, "2018-06-01 18:40:45", "home", "info-click"),
  (220, "2018-06-01 18:52:30", "home", "info-click"),
  (220, "2018-06-01 19:04:45", "products", "info-click"),
  (220, "2018-06-01 19:17:00", "products", "info-click"),
  (220, "2018-06-01 19:30:30", "products", "info-click"),
  (220, "2018-06-01 19:42:30", "products", "info-click"),
  (220, "2018-06-01 19:45:30", "products", "add-to-cart")
).toDF("user", "timestamp", "page", "action")

Let’s set the first timeout `tmo1` to 15 minutes, and the second timeout `tmo2` to 60 minutes.

The end result should look something like below:

+----+-------------------+-------+
|user|          timestamp|sess_id|
+----+-------------------+-------+
| 101|2018-06-01 10:30:15|  101-1|
| 101|2018-06-01 10:32:00|  101-1|
| 101|2018-06-01 10:35:00|  101-1|
| 101|2018-06-01 11:00:45|  101-2|  <-- timeout rule #1
| 101|2018-06-01 11:12:00|  101-2|
| 101|2018-06-01 11:25:30|  101-2|
| 101|2018-06-01 11:38:15|  101-2|
| 101|2018-06-01 11:50:00|  101-2|
| 101|2018-06-01 12:01:45|  101-3|  <-- timeout rule #2
| 101|2018-06-01 12:04:00|  101-3|
| ...|           ...     |    ...|
+----+-------------------+-------+

Given the above session creation rules, it’s obvious that all programming logic is going to be centered around the timestamp alone, hence the omission of columns like `page` in the expected final result.

Generating sessions based on rule #1 is rather straight forward as computing the timestamp difference between consecutive rows is easy with Spark built-in Window functions. As for session creation rule #2, it requires dynamically identifying the start of the next session that depends on where the current session ends. Hence, even robust Window functions over, say, `partitionBy(user).orderBy(timestamp).rangeBetween(0, tmo2)` wouldn’t cut it.

The solution to be suggested involves using a UDF (user-defined fucntion) to leverage Scala’s feature-rich set of functions:

def tmoSessList(tmo: Long) = udf{ (uid: String, tsList: Seq[String], tsDiffs: Seq[Long]) =>
  def sid(n: Long) = s"$uid-$n"

  val sessList = tsDiffs.foldLeft( (List[String](), 0L, 0L) ){ case ((ls, j, k), i) =>
    if (i == 0 || j + i >= tmo)
      (sid(k + 1) :: ls, 0L, k + 1)
    else
      (sid(k) :: ls, j + i, k)
  }._1.reverse

  tsList zip sessList
}

Note that the timestamp diff list `tsDiffs` is the main input being processed for generating sessions based on the `tmo2` value (session create rule #2). The timestamp list `tsList` is being “passed thru” merely to be included in the output with each timestamp paired with the corresponding session ID.

Also note that the accumulator for `foldLeft` in the UDF is a Tuple of `(ls, j, k)`, where:

  • `ls` is the list of formatted session IDs to be returned
  • `j` and `k` are for carrying over the conditionally changing timestamp value and session id number, respectively, to the next iteration

Now, let’s lay out the steps for carrying out the necessary transformations to generate the sessions:

  1. Identify sessions (with 0 = start of a session) per user based on session creation rule #1
  2. Group the dataset to assemble the timestamp diff list per user
  3. Process the timestamp diff list via the above UDF to identify sessions based on rule #2 and generate all session IDs per user
  4. Expand the processed dataset which consists of the timestamp paired with the corresponding session IDs

Step 1:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val tmo1: Long = 15 * 60
val tmo2: Long = 60 * 60

val win1 = Window.partitionBy("user").orderBy("timestamp")

val df1 = df.
  withColumn("ts_diff",
    unix_timestamp($"timestamp") - unix_timestamp(lag($"timestamp", 1).over(win1))
  ).
  withColumn("ts_diff", when(row_number.over(win1) === 1 || $"ts_diff" >= tmo1, 0L).
    otherwise($"ts_diff")
  )

df1.show(50)
+----+-------------------+--------+-----------+-------+
|user|          timestamp|    page|     action|ts_diff|
+----+-------------------+--------+-----------+-------+
| 101|2018-06-01 10:30:15|    home|   redirect|      0|
| 101|2018-06-01 10:32:00|    home| info-click|    105|
| 101|2018-06-01 10:35:00|    home| info-click|    180|
| 101|2018-06-01 11:00:45|products|      visit|      0|
| 101|2018-06-01 11:12:00|products| info-click|    675|
| 101|2018-06-01 11:25:30|products| info-click|    810|
| 101|2018-06-01 11:38:15|about-us| info-click|    765|
| 101|2018-06-01 11:50:00|about-us| info-click|    705|
| 101|2018-06-01 12:01:45|    home|      visit|    705|
| 101|2018-06-01 12:04:00|    home| info-click|    135|
| 101|2018-06-01 20:02:45|    home|      visit|      0|
| 101|2018-06-01 20:40:00|products| info-click|      0|
| 101|2018-06-01 20:46:30|products| info-click|    390|
| 101|2018-06-01 20:50:15|products|add-to-cart|    225|
| 220|2018-06-01 18:15:30|    home|   redirect|      0|
| 220|2018-06-01 18:17:00|    home| info-click|     90|
| 220|2018-06-01 18:40:45|    home| info-click|      0|
| 220|2018-06-01 18:52:30|    home| info-click|    705|
| 220|2018-06-01 19:04:45|products| info-click|    735|
| 220|2018-06-01 19:17:00|products| info-click|    735|
| 220|2018-06-01 19:30:30|products| info-click|    810|
| 220|2018-06-01 19:42:30|products| info-click|    720|
| 220|2018-06-01 19:45:30|products|add-to-cart|    180|
+----+-------------------+--------+-----------+-------+

Steps 2-4:

val df2 = df1.
  groupBy("user").agg(
    collect_list($"timestamp").as("ts_list"), collect_list($"ts_diff").as("ts_diffs")
  ).
  withColumn("tmo_sess_id",
    explode(tmoSessList(tmo2)($"user", $"ts_list", $"ts_diffs"))
  ).
  select($"user", $"tmo_sess_id._1".as("timestamp"), $"tmo_sess_id._2".as("sess_id"))

df2.show(50)
+----+-------------------+-------+
|user|          timestamp|sess_id|
+----+-------------------+-------+
| 101|2018-06-01 10:30:15|  101-1|  User 101
| 101|2018-06-01 10:32:00|  101-1|
| 101|2018-06-01 10:35:00|  101-1|
| 101|2018-06-01 11:00:45|  101-2|  <-- timeout rule #1
| 101|2018-06-01 11:12:00|  101-2|
| 101|2018-06-01 11:25:30|  101-2|
| 101|2018-06-01 11:38:15|  101-2|
| 101|2018-06-01 11:50:00|  101-2|
| 101|2018-06-01 12:01:45|  101-3|  <-- timeout rule #2
| 101|2018-06-01 12:04:00|  101-3|
| 101|2018-06-01 20:02:45|  101-4|  <-- timeout rule #1
| 101|2018-06-01 20:40:00|  101-5|  <-- timeout rule #1
| 101|2018-06-01 20:46:30|  101-5|
| 101|2018-06-01 20:50:15|  101-5|
| 220|2018-06-01 18:15:30|  220-1|  User 220
| 220|2018-06-01 18:17:00|  220-1|
| 220|2018-06-01 18:40:45|  220-2|  <-- timeout rule #1
| 220|2018-06-01 18:52:30|  220-2|
| 220|2018-06-01 19:04:45|  220-2|
| 220|2018-06-01 19:17:00|  220-2|
| 220|2018-06-01 19:30:30|  220-2|
| 220|2018-06-01 19:42:30|  220-3|  <-- timeout rule #2
| 220|2018-06-01 19:45:30|  220-3|
+----+-------------------+-------+

Spark – Time Series Sessions

When analyzing time series activity data, it’s often useful to group the chronological activities into “target”-based sessions. These “targets” could be products, events, web pages, etc.

Using a simplified time series log of web page activities, we’ll look at how web page-based sessions can be created in this blog post.

Let’s say we have a log of chronological web page activities as shown below:

+----+-------------------+--------+-----------+
|user|          timestamp|    page|     action|
+----+-------------------+--------+-----------+
| 101|2018-06-01 10:30:15|    home|   redirect|
| 101|2018-06-01 10:31:00|    home| info-click|
| 101|2018-06-01 10:33:00|    home| info-click|
| 101|2018-06-01 10:33:45|products|      visit|
| 101|2018-06-01 10:35:00|products| info-click|
| 101|2018-06-01 10:50:30|products| info-click|
| 101|2018-06-01 11:40:15|about-us|      visit|
| 101|2018-06-01 11:41:00|about-us| info-click|
| 101|2018-06-01 15:00:45|    home|      visit|
| 101|2018-06-01 15:02:00|    home| info-click|
| ...|           ...     |     ...|        ...|
+----+-------------------+--------+-----------+

And let’s say we want to group the log data by web page to generate user-defined sessions with format `userID-#`, where # is a monotonically increasing integer, like below:

+----+-------------------+--------+-----------+-------+
|user|          timestamp|    page|     action|sess_id|
+----+-------------------+--------+-----------+-------+
| 101|2018-06-01 10:30:15|    home|   redirect|  101-1|
| 101|2018-06-01 10:31:00|    home| info-click|  101-1|
| 101|2018-06-01 10:33:00|    home| info-click|  101-1|
| 101|2018-06-01 10:33:45|products|      visit|  101-2|
| 101|2018-06-01 10:35:00|products| info-click|  101-2|
| 101|2018-06-01 10:50:30|products| info-click|  101-2|
| 101|2018-06-01 11:40:15|about-us|      visit|  101-3|
| 101|2018-06-01 11:41:00|about-us| info-click|  101-3|
| 101|2018-06-01 15:00:45|    home|      visit|  101-4|
| 101|2018-06-01 15:02:00|    home| info-click|  101-4|
| ...|           ...     |     ...|        ...|    ...|
+----+-------------------+--------+-----------+-------+

The first thing that pops up in one’s mind might be to perform a `groupBy(user, page)` or a Window `partitionBy(user, page)`. But that wouldn’t work since doing so would disregard time gaps between the same page, resulting in all rows with the same page grouped together under a given user.

First thing first, let’s assemble a DataFrame with some sample web page activity data:

val df = Seq(
  (101, "2018-06-01 10:30:15", "home", "redirect"),
  (101, "2018-06-01 10:32:00", "home", "info-click"),
  (101, "2018-06-01 10:35:00", "home", "info-click"),
  (101, "2018-06-01 11:00:45", "products", "visit"),
  (101, "2018-06-01 11:12:00", "products", "info-click"),
  (101, "2018-06-01 11:25:30", "products", "info-click"),
  (101, "2018-06-01 11:38:15", "about-us", "info-click"),
  (101, "2018-06-01 11:50:00", "about-us", "info-click"),
  (101, "2018-06-01 12:01:45", "home", "visit"),
  (101, "2018-06-01 12:04:00", "home", "info-click"),
  (101, "2018-06-01 20:02:45", "home", "visit"),
  (101, "2018-06-01 20:40:00", "products", "info-click"),
  (101, "2018-06-01 20:46:30", "products", "info-click"),
  (101, "2018-06-01 20:50:15", "products", "add-to-cart"),
  (220, "2018-06-01 18:15:30", "home", "redirect"),
  (220, "2018-06-01 18:17:00", "home", "info-click"),
  (220, "2018-06-01 18:40:45", "home", "info-click"),
  (220, "2018-06-01 18:52:30", "home", "info-click"),
  (220, "2018-06-01 19:04:45", "products", "info-click"),
  (220, "2018-06-01 19:17:00", "products", "info-click"),
  (220, "2018-06-01 19:30:30", "products", "info-click"),
  (220, "2018-06-01 19:42:30", "products", "info-click"),
  (220, "2018-06-01 19:45:30", "products", "add-to-cart")
).toDF("user", "timestamp", "page", "action")

The solution to be presented here involves a few steps:

  1. Generate a new column `first_ts` which, for each user, has the value of `timestamp` in the current row if the `page` value is different from that in the previous row; otherwise `null`.
  2. Backfill all the `null`s in `first_ts` with the `last` non-null value via Window function `last()` and store in the new column `sess_ts`.
  3. Assemble session IDs by concatenating `user` and the `dense_rank` of `sess_ts` within each `user` partition.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val winUserTS = Window.partitionBy($"user").orderBy("timestamp")
val winUserSess = Window.partitionBy($"user").orderBy("sess_ts")

df.
  withColumn( "first_ts", when( row_number.over(winUserTS) === 1 ||
      $"page" =!= lag($"page", 1).over(winUserTS), $"timestamp"
    ) ).
  withColumn( "sess_ts", last($"first_ts", ignoreNulls = true).over(
      winUserTS.rowsBetween(Window.unboundedPreceding, 0)
    ) ).
  withColumn( "session_id", concat($"user", lit("-"), dense_rank.over(winUserSess)) ).
  show(50)
+----+-------------------+--------+-----------+-------------------+-------------------+----------+
|user|          timestamp|    page|     action|           first_ts|            sess_ts|session_id|
+----+-------------------+--------+-----------+-------------------+-------------------+----------+
| 101|2018-06-01 10:30:15|    home|   redirect|2018-06-01 10:30:15|2018-06-01 10:30:15|     101-1|
| 101|2018-06-01 10:32:00|    home| info-click|               null|2018-06-01 10:30:15|     101-1|
| 101|2018-06-01 10:35:00|    home| info-click|               null|2018-06-01 10:30:15|     101-1|
| 101|2018-06-01 11:00:45|products|      visit|2018-06-01 11:00:45|2018-06-01 11:00:45|     101-2|
| 101|2018-06-01 11:12:00|products| info-click|               null|2018-06-01 11:00:45|     101-2|
| 101|2018-06-01 11:25:30|products| info-click|               null|2018-06-01 11:00:45|     101-2|
| 101|2018-06-01 11:38:15|about-us| info-click|2018-06-01 11:38:15|2018-06-01 11:38:15|     101-3|
| 101|2018-06-01 11:50:00|about-us| info-click|               null|2018-06-01 11:38:15|     101-3|
| 101|2018-06-01 12:01:45|    home|      visit|2018-06-01 12:01:45|2018-06-01 12:01:45|     101-4|
| 101|2018-06-01 12:04:00|    home| info-click|               null|2018-06-01 12:01:45|     101-4|
| 101|2018-06-01 20:02:45|    home|      visit|               null|2018-06-01 12:01:45|     101-4|
| 101|2018-06-01 20:40:00|products| info-click|2018-06-01 20:40:00|2018-06-01 20:40:00|     101-5|
| 101|2018-06-01 20:46:30|products| info-click|               null|2018-06-01 20:40:00|     101-5|
| 101|2018-06-01 20:50:15|products|add-to-cart|               null|2018-06-01 20:40:00|     101-5|
| 220|2018-06-01 18:15:30|    home|   redirect|2018-06-01 18:15:30|2018-06-01 18:15:30|     220-1|
| 220|2018-06-01 18:17:00|    home| info-click|               null|2018-06-01 18:15:30|     220-1|
| 220|2018-06-01 18:40:45|    home| info-click|               null|2018-06-01 18:15:30|     220-1|
| 220|2018-06-01 18:52:30|    home| info-click|               null|2018-06-01 18:15:30|     220-1|
| 220|2018-06-01 19:04:45|products| info-click|2018-06-01 19:04:45|2018-06-01 19:04:45|     220-2|
| 220|2018-06-01 19:17:00|products| info-click|               null|2018-06-01 19:04:45|     220-2|
| 220|2018-06-01 19:30:30|products| info-click|               null|2018-06-01 19:04:45|     220-2|
| 220|2018-06-01 19:42:30|products| info-click|               null|2018-06-01 19:04:45|     220-2|
| 220|2018-06-01 19:45:30|products|add-to-cart|               null|2018-06-01 19:04:45|     220-2|
+----+-------------------+--------+-----------+-------------------+-------------------+----------+

Note that the final output includes all the intermediate columns (i.e. `first_ts` and `sess_ts`) for demonstration purpose.

Spark – Interpolating Time Series Data

Like many software professionals, I often resort to Stack Overflow (through search engines) when looking for clues to solve programming problems at hand. Besides looking for programming solution ideas, when I find some free time I would also visit the site to help answer questions posted there. Occasionally, some of the more interesting question topics would trigger me to expand them into blog posts. In fact, a good chunk of the content in my Scala-on-Spark (SoS) mini blog series was from selected Stack Overflow questions I’ve answered in the past. So is the topic being tackled in this blog post.

Suppose we have time-series data with time gaps among the chronological timestamps like below:

+---+-------------------+------+
| id|          timestamp|amount|
+---+-------------------+------+
|  1|2019-01-16 08:30:00| 100.0|
|  1|2019-01-16 08:35:00| 110.0|
|  1|2019-01-16 08:38:00| 120.0|
|  2|2019-01-16 12:00:00| 200.0|
|  2|2019-01-16 12:03:00| 210.0|
|  2|2019-01-16 12:04:00| 220.0|
|  2|2019-01-16 12:08:00| 230.0|
+---+-------------------+------+

Our goal is to expand column `timestamp` into per-minute timestamps and column `amount` into linearly interpolated values like below:

+---+-------------------+------+
| id|          timestamp|amount|
+---+-------------------+------+
|  1|2019-01-16 08:30:00| 100.0|
|  1|2019-01-16 08:31:00| 102.0|
|  1|2019-01-16 08:32:00| 104.0|
|  1|2019-01-16 08:33:00| 106.0|
|  1|2019-01-16 08:34:00| 108.0|
|  1|2019-01-16 08:35:00| 110.0|
|  1|2019-01-16 08:36:00|113.33|
|  1|2019-01-16 08:37:00|116.67|
|  1|2019-01-16 08:38:00| 120.0|
|...|           ...     |   ...|
+---+-------------------+------+

There are different ways to solve interpolation problems. Since timestamps can be represented as `Long` values (i.e. Unix time), it might make sense to consider using method spark.range to create a time series of contiguous timestamps and left-join with the dataset at hand. The catch, though, is that the method applies to the entire dataset (as opposed to per-group) and requires the `start` and `end` of the timestamp range as its parameters that might not be known in advance.

A more flexible approach would be to use a UDF (user-defined function) for custom data manipulation, though at the expense of potential performance degradation (since built-in Spark APIs that leverage Spark’s optimization engine generally scale better than UDFs). For more details about native functions versus UDFs in Spark, check out this blog post.

Nevertheless, the solution being proposed here involves using a UDF which, for each row, takes values of `timestamp` and `amount` in both the current row and previous row as parameters, and returns a list of interpolated `(timestamp, amount)` Tuples. Using the java.time API, the previous and current String-type timestamps will be converted into a LocalDateTime range to be linearly interpolated.

def tsInterpolate(tsPattern: String) = udf{

  (ts1: String, ts2: String, amt1: Double, amt2: Double) =>
    import java.time.LocalDateTime
    import java.time.format.DateTimeFormatter

    val timeFormat = DateTimeFormatter.ofPattern(tsPattern)

    val perMinuteTS = if (ts1 == ts2) Vector(ts1) else {
      val ldt1 = LocalDateTime.parse(ts1, timeFormat)
      val ldt2 = LocalDateTime.parse(ts2, timeFormat)
      Iterator.iterate(ldt1.plusMinutes(1))(_.plusMinutes(1)).
        takeWhile(! _.isAfter(ldt2)).
        map(_.format(timeFormat)).
        toVector
    }

    val perMinuteAmt = for {
      i <- 1 to perMinuteTS.size
    } yield amt1 + ((amt2 - amt1) * i / perMinuteTS.size)

    perMinuteTS zip perMinuteAmt
}

Note that `Iterator.iterate(init)(next).takeWhile(condition)` in the UDF is just a functional version of the conventional `while-loop`.

With the UDF in place, we provide the function the timestamp pattern along with the previous/current timestamp pair and previous/current amount pair to produce a list of interpolated `timestamp-amount` pairs. The output will then be `flattened` using Spark's built-in `explode` function.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val df = Seq(
  (1, "2019-01-16 08:30:00", 100.0),
  (1, "2019-01-16 08:35:00", 110.0),
  (1, "2019-01-16 08:38:00", 120.0),
  (2, "2019-01-16 12:00:00", 200.0),
  (2, "2019-01-16 12:03:00", 210.0),
  (2, "2019-01-16 12:04:00", 220.0),
  (2, "2019-01-16 12:08:00", 230.0)
).toDF("id", "timestamp", "amount")

val tsPattern = "yyyy-MM-dd HH:mm:ss"
val win = Window.partitionBy($"id").orderBy($"timestamp")

df.
  withColumn("timestampPrev", when(row_number.over(win) === 1, $"timestamp").
    otherwise(lag($"timestamp", 1).over(win))
  ).
  withColumn("amountPrev", when(row_number.over(win) === 1, $"amount").
    otherwise(lag($"amount", 1).over(win))
  ).
  withColumn("interpolatedList",
    tsInterpolate(tsPattern)($"timestampPrev", $"timestamp", $"amountPrev", $"amount")
  ).
  withColumn("interpolated", explode($"interpolatedList")).
  select(
    $"id", $"interpolated._1".as("timestamp"), round($"interpolated._2", 2).as("amount")
  ).
  show
+---+-------------------+------+
| id|          timestamp|amount|
+---+-------------------+------+
|  1|2019-01-16 08:30:00| 100.0|
|  1|2019-01-16 08:31:00| 102.0|
|  1|2019-01-16 08:32:00| 104.0|
|  1|2019-01-16 08:33:00| 106.0|
|  1|2019-01-16 08:34:00| 108.0|
|  1|2019-01-16 08:35:00| 110.0|
|  1|2019-01-16 08:36:00|113.33|
|  1|2019-01-16 08:37:00|116.67|
|  1|2019-01-16 08:38:00| 120.0|
|  2|2019-01-16 12:00:00| 200.0|
|  2|2019-01-16 12:01:00|203.33|
|  2|2019-01-16 12:02:00|206.67|
|  2|2019-01-16 12:03:00| 210.0|
|  2|2019-01-16 12:04:00| 220.0|
|  2|2019-01-16 12:05:00| 222.5|
|  2|2019-01-16 12:06:00| 225.0|
|  2|2019-01-16 12:07:00| 227.5|
|  2|2019-01-16 12:08:00| 230.0|
+---+-------------------+------+