Like many software professionals, I often resort to Stack Overflow (through search engines) when looking for clues to solve programming problems at hand. Besides looking for programming solution ideas, when I find some free time I would also visit the site to help answer questions posted there. Occasionally, some of the more interesting question topics would trigger me to expand them into blog posts. In fact, a good chunk of the content in my Scala-on-Spark (SoS) mini blog series was from selected Stack Overflow questions I’ve answered in the past. So is the topic being tackled in this blog post.
Suppose we have time-series data with time gaps among the chronological timestamps like below:
+---+-------------------+------+ | id| timestamp|amount| +---+-------------------+------+ | 1|2019-01-16 08:30:00| 100.0| | 1|2019-01-16 08:35:00| 110.0| | 1|2019-01-16 08:38:00| 120.0| | 2|2019-01-16 12:00:00| 200.0| | 2|2019-01-16 12:03:00| 210.0| | 2|2019-01-16 12:04:00| 220.0| | 2|2019-01-16 12:08:00| 230.0| +---+-------------------+------+
Our goal is to expand column `timestamp` into per-minute timestamps and column `amount` into linearly interpolated values like below:
+---+-------------------+------+ | id| timestamp|amount| +---+-------------------+------+ | 1|2019-01-16 08:30:00| 100.0| | 1|2019-01-16 08:31:00| 102.0| | 1|2019-01-16 08:32:00| 104.0| | 1|2019-01-16 08:33:00| 106.0| | 1|2019-01-16 08:34:00| 108.0| | 1|2019-01-16 08:35:00| 110.0| | 1|2019-01-16 08:36:00|113.33| | 1|2019-01-16 08:37:00|116.67| | 1|2019-01-16 08:38:00| 120.0| |...| ... | ...| +---+-------------------+------+
There are different ways to solve interpolation problems. Since timestamps can be represented as `Long` values (i.e. Unix time), it might make sense to consider using method spark.range to create a time series of contiguous timestamps and left-join with the dataset at hand. The catch, though, is that the method applies to the entire dataset (as opposed to per-group) and requires the `start` and `end` of the timestamp range as its parameters that might not be known in advance.
A more flexible approach would be to use a UDF (user-defined function) for custom data manipulation, though at the expense of potential performance degradation (since built-in Spark APIs that leverage Spark’s optimization engine generally scale better than UDFs). For more details about native functions versus UDFs in Spark, check out this blog post.
Nevertheless, the solution being proposed here involves using a UDF which, for each row, takes values of `timestamp` and `amount` in both the current row and previous row as parameters, and returns a list of interpolated `(timestamp, amount)` Tuples. Using the java.time API, the previous and current String-type timestamps will be converted into a LocalDateTime range to be linearly interpolated.
def tsInterpolate(tsPattern: String) = udf{ (ts1: String, ts2: String, amt1: Double, amt2: Double) => import java.time.LocalDateTime import java.time.format.DateTimeFormatter val timeFormat = DateTimeFormatter.ofPattern(tsPattern) val perMinuteTS = if (ts1 == ts2) Vector(ts1) else { val ldt1 = LocalDateTime.parse(ts1, timeFormat) val ldt2 = LocalDateTime.parse(ts2, timeFormat) Iterator.iterate(ldt1.plusMinutes(1))(_.plusMinutes(1)). takeWhile(! _.isAfter(ldt2)). map(_.format(timeFormat)). toVector } val perMinuteAmt = for { i <- 1 to perMinuteTS.size } yield amt1 + ((amt2 - amt1) * i / perMinuteTS.size) perMinuteTS zip perMinuteAmt }
Note that `Iterator.iterate(init)(next).takeWhile(condition)` in the UDF is just a functional version of the conventional `while-loop`.
With the UDF in place, we provide the function the timestamp pattern along with the previous/current timestamp pair and previous/current amount pair to produce a list of interpolated `timestamp-amount` pairs. The output will then be `flattened` using Spark's built-in `explode` function.
import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window import spark.implicits._ val df = Seq( (1, "2019-01-16 08:30:00", 100.0), (1, "2019-01-16 08:35:00", 110.0), (1, "2019-01-16 08:38:00", 120.0), (2, "2019-01-16 12:00:00", 200.0), (2, "2019-01-16 12:03:00", 210.0), (2, "2019-01-16 12:04:00", 220.0), (2, "2019-01-16 12:08:00", 230.0) ).toDF("id", "timestamp", "amount") val tsPattern = "yyyy-MM-dd HH:mm:ss" val win = Window.partitionBy($"id").orderBy($"timestamp") df. withColumn("timestampPrev", when(row_number.over(win) === 1, $"timestamp"). otherwise(lag($"timestamp", 1).over(win)) ). withColumn("amountPrev", when(row_number.over(win) === 1, $"amount"). otherwise(lag($"amount", 1).over(win)) ). withColumn("interpolatedList", tsInterpolate(tsPattern)($"timestampPrev", $"timestamp", $"amountPrev", $"amount") ). withColumn("interpolated", explode($"interpolatedList")). select( $"id", $"interpolated._1".as("timestamp"), round($"interpolated._2", 2).as("amount") ). show +---+-------------------+------+ | id| timestamp|amount| +---+-------------------+------+ | 1|2019-01-16 08:30:00| 100.0| | 1|2019-01-16 08:31:00| 102.0| | 1|2019-01-16 08:32:00| 104.0| | 1|2019-01-16 08:33:00| 106.0| | 1|2019-01-16 08:34:00| 108.0| | 1|2019-01-16 08:35:00| 110.0| | 1|2019-01-16 08:36:00|113.33| | 1|2019-01-16 08:37:00|116.67| | 1|2019-01-16 08:38:00| 120.0| | 2|2019-01-16 12:00:00| 200.0| | 2|2019-01-16 12:01:00|203.33| | 2|2019-01-16 12:02:00|206.67| | 2|2019-01-16 12:03:00| 210.0| | 2|2019-01-16 12:04:00| 220.0| | 2|2019-01-16 12:05:00| 222.5| | 2|2019-01-16 12:06:00| 225.0| | 2|2019-01-16 12:07:00| 227.5| | 2|2019-01-16 12:08:00| 230.0| +---+-------------------+------+