In the previous blog post, we saw how one could partition a time series log of web activities into web page-based sessions. Operating on the same original dataset, we’re going to generate sessions based on a different set of rules.
Rather than web page-based, sessions are defined with the following rules:
- A session expires after inactivity of a timeout period (say `tmo1`), and,
- An active session expires after a timeout period (say `tmo2`).
First, we assemble the original sample dataset used in the previous blog:
val df = Seq( (101, "2018-06-01 10:30:15", "home", "redirect"), (101, "2018-06-01 10:32:00", "home", "info-click"), (101, "2018-06-01 10:35:00", "home", "info-click"), (101, "2018-06-01 11:00:45", "products", "visit"), (101, "2018-06-01 11:12:00", "products", "info-click"), (101, "2018-06-01 11:25:30", "products", "info-click"), (101, "2018-06-01 11:38:15", "about-us", "info-click"), (101, "2018-06-01 11:50:00", "about-us", "info-click"), (101, "2018-06-01 12:01:45", "home", "visit"), (101, "2018-06-01 12:04:00", "home", "info-click"), (101, "2018-06-01 20:02:45", "home", "visit"), (101, "2018-06-01 20:40:00", "products", "info-click"), (101, "2018-06-01 20:46:30", "products", "info-click"), (101, "2018-06-01 20:50:15", "products", "add-to-cart"), (220, "2018-06-01 18:15:30", "home", "redirect"), (220, "2018-06-01 18:17:00", "home", "info-click"), (220, "2018-06-01 18:40:45", "home", "info-click"), (220, "2018-06-01 18:52:30", "home", "info-click"), (220, "2018-06-01 19:04:45", "products", "info-click"), (220, "2018-06-01 19:17:00", "products", "info-click"), (220, "2018-06-01 19:30:30", "products", "info-click"), (220, "2018-06-01 19:42:30", "products", "info-click"), (220, "2018-06-01 19:45:30", "products", "add-to-cart") ).toDF("user", "timestamp", "page", "action")
Let’s set the first timeout `tmo1` to 15 minutes, and the second timeout `tmo2` to 60 minutes.
The end result should look something like below:
+----+-------------------+-------+ |user| timestamp|sess_id| +----+-------------------+-------+ | 101|2018-06-01 10:30:15| 101-1| | 101|2018-06-01 10:32:00| 101-1| | 101|2018-06-01 10:35:00| 101-1| | 101|2018-06-01 11:00:45| 101-2| <-- timeout rule #1 | 101|2018-06-01 11:12:00| 101-2| | 101|2018-06-01 11:25:30| 101-2| | 101|2018-06-01 11:38:15| 101-2| | 101|2018-06-01 11:50:00| 101-2| | 101|2018-06-01 12:01:45| 101-3| <-- timeout rule #2 | 101|2018-06-01 12:04:00| 101-3| | ...| ... | ...| +----+-------------------+-------+
Given the above session creation rules, it’s obvious that all programming logic is going to be centered around the timestamp alone, hence the omission of columns like `page` in the expected final result.
Generating sessions based on rule #1 is rather straight forward as computing the timestamp difference between consecutive rows is easy with Spark built-in Window functions. As for session creation rule #2, it requires dynamically identifying the start of the next session that depends on where the current session ends. Hence, even robust Window functions over, say, `partitionBy(user).orderBy(timestamp).rangeBetween(0, tmo2)` wouldn’t cut it.
The solution to be suggested involves using a UDF (user-defined fucntion) to leverage Scala’s feature-rich set of functions:
def tmoSessList(tmo: Long) = udf{ (uid: String, tsList: Seq[String], tsDiffs: Seq[Long]) => def sid(n: Long) = s"$uid-$n" val sessList = tsDiffs.foldLeft( (List[String](), 0L, 0L) ){ case ((ls, j, k), i) => if (i == 0 || j + i >= tmo) (sid(k + 1) :: ls, 0L, k + 1) else (sid(k) :: ls, j + i, k) }._1.reverse tsList zip sessList }
Note that the timestamp diff list `tsDiffs` is the main input being processed for generating sessions based on the `tmo2` value (session create rule #2). The timestamp list `tsList` is being “passed thru” merely to be included in the output with each timestamp paired with the corresponding session ID.
Also note that the accumulator for `foldLeft` in the UDF is a Tuple of `(ls, j, k)`, where:
- `ls` is the list of formatted session IDs to be returned
- `j` and `k` are for carrying over the conditionally changing timestamp value and session id number, respectively, to the next iteration
Now, let’s lay out the steps for carrying out the necessary transformations to generate the sessions:
- Identify sessions (with 0 = start of a session) per user based on session creation rule #1
- Group the dataset to assemble the timestamp diff list per user
- Process the timestamp diff list via the above UDF to identify sessions based on rule #2 and generate all session IDs per user
- Expand the processed dataset which consists of the timestamp paired with the corresponding session IDs
Step 1:
import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window import spark.implicits._ val tmo1: Long = 15 * 60 val tmo2: Long = 60 * 60 val win1 = Window.partitionBy("user").orderBy("timestamp") val df1 = df. withColumn("ts_diff", unix_timestamp($"timestamp") - unix_timestamp(lag($"timestamp", 1).over(win1)) ). withColumn("ts_diff", when(row_number.over(win1) === 1 || $"ts_diff" >= tmo1, 0L). otherwise($"ts_diff") ) df1.show(50) +----+-------------------+--------+-----------+-------+ |user| timestamp| page| action|ts_diff| +----+-------------------+--------+-----------+-------+ | 101|2018-06-01 10:30:15| home| redirect| 0| | 101|2018-06-01 10:32:00| home| info-click| 105| | 101|2018-06-01 10:35:00| home| info-click| 180| | 101|2018-06-01 11:00:45|products| visit| 0| | 101|2018-06-01 11:12:00|products| info-click| 675| | 101|2018-06-01 11:25:30|products| info-click| 810| | 101|2018-06-01 11:38:15|about-us| info-click| 765| | 101|2018-06-01 11:50:00|about-us| info-click| 705| | 101|2018-06-01 12:01:45| home| visit| 705| | 101|2018-06-01 12:04:00| home| info-click| 135| | 101|2018-06-01 20:02:45| home| visit| 0| | 101|2018-06-01 20:40:00|products| info-click| 0| | 101|2018-06-01 20:46:30|products| info-click| 390| | 101|2018-06-01 20:50:15|products|add-to-cart| 225| | 220|2018-06-01 18:15:30| home| redirect| 0| | 220|2018-06-01 18:17:00| home| info-click| 90| | 220|2018-06-01 18:40:45| home| info-click| 0| | 220|2018-06-01 18:52:30| home| info-click| 705| | 220|2018-06-01 19:04:45|products| info-click| 735| | 220|2018-06-01 19:17:00|products| info-click| 735| | 220|2018-06-01 19:30:30|products| info-click| 810| | 220|2018-06-01 19:42:30|products| info-click| 720| | 220|2018-06-01 19:45:30|products|add-to-cart| 180| +----+-------------------+--------+-----------+-------+
Steps 2-4:
val df2 = df1. groupBy("user").agg( collect_list($"timestamp").as("ts_list"), collect_list($"ts_diff").as("ts_diffs") ). withColumn("tmo_sess_id", explode(tmoSessList(tmo2)($"user", $"ts_list", $"ts_diffs")) ). select($"user", $"tmo_sess_id._1".as("timestamp"), $"tmo_sess_id._2".as("sess_id")) df2.show(50) +----+-------------------+-------+ |user| timestamp|sess_id| +----+-------------------+-------+ | 101|2018-06-01 10:30:15| 101-1| User 101 | 101|2018-06-01 10:32:00| 101-1| | 101|2018-06-01 10:35:00| 101-1| | 101|2018-06-01 11:00:45| 101-2| <-- timeout rule #1 | 101|2018-06-01 11:12:00| 101-2| | 101|2018-06-01 11:25:30| 101-2| | 101|2018-06-01 11:38:15| 101-2| | 101|2018-06-01 11:50:00| 101-2| | 101|2018-06-01 12:01:45| 101-3| <-- timeout rule #2 | 101|2018-06-01 12:04:00| 101-3| | 101|2018-06-01 20:02:45| 101-4| <-- timeout rule #1 | 101|2018-06-01 20:40:00| 101-5| <-- timeout rule #1 | 101|2018-06-01 20:46:30| 101-5| | 101|2018-06-01 20:50:15| 101-5| | 220|2018-06-01 18:15:30| 220-1| User 220 | 220|2018-06-01 18:17:00| 220-1| | 220|2018-06-01 18:40:45| 220-2| <-- timeout rule #1 | 220|2018-06-01 18:52:30| 220-2| | 220|2018-06-01 19:04:45| 220-2| | 220|2018-06-01 19:17:00| 220-2| | 220|2018-06-01 19:30:30| 220-2| | 220|2018-06-01 19:42:30| 220-3| <-- timeout rule #2 | 220|2018-06-01 19:45:30| 220-3| +----+-------------------+-------+