Tag Archives: real time streaming

ETL & Pipelining With Alpakka Kafka

With the increasing demand for big data transformations on distributed platforms, one approach is to put in place a streaming ETL system with built-in packpressure using Alpakka Kafka API that allows composable pipelines to be constructed on-demand. This blog post extends the previous post which shows a skeletal version of the system.

Let’s first review the diagram shown in the previous post of what we’re aiming to build — a streaming ETL system empowered by reactive streams and Apache Kafka’s publish-subscribe machinery for durable stream data to be produced or consumed by various data processing/storage systems:

Alpakka Kafka - Streaming ETL

In this blog post, we’re going to:

  1. enhance the data warehouse consumer to programmatically track the commit offset positions,
  2. plug into an existing consumer a data processing pipeline as a stream processing operator, and,
  3. add to the streaming ETL system a mix of heterogeneous producers and consumers

Action item #1 would address the requirement of at-least-once delivery in stream consumption. #2 illustrates how to add to the streaming ETL system a custom data pipeline as a composable stream flow, and #3 showcases how data in various storage systems can participate in the real-time stream to operate (serially or in parallel) as composable sources, flows and sinks. All relevant source code is available in this GitHub repo.

Real-time streaming ETL/pipelining of property listing data

For usage demonstration, the application runs ETL/pipelining of data with a minified real estate property listing data model. It should be noted that expanding or even changing it altogether to a different data model should not affect how the core streaming ETL system operates.

Below are a couple of links related to library dependencies and configurations for the core application:

  • Library dependencies in build.sbt [separate tab]
  • Configurations for Akka, Kafka, PostgreSQL & Cassandra in application.conf [separate tab]

It’s also worth noting that the application can be scaled up with just configurative changes. For example, if the Kafka brokers and Cassandra database span across multiple hosts, relevant configurations like Kafka’s bootstrap.servers could be "10.1.0.1:9092,10.1.0.2:9092,10.1.0.3:9092" and contact-points for Cassandra might look like ["10.2.0.1:9042","10.2.0.2:9042"].

Next, let’s get ourselves familiarized with the property listing data definitions in the PostgreSQL and Cassandra, as well as the property listing classes that model the schemas.

A Kafka producer using Alpakka Csv

Alpakka comes with a simple API for CSV file parsing with method lineScanner() that takes parameters including the delimiter character and returns a Flow[ByteString, List[ByteString], NotUsed].

Below is the relevant code in CsvPlain.scala that highlights how the CSV file gets parsed and materialized into a stream of Map[String,String] via CsvParsing and CsvToMap, followed by transforming into a stream of PropertyListing objects.

    val source: Source[PropertyListing, NotUsed] =
      FileIO.fromPath(Paths.get(csvFilePath))
        .via(CsvParsing.lineScanner(CsvParsing.Tab))
        .viaMat(CsvToMap.toMapAsStrings())(Keep.right)
        .drop(offset).take(limit)
        .map(toClassPropertyListing(_))

    // ...

    source
      .map{ property =>
        val prodRec = new ProducerRecord[String, String](
          topic, property.propertyId.toString, property.toJson.compactPrint
        )
        println(s"[CSV] >>> Producer msg: $prodRec")
        prodRec
      }
      .runWith(Producer.plainSink(producerSettings))

Note that the drop(offset)/take(limit) code line, which can be useful for testing, is for taking a segmented range of the stream source and can be removed if preferred.

A subsequent map wraps each of the PropertyListing objects in a ProducerRecord[K,V] with the associated topic and key/value of type String/JSON before being streamed into Kafka via Alpakka Kafka’s Producer.plainSink().

A Kafka producer using Alpakka Slick

The PostgresPlain producer, which is pretty much identical to the one described in the previous blog post, creates a Kafka producer using Alpakka Slick which allows SQL queries into a PostgreSQL database to be coded in Slick’s functional programming style.

The partial code below shows how method Slick.source() takes a streaming query and returns a stream source of PropertyListing objects.

    val source: Source[PropertyListing, NotUsed] =
      Slick
        .source(TableQuery[PropertyListings].sortBy(_.propertyId).drop(offset).take(limit).result)

    // ...

    source
      .map{ property =>
        val prodRec = new ProducerRecord[String, String](
          topic, property.propertyId.toString, property.toJson.compactPrint
        )
        println(s"[POSTRES] >>> Producer msg: $prodRec")
        prodRec
      }
      .runWith(Producer.plainSink(producerSettings))

The high-level code logic in PostgresPlain is similar to that of the CsvPlain producer.

A Kafka consumer using Alpakka Cassandra

We created a Kafka consumer in the previous blog post using Alpakka Kafka’s Consumer.plainSource[K,V] for consuming data from a given Kafka topic into a Cassandra database.

The following partial code from the slightly refactored version of the consumer, CassandraPlain shows how data associated with a given Kafka topic can be consumed via Alpakka Kafka’s Consumer.plainSource().

  def runPropertyListing(consumerGroup: String,
                         topic: String)(implicit
                                        cassandraSession: CassandraSession,
                                        jsonFormat: JsonFormat[PropertyListing],
                                        system: ActorSystem,
                                        ec: ExecutionContext): Future[Done] = {

    val consumerConfig = system.settings.config.getConfig("akkafka.consumer.with-brokers")
    val consumerSettings =
      ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
        .withGroupId(consumerGroup)
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    val table = "propertydata.property_listing"
    val partitions = 10 // number of partitions

    val statementBinder: (ConsumerRecord[String, String], PreparedStatement) => BoundStatement = {
      case (msg, preparedStatement) =>
        val p = msg.value().parseJson.convertTo[PropertyListing]
        preparedStatement.bind(
          (p.propertyId % partitions).toString, Int.box(p.propertyId), p.dataSource.getOrElse("unknown"),
          Double.box(p.bathrooms.getOrElse(0)), Int.box(p.bedrooms.getOrElse(0)), Double.box(p.listPrice.getOrElse(0)), Int.box(p.livingArea.getOrElse(0)),
          p.propertyType.getOrElse(""), p.yearBuilt.getOrElse(""), p.lastUpdated.getOrElse(""), p.streetAddress.getOrElse(""), p.city.getOrElse(""), p.state.getOrElse(""), p.zip.getOrElse(""), p.country.getOrElse("")
        )
    }
    val cassandraFlow: Flow[ConsumerRecord[String, String], ConsumerRecord[String, String], NotUsed] =
      CassandraFlow.create(
        CassandraWriteSettings.defaults,
        s"""INSERT INTO $table (partition_key, property_id, data_source, bathrooms, bedrooms, list_price, living_area, property_type, year_built, last_updated, street_address, city, state, zip, country)
           |VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""".stripMargin,
        statementBinder
      )

    val control: DrainingControl[Done] =
      Consumer
        .plainSource(consumerSettings, Subscriptions.topics(topic))
        .via(cassandraFlow)
        .toMat(Sink.ignore)(DrainingControl.apply)
        .run()

    Thread.sleep(2000)
    control.drainAndShutdown()
  }

