Monthly Archives: March 2019

Spark – Time Series Sessions

When analyzing time series activity data, it’s often useful to group the chronological activities into “target”-based sessions. These “targets” could be products, events, web pages, etc.

Using a simplified time series log of web page activities, we’ll look at how web page-based sessions can be created in this blog post.

Let’s say we have a log of chronological web page activities as shown below:

+----+-------------------+--------+-----------+
|user|          timestamp|    page|     action|
+----+-------------------+--------+-----------+
| 101|2018-06-01 10:30:15|    home|   redirect|
| 101|2018-06-01 10:31:00|    home| info-click|
| 101|2018-06-01 10:33:00|    home| info-click|
| 101|2018-06-01 10:33:45|products|      visit|
| 101|2018-06-01 10:35:00|products| info-click|
| 101|2018-06-01 10:50:30|products| info-click|
| 101|2018-06-01 11:40:15|about-us|      visit|
| 101|2018-06-01 11:41:00|about-us| info-click|
| 101|2018-06-01 15:00:45|    home|      visit|
| 101|2018-06-01 15:02:00|    home| info-click|
| ...|           ...     |     ...|        ...|
+----+-------------------+--------+-----------+

And let’s say we want to group the log data by web page to generate user-defined sessions with format `userID-#`, where # is a monotonically increasing integer, like below:

+----+-------------------+--------+-----------+-------+
|user|          timestamp|    page|     action|sess_id|
+----+-------------------+--------+-----------+-------+
| 101|2018-06-01 10:30:15|    home|   redirect|  101-1|
| 101|2018-06-01 10:31:00|    home| info-click|  101-1|
| 101|2018-06-01 10:33:00|    home| info-click|  101-1|
| 101|2018-06-01 10:33:45|products|      visit|  101-2|
| 101|2018-06-01 10:35:00|products| info-click|  101-2|
| 101|2018-06-01 10:50:30|products| info-click|  101-2|
| 101|2018-06-01 11:40:15|about-us|      visit|  101-3|
| 101|2018-06-01 11:41:00|about-us| info-click|  101-3|
| 101|2018-06-01 15:00:45|    home|      visit|  101-4|
| 101|2018-06-01 15:02:00|    home| info-click|  101-4|
| ...|           ...     |     ...|        ...|    ...|
+----+-------------------+--------+-----------+-------+

The first thing that pops up in one’s mind might be to perform a `groupBy(user, page)` or a Window `partitionBy(user, page)`. But that wouldn’t work since doing so would disregard time gaps between the same page, resulting in all rows with the same page grouped together under a given user.

First thing first, let’s assemble a DataFrame with some sample web page activity data:

val df = Seq(
  (101, "2018-06-01 10:30:15", "home", "redirect"),
  (101, "2018-06-01 10:32:00", "home", "info-click"),
  (101, "2018-06-01 10:35:00", "home", "info-click"),
  (101, "2018-06-01 11:00:45", "products", "visit"),
  (101, "2018-06-01 11:12:00", "products", "info-click"),
  (101, "2018-06-01 11:25:30", "products", "info-click"),
  (101, "2018-06-01 11:38:15", "about-us", "info-click"),
  (101, "2018-06-01 11:50:00", "about-us", "info-click"),
  (101, "2018-06-01 12:01:45", "home", "visit"),
  (101, "2018-06-01 12:04:00", "home", "info-click"),
  (101, "2018-06-01 20:02:45", "home", "visit"),
  (101, "2018-06-01 20:40:00", "products", "info-click"),
  (101, "2018-06-01 20:46:30", "products", "info-click"),
  (101, "2018-06-01 20:50:15", "products", "add-to-cart"),
  (220, "2018-06-01 18:15:30", "home", "redirect"),
  (220, "2018-06-01 18:17:00", "home", "info-click"),
  (220, "2018-06-01 18:40:45", "home", "info-click"),
  (220, "2018-06-01 18:52:30", "home", "info-click"),
  (220, "2018-06-01 19:04:45", "products", "info-click"),
  (220, "2018-06-01 19:17:00", "products", "info-click"),
  (220, "2018-06-01 19:30:30", "products", "info-click"),
  (220, "2018-06-01 19:42:30", "products", "info-click"),
  (220, "2018-06-01 19:45:30", "products", "add-to-cart")
).toDF("user", "timestamp", "page", "action")

