Tag Archives: mvc framework

Scala Distributed Systems With Akka

A recent R&D project prompted me to look into a programming platform for a distributed system. Storm coupled with Kafka popped up as a candidate, but as streaming wasn’t part of the system requirement, I decided to try out an actor-based system. Between Java and Scala, I had in mind Scala as the programming language primarily for its good mix of object-oriented and functional programming styles, as mentioned in a previous post about Scala-Play.

Naturally, Scala-Akka became a prime candidate. During the technology evaluation phase, I came across a couple of sample applications from Lightbend (formerly Typesafe) that I think are great tutorials for getting into Scala + Akka. Certain illustrated techniques for performance and scalability in some of the more comprehensive applications are particularly useful. Although the Play framework serves a great application development platform, it’s of less interest from a product functionality perspective.

Akka Actor Systems

Akka is an open-source actor library/toolkit targeted for building scalable concurrent and distributed applications in Scala or Java. At the core of Akka are lightweight computational primitives called actors, each of which encapsulates state and behavior, and communicates via asynchronous immutable message passing.

It’s important to note that keeping messages immutable and non-blocking are some of the fundamental best-practices that the Actor model is designed for. In other words, they aren’t enforced by the model itself, hence it’s the developer’s responsibility to embrace the best practices. The underlying shared-nothing principle of actors with message passing the only means of interactions makes it an appealing concurrency model, as opposed to managing shared memory with locks in general thread-based concurrency.

Sample app #1: Pi approximation

To quickly grasp how to use Akka Actors to solve computational problems, it might be worth taking a look at a sample application for approximating value of pi (i.e. ?). Although the sample application consists of deprecated code, I still find it a nice example for understanding how to craft out the computational components as actors and coordinate partial result passing via messages among the actors.

Given the dated code, one might want to just skim through the source code of the application while skipping the syntactic details. It shows how easy it is to formulate a distributed computation scheme by making the computation workers (Worker), aggregator (Master) and output listener (Listener) as actors, each playing different roles. A couple of notes:

  1. In general, “tell” (i.e. fire-and-forget) is preferred to “ask” in sending messages for performance reason. It makes sense in this application since it’s an approximation task, hence failure of a worker in a rare case isn’t end of the world.
  2. Instead of having all actors defined in a single source file as in this example, actors are often defined separately in a slightly more complex application. It’s a common practice that actor classes are defined using companion objects in Scala. For instance, the Worker actor would be something like the following:
  object Worker {
    sealed trait PiApprox
    case class Work(start: Int, nrOfElements: Int) extends PiApprox
    case class Result(value: Double) extends PiApprox
  }

  class Worker extends Actor {
    def calculatePiFor(start: Int, nrOfElements: Int): Double = {
      ...
    }
    def receive = {
      case Work(start, nrOfElements) =>
        sender ! Result(calculatePiFor(start, nrOfElements))
    }
  }

Sample app #2: Reactive maps

Lightbend provides a functionality-rich sample application, reactive maps, that illustrates a number of features centered around an actor system, including:

  • GPS using HTML5 geolocation
  • Bot simulation on geospatial maps
  • Play’s handling WebSockets with actors
  • Actor dependency injection
  • Akka’s peer-to-peer cluster
  • Distributed publish/subscribe in cluster
  • Akka persistence and journal
  • Akka cluster sharding
  • Reactive application deployment

Like most of their sample application templates, reactive-maps comes with a tutorial that walks through the application. What I like about this one is that it starts with a more barebone working version and proceeds to enhance with more robust features. In the second half of the application walk-thru, a new feature for user travel distance tracking is created from scratch and then rewritten to address scalability issue by means of improved design of the associated actor as well as using of Akka persistence/journal and cluster sharding.

Due to the rather wide range of features involved in the application, it might take some effort to go over the whole walk-thru. Nevertheless, I think it’s a worthy exercise to pick up some neat techniques in building a real-world application using Scala and Akka.

Deprecated Akka persistence interface

The source code seems to be rather up to date, although the deprecated Akka persistence interface EventsourcedProcessor does generate some compiler warning. To fix it, use trait PersistenActor instead and override the now abstract PersistenceId method. The relevant code after the fix should be as follows:

/app/backend/UserMetaData.scala:

...
// import akka.persistence.EventsourcedProcessor
import akka.persistence.PersistentActor
...
// class UserMetaData extends EventsourcedProcessor {
class UserMetaData extends PersistentActor {
  ...
  override def persistenceId = self.path.toStringWithoutAddress
  override def receiveRecover: Receive = {
    ...
  }
  ...

Issue with dependency injection and cluster sharding

There is a bug caught during compilation that arises from the binding for UserMetaData actors in the Play module, Actors.scala, responsible for initializing actors for the web frontend. Dependency injection is used in the module to bind actors of backend role that need to be created from the backend. The cluster-sharded UserMetaData actors now need to be created with the ClusterSharding extension hence requiring a special binding. This new binding causes an exception as follows:

Unexpected exception CreationException: Unable to create injector, see the following errors:

Error in custom provider, java.lang.IllegalArgumentException:
  Shard type [UserMetaData] must be started first while locating actors.UserMetaDataProvider
  at actors.Actors.configure(Actors.scala:26)
  (via modules: com.google.inject.util.Modules$OverrideModule -> actors.Actors)
  while locating akka.actor.ActorRef annotated with @com.google.inject.name.Named(value=userMetaData)
Caused by: java.lang.IllegalArgumentException: Shard type [UserMetaData] must be started first
	at akka.contrib.pattern.ClusterSharding.shardRegion(ClusterSharding.scala:337)
	at actors.UserMetaDataProvider.get$lzycompute(Actors.scala:36)
	at actors.UserMetaDataProvider.get(Actors.scala:36)
	at actors.UserMetaDataProvider.get(Actors.scala:35)
	at com.google.inject.internal.ProviderInternalFactory.provision(ProviderInternalFactory.java:81)
        ...

It can be fixed by moving the ClusterSharding related code from class BackendActors into class UserMetaDataProvider, as follows:

/app/actors/Actors.scala:

...
class UserMetaDataProvider @Inject() (system: ActorSystem) extends Provider[ActorRef] {
  lazy val get = {
    if (Cluster(system).selfRoles.exists(r => r.startsWith("backend"))) {
      ClusterSharding(system).start(
        typeName = UserMetaData.shardName,
        entryProps = Some(UserMetaData.props),
        idExtractor = UserMetaData.idExtractor,
        shardResolver = UserMetaData.shardResolver)
    }
    else {
      ClusterSharding(system).start(
        typeName = UserMetaData.shardName,
        entryProps = None,
        idExtractor = UserMetaData.idExtractor,
        shardResolver = UserMetaData.shardResolver)
    }
    ClusterSharding(system).shardRegion(UserMetaData.shardName)
  }
}
...

Akka persistence journal dependency issue

Near the end of the example walk-thru is a custom journal setup using the key-value store Leveldb. However the setup fails at run-time with errors as follows:

[ERROR] [application-akka.actor.default-dispatcher-2] [ActorSystem(application)] Uncaught fatal error from thread
  [application-akka.actor.default-dispatcher-2] shutting down ActorSystem [application]
java.lang.NoSuchMethodError: com.google.common.io.Closeables.closeQuietly(Ljava/io/Closeable;)V
	at org.iq80.leveldb.impl.MMapLogWriter.close(MMapLogWriter.java:83)
	at org.iq80.leveldb.impl.VersionSet.initializeIfNeeded(VersionSet.java:111)
	at org.iq80.leveldb.impl.VersionSet.(VersionSet.java:91)
	at org.iq80.leveldb.impl.DbImpl.(DbImpl.java:167)
	at org.iq80.leveldb.impl.Iq80DBFactory.open(Iq80DBFactory.java:59)
	at akka.persistence.journal.leveldb.LeveldbStore$class.preStart(LeveldbStore.scala:112)
	at akka.persistence.journal.leveldb.SharedLeveldbStore.preStart(LeveldbStore.scala:127)
	...
[ERROR] [application-akka.actor.default-dispatcher-4] [akka://application/user/store] null
java.lang.NullPointerException
	at akka.persistence.journal.leveldb.LeveldbStore$class.postStop(LeveldbStore.scala:117)
	at akka.persistence.journal.leveldb.SharedLeveldbStore.postStop(LeveldbStore.scala:127)
	at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
	at akka.persistence.journal.leveldb.SharedLeveldbStore.aroundPostStop(LeveldbStore.scala:127)
        ...

Some relevant bug reports in the Akka community suggests that it’s a problem with LevelDB’s dependency on some dated version of Google Guava. Since the default journal seems to work fine and the custom setup isn’t for production-grade journal anyway, I’m going to skip it. Source code with the above changes can be found at GitHub. In a production environment, one would probably want to use Redis, PostgreSQL, HBase, etc, for the persistence journal.

Below is a screen-shot of the final version of the reactive-maps application.

Scala Akka Reactive Maps

Final thoughts

Despite the described glitches, Lightbend’s reactive-maps application is a well-thought-out tutorial, methodically stepping through the thought process from design to implementation, along with helpful remarks related to real-world performance and scalability. Even though the sample application is primarily for illustration, it’s no trivial hello-world and a lot of demonstrated techniques could be borrowed or repurposed in a production-grade actor system.

As mentioned earlier in the post, I think Akka actor with its shared-nothing principle and non-blocking message-passing communication is a great alternative to the thread-based shared-memory concurrency model in which deadlocks and expensive context switching could be dreadful to deal with. However, in building a well-designed actor-based application, it does require proper architectural work and discipline for best-practices to componentize tasks into lightweight actors which interact among themselves by means of immutable message passing.

On scalability, there are powerful features like actor cluster sharding and distributed publish/subscribe that allow one to build actor systems that can scale horizontally. And last but not least, Scala’s deep root in both object-oriented programming and functional programming makes it a effective tool for the coding task.

Database CRUD In Scala-Play

In a recent startup venture, I adopted Node.js and Python, both dynamic-typing programming platforms, in the core technology stack of a data-centric web application. While I like both platforms for what they’re inherently good at, I still have some biased preference towards static-typing languages. Scala was in my considered list of platforms before I eventually settled on Node.js. I wasn’t going to just forget about what I liked about Scala though.

Coming from a Math background, I have high regard for the benefit of functional programming. To me, Java has always been a great object-oriented programming (OOP) language for general-purpose application development. Although functional programming (FP) has been added since Java 8, the feature isn’t really an integral part of the language core. For that reason, I’m not going to start my pursuit of a static-typing programming platform that embodies OOP and FP with the Java platform.

Scala, Play and Reactive programming

Scala’s static-typing plus blending of object-oriented and functional programming in its language design make it an attractive programming language. The Play MVC framework, which comes with handy features like REST, asynchronous I/O in building web applications in Scala, has also picked up some momentum over the past few years. So, for general-purpose web-based application development, the Scala-Play combo sounds appealing for what I’m looking for.

Typesafe (being renamed to Lightbend as we speak) is a company founded by the authors of Scala and Akka. Advocating the Reactive programming paradigm, it provides a development tool Activator along with application templates to help streamline the development process, with emphasis in scalability, performance, resilience and non-blocking message-driven communication. The browser-based tool and reusable templates help make adopting the framework easier.

Data access layer: Anorm vs Squeryl vs Slick

There are a few libraries/APIs for the data access layer in the Scala-Play ecosystem. Anorm provides functionality for parsing and transformation of query results from embedded plain SQL. Squeryl is an ORM (object-relational mapping) that supports composition of queries and explicit data retrieval strategies. Slick is a FRM (functional-relational mapping) and virtually handles data access operations like using of Scala collections. After some quick review, I decided to try out Anorm.

As always, best way to get familiar with a programming platform is writing code. My hello-world application is a web application that handles database CRUD (create/read/update/delete) operations along with query pagination. Typesafe’s templates come in handy and help tremendously in providing the base code structure in various categories. There are templates with the data access layer using each of the three libraries. There is already a template with sample code for basic database operations and query pagination. That prompts me to bypass building the very basic stuff and instead focus on enhancement for expansion and maintainability.

Creating a project from Typesafe’s templates is straight forward. To run the Reactive development platform, simply launch its web server at the command line under the project root with the following command (or just doubleclick the activator executable from within a file browser):

./activator ui

Activator’s UI will be fired off on a web browser. You can compile and run the application via the UI and check out the launched application at http://localhost:9000/.

Looking under the hood

Based on the few templates I’ve downloaded, below is what a typical project root in the file system would look like:

path-to-project-root/
    build.sbt
    app/
        controllers/
        models/
        views/
    conf/
        application.conf
	routes
        evolutions/
    logs/
    project/
        build.properties
        plugins.sbt
    public/
    test/

Like many other web-based MVC frameworks, much of the file system structure (app/, logs/, public/, test/) is pretty self-explanatory. The *.sbt files contain project-specific build scripts and package dependencies. Routing is configured within file conf/routes. The conf/evolutions/ subdirectory is for tracking database evolution scripts.

Despite my limited experience in Scala, the overall code included in the template is pretty easy to grasp. It’s self-contained and equipped with sample data, scripts for data schema, Scala modules for models, views and controllers, and even jQuery and Bootstrap libaries. After getting a good understanding of the skeletal code in the framework, I came up with a list of high-level changes to be made:

  1. Expand the schema with multiple relational entities
  2. Apply Anorm’s parsing, filtering and query pagination to multiple views
  3. Modularize controller code into multiple controllers
  4. Add a navigation bar with some simple Bootstrap UI enhancement

All of the above are pretty straight forward. A simple 3-table relational data model (song –N:1– musician –N:1– country) is created. SQL scripts for populating the tables are created under conf/evolutions/default/*.sql to make use of Play’s database evolution scripting mechanism. Forms are created for creating and editing songs and musicians. Filtering and query pagination are applied to the song lists and musician lists. Multiple controllers are created for modularity and maintainability.

SQL statements in Anorm

Plain SQL statements can be directly embedded into Play’s model. For instance, the update() function in app/models/Song.scala for updating the song table is as simple as follows:

    def update(id: Long, song: Song) = {
        DB.withConnection { implicit connection =>
            SQL(
                """
                    update song
                    set name = {name}, released = {released}, musician_id = {musician_id}
                    where id = {id}
                """
            ).on(
                'id -> id,
                'name -> song.name,
                'released -> song.released,
                'musician_id -> song.musicianId
            ).executeUpdate()
        }
    }

Anorm SqlParser

Anorm appears to be a lean and mean library that allows developers to directly embed SQL statements into the application. At the same time, it provides flexible methods for parsing and transforming query results. One useful feature in Scala is its parser combinator which allows you to chain parsers to perform sequence of parsings of arbitrary text. For example, the following snippet in app/models/Song.scala shows the using of sequential parser combinator (~) to parse result set of the query from table “song”:

object Song {
    val simple = {
        get[Option[Long]]("song.id") ~
        get[String]("song.name") ~
        get[Option[Date]]("song.released") ~
        get[Option[Long]]("song.musician_id") map {
            case id ~ name ~ released ~ musicianId => Song(id, name, released, musicianId)
        }
    }
    ...
}

Both get[T] and the parser combinator (~) are methods defined within SqlParser as part of Anorm’s API:

def get[T](columnName: String)(implicit extractor: anorm.Column[T]): RowParser[T] = RowParser {
    ...
}

object RowParser {
    def apply[A](f: Row => SqlResult[A]): RowParser[A] = new RowParser[A] {
        def apply(row: Row): SqlResult[A] = f(row)
    }
}

trait RowParser[+A] extends (Row) => SqlResult[A] {
    parent =>
    ...
    def ~[B](p: RowParser[B]): RowParser[A ~ B] = RowParser(row => parent(row).flatMap(a => p(row).map(new ~(a, _))))
    ...
}

Query pagination

Pagination is done in an elegant fashion by means of a helper case class:

case class Page[A](items: Seq[A], page: Int, offset: Long, total: Long) {
    lazy val prev = Option(page - 1).filter(_ >= 0)
    lazy val next = Option(page + 1).filter(_ => (offset + items.size) < total)
}

The list() function in app/models/Song.scala is then defined with type Page[(Song, Option[Musician])]:

    def list(page: Int = 0, pageSize: Int = 10, orderBy: Int = 1, filter: String = "%"): Page[(Song, Option[Musician])] = {
        val offset = pageSize * page

        DB.withConnection { implicit connection =>
            val songs = SQL(
                """
                    select * from song 
                    left join musician on song.musician_id = musician.id
                    where song.name like {filter}
                    order by {orderBy} nulls last
                    limit {pageSize} offset {offset}
                """
            ).on(
                'pageSize -> pageSize, 
                'offset -> offset,
                'filter -> filter,
                'orderBy -> orderBy
            ).as(Song.withMusician *)

            val totalRows = SQL(
                """
                    select count(*) from song 
                    left join musician on song.musician_id = musician.id
                    where song.name like {filter}
                """
            ).on(
                'filter -> filter
            ).as(scalar[Long].single)

            Page(songs, page, offset, totalRows)
        }
    }

And the song list view, app/views/listSongs.scala.html, displays the song page information passed as currentPage, of type Page[(Song, Option[Musician])].

Passing request header as Implicit argument

A navigation bar is added to the main html template, app/views/main.scala.html. To highlight the active <li> items on the nav-bar, Bootstrap’s active class is used. But since the html pages are all rendered from the server-side in this application, request header needs to be passed from the controllers to the associated views (forms, list pages, etc), which in turn pass the header to the main html template where the nav-bar is defined. In Scala-Play, it can be effectively done by passing the request header as the last Implicit argument (which minimizes breaking any existing code). For instance, the argument list within app/views/editSong.scala.html will be as follows:

@(id: Long, songForm: Form[Song], musicians : Seq[(String, String)])(implicit messages: Messages, request: RequestHeader)

The request header passed to the main html template will then be consumed as highlighted below:

@(content: Html)(implicit messages: Messages, request: RequestHeader)
<!DOCTYPE html>
<html>
    <head>
        ...
    </head>
    <body>
        <header>
            ...
            <nav class="navbar navbar-default">
                <div class="container-fluid">
                <ul class="nav navbar-nav">
                    <li class="@{"active".when(request.path == routes.SongController.list(0, 2, "").toString())}"><a href="@routes.SongController.list(0, 2, "")">Songs</a></li>
                    <li class="@{"active".when(request.path == routes.MusicianController.list(0, 2, "").toString())}"><a href="@routes.MusicianController.list(0, 2, "")">Musicians</a></li>
                </ul>
                </div>
            </nav>
        </header>        
        ...
    </body>
</html>

Complete source code of the Scala-Play project is at GitHub.

My thoughts on Scala

Scala runs on JVM (Java virtual machine) and supports standard Java libraries. Based on some quick review of Scala I did a couple of years ago, if you want to quickly pick up the language’s basics, consider Scala School. For a more comprehensive introduction of Scala, try Programming in Scala.

Unless you’re already familiar with some functional programming language like Haskell, Erlang, it isn’t the easiest language to pick up and takes some getting-used-to to read Scala code. But the apparent inconsistency in certain aspects of the language is what makes it hard to learn the language. For instance, the Scala language seems to support semicolon inference in a inconsistent fashion. Here’s an example:

val files = Array(new java.io.File("/tmp/a.txt"), new java.io.File("/tmp/b.z"), new java.io.File("/tmp/c.txt"))

def lines(file: java.io.File) = scala.io.Source.fromFile(file).getLines().toList

def grep(pattern: String, fileExt: String) =
    for (
        file <- files if file.getName.endsWith(fileExt)
        line <- lines(file)
        trimmed = line.trim if trimmed.matches(pattern)
    ) println(file + ": " + trimmed)

The above snippet will fail because the compiler won’t infer semicolons within for(…). Switching to for{…} will fix the problem as semicolons are inferred in {} blocks.

There is also some inconsistency in whether type inference is supported within an argument list versus across curried argument lists, as illustrated in this blog post by Paul Chiusano.

One must also make room in memory to memorize loads of symbols for various meanings in the language, as Scala loves smileys. A few examples below:

<:      Upper type bound
>:      Lower type bound
<%      View bound
^^      Transformation parser combinator
: =>    By-name argument

Conclusion

Nevertheless, I like Scala as an OOP + FP platform by design. I also like how the Play framework along with TypeSafe’s Activator provides the Reactive application development platform and streamlines development process for contemporary web-based applications.

Many other programming language platforms (Java, Ruby, etc) out there support OOP + FP to various extents, but Scala is still one of the few static-typing platforms providing a solid hybrid of OOP and FP. In addition, running on a JVM and supporting Java libraries are really a big plus for Scala.

Having various choices of libraries/APIs in the data access layer allows engineers to pick what’s best for their needs. If you want an ORM, go for Squeryl; functional all the way, use Slick; embedded plain SQL with versatile parsing/filtering functions, Anorm it is.

Then, there is Akka’s Actor-based concurrency model included in Scala’s standard library. Actors are computational primitives that encapsulate state and behavior, and communicate via asynchronous message passing. The simple yet robust Actor model equipped with Scala’s OOP + FP programming semantics creates a powerful tool for building scalable distributed systems.