When analyzing time series activity data, it’s often useful to group the chronological activities into “target”-based sessions. These “targets” could be products, events, web pages, etc.
Using a simplified time series log of web page activities, we’ll look at how web page-based sessions can be created in this blog post.
Let’s say we have a log of chronological web page activities as shown below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
+----+-------------------+--------+-----------+ |user| timestamp| page| action| +----+-------------------+--------+-----------+ | 101|2018-06-01 10:30:15| home| redirect| | 101|2018-06-01 10:31:00| home| info-click| | 101|2018-06-01 10:33:00| home| info-click| | 101|2018-06-01 10:33:45|products| visit| | 101|2018-06-01 10:35:00|products| info-click| | 101|2018-06-01 10:50:30|products| info-click| | 101|2018-06-01 11:40:15|about-us| visit| | 101|2018-06-01 11:41:00|about-us| info-click| | 101|2018-06-01 15:00:45| home| visit| | 101|2018-06-01 15:02:00| home| info-click| | ...| ... | ...| ...| +----+-------------------+--------+-----------+ |
And let’s say we want to group the log data by web page to generate user-defined sessions with format userID-#
, where # is a monotonically increasing integer, like below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
+----+-------------------+--------+-----------+-------+ |user| timestamp| page| action|sess_id| +----+-------------------+--------+-----------+-------+ | 101|2018-06-01 10:30:15| home| redirect| 101-1| | 101|2018-06-01 10:31:00| home| info-click| 101-1| | 101|2018-06-01 10:33:00| home| info-click| 101-1| | 101|2018-06-01 10:33:45|products| visit| 101-2| | 101|2018-06-01 10:35:00|products| info-click| 101-2| | 101|2018-06-01 10:50:30|products| info-click| 101-2| | 101|2018-06-01 11:40:15|about-us| visit| 101-3| | 101|2018-06-01 11:41:00|about-us| info-click| 101-3| | 101|2018-06-01 15:00:45| home| visit| 101-4| | 101|2018-06-01 15:02:00| home| info-click| 101-4| | ...| ... | ...| ...| ...| +----+-------------------+--------+-----------+-------+ |
The first thing that pops up in one’s mind might be to perform a groupBy(user, page)
or a Window partitionBy(user, page)
. But that wouldn’t work since doing so would disregard time gaps between the same page, resulting in all rows with the same page grouped together under a given user.
First thing first, let’s assemble a DataFrame with some sample web page activity data:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
val df = Seq( (101, "2018-06-01 10:30:15", "home", "redirect"), (101, "2018-06-01 10:32:00", "home", "info-click"), (101, "2018-06-01 10:35:00", "home", "info-click"), (101, "2018-06-01 11:00:45", "products", "visit"), (101, "2018-06-01 11:12:00", "products", "info-click"), (101, "2018-06-01 11:25:30", "products", "info-click"), (101, "2018-06-01 11:38:15", "about-us", "info-click"), (101, "2018-06-01 11:50:00", "about-us", "info-click"), (101, "2018-06-01 12:01:45", "home", "visit"), (101, "2018-06-01 12:04:00", "home", "info-click"), (101, "2018-06-01 20:02:45", "home", "visit"), (101, "2018-06-01 20:40:00", "products", "info-click"), (101, "2018-06-01 20:46:30", "products", "info-click"), (101, "2018-06-01 20:50:15", "products", "add-to-cart"), (220, "2018-06-01 18:15:30", "home", "redirect"), (220, "2018-06-01 18:17:00", "home", "info-click"), (220, "2018-06-01 18:40:45", "home", "info-click"), (220, "2018-06-01 18:52:30", "home", "info-click"), (220, "2018-06-01 19:04:45", "products", "info-click"), (220, "2018-06-01 19:17:00", "products", "info-click"), (220, "2018-06-01 19:30:30", "products", "info-click"), (220, "2018-06-01 19:42:30", "products", "info-click"), (220, "2018-06-01 19:45:30", "products", "add-to-cart") ).toDF("user", "timestamp", "page", "action") |
The solution to be presented here involves a few steps:
- Generate a new column
first_ts
which, for each user, has the value oftimestamp
in the current row if thepage
value is different from that in the previous row; otherwisenull
. - Backfill all the
null
s infirst_ts
with thelast
non-null value via Window functionlast()
and store in the new columnsess_ts
. - Assemble session IDs by concatenating
user
and thedense_rank
ofsess_ts
within eachuser
partition.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window import spark.implicits._ val winUserTS = Window.partitionBy($"user").orderBy("timestamp") val winUserSess = Window.partitionBy($"user").orderBy("sess_ts") df. withColumn( "first_ts", when( row_number.over(winUserTS) === 1 || $"page" =!= lag($"page", 1).over(winUserTS), $"timestamp" ) ). withColumn( "sess_ts", last($"first_ts", ignoreNulls = true).over( winUserTS.rowsBetween(Window.unboundedPreceding, 0) ) ). withColumn( "session_id", concat($"user", lit("-"), dense_rank.over(winUserSess)) ). show(50) +----+-------------------+--------+-----------+-------------------+-------------------+----------+ |user| timestamp| page| action| first_ts| sess_ts|session_id| +----+-------------------+--------+-----------+-------------------+-------------------+----------+ | 101|2018-06-01 10:30:15| home| redirect|2018-06-01 10:30:15|2018-06-01 10:30:15| 101-1| | 101|2018-06-01 10:32:00| home| info-click| null|2018-06-01 10:30:15| 101-1| | 101|2018-06-01 10:35:00| home| info-click| null|2018-06-01 10:30:15| 101-1| | 101|2018-06-01 11:00:45|products| visit|2018-06-01 11:00:45|2018-06-01 11:00:45| 101-2| | 101|2018-06-01 11:12:00|products| info-click| null|2018-06-01 11:00:45| 101-2| | 101|2018-06-01 11:25:30|products| info-click| null|2018-06-01 11:00:45| 101-2| | 101|2018-06-01 11:38:15|about-us| info-click|2018-06-01 11:38:15|2018-06-01 11:38:15| 101-3| | 101|2018-06-01 11:50:00|about-us| info-click| null|2018-06-01 11:38:15| 101-3| | 101|2018-06-01 12:01:45| home| visit|2018-06-01 12:01:45|2018-06-01 12:01:45| 101-4| | 101|2018-06-01 12:04:00| home| info-click| null|2018-06-01 12:01:45| 101-4| | 101|2018-06-01 20:02:45| home| visit| null|2018-06-01 12:01:45| 101-4| | 101|2018-06-01 20:40:00|products| info-click|2018-06-01 20:40:00|2018-06-01 20:40:00| 101-5| | 101|2018-06-01 20:46:30|products| info-click| null|2018-06-01 20:40:00| 101-5| | 101|2018-06-01 20:50:15|products|add-to-cart| null|2018-06-01 20:40:00| 101-5| | 220|2018-06-01 18:15:30| home| redirect|2018-06-01 18:15:30|2018-06-01 18:15:30| 220-1| | 220|2018-06-01 18:17:00| home| info-click| null|2018-06-01 18:15:30| 220-1| | 220|2018-06-01 18:40:45| home| info-click| null|2018-06-01 18:15:30| 220-1| | 220|2018-06-01 18:52:30| home| info-click| null|2018-06-01 18:15:30| 220-1| | 220|2018-06-01 19:04:45|products| info-click|2018-06-01 19:04:45|2018-06-01 19:04:45| 220-2| | 220|2018-06-01 19:17:00|products| info-click| null|2018-06-01 19:04:45| 220-2| | 220|2018-06-01 19:30:30|products| info-click| null|2018-06-01 19:04:45| 220-2| | 220|2018-06-01 19:42:30|products| info-click| null|2018-06-01 19:04:45| 220-2| | 220|2018-06-01 19:45:30|products|add-to-cart| null|2018-06-01 19:04:45| 220-2| +----+-------------------+--------+-----------+-------------------+-------------------+----------+ |
Note that the final output includes all the intermediate columns (i.e. first_ts
and sess_ts
) for demonstration purpose.
Pingback: Spark – Custom Timeout Sessions | Genuine Blog
I think 6th line should be
val winUserSess = Window.partitionBy($”user”).orderBy(“timestamp”) instead of
val winUserSess = Window.partitionBy($”user”).orderBy(“sess_ts”)
Thanks for the comment, deeksha. We want the generated
session_id
values to correspond to thesess_ts
values such that rows for a given user with samesess_ts
have samesession_id
. Ordering bysess_ts
in the 2nd Window spec is specifically for Window functiondense_rank
to generate thesession_id
as ${user}-${rank} to fulfill the requirement, whereas ordering bytimestamp
would generate distinct ranks for the samesess_ts
.