Scala On Spark – Cumulative Pivot Sum

In a couple of recent R&D projects, I was using Apache Spark rather extensively to address some data processing needs on Hadoop clusters. Although there is an abundance of big data processing platforms these days, it didn’t take long for me to settle on Spark. One of the main reasons is that the programming language for the R&D is Scala, which is what Spark itself is written in. In particular, Spark’s inherent support for functional programming and compositional transformations on immutable data enables high performance at scale as well as readability. Other main reasons are very much in line with some of the key factors attributing to Spark’s rising popularity.

I’m starting a mini blog series on Scala-on-Spark (SoS) with each blog post demonstrating with some Scala programming example on Apache Spark. In the blog series, I’m going to illustrate how the functionality-rich SoS is able to resolve some non-trivial data processing problems with seemingly little effort. If nothing else, they are good brain-teasing programming exercise in Scala on Spark.

As the source data for the example, let’s consider a minuscule set of weather data stored in a DataFrame, which consists of the following columns:

  • Weather Station ID
  • Start Date of a half-month period
  • Temperature High (in Fahrenheit) over the period
  • Temperature Low (in Fahrenheit) over the period
  • Total Precipitation (in inches) over the period

Note that with a properly configured Spark cluster, the methods illustrated in the following example can be readily adapted to handle much more granular data at scale – e.g. down to sub-hourly weather data from tens of thousands of weather stations. It’s also worth mentioning that there can be other ways to solve the problems presented in the examples.

For illustration purpose, the following code snippets are executed on a Spark Shell. First thing is to generate a DataFrame with the said columns of sample data, which will be used as source data for this example and a couple following ones.

import java.sql.Date

val weatherDataDF = Seq(
  (100, Date.valueOf("2017-07-01"), 75, 59, 0.0),
  (100, Date.valueOf("2017-07-16"), 77, 59, 0.5),
  (100, Date.valueOf("2017-08-01"), 80, 63, 1.0),
  (100, Date.valueOf("2017-08-16"), 78, 62, 1.0),
  (100, Date.valueOf("2017-09-01"), 74, 59, 0.0),
  (100, Date.valueOf("2017-09-16"), 72, 57, 0.0),
  (100, Date.valueOf("2017-10-01"), 68, 54, 0.0),
  (100, Date.valueOf("2017-10-16"), 66, 54, 0.0),
  (100, Date.valueOf("2017-11-01"), 64, 50, 0.5),
  (100, Date.valueOf("2017-11-16"), 61, 48, 1.0),
  (100, Date.valueOf("2017-12-01"), 59, 46, 2.0),
  (100, Date.valueOf("2017-12-16"), 57, 45, 1.5),
  (115, Date.valueOf("2017-07-01"), 76, 57, 0.0),
  (115, Date.valueOf("2017-07-16"), 76, 56, 1.0),
  (115, Date.valueOf("2017-08-01"), 78, 57, 0.0),
  (115, Date.valueOf("2017-08-16"), 81, 57, 0.0),
  (115, Date.valueOf("2017-09-01"), 77, 54, 0.0),
  (115, Date.valueOf("2017-09-16"), 72, 50, 0.0),
  (115, Date.valueOf("2017-10-01"), 65, 45, 0.0),
  (115, Date.valueOf("2017-10-16"), 59, 40, 1.5),
  (115, Date.valueOf("2017-11-01"), 55, 37, 1.0),
  (115, Date.valueOf("2017-11-16"), 52, 35, 2.0),
  (115, Date.valueOf("2017-12-01"), 45, 30, 3.0),
  (115, Date.valueOf("2017-12-16"), 41, 28, 1.5)
).toDF("station", "start_date", "temp_high", "temp_low", "total_precip")

In this first example, the goal is to generate a table of cumulative precipitation by weather stations in month-by-month columns. By ‘cumulative sum’, it means the monthly precipitation will be cumulated from one month over to the next one (i.e. rolling sum). In other words, if July’s precipitation is 2 inches and August’s is 1 inch, the figure for August will be 3 inches. The result should look like the following table:

+-------+-------+-------+-------+-------+-------+-------+
|station|2017-07|2017-08|2017-09|2017-10|2017-11|2017-12|
+-------+-------+-------+-------+-------+-------+-------+
|    100|    0.5|    2.5|    2.5|    2.5|    4.0|    7.5|
|    115|    1.0|    1.0|    1.0|    2.5|    5.5|   10.0|
+-------+-------+-------+-------+-------+-------+-------+

First, we transform the original DataFrame to include an additional year-month column, followed by using Spark’s groupBy, pivot and agg methods to generate the pivot table.

import org.apache.spark.sql.functions._

val monthlyData = weatherDataDF.
  withColumn("year_mo", concat(
    year($"start_date"), lit("-"), lpad(month($"start_date"), 2, "0")
  )).
  groupBy("station").pivot("year_mo")
  
val monthlyPrecipDF = monthlyData.agg(sum($"total_precip"))

