Tag Archives: spark

Scala On Spark – Streak

This is yet another programming example in my Scala-on-Spark blog series. Again, while it starts with the same minuscule weather data used in previous examples of the blog series, it can be viewed as an independent programming exercise.

In this example, we’re going to create a table that shows the streaks of consecutive months with non-zero precipitation.

Result should be similar to the following:

We’ll explore using Spark’s window functions in this example. As a side note, some of the previous examples in the blog series could be resolved using window functions as well. By means of aggregating over partitioned sliding windows of data, Spark’s window functions readily perform certain kinds of complex aggregations which would otherwise require repetitive nested groupings. They are similar to how PostgreSQL’s window functions work.

Now, let’s load up the same old minuscule weather data.

First, create a DataFrame of precipitation by weather station and month and filter it to consist of only months with positive precipitation..

Next, using window function, we capture sequences of row numbers ordered by month over partitions by weather station. For each row, we then use an UDF to calculate the base date by dating back from the corresponding month of the row in accordance with the row number. As shown in the following table, these base dates help trace chunks of contiguous months back to their common base dates.

Finally, we apply another row-number window function, but this time, over partitions by weather station as well as base date. This partitioning allows contiguous common base dates to generate new row numbers as the wanted streaks.

Using the same logic flow, we can also generate similar streak reports for temperature high/low (e.g. streak of temperature high above 75F). I’ll leave that as exercise for the readers.

Scala On Spark – Sum Over Periods

This is another programming example in my Scala-on-Spark blog series. While it uses the same minuscule weather data created in the first example of the blog series, it can be viewed as an independent programming exercise.

In this example, we want a table of total precipitation over custom past periods by weather stations. The specific periods in this example are the previous month, previous 3 months, and all previous months. We have data from July through December, and let’s say it’s now January hence the previous month is December.

The result should be like this:

User-defined functions (UDF) will be used in this example. Spark’s UDF supplements its API by allowing the vast library of Scala (or any of the other supported languages) functions to be used. That said, a method from Spark’s API should be picked over an UDF of same functionality as the former would likely perform more optimally.

First, let’s load up the said weather data.

We first create a DataFrame of precipitation by weather station and month, each with the number of months that lag the current month.

Next, we combine the list of months-lagged with monthly precipitation by means of a UDF to create a map column. To do that, we use Scala’s zip method within the UDF to create a list of tuples from the two input lists and convert the resulting list into a map.

Note that the map content might look different depending on when it is generated, as the months-lagged is relative to the current month when the application is run.

Using another UDF to sum precipitation counting backward from the previous months based on the number of months lagged, we create the result DataFrame.

Again, note that the months-lagged is relative to the current month when the application is executed, hence the months-lagged parameters for the aggMapValues UDF should be adjusted accordingly.

We can use similar approach to come up with a table for temperature high/low over the custom periods. Below are the steps for creating the result table for temperature high.

I’ll leave creating the temperature low result table as a programming exercise for the readers. Note that rather than calculating temperature high and low separately, one could aggregate both of them together in some of the steps with little code change. For those who are up for a slightly more challenging exercise, both temperature high and low data can in fact be transformed together in every step of the way.

Scala On Spark – Cumulative Pivot Sum

In a couple of recent R&D projects, I was using Apache Spark rather extensively to address some data processing needs on Hadoop clusters. Although there is an abundance of big data processing platforms these days, it didn’t take long for me to settle on Spark. One of the main reasons is that the programming language for the R&D is Scala, which is what Spark itself is written in. In particular, Spark’s inherent support for functional programming and compositional transformations on immutable data enables high performance at scale as well as readability. Other main reasons are very much in line with some of the key factors attributing to Spark’s rising popularity.

I’m starting a mini blog series on Scala-on-Spark (SoS) with each blog post demonstrating with some Scala programming example on Apache Spark. In the blog series, I’m going to illustrate how the functionality-rich SoS is able to resolve some non-trivial data processing problems with seemingly little effort. If nothing else, they are good brain-teasing programming exercise in Scala on Spark.

As the source data for the example, let’s consider a minuscule set of weather data stored in a DataFrame, which consists of the following columns:

  • Weather Station ID
  • Start Date of a half-month period
  • Temperature High (in Fahrenheit) over the period
  • Temperature Low (in Fahrenheit) over the period
  • Total Precipitation (in inches) over the period

Note that with a properly configured Spark cluster, the methods illustrated in the following example can be readily adapted to handle much more granular data at scale – e.g. down to sub-hourly weather data from tens of thousands of weather stations. It’s also worth mentioning that there can be other ways to solve the problems presented in the examples.

For illustration purpose, the following code snippets are executed on a Spark Shell. First thing is to generate a DataFrame with the said columns of sample data, which will be used as source data for this example and a couple following ones.

In this first example, the goal is to generate a table of cumulative precipitation by weather stations in month-by-month columns. By ‘cumulative sum’, it means the monthly precipitation will be cumulated from one month over to the next one (i.e. rolling sum). In other words, if July’s precipitation is 2 inches and August’s is 1 inch, the figure for August will be 3 inches. The result should look like the following table:

First, we transform the original DataFrame to include an additional year-month column, followed by using Spark’s groupBy, pivot and agg methods to generate the pivot table.

Next, we assemble a list of the year-month columns and traverse the list using method foldLeft, which is one of the most versatile Scala functions for custom iterative transformations. In this particular case, the data to be transformed by foldLeft is a tuple of (DataFrame, Double). Normally, transforming the DataFrame alone should suffice, but in this case we need an additional value to address to rolling cumulation requirement.

The tuple’s first DataFrame-type element, with monthlyPrecipDF as its initial value, will be transformed using the binary operator function specified as foldLeft’s second argument (i.e. (acc, c) => …). As for the tuple’s second Double-type element, with the first year-month as its initial value it’s for carrying the current month value over to the next iteration. The end result is a (DataFrame, Double) tuple successively transformed month-by-month.

Similar pivot aggregations can be applied to temperature high’s/low’s as well, with method sum replaced with method max/min.

Finally, we compute cumulative temperature high/low like cumulative precipitation, by replacing method sum with iterative max/min using Spark’s when-otherwise method.