This is yet another programming example in my Scala-on-Spark blog series. Again, while it starts with the same minuscule weather data used in previous examples of the blog series, it can be viewed as an independent programming exercise.
In this example, we’re going to create a table that shows the streaks of consecutive months with non-zero precipitation.
Result should be similar to the following:
+-------+-------+--------------+------+ |station|year_mo|monthly_precip|streak| +-------+-------+--------------+------+ | 100|2017-07| 0.5| 1| | 100|2017-08| 2.0| 2| | 100|2017-11| 1.5| 1| | 100|2017-12| 3.5| 2| | 115|2017-07| 1.0| 1| | 115|2017-10| 1.5| 1| | 115|2017-11| 3.0| 2| | 115|2017-12| 4.5| 3| +-------+-------+--------------+------+
We’ll explore using Spark’s window functions in this example. As a side note, some of the previous examples in the blog series could be resolved using window functions as well. By means of aggregating over partitioned sliding windows of data, Spark’s window functions readily perform certain kinds of complex aggregations which would otherwise require repetitive nested groupings. They are similar to how PostgreSQL’s window functions work.
Now, let’s load up the same old minuscule weather data.
import java.sql.Date // DataFrame columns: // Weather Station ID // Start Date of a half-month period // Temperature High (in Fahrenheit) over the period // Temperature Low (in Fahrenheit) over the period // Total Precipitation (in inches) over the period val weatherDataDF = Seq( (100, Date.valueOf("2017-07-01"), 75, 59, 0.0), (100, Date.valueOf("2017-07-16"), 77, 59, 0.5), (100, Date.valueOf("2017-08-01"), 80, 63, 1.0), (100, Date.valueOf("2017-08-16"), 78, 62, 1.0), (100, Date.valueOf("2017-09-01"), 74, 59, 0.0), (100, Date.valueOf("2017-09-16"), 72, 57, 0.0), (100, Date.valueOf("2017-10-01"), 68, 54, 0.0), (100, Date.valueOf("2017-10-16"), 66, 54, 0.0), (100, Date.valueOf("2017-11-01"), 64, 50, 0.5), (100, Date.valueOf("2017-11-16"), 61, 48, 1.0), (100, Date.valueOf("2017-12-01"), 59, 46, 2.0), (100, Date.valueOf("2017-12-16"), 57, 45, 1.5), (115, Date.valueOf("2017-07-01"), 76, 57, 0.0), (115, Date.valueOf("2017-07-16"), 76, 56, 1.0), (115, Date.valueOf("2017-08-01"), 78, 57, 0.0), (115, Date.valueOf("2017-08-16"), 81, 57, 0.0), (115, Date.valueOf("2017-09-01"), 77, 54, 0.0), (115, Date.valueOf("2017-09-16"), 72, 50, 0.0), (115, Date.valueOf("2017-10-01"), 65, 45, 0.0), (115, Date.valueOf("2017-10-16"), 59, 40, 1.5), (115, Date.valueOf("2017-11-01"), 55, 37, 1.0), (115, Date.valueOf("2017-11-16"), 52, 35, 2.0), (115, Date.valueOf("2017-12-01"), 45, 30, 3.0), (115, Date.valueOf("2017-12-16"), 41, 28, 1.5) ).toDF("station", "start_date", "temp_high", "temp_low", "total_precip")
First, create a DataFrame of precipitation by weather station and month and filter it to consist of only months with positive precipitation.
import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window val positivePrecipDF = weatherDataDF.groupBy($"station", last_day($"start_date").as("mo_end")). agg(sum($"total_precip").as("monthly_precip")). where($"monthly_precip" > 0) positivePrecipDF.orderBy($"station", $"mo_end").show // +-------+----------+--------------+ // |station| mo_end|monthly_precip| // +-------+----------+--------------+ // | 100|2017-07-31| 0.5| // | 100|2017-08-31| 2.0| // | 100|2017-11-30| 1.5| // | 100|2017-12-31| 3.5| // | 115|2017-07-31| 1.0| // | 115|2017-10-31| 1.5| // | 115|2017-11-30| 3.0| // | 115|2017-12-31| 4.5| // +-------+----------+--------------+
Next, using window function, we capture sequences of row numbers ordered by month over partitions by weather station. For each row, we then use an UDF to calculate the base date by dating back from the corresponding month of the row in accordance with the row number. As shown in the following table, these base dates help trace chunks of contiguous months back to their common base dates.
// UDF to calculate the last day of the month which is n months prior to the input date def baseDate = udf{ (d: java.sql.Date, n: Long) => val base = d.toLocalDate.plusMonths(-n) java.sql.Date.valueOf(base.withDayOfMonth(base.lengthOfMonth)) } val rankedDF = positivePrecipDF.withColumn("row_num", row_number.over( Window.partitionBy($"station").orderBy($"mo_end") )).withColumn( "baseDate", baseDate($"mo_end", $"row_num") ) rankedDF.show // +-------+----------+--------------+-------+----------+ // |station| mo_end|monthly_precip|row_num| baseDate| // +-------+----------+--------------+-------+----------+ // | 115|2017-07-31| 1.0| 1|2017-06-30| // | 115|2017-10-31| 1.5| 2|2017-08-31| // | 115|2017-11-30| 3.0| 3|2017-08-31| // | 115|2017-12-31| 4.5| 4|2017-08-31| // | 100|2017-07-31| 0.5| 1|2017-06-30| // | 100|2017-08-31| 2.0| 2|2017-06-30| // | 100|2017-11-30| 1.5| 3|2017-08-31| // | 100|2017-12-31| 3.5| 4|2017-08-31| // +-------+----------+--------------+-------+----------+
Finally, we apply another row-number window function, but this time, over partitions by weather station as well as base date. This partitioning allows contiguous common base dates to generate new row numbers as the wanted streaks.
val streakDF = rankedDF.select( $"station", $"mo_end", $"monthly_precip", row_number.over( Window.partitionBy($"station", $"baseDate").orderBy($"mo_end") ).as("streak") ).withColumn( "year_mo", concat(year($"mo_end"), lit("-"), lpad(month($"mo_end"), 2, "0")) ).select( $"station", $"year_mo", $"monthly_precip", $"streak" ).orderBy($"station", $"mo_end") streakDF.show // +-------+-------+--------------+------+ // |station|year_mo|monthly_precip|streak| // +-------+-------+--------------+------+ // | 100|2017-07| 0.5| 1| // | 100|2017-08| 2.0| 2| // | 100|2017-11| 1.5| 1| // | 100|2017-12| 3.5| 2| // | 115|2017-07| 1.0| 1| // | 115|2017-10| 1.5| 1| // | 115|2017-11| 3.0| 2| // | 115|2017-12| 4.5| 3| // +-------+-------+--------------+------+
Using the same logic flow, we can also generate similar streak reports for temperature high/low (e.g. streak of temperature high above 75F). I’ll leave that as exercise for the readers.