Alpakka’s CassandraFlow.create() is the stream processing operator responsible for funneling data into the Cassandra database. Note that it takes a CQL PreparedStatement along with a “statement binder” that binds the incoming class variables to the corresponding Cassandra table columns before executing the CQL.

Enhancing the Kafka consumer for ‘at-least-once’ consumption

To enable at-least-once consumption by Cassandra, instead of Consumer.plainSource[K,V], we construct the stream graph via Alpakka Kafka Consumer.committableSource[K,V] which offers programmatic tracking of the commit offset positions. By keeping the commit offsets as an integral part of the streaming data, failed streams could be re-run from the offset positions.

The main stream composition code of the enhanced consumer, CassandraCommittable.scala, is shown below.

  def runPropertyListing(consumerGroup: String,
                         topic: String)(implicit
                                        cassandraSession: CassandraSession,
                                        jsonFormat: JsonFormat[PropertyListing],
                                        system: ActorSystem,
                                        ec: ExecutionContext): Future[Done] = {

    val consumerConfig = system.settings.config.getConfig("akkafka.consumer.with-brokers")
    val consumerSettings =
      ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
        .withGroupId(consumerGroup)
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    val committerConfig = system.settings.config.getConfig("akka.kafka.committer")
    val committerSettings = CommitterSettings(committerConfig)

    val table = "propertydata.property_listing"
    val partitions = 10 // number of partitions

    val statementBinder: (CommittableMessage[String, String], PreparedStatement) => BoundStatement = {
      case (msg, preparedStatement) =>
        val p = msg.record.value().parseJson.convertTo[PropertyListing]
        preparedStatement.bind(
          (p.propertyId % partitions).toString, Int.box(p.propertyId), p.dataSource.getOrElse("unknown"),
          Double.box(p.bathrooms.getOrElse(0)), Int.box(p.bedrooms.getOrElse(0)), Double.box(p.listPrice.getOrElse(0)), Int.box(p.livingArea.getOrElse(0)),
          p.propertyType.getOrElse(""), p.yearBuilt.getOrElse(""), p.lastUpdated.getOrElse(""), p.streetAddress.getOrElse(""), p.city.getOrElse(""), p.state.getOrElse(""), p.zip.getOrElse(""), p.country.getOrElse("")
        )
    }
    val cassandraFlow: Flow[CommittableMessage[String, String], CommittableMessage[String, String], NotUsed] =
      CassandraFlow.create(
        CassandraWriteSettings.defaults,
        s"""INSERT INTO $table (partition_key, property_id, data_source, bathrooms, bedrooms, list_price, living_area, property_type, year_built, last_updated, street_address, city, state, zip, country)
           |VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""".stripMargin,
        statementBinder
      )

    val control =
      Consumer
        .committableSource(consumerSettings, Subscriptions.topics(topic))
        .via(cassandraFlow)
        .map(_.committableOffset)
        .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
        .run()

    Thread.sleep(2000)
    control.drainAndShutdown()
  }

A couple of notes:

  1. In order to be able to programmatically keep track of the commit offset positions, each of the stream elements emitted from Consumer.committableSource[K,V] is wrapped in a CommittableMessage[K,V] object, consisting of the CommittableOffset value in addition to the Kafka ConsumerRecord[K,V].
  2. Committing the offset should be done after the stream data is processed for at-least-once consumption, whereas committing prior to processing the stream data would only achieve at-most-once delivery.

Adding a property-rating pipeline to the Alpakka Kafka consumer

Next, we add a data processing pipeline to the consumer to perform a number of ratings of the individual property listings in the stream before delivering the rated property listing data to the Cassandra database, as illustrated in the following diagram.

Alpakka Kafka - Streaming ETL w/ custom pipelines

Since the CassandraFlow.create() stream operator will be executed after the rating pipeline, the corresponding “statement binder” necessary for class-variable/table-column binding will now need to encapsulate also PropertyRating along with CommittableMessage[K,V], as shown in the partial code of CassandraCommittableWithRatings.scala below.

  def runPropertyListing(consumerGroup: String,
                         topic: String)(implicit
                                         cassandraSession: CassandraSession,
                                         jsonFormat: JsonFormat[PropertyListing],
                                         system: ActorSystem,
                                         ec: ExecutionContext): Future[Done] = {

    val consumerConfig = system.settings.config.getConfig("akkafka.consumer.with-brokers")
    val consumerSettings =
      ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
        .withGroupId(consumerGroup)
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    val committerConfig = system.settings.config.getConfig("akka.kafka.committer")
    val committerSettings = CommitterSettings(committerConfig)

    val table = "propertydata.rated_property_listing"
    val partitions = 10 // number of partitions

    val statementBinder: ((PropertyRating, CommittableMessage[String, String]), PreparedStatement) => BoundStatement = {
      case ((rating, msg), preparedStatement) =>
        val p = msg.record.value().parseJson.convertTo[PropertyListing]
        preparedStatement.bind(
          (p.propertyId % partitions).toString, Int.box(p.propertyId), p.dataSource.getOrElse("unknown"),
          Double.box(p.bathrooms.getOrElse(0)), Int.box(p.bedrooms.getOrElse(0)), Double.box(p.listPrice.getOrElse(0)), Int.box(p.livingArea.getOrElse(0)),
          p.propertyType.getOrElse(""), p.yearBuilt.getOrElse(""), p.lastUpdated.getOrElse(""), p.streetAddress.getOrElse(""), p.city.getOrElse(""), p.state.getOrElse(""), p.zip.getOrElse(""), p.country.getOrElse(""),
          rating.affordability.getOrElse(0), rating.comfort.getOrElse(0), rating.neighborhood.getOrElse(0), rating.schools.getOrElse(0)
        )
    }
    val cassandraFlow: Flow[(PropertyRating, CommittableMessage[String, String]), (PropertyRating, CommittableMessage[String, String]), NotUsed] =
      CassandraFlow.create(
        CassandraWriteSettings.defaults,
        s"""INSERT INTO $table (partition_key, property_id, data_source, bathrooms, bedrooms, list_price, living_area, property_type, year_built, last_updated, street_address, city, state, zip, country, rating_affordability, rating_comfort, rating_neighborhood, rating_schools)
           |VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""".stripMargin,
        statementBinder
      )

    val control =
      Consumer
        .committableSource(consumerSettings, Subscriptions.topics(topic))
        .via(PropertyRating.compute())
        .via(cassandraFlow)
        .map { case (_, msg) => msg.committableOffset }
        .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
        .run()

    Thread.sleep(2000)
    control.drainAndShutdown()
  }

