This is another programming example in my Scala-on-Spark blog series. While it uses the same minuscule weather data created in the first example of the blog series, it can be viewed as an independent programming exercise.
In this example, we want a table of total precipitation over custom past periods by weather stations. The specific periods in this example are the previous month, previous 3 months, and all previous months. We have data from July through December, and let’s say it’s now January hence the previous month is December.
The result should be like this:
+-------+---------------+---------------+---------------+ |station|precip_prev_1mo|precip_prev_3mo|precip_prev_all| +-------+---------------+---------------+---------------+ | 100| 3.5| 5.0| 7.5| | 115| 4.5| 9.0| 10.0| +-------+---------------+---------------+---------------+
User-defined functions (UDF) will be used in this example. Spark’s UDF supplements its API by allowing the vast library of Scala (or any of the other supported languages) functions to be used. That said, a method from Spark’s API should be picked over an UDF of same functionality as the former would likely perform more optimally.
First, let’s load up the said weather data.
import java.sql.Date // DataFrame columns: // Weather Station ID // Start Date of a half-month period // Temperature High (in Fahrenheit) over the period // Temperature Low (in Fahrenheit) over the period // Total Precipitation (in inches) over the period val weatherDataDF = Seq( (100, Date.valueOf("2017-07-01"), 75, 59, 0.0), (100, Date.valueOf("2017-07-16"), 77, 59, 0.5), (100, Date.valueOf("2017-08-01"), 80, 63, 1.0), (100, Date.valueOf("2017-08-16"), 78, 62, 1.0), (100, Date.valueOf("2017-09-01"), 74, 59, 0.0), (100, Date.valueOf("2017-09-16"), 72, 57, 0.0), (100, Date.valueOf("2017-10-01"), 68, 54, 0.0), (100, Date.valueOf("2017-10-16"), 66, 54, 0.0), (100, Date.valueOf("2017-11-01"), 64, 50, 0.5), (100, Date.valueOf("2017-11-16"), 61, 48, 1.0), (100, Date.valueOf("2017-12-01"), 59, 46, 2.0), (100, Date.valueOf("2017-12-16"), 57, 45, 1.5), (115, Date.valueOf("2017-07-01"), 76, 57, 0.0), (115, Date.valueOf("2017-07-16"), 76, 56, 1.0), (115, Date.valueOf("2017-08-01"), 78, 57, 0.0), (115, Date.valueOf("2017-08-16"), 81, 57, 0.0), (115, Date.valueOf("2017-09-01"), 77, 54, 0.0), (115, Date.valueOf("2017-09-16"), 72, 50, 0.0), (115, Date.valueOf("2017-10-01"), 65, 45, 0.0), (115, Date.valueOf("2017-10-16"), 59, 40, 1.5), (115, Date.valueOf("2017-11-01"), 55, 37, 1.0), (115, Date.valueOf("2017-11-16"), 52, 35, 2.0), (115, Date.valueOf("2017-12-01"), 45, 30, 3.0), (115, Date.valueOf("2017-12-16"), 41, 28, 1.5) ).toDF("station", "start_date", "temp_high", "temp_low", "total_precip")
We first create a DataFrame of precipitation by weather station and month, each with the number of months that lag the current month.
import org.apache.spark.sql.functions._ val monthlyPrecipDF = weatherDataDF.groupBy($"station", last_day($"start_date").as("mo_end")). agg(sum($"total_precip").as("monthly_precip")). withColumn("mo_lag", months_between(last_day(current_date), $"mo_end")). orderBy($"station", $"mo_end") monthlyPrecipDF.show // +-------+----------+--------------+------+ // |station| mo_end|monthly_precip|mo_lag| // +-------+----------+--------------+------+ // | 100|2017-12-31| 3.5| 1.0| // | 100|2017-11-30| 1.5| 2.0| // | 100|2017-10-31| 0.0| 3.0| // | 100|2017-09-30| 0.0| 4.0| // | 100|2017-08-31| 2.0| 5.0| // | 100|2017-07-31| 0.5| 6.0| // | 115|2017-12-31| 4.5| 1.0| // | 115|2017-11-30| 3.0| 2.0| // | 115|2017-10-31| 1.5| 3.0| // | 115|2017-09-30| 0.0| 4.0| // | 115|2017-08-31| 0.0| 5.0| // | 115|2017-07-31| 1.0| 6.0| // +-------+----------+--------------+------+
Next, we combine the list of months-lagged with monthly precipitation by means of a UDF to create a map column. To do that, we use Scala’s zip method within the UDF to create a list of tuples from the two input lists and convert the resulting list into a map.
// UDF to combine 2 array-type columns to map def arraysToMap = udf( (a: Seq[Double], b: Seq[Double]) => (a zip b).toMap ) val precipMapDF = monthlyPrecipDF.groupBy("station").agg( collect_list($"mo_lag").as("mo_lag_list"), collect_list($"monthly_precip").as("precip_list") ).select( $"station", arraysToMap($"mo_lag_list", $"precip_list").as("mo_precip_map") ) precipMapDF.show(truncate=false) // +-------+---------------------------------------------------------------------------+ // |station|mo_precip_map | // +-------+---------------------------------------------------------------------------+ // |115 |Map(5.0 -> 0.0, 1.0 -> 4.5, 6.0 -> 1.0, 2.0 -> 3.0, 3.0 -> 1.5, 4.0 -> 0.0)| // |100 |Map(5.0 -> 2.0, 1.0 -> 3.5, 6.0 -> 0.5, 2.0 -> 1.5, 3.0 -> 0.0, 4.0 -> 0.0)| // +-------+---------------------------------------------------------------------------+
Note that the map content might look different depending on when it is generated, as the months-lagged is relative to the current month when the application is run.
Using another UDF to sum precipitation counting backward from the previous months based on the number of months lagged, we create the result DataFrame.
// UDF to aggregate map values for keys less than or equal to x (0 for all) def aggMapValues = udf( (m: Map[Double, Double], x: Double) => if (x > 0) m.map{ case (k, v) => if (k <= x) v else 0 }.sum else m.map{ case (k, v) => v }.sum ) val customPrecipDF = precipMapDF. withColumn( "precip_prev_1mo", aggMapValues($"mo_precip_map", lit(1)) ). withColumn( "precip_prev_3mo", aggMapValues($"mo_precip_map", lit(3)) ). withColumn( "precip_prev_all", aggMapValues($"mo_precip_map", lit(0)) ). select( $"station", $"precip_prev_1mo", $"precip_prev_3mo", $"precip_prev_all" ) customPrecipDF.show // +-------+---------------+---------------+---------------+ // |station|precip_prev_1mo|precip_prev_3mo|precip_prev_all| // +-------+---------------+---------------+---------------+ // | 115| 4.5| 9.0| 10.0| // | 100| 3.5| 5.0| 7.5| // +-------+---------------+---------------+---------------+
Again, note that the months-lagged is relative to the current month when the application is executed, hence the months-lagged parameters for the aggMapValues UDF should be adjusted accordingly.
We can use similar approach to come up with a table for temperature high/low over the custom periods. Below are the steps for creating the result table for temperature high.
val monthlyHighDF = weatherDataDF.groupBy($"station", last_day($"start_date").as("mo_end")). agg(max($"temp_high").as("monthly_high")). withColumn("mo_lag", months_between(last_day(current_date), $"mo_end")) monthlyHighDF.orderBy($"station", $"mo_end").show // +-------+----------+------------+------+ // |station| mo_end|monthly_high|mo_lag| // +-------+----------+------------+------+ // | 100|2017-07-31| 77| 6.0| // | 100|2017-08-31| 80| 5.0| // | 100|2017-09-30| 74| 4.0| // | 100|2017-10-31| 68| 3.0| // | 100|2017-11-30| 64| 2.0| // | 100|2017-12-31| 59| 1.0| // | 115|2017-07-31| 76| 6.0| // | 115|2017-08-31| 81| 5.0| // | 115|2017-09-30| 77| 4.0| // | 115|2017-10-31| 65| 3.0| // | 115|2017-11-30| 55| 2.0| // | 115|2017-12-31| 45| 1.0| // +-------+----------+------------+------+ import org.apache.spark.sql.types.DoubleType val tempHighMapDF = monthlyHighDF.groupBy("station").agg( collect_list($"mo_lag").as("mo_lag_list"), collect_list($"monthly_high".cast(DoubleType)).as("temp_high_list") ).select( $"station", arraysToMap($"mo_lag_list", $"temp_high_list").as("mo_high_map") ) tempHighMapDF.show(truncate=false) // UDF to aggregate map values for keys less than or equal to x (0 for all) def aggMapValues = udf( (m: Map[Double, Double], x: Double) => if (x > 0) m.map{ case (k, v) => if (k <= x) v else 0 }.max else m.map{ case (k, v) => v }.max ) val customTempHighDF = tempHighMapDF. withColumn( "high_prev_1mo", aggMapValues($"mo_high_map", lit(1)) ). withColumn( "high_prev_3mo", aggMapValues($"mo_high_map", lit(3)) ). withColumn( "high_prev_all", aggMapValues($"mo_high_map", lit(0)) ). select( $"station", $"high_prev_1mo", $"high_prev_3mo", $"high_prev_all" ) customTempHighDF.show // +-------+-------------+-------------+-------------+ // |station|high_prev_1mo|high_prev_3mo|high_prev_all| // +-------+-------------+-------------+-------------+ // | 115| 45.0| 65.0| 81.0| // | 100| 59.0| 68.0| 80.0| // +-------+-------------+-------------+-------------+
I’ll leave creating the temperature low result table as a programming exercise for the readers. Note that rather than calculating temperature high and low separately, one could aggregate both of them together in some of the steps with little code change. For those who are up for a slightly more challenging exercise, both temperature high and low data can in fact be transformed together in every step of the way.