monthlyPrecipDF.show
// +-------+-------+-------+-------+-------+-------+-------+
// |station|2017-07|2017-08|2017-09|2017-10|2017-11|2017-12|
// +-------+-------+-------+-------+-------+-------+-------+
// |    115|    1.0|    0.0|    0.0|    1.5|    3.0|    4.5|
// |    100|    0.5|    2.0|    0.0|    0.0|    1.5|    3.5|
// +-------+-------+-------+-------+-------+-------+-------+

Next, we assemble a list of the year-month columns and traverse the list using method foldLeft, which is one of the most versatile Scala functions for custom iterative transformations. In this particular case, the data to be transformed by foldLeft is a tuple of (DataFrame, Double). Normally, transforming the DataFrame alone should suffice, but in this case we need an additional value to address to rolling cumulation requirement.

The tuple’s first DataFrame-type element, with monthlyPrecipDF as its initial value, will be transformed using the binary operator function specified as foldLeft’s second argument (i.e. (acc, c) => …). As for the tuple’s second Double-type element, with the first year-month as its initial value it’s for carrying the current month value over to the next iteration. The end result is a (DataFrame, Double) tuple successively transformed month-by-month.

val yearMonths = monthlyPrecipDF.columns.filter(_ != "station")

val cumulativePrecipDF = yearMonths.drop(1).
  foldLeft((monthlyPrecipDF, yearMonths.head))( (acc, c) =>
    ( acc._1.withColumn(c, col(acc._2) + col(c)), c )
)._1

cumulativePrecipDF.show
// +-------+-------+-------+-------+-------+-------+-------+
// |station|2017-07|2017-08|2017-09|2017-10|2017-11|2017-12|
// +-------+-------+-------+-------+-------+-------+-------+
// |    115|    1.0|    1.0|    1.0|    2.5|    5.5|   10.0|
// |    100|    0.5|    2.5|    2.5|    2.5|    4.0|    7.5|
// +-------+-------+-------+-------+-------+-------+-------+

Similar pivot aggregations can be applied to temperature high’s/low’s as well, with method sum replaced with method max/min.

val monthlyHighDF = monthlyData.agg(max($"temp_high").as("high"))

monthlyHighDF.show
// +-------+-------+-------+-------+-------+-------+-------+
// |station|2017-07|2017-08|2017-09|2017-10|2017-11|2017-12|
// +-------+-------+-------+-------+-------+-------+-------+
// |    115|     76|     81|     77|     65|     55|     45|
// |    100|     77|     80|     74|     68|     64|     59|
// +-------+-------+-------+-------+-------+-------+-------+

val monthlyLowDF = monthlyData.agg(min($"temp_low").as("low"))

monthlyLowDF.show
// +-------+-------+-------+-------+-------+-------+-------+
// |station|2017-07|2017-08|2017-09|2017-10|2017-11|2017-12|
// +-------+-------+-------+-------+-------+-------+-------+
// |    115|     56|     57|     50|     40|     35|     28|
// |    100|     59|     62|     57|     54|     48|     45|
// +-------+-------+-------+-------+-------+-------+-------+

Finally, we compute cumulative temperature high/low like cumulative precipitation, by replacing method sum with iterative max/min using Spark’s when-otherwise method.

val cumulativeHighDF = yearMonths.drop(1).
  foldLeft((monthlyHighDF, yearMonths.head))( (acc, c) =>
    ( acc._1.withColumn(c, when(col(acc._2) > col(c), col(acc._2)).
      otherwise(col(c))), c )
)._1

cumulativeHighDF.show
// +-------+-------+-------+-------+-------+-------+-------+
// |station|2017-07|2017-08|2017-09|2017-10|2017-11|2017-12|
// +-------+-------+-------+-------+-------+-------+-------+
// |    115|     76|     81|     81|     81|     81|     81|
// |    100|     77|     80|     80|     80|     80|     80|
// +-------+-------+-------+-------+-------+-------+-------+

val cumulativeLowDF = yearMonths.drop(1).
  foldLeft((monthlyLowDF, yearMonths.head))( (acc, c) =>
    ( acc._1.withColumn(c, when(col(acc._2) < col(c), col(acc._2)).
      otherwise(col(c))), c )
)._1

cumulativeLowDF.show
// +-------+-------+-------+-------+-------+-------+-------+
// |station|2017-07|2017-08|2017-09|2017-10|2017-11|2017-12|
// +-------+-------+-------+-------+-------+-------+-------+
// |    115|     56|     56|     50|     40|     35|     28|
// |    100|     59|     59|     57|     54|     48|     45|
// +-------+-------+-------+-------+-------+-------+-------+

5 thoughts on “Scala On Spark – Cumulative Pivot Sum

  1. Pingback: Scala On Spark – Sum Over Periods | Genuine Blog

  2. Pingback: Scala On Spark – Streak | Genuine Blog

  3. Pingback: Scala On Spark – Word-pair Count | Genuine Blog

  4. Pingback: Spark – Interpolating Time Series Data | Genuine Blog

Leave a Reply

Your email address will not be published. Required fields are marked *