For demonstration purpose, we create a dummy pipeline for rating of individual real estate properties in areas such as affordability, neighborhood, each returning just a Future of random integers between 1 and 5 after a random time delay. The rating related fields along with the computation logic are wrapped in class PropertyRating as shown below.

case class PropertyRating(
    propertyId: Int,
    affordability: Option[Int],
    comfort: Option[Int],
    neighborhood: Option[Int],
    schools: Option[Int]
  )

object PropertyRating {
  def rand = java.util.concurrent.ThreadLocalRandom.current

  def biasedRandNum(l: Int, u: Int, biasedNums: Set[Int], biasedFactor: Int = 1): Int = {
    Vector
      .iterate(rand.nextInt(l, u+1), biasedFactor)(_ => rand.nextInt(l, u+1))
      .dropWhile(!biasedNums.contains(_))
      .headOption match {
        case Some(n) => n
        case None => rand.nextInt(l, u+1)
      }
  }

  def fakeRating()(implicit ec: ExecutionContext): Future[Int] = Future{  // Fake rating computation
    Thread.sleep(biasedRandNum(1, 9, Set(3, 4, 5)))  // Sleep 1-9 secs
    biasedRandNum(1, 5, Set(2, 3, 4))  // Rating 1-5; mostly 2-4
  }

  def compute()(implicit ec: ExecutionContext): Flow[CommittableMessage[String, String], (PropertyRating, CommittableMessage[String, String]), NotUsed] =
    Flow[CommittableMessage[String, String]].mapAsync(1){ msg =>
      val propertyId = msg.record.key().toInt  // let it crash in case of bad PK data
      ( for {
            affordability <- PropertyRating.fakeRating()
            comfort <- PropertyRating.fakeRating()
            neighborhood <- PropertyRating.fakeRating()
            schools <- PropertyRating.fakeRating()
          }
          yield new PropertyRating(propertyId, Option(affordability), Option(comfort), Option(neighborhood), Option(schools))
        )
        .map(rating => (rating, msg)).recover{ case e => throw new Exception("ERROR in computeRatingFlow()!") }
    }
}

A Kafka consumer with a custom flow & stream destination

The application is also bundled with a consumer with the property rating pipeline followed by a custom flow to showcase how one can compose an arbitrary side-effecting operator with custom stream destination.

  def customBusinessLogic(key: String, value: String, rating: PropertyRating)(
    implicit ec: ExecutionContext): Future[Done] = Future {

    println(s"KEY: $key  VALUE: $value  RATING: $rating")
    // Perform custom business logic with key/value
    // and save to an external storage, etc.
    Done
  }

  def runPropertyListing(consumerGroup: String,
                         topic: String)(implicit
                                         jsonFormat: JsonFormat[PropertyListing],
                                         system: ActorSystem,
                                         ec: ExecutionContext): Future[Done] = {

    val consumerConfig = system.settings.config.getConfig("akkafka.consumer.with-brokers")
    val consumerSettings =
      ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
        .withGroupId(consumerGroup)
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    val committerConfig = system.settings.config.getConfig("akka.kafka.committer")
    val committerSettings = CommitterSettings(committerConfig)

    val control =
      Consumer
        .committableSource(consumerSettings, Subscriptions.topics(topic))
        .via(PropertyRating.compute())
        .mapAsync(1) { case (rating, msg) =>
          customBusinessLogic(msg.record.key, msg.record.value, rating)
            .map(_ => msg.committableOffset)
        }
        .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
        .run()

    Thread.sleep(5000)
    control.drainAndShutdown()
  }

Note that mapAsync is used to allow the stream transformation by the custom business logic to be carried out asynchronously.

Running the streaming ETL/pipelining system

To run the application that comes with sample real estate property listing data on a computer, go to the GitHub repo and follow the README instructions to launch the producers and consumers on one or more command-line terminals.

Also included in the README are instructions about how to run a couple of set queries to verify data that get ETL-ed to the Cassandra tables via Alpakka Cassandra’s CassandraSource which takes a CQL query as its argument.

Further enhancements

Depending on specific business requirement, the streaming ETL system can be further enhanced in a number of areas.

  1. This streaming ETL system offers at-least-once delivery only in stream consumptions. If an end-to-end version is necessary, one could enhance the producers by using Producer.committabbleSink() or Producer.flexiFlow() instead of Producer.plainSink().
  2. For exactly-once delivery, which is a generally much more stringent requirement, one approach to achieve that would be to atomically persist the in-flight data with the corresponding commit offset positions using a reliable storage system.
  3. In case tracking of Kafka’s topic partition assignment is required, one can use Consumer.committablePartitionedSource[K,V] instead of Consumer.committableSource[K,V]. More details can be found in the tech doc.
  4. To gracefully restart a stream on failure with a configurable backoff, Akka Stream provides method RestartSource.onFailuresWithBackoff for that as illustrated in an example in this tech doc.

Streaming ETL With Alpakka Kafka

In a previous startup I cofounded, our core product was a geospatial application that provided algorithmic ratings of the individual residential real estate properties for home buyers. Given that there were over 100+ millions of residential properties nationwide, the collective data volume of all the associated attributes necessary for the data engineering work was massive.