The solution to be presented here involves a few steps:

  1. Generate a new column `first_ts` which, for each user, has the value of `timestamp` in the current row if the `page` value is different from that in the previous row; otherwise `null`.
  2. Backfill all the `null`s in `first_ts` with the `last` non-null value via Window function `last()` and store in the new column `sess_ts`.
  3. Assemble session IDs by concatenating `user` and the `dense_rank` of `sess_ts` within each `user` partition.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val winUserTS = Window.partitionBy($"user").orderBy("timestamp")
val winUserSess = Window.partitionBy($"user").orderBy("sess_ts")

df.
  withColumn( "first_ts", when( row_number.over(winUserTS) === 1 ||
      $"page" =!= lag($"page", 1).over(winUserTS), $"timestamp"
    ) ).
  withColumn( "sess_ts", last($"first_ts", ignoreNulls = true).over(
      winUserTS.rowsBetween(Window.unboundedPreceding, 0)
    ) ).
  withColumn( "session_id", concat($"user", lit("-"), dense_rank.over(winUserSess)) ).
  show(50)
+----+-------------------+--------+-----------+-------------------+-------------------+----------+
|user|          timestamp|    page|     action|           first_ts|            sess_ts|session_id|
+----+-------------------+--------+-----------+-------------------+-------------------+----------+
| 101|2018-06-01 10:30:15|    home|   redirect|2018-06-01 10:30:15|2018-06-01 10:30:15|     101-1|
| 101|2018-06-01 10:32:00|    home| info-click|               null|2018-06-01 10:30:15|     101-1|
| 101|2018-06-01 10:35:00|    home| info-click|               null|2018-06-01 10:30:15|     101-1|
| 101|2018-06-01 11:00:45|products|      visit|2018-06-01 11:00:45|2018-06-01 11:00:45|     101-2|
| 101|2018-06-01 11:12:00|products| info-click|               null|2018-06-01 11:00:45|     101-2|
| 101|2018-06-01 11:25:30|products| info-click|               null|2018-06-01 11:00:45|     101-2|
| 101|2018-06-01 11:38:15|about-us| info-click|2018-06-01 11:38:15|2018-06-01 11:38:15|     101-3|
| 101|2018-06-01 11:50:00|about-us| info-click|               null|2018-06-01 11:38:15|     101-3|
| 101|2018-06-01 12:01:45|    home|      visit|2018-06-01 12:01:45|2018-06-01 12:01:45|     101-4|
| 101|2018-06-01 12:04:00|    home| info-click|               null|2018-06-01 12:01:45|     101-4|
| 101|2018-06-01 20:02:45|    home|      visit|               null|2018-06-01 12:01:45|     101-4|
| 101|2018-06-01 20:40:00|products| info-click|2018-06-01 20:40:00|2018-06-01 20:40:00|     101-5|
| 101|2018-06-01 20:46:30|products| info-click|               null|2018-06-01 20:40:00|     101-5|
| 101|2018-06-01 20:50:15|products|add-to-cart|               null|2018-06-01 20:40:00|     101-5|
| 220|2018-06-01 18:15:30|    home|   redirect|2018-06-01 18:15:30|2018-06-01 18:15:30|     220-1|
| 220|2018-06-01 18:17:00|    home| info-click|               null|2018-06-01 18:15:30|     220-1|
| 220|2018-06-01 18:40:45|    home| info-click|               null|2018-06-01 18:15:30|     220-1|
| 220|2018-06-01 18:52:30|    home| info-click|               null|2018-06-01 18:15:30|     220-1|
| 220|2018-06-01 19:04:45|products| info-click|2018-06-01 19:04:45|2018-06-01 19:04:45|     220-2|
| 220|2018-06-01 19:17:00|products| info-click|               null|2018-06-01 19:04:45|     220-2|
| 220|2018-06-01 19:30:30|products| info-click|               null|2018-06-01 19:04:45|     220-2|
| 220|2018-06-01 19:42:30|products| info-click|               null|2018-06-01 19:04:45|     220-2|
| 220|2018-06-01 19:45:30|products|add-to-cart|               null|2018-06-01 19:04:45|     220-2|
+----+-------------------+--------+-----------+-------------------+-------------------+----------+

Note that the final output includes all the intermediate columns (i.e. `first_ts` and `sess_ts`) for demonstration purpose.