For the initial MVP (minimum viable product) releases in which we only needed to showcase our product features in a selected metropolitan area, we used PostgreSQL as the OLTP (online transaction processing) database. Leveraging Postgres’ table partitioning feature, we had an OLTP database capable of accommodating incremental geographical expansion into multiple cities and states.

Batch ETL

The need for a big data warehouse wasn’t imminent in the beginning, though we had to make sure a data processing platform for a highly scalable data warehouse along with efficient ETL (extract/transform/load) functions would be ready on a short notice. The main objective was to make sure the OLTP database could be kept at a minimal volume while less frequently used data got “archived” off to a big data warehouse for data analytics.

With limited engineering resources available in a small startup, I kicked off a R&D project on the side to build programmatic ETL processes to periodically funnel data from PostgreSQL to a big data warehouse in a batch manner. Cassandra was chosen to be the data warehouse and was configured on an Amazon EC2 cluster. The project was finished with a batch ETL solution that functionally worked as intended, although back in my mind a more “continuous” operational model would be preferred.

Real-time Streaming ETL

Fast-forward to 2021, I recently took on a big data streaming project that involves ETL and building data pipelines on a distributed platform. Central to the project requirement is real-time (or more precisely, near real-time) processing of high-volume data. Another aspect of the requirement is that the streaming system has to accommodate custom data pipelines as composable components of the consumers, suggesting that a streaming ETL solution would be more suitable than a batch one. Lastly, stream consumption needs to guarantee at-least-once delivery.

Given all that, Apache Kafka promptly stood out as a top candidate to serve as the distributed streaming brokers. In particular, its capability of keeping durable data in a distributed fault-tolerant cluster allows it to serve different consumers at various instances of time and locales. Next, Akka Stream was added to the tech stack for its versatile stream-based application integration functionality as well as benefits of reactive streams.

Alpakka – a reactive stream API and DSL

Built on top of Akka Stream, Alpakka provides a comprehensive API and DSL (domain specific language) for reactive and stream-oriented programming to address the application integration needs for interoperating with a wide range of prominent systems across various computing domains. That, coupled with the underlying Akka Stream’s versatile streaming functions, makes Alpakka a powerful toolkit for what is needed.

In this blog post, we’ll assemble in Scala a producer and a consumer using the Alpakka API to perform streaming ETL from a PostgreSQL database through Kafka brokers into a Cassandra data warehouse. In a subsequent post, we’ll enhance and package up these snippets to address the requirement of at-least-once delivery in consumption and composability of data pipelines.

Streaming ETL with Alpakka Kafka, Slick, Cassandra, …

The following diagram shows the near-real time ETL functional flow of data streaming from various kinds of data sources (e.g. a PostgreSQL database or a CSV file) to data destinations (e.g. a Cassandra data warehouse or a custom data stream outlet).

Alpakka Kafka - Streaming ETL

The Apache Kafka brokers provide a distributed publish-subscribe platform for keeping in-flight data in durable immutable logs readily available for consumption. Meanwhile, the Akka Stream based Alpakka API that comes with a DSL allows programmatic integrations to compose data pipelines as sources, sinks and flows, in addition to enabling “reactivity” by equipping the streams with non-blocking backpressure.

It should be noted that the same stream can be processed using various data sources and destinations simultaneously. For instance, data with the same schema from both the CSV file and Postgres database could be published to the same topic and consumed by a consumer group designated for the Cassandra database and another consumer group for a different data storage.

Example: ETL of real estate property listing data

The platform will be for general-purpose ETL/pipelining. For illustration purpose in this blog post, we’re going to use it to perform streaming ETL of some simplified dataset of residential real estate property listings.

First, we create a simple class to represent a property listing.

package alpakkafka

case class PropertyListing(
    propertyId: Int,
    dataSource: Option[String],
    bathrooms: Option[Double],
    bedrooms: Option[Int],
    listPrice: Option[Double],
    livingArea: Option[Int],
    propertyType: Option[String],
    yearBuilt: Option[String],
    lastUpdated: Option[String],
    streetAddress: Option[String],
    city: Option[String],
    state: Option[String],
    zip: Option[String],
    country: Option[String]
  ) {
    def summary(): String = {
      val ba = bathrooms.getOrElse(0)
      val br = bedrooms.getOrElse(0)
      val price = listPrice.getOrElse(0)
      val area = livingArea.getOrElse(0)
      val street = streetAddress.getOrElse("")
      val cit = city.getOrElse("")
      val sta = state.getOrElse("")
      s"PropertyID: $propertyId | Price: $$$price ${br}BR/${ba}BA/${area}sqft | Address: $street, $cit, $sta"
    }
  }

Using the good old sbt as the build tool, relevant library dependencies for Akka Stream, Alpakka Kafka, Postgres/Slick and Cassandra/DataStax are included in build.sbt.

name := "alpakka-streaming-etl"

version := "0.1"

scalaVersion := "2.13.6"

scalacOptions += "-deprecation"

val akkaVersion = "2.6.16"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % akkaVersion,
  "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
  "com.typesafe.akka" %% "akka-stream" % akkaVersion,
  "com.typesafe.akka" %% "akka-stream-kafka" % "2.1.1",
  "com.lightbend.akka" %% "akka-stream-alpakka-slick" % "3.0.3",
  "org.postgresql" % "postgresql" % "42.2.24",
  "com.lightbend.akka" %% "akka-stream-alpakka-csv" % "3.0.3",
  "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "3.0.3",
  "com.datastax.oss" % "java-driver-core" % "4.13.0",
  "org.apache.tinkerpop" % "tinkergraph-gremlin" % "3.5.1",
  "io.spray" %%  "spray-json" % "1.3.6",
  "ch.qos.logback" % "logback-classic" % "1.2.4" % Runtime
)

trapExit := false

Next, we put configurations for Akka Actor, Alpakka Kafka, Slick and Cassandra in application.conf under src/main/resources/:

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
}

akka.actor.allow-java-serialization = on

slick-postgres {
  profile = "slick.jdbc.PostgresProfile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "org.postgresql.Driver"
      url = "jdbc:postgresql://localhost/propertydb"
      user = "pipeliner"
      password = "pa$$word"
    }
  }
}

akka.kafka.producer {
  discovery-method = akka.discovery
  service-name = ""
  resolve-timeout = 3 seconds
  parallelism = 10000
  close-timeout = 60s
  close-on-producer-stop = true
  use-dispatcher = "akka.kafka.default-dispatcher"
  eos-commit-interval = 100ms
  kafka-clients {
  }
}

akkafka.producer.with-brokers: ${akka.kafka.producer} {
  kafka-clients {
    bootstrap.servers = "127.0.0.1:9092"
  }
}

akka.kafka.consumer {
  poll-interval = 250ms  # 50ms
  poll-timeout = 50ms
  stop-timeout = 10s     # 30s
  close-timeout = 10s    # 20s
  commit-timeout = 10s   # 15s
  wakeup-timeout = 10s
  eos-draining-check-interval = 30ms
  use-dispatcher = "akka.kafka.default-dispatcher"
  kafka-clients {
    enable.auto.commit = true  # `true` for `Consumer.plainSource`
  }
}

akkafka.consumer.with-brokers: ${akka.kafka.consumer} {
  kafka-clients {
    bootstrap.servers = "127.0.0.1:9092"
  }
}

akka.kafka.committer {
  max-batch = 1000
  max-interval = 10s
  parallelism = 100
  delivery = WaitForAck
  when = OffsetFirstObserved
}

alpakka.cassandra {
  session-provider = "akka.stream.alpakka.cassandra.DefaultSessionProvider"
  service-discovery {
    name = ""
    lookup-timeout = 1 s
  }
  session-dispatcher = "akka.actor.default-dispatcher"
  datastax-java-driver-config = "datastax-java-driver"
}

datastax-java-driver {
  basic {
    contact-points = ["127.0.0.1:9042"]
    load-balancing-policy.local-datacenter = datacenter1
  }
  advanced.reconnect-on-init = true
}

Note that the sample configuration is for running the application with all Kafka, PostgreSQL and Cassandra on a single computer. The host IPs (i.e. 127.0.0.1) should be replaced with their corresponding host IPs/names in case they’re on separate hosts. For example, relevant configurations for Kafka brokers and Cassandra database spanning across multiple hosts might look something like bootstrap.servers = "10.1.0.1:9092,10.1.0.2:9092,10.1.0.3:9092" and contact-points = ["10.2.0.1:9042","10.2.0.2:9042"].

PostgresProducerPlain – an Alpakka Kafka producer

The PostgresProducerPlain snippet below creates a Kafka producer using Alpakka Slick which allows SQL queries to be coded in Slick’s functional programming style.

package alpakkafka

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.{Done, NotUsed}

import akka.stream.alpakka.slick.scaladsl._

import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

import spray.json._
import spray.json.DefaultJsonProtocol._

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

object PostgresProducerPlain {

  def run(topic: String,
          offset: Int = 0,
          limit: Int = Int.MaxValue)(implicit
                                     slickSession: SlickSession,
                                     jsonFormat: JsonFormat[PropertyListing],
                                     system: ActorSystem,
                                     ec: ExecutionContext): Future[Done] = {

    import slickSession.profile.api._

    class PropertyListings(tag: Tag) extends Table[PropertyListing](tag, "property_listing") {
      def propertyId = column[Int]("property_id", O.PrimaryKey)
      def dataSource = column[Option[String]]("data_source")
      def bathrooms = column[Option[Double]]("bathrooms")
      def bedrooms = column[Option[Int]]("bedrooms")
      def listPrice = column[Option[Double]]("list_price")
      def livingArea = column[Option[Int]]("living_area")
      def propertyType = column[Option[String]]("property_type")
      def yearBuilt = column[Option[String]]("year_built")
      def lastUpdated = column[Option[String]]("last_updated")
      def streetAddress = column[Option[String]]("street_address")
      def city = column[Option[String]]("city")
      def state = column[Option[String]]("state")
      def zip = column[Option[String]]("zip")
      def country = column[Option[String]]("country")
      def * =
        (propertyId, dataSource, bathrooms, bedrooms, listPrice, livingArea, propertyType, yearBuilt, lastUpdated, streetAddress, city, state, zip, country) <> (PropertyListing.tupled, PropertyListing.unapply)
    }

    val source: Source[PropertyListing, NotUsed] =
      Slick
        .source(TableQuery[PropertyListings].sortBy(_.propertyId).drop(offset).take(limit).result)

    val producerConfig = system.settings.config.getConfig("akkafka.producer.with-brokers")
    val producerSettings =
      ProducerSettings(producerConfig, new StringSerializer, new StringSerializer)

    source
      .map{ property =>
        val prodRec = new ProducerRecord[String, String](
            topic, property.propertyId.toString, property.toJson.compactPrint
          )
        println(s"[POSTRES] >>> Producer msg: $prodRec")
        prodRec
      }
      .runWith(Producer.plainSink(producerSettings))
  }

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val ec = system.dispatcher

    implicit val propertyListingJsonFormat: JsonFormat[PropertyListing] = jsonFormat14(PropertyListing)
    implicit val slickSession = SlickSession.forConfig("slick-postgres")

    val topic = "property-listing-topic"
    val offset: Int = if (args.length >= 1) Try(args(0).toInt).getOrElse(0) else 0
    val limit: Int = if (args.length == 2) Try(args(1).toInt).getOrElse(Int.MaxValue) else Int.MaxValue

    run(topic, offset, limit) onComplete{ _ =>
      slickSession.close()
      system.terminate()
    }
  }
}

Method Slick.source[T]() takes a streaming query and returns a Source[T, NotUsed]. In this case, T is PropertyListing. Note that Slick.source() can also take a plain SQL statement wrapped within sql"..." as its argument, if wanted (in which case an implicit value of slick.jdbc.GetResult should be defined).

A subsequent map wraps each of the property listing objects in a ProducerRecord[K,V] with topic and key/value of type String/JSON, before publishing to the Kafka topic via Alpakka Kafka’s Producer.plainSink[K,V].

To run PostgresProducerPlain, simply navigate to the project root and execute the following command from within a command line terminal:

# sbt "runMain alpakkafka.PostgresProducerPlain  "

# Stream the first 20 rows from property_listing in Postgres into Kafka under topic "property-listing"
sbt "runMain alpakkafka.PostgresProducerPlain 0 20"

# Stream all rows from property_listing in Postgres into Kafka under topic "property-listing"
sbt "runMain alpakkafka.PostgresProducerPlain"

CassandraConsumerPlain – an Alpakka Kafka consumer

Using Alpakka Kafka, CassandraConsumerPlain shows how a basic Kafka consumer can be formulated as an Akka stream that consumes data from Kafka via Consumer.plainSource followed by a stream processing operator, Alpakka Cassandra’s CassandraFlow to stream the data into a Cassandra database.

package alpakkafka

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.{Done, NotUsed}

import akka.kafka.{CommitterSettings, ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.{Committer, Consumer}
import akka.kafka.scaladsl.Consumer.DrainingControl
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer

import akka.stream.alpakka.cassandra.{CassandraSessionSettings, CassandraWriteSettings}
import akka.stream.alpakka.cassandra.scaladsl.{CassandraSession, CassandraSessionRegistry}
import akka.stream.alpakka.cassandra.scaladsl.{CassandraFlow, CassandraSource}
import com.datastax.oss.driver.api.core.cql.{BoundStatement, PreparedStatement}

import spray.json._
import spray.json.DefaultJsonProtocol._

import scala.concurrent.{ExecutionContext, Future}
import java.util.concurrent.atomic.AtomicReference
import scala.util.{Failure, Success, Try}

object CassandraConsumerPlain {

  def run(consumerGroup: String,
          topic: String)(implicit
                         cassandraSession: CassandraSession,
                         jsonFormat: JsonFormat[PropertyListing],
                         system: ActorSystem,
                         ec: ExecutionContext): Future[Done] = {

    val consumerConfig = system.settings.config.getConfig("akkafka.consumer.with-brokers")
    val consumerSettings =
      ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
        .withGroupId(consumerGroup)
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    val table = "propertydata.property_listing"
    val partitions = 10  // number of partitions

    val statementBinder: (ConsumerRecord[String, String], PreparedStatement) => BoundStatement = {
      case (msg, preparedStatement) =>
        val p = msg.value().parseJson.convertTo[PropertyListing]
        preparedStatement.bind(
          (p.propertyId % partitions).toString, Int.box(p.propertyId), p.dataSource.getOrElse("unknown"),
          Double.box(p.bathrooms.getOrElse(0)), Int.box(p.bedrooms.getOrElse(0)), Double.box(p.listPrice.getOrElse(0)), Int.box(p.livingArea.getOrElse(0)),
          p.propertyType.getOrElse(""), p.yearBuilt.getOrElse(""), p.lastUpdated.getOrElse(""), p.streetAddress.getOrElse(""), p.city.getOrElse(""), p.state.getOrElse(""), p.zip.getOrElse(""), p.country.getOrElse("")
        )
    }
    val cassandraFlow: Flow[ConsumerRecord[String, String], ConsumerRecord[String, String], NotUsed] =
      CassandraFlow.create(
        CassandraWriteSettings.defaults,
        s"""INSERT INTO $table (partition_key, property_id, data_source, bathrooms, bedrooms, list_price, living_area, property_type, year_built, last_updated, street_address, city, state, zip, country)
           |VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""".stripMargin,
        statementBinder
      )

    val control: DrainingControl[Done] =
      Consumer
        .plainSource(consumerSettings, Subscriptions.topics(topic))
        .via(cassandraFlow)
        .toMat(Sink.ignore)(DrainingControl.apply)
        .run()

    Thread.sleep(5000)
    control.drainAndShutdown()
  }

  def query(sql: String)(implicit
                         cassandraSession: CassandraSession,
                         system: ActorSystem,
                         ec: ExecutionContext): Future[Seq[(String, PropertyListing)]] = {

    CassandraSource(sql)
      .map{ r =>
        val partitionKey = r.getString("partition_key")
        val propertyListingTuple = (
            r.getInt("property_id"),
            Option(r.getString("data_source")),
            Option(r.getDouble("list_price")),
            Option(r.getInt("bedrooms")),
            Option(r.getDouble("bathrooms")),
            Option(r.getInt("living_area")),
            Option(r.getString("property_type")),
            Option(r.getString("year_built")),
            Option(r.getString("last_updated")),
            Option(r.getString("street_address")),
            Option(r.getString("city")),
            Option(r.getString("state")),
            Option(r.getString("zip")),
            Option(r.getString("country"))
          )
        val propertyListing = PropertyListing.tupled(propertyListingTuple)
        (partitionKey, propertyListing)
      }
      .runWith(Sink.seq)
  }

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val ec = system.dispatcher

    implicit val propertyListingJsonFormat: JsonFormat[PropertyListing] = jsonFormat14(PropertyListing)
    implicit val cassandraSession: CassandraSession =
      CassandraSessionRegistry.get(system).sessionFor(CassandraSessionSettings())

    val consumerGroup = "datawarehouse-consumer-group"
    val topic = "property-listing-topic"

    run(consumerGroup, topic) onComplete(println)

    // Query for property listings within a Cassandra partition
    val sql = s"SELECT * FROM propertydata.property_listing WHERE partition_key = '0';"
    query(sql) onComplete {
      case Success(res) => res.foreach(println)
      case Failure(e) => println(s"ERROR: $e")
    }

    Thread.sleep(5000)
    system.terminate()
  }
}

A few notes:

  • Consumer.plainSource: As a first stab at building a consumer, we use Alpakka Kafka’s Consumer.plainSource[K,V] as the stream source. To ensure the stream to be stopped in a controlled fashion, Consumer.Drainingcontrol is included when composing the stream graph. While it’s straight forward to use, plainSource doesn’t offer programmatic tracking of the commit offset position thus cannot guarantee at-least-once delivery. An enhanced version of the consumer will be constructed in a subsequent blog post.
  • Partition key: Cassandra mandates having a partition key as part of the primary key of every table for distributing across cluster nodes. In our property listing data, we make the modulo of the Postgres primary key property_id by the number of partitions to be the partition key. It could certainly be redefined to something else (e.g. locale or type of the property) in accordance with the specific business requirement.
  • CassandraSource: Method query() simply executes queries against a Cassandra database using CassandraSource which takes a CQL query with syntax similar to standard SQL’s. It isn’t part of the consumer flow, but is rather as a convenient tool for verifying stream consumption result.
  • CassandraFlow: Alpakka’s CassandraFlow.create[S]() is the main processing operator responsible for streaming data into the Cassandra database. It takes a CQL PreparedStatement and a “statement binder” that binds the incoming class variables to the corresponding Cassandra columns before executing the insert/update. In this case, S is ConsumerRecord[K,V].

To run CassandraConsumerPlain, Navigate to the project root and execute the following from within a command line terminal:

# Stream any uncommitted data from Kafka topic "property-listing" into Cassandra
sbt "runMain alpakkafka.CassandraConsumerPlain"

Table schema in PostgreSQL & Cassandra

Obviously, the streaming ETL application is supposed to run in the presence of one or more Kafka brokers, a PostgreSQL database and a Cassandra data warehouse. For proof of concept, getting all these systems with basic configurations on a descent computer (Linux, Mac OS, etc) is a trivial exercise. The ETL application is readily scalable that it would require only configurative changes when, say, needs rise for scaling up of Kafka and Cassandra to span clusters of nodes in the cloud.

Below is how the table schema of property_listing can be created in PostgreSQL via psql:

CREATE ROLE pipeliner WITH createdb login ENCRYPTED PASSWORD 'pa$$word';
CREATE DATABASE propertydb WITH OWNER 'pipeliner' ENCODING 'utf8';

CREATE TABLE property_listing (
    property_id integer PRIMARY KEY,
    bathrooms numeric,
    bedrooms integer,
    list_price double precision,
    living_area integer,
    property_type text,
    year_built text,
    data_source text,
    last_updated timestamp with time zone,
    street_address character varying(250),
    city character varying(50),
    state character varying(50),
    zip character varying(10),
    country character varying(3)  
);
ALTER TABLE property_listing OWNER TO pipeliner;

To create keyspace propertydata and the corresponding table property_listing in Cassandra, one can launch cqlsh and execute the following CQL statements:

CREATE KEYSPACE propertydata
  WITH REPLICATION = { 
   'class' : 'SimpleStrategy',  // use 'NetworkTopologyStrategy' for multi-node
   'replication_factor' : 2     // use 'datacenter1' for multi-node
  };

CREATE TABLE propertydata.property_listing (
    partition_key text,
    property_id int,
    data_source text,
    bathrooms double,
    bedrooms int,
    list_price double,
    living_area int,
    property_type text,
    year_built text,
    last_updated text,
    street_address text,
    city text,
    state text,
    zip text,
    country text,
    PRIMARY KEY ((partition_key), property_id)
);

What’s next?

So, we now have a basic streaming ETL system running Alphakka Kafka on top of a cluster of Kafka brokers to form the reactive stream “backbone” for near real-time ETL between data stores. With Alpakka Slick and Alpakka Cassandra, a relational database like PostgreSQL and a Cassandra data warehouse can be made part of the system like composable stream components.

As noted earlier, the existing Cassandra consumer does not guarantee at-least-once delivery, which is part of the requirement. In the next blog post, we’ll enhance the existing consumer to address the required delivery guarantee. We’ll also add a data processing pipeline to illustrate how to construct additional data pipelines as composable stream operators. All relevant source code along with some sample dataset will be published in a GitHub repo.

Text Mining With Akka Streams

Reactive Systems, whose core characteristics are declared in the Reactive Manifesto, have started to emerge in recent years as message-driven systems that emphasize scalability, responsiveness and resilience. It’s pretty clear from the requirements that a system can’t be simply made Reactive. Rather, it should be built from the architectural level to be Reactive.

Akka’s actor systems, which rely on asynchronous message-passing among lightweight loosely-coupled actors, serve a great run-time platform for building Reactive Systems on the JVM (Java Virtual Machine). I posted a few blogs along with sample code about Akka actors in the past. This time I’m going to talk about something different but closely related.

Reactive Streams

While bearing a similar name, Reactive Streams is a separate initiative that mandates its implementations to be capable of processing stream data asynchronously and at the same time automatically regulating the stream flows in a non-blocking fashion.

Akka Streams, built on top of Akka actor systems, is an implementation of Reactive Streams. Equipped with the back-pressure functionality, it eliminates the need of manually buffering stream flows or custom-building stream buffering mechanism to avoid buffer overflow problems.

Extracting n-grams from text

In text mining, n-grams are useful data in the area of NLP (natural language processing). In this blog post, I’ll illustrate extracting n-grams from a stream of text messages using Akka Streams with Scala as the programming language.

First thing first, let’s create an object with methods for generating random text content:

Source code: TextMessage.scala

Some minimal effort has been made to generate random clauses of likely pronounceable fake words along with punctuations. To make it a little more flexible, lengths of individual words and clauses would be supplied as parameters.

Next, create another object with text processing methods responsible for extracting n-grams from input text, with n being an input parameter. Using Scala’s sliding(size, step) iterator method with size n and step default to 1, a new iterator of sliding window view is generated to produce the wanted n-grams.

Source code: TextProcessor.scala

Now that the text processing tools are in place, we can focus on building the main streaming application in which Akka Streams plays the key role.

First, make sure we have the necessary library dependencies included in build.sbt:

Source code: build.sbt

As Akka Streams is relatively new development work, more recent Akka versions (2.4.9 or higher) should be used.

Let’s start with a simple stream for this text mining application:

Source code: NgramStream_v01.scala

As shown in the source code, constructing a simple stream like this is just defining and chaining together the text-generating source, the text-processing flow and the text-display sink as follows:

    val textSource: Source[String, NotUsed] =
      Source((1 to numMessages).map(_ =>
        TextMessage.genRandText(minWordsInText: Int, maxWordsInText: Int))
      )

    def ngramFlow(n: Int): Flow[String, String, NotUsed] =
      Flow[String].map(text => TextProcessor.genNgrams(text, n))

    val textSink = Sink.foreach[String](println)

    textSource.via(ngramFlow(nVal)).runWith(textSink).
      onComplete {
        ...
      }

Graph DSL

Akka Streams provides a Graph DSL (domain-specific language) that helps build the topology of stream flows using predefined fan-in/fan-out functions.

What Graph DSL does is somewhat similar to how Apache Storm‘s TopologyBuilder pieces together its spouts (i.e. stream sources), bolts (i.e. stream processors) and stream grouping/partitioning functions, as illustrated in a previous blog about HBase streaming.

Back-pressure

Now, let’s branch off the stream using Graph DSL to illustrate how the integral back-pressure feature is at play.

Source code: NgramStream_v02.scala

Streaming to a file should be significantly slower than streaming to the console. To make the difference more noticeable, a delay is deliberately added to streaming each line of text in the file sink.

Running the application and you will notice that the console display is slowed down. It’s the result of the upstream data flow being regulated to accommodate the relatively slow file I/O outlet even though the other console outlet is able to consume relatively faster – all that being conducted in a non-blocking fashion.

Graph DSL create() methods

To build a streaming topology using Graph DSL, you’ll need to use one of the create() methods defined within trait GraphApply, which is extended by object GraphDSL. Here are the signatures of the create() methods:

trait GraphApply {
  def create[S <: Shape]()(buildBlock: GraphDSL.Builder[NotUsed] => S): Graph[S, NotUsed] = {
    val builder = new GraphDSL.Builder
    val s = buildBlock(builder)
    val mod = builder.module.replaceShape(s)
    new GraphApply.GraphImpl(s, mod)
  }
  ...

  [2..#
  def create[S <: Shape, Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) => Mat)(
    buildBlock: GraphDSL.Builder[Mat] => ([#g1.Shape#]) => S): Graph[S, Mat] = {
    val builder = new GraphDSL.Builder
    val curried = combineMat.curried
    val s##1 = builder.add(g##1, (m##1: M##1) => curried(m##1))
    [2..#val s1 = builder.add(g1, (f: M1 => Any, m1: M1) => f(m1))#
    ]
    val s = buildBlock(builder)([#s1#])
    val mod = builder.module.replaceShape(s)
    new GraphApply.GraphImpl(s, mod)
  }#
  ]
}

Note that the sbt-boilerplate template language is needed to interpret the create() method being used in the application that takes multiple stream components as input parameters.

Materialized values

In Akka Streams, materializing a constructed stream is the step of actually running the stream with the necessary resources. To run the stream, the implicitly passed factory method ActorMaterializer() is required to allocate the resources for stream execution. That includes starting up the underlying Akka actors to process the stream.

Every processing stage of the stream can produce a materialized value. By default, using the via(flow) and to(sink) functions, the materialized value of the left-most stage will be preserved. As in the following example, for graph1, the materialized value of the source is preserved:

    val source: Source[String, Future[String]] = ...
    val flow: Flow[String, String, Future[Int]] = ...
    val sink: Sink[String, Future[IOResult]] = ...

    val graph1: RunnableGraph[Future[String]] = source.via(flow).to(sink)
    val graph2: RunnableGraph[Future[Int]] = source.viaMat(flow)(Keep.right).to(sink)
    val graph3: RunnableGraph[(Future[String], Future[IOResult])] = source.via(flow).toMat(sink)(Keep.both)

To allow one to selectively capture the materialized values of the specific stream components, Akka Streams provides functions viaMat(flow) and toMat(sink) along with a combiner function, Keep. As shown in the above example, for graph2, the materialized value of the flow is preserved, whereas for graph3, materialized values for both the flow and sink are preserved.

Back to our fileSink function as listed below, toMat(fileIOSink)(Keep.right) instructs Akka Streams to keep the materialized value of the fileIOSink as a Future value of type IOResult:

    def fileSink(filename: String): Sink[String, Future[IOResult]] =
      Flow[String].
        map(line => { Thread.sleep(500); ByteString(line + "\n") }).
        toMat(FileIO.toPath(Paths.get(filename)))(Keep.right)
        // sleep() delay added to illustrate back-pressure

Using Graph DSL, as seen earlier in the signature of the create() method, one can select what materialized value is to be preserved by specifying the associated stream components accordingly as the curried parameters:

    ([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) => Mat)

In our case, we want the materialized value of fileSink, thus the curried parameters should look like this:

    (consoleSink, fileSink(filename))((_, file) => file)

Defining the stream graph

Akka Streams provides a number of functions for fan-out (e.g. Broadcast, Balance) and fan-in (e.g. Merge, Concat). In our example, we want a simple topology with a single text source and the same n-gram generator flow branching off to two sinks in parallel:

          val bcast = builder.add(Broadcast[String](2))
          textSource ~> ngramFlow(nVal) ~> bcast.in
          bcast.out(0) ~> console
          bcast.out(1) ~> file

Adding a message counter

Let’s further expand our n-gram extraction application to include displaying a count. A simple count-flow is created to map each message string into numeric 1, and a count-sink to sum up all these 1’s streamed to the sink. Adding them as the third flow and sink to the existing stream topology yields something similar to the following:

          val bcast = builder.add(Broadcast[String](3))
          val ngFlow = ngramFlow(nVal)
          textSource ~> bcast.in
          bcast.out(0) ~> ngFlow ~> console
          bcast.out(1) ~> ngFlow ~> file
          bcast.out(2) ~> countFlow ~> count

Source code: NgramStream_v03.scala

Full source code of the application is at GitHub.

Final thoughts

Having used Apache Storm, I see it a rather different beast compared with Akka Streams. A full comparison between the two would obviously be an extensive exercise by itself, but it suffices to say that both are great platforms for streaming applications.

Perhaps one of the biggest differences between the two is that Storm provides granular message delivery options (at most / at least / exactly once, guaranteed message delivery) whereas Akka Streams by design questions the premise of reliable messaging on distributed systems. For instance, if guaranteed message delivery is a requirement, Akka Streams would probably not be the best choice.

Back-pressure has recently been added to Storm’s v.1.0.x built-in feature list, so there is indeed some flavor of reactiveness in it. Aside from message delivery options, choosing between the two technologies might be a decision basing more on other factors such as engineering staff’s expertise, concurrency model preference, etc.

Outside of the turf of typical streaming systems, Akka Streams also plays a key role as the underlying platform for an emerging service stack. Viewed as the next-generation of Spray.io, Akka HTTP is built on top of Akka Streams. Designed for building HTTP-based integration layers, Akka HTTP provides versatile streaming-oriented HTTP routing and request/response transformation mechanism. Under the hood, Akka Streams’ back-pressure functionality regulates data streaming between the server and the remote client, consequently conserving memory utilization on the server.