Monthly Archives: April 2021

Akka Typed: Scheduler, PubSub

This is the final post of the 3-part blog series that centers around migrating an actor-based blockchain application from Akka classic to Akka Typed. In the previous post, we looked at the difference between the two APIs in starting actors and in tell-ing and ask-ing. This time, we’re going to cover the following topics:

  • Scheduler
  • Distributed PubSub

Scheduler

Akka classic’s Scheduler feature provides task-scheduling methods for recurring schedule (e.g. scheduleAtFixedRate(), which replaces the deprecated schedule()) or one-time schedule (scheduleOnce()), each with a few signature variants. For example, scheduleOnce() has more than a couple of different method signatures for different use cases:

// Schedule once for a `f: => Unit` by-name parameter
final def scheduleOnce(delay: FiniteDuration)(f: => Unit)(implicit executor: ExecutionContext): Cancellable

// Schedule once for a `Runnable`
def scheduleOnce(delay: Duration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable

// Schedule once for sending a message to another actor
final def scheduleOnce(delay: FiniteDuration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable

The blockchain application uses schedulers in a number of places. For instance, to ensure that the potentially time-consuming proof generating process won’t take more than a set duration, the generatePoW() method within actor Miner uses scheduleOnce() to try complete a Promise with a timeout exception at a scheduled time:

private def generatePoW(block: Block)(implicit ec: ExecutionContext, timeout: FiniteDuration): Future[Long] = {
  val promise = Promise[Long]()
  context.system.scheduler.scheduleOnce(timeout){ promise tryFailure new TimeoutException(s"$block: $timeout") }
  Future{
    Try{
      val incrementedNonce =
        ProofOfWork.generateProof(bytesToBase64(block.hash), defaultDifficulty, defaultNonce)
      promise success incrementedNonce
    }.
    recover{
      case e: Exception => promise failure e
    }
  }
  promise.future
}

Similar to its counterpart in the classic API, Akka Typed Scheduler also provides task-scheduling methods for recurring and one-time schedules. Here’s the method signature of the typed version of scheduleOnce():

// Schedule once for a `Runnable`
abstract def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(
    implicit executor: ExecutionContext): Cancellable

The variant that takes a f: => Unit by-name parameter is no longer available in Akka Typed. However, since Runnable only has a single abstract method (SAM), run(), the following expressions are essentially the same:

// Invoke scheduleOnce() with a Runnable object
context.system.scheduler.scheduleOnce(timeout, new Runnable { override def run() = ??? })

// Invoke scheduleOnce() with a `() => Unit` by-name parameter
context.system.scheduler.scheduleOnce(timeout, () => ???)

For example, the typed version of the proof-of-work generating method generatePoW() could be coded as follows:

private def generatePoW(block: Block)(implicit ec: ExecutionContext, timeout: FiniteDuration): Future[Long] = {
  val promise = Promise[Long]()
  context.system.scheduler.scheduleOnce(
    timeout, () => promise tryFailure new TimeoutException(s"$block: $timeout")
  )
  Future{
    Try{
      val incrementedNonce =
        ProofOfWork.generateProof(bytesToBase64(block.hash), defaultDifficulty, defaultNonce)
      promise success incrementedNonce
    }.
    recover{
      case e: Exception => promise failure e
    }
  }
  promise.future
}

Also seemingly missing is the other variant for sending a message to another actor. In fact, it has been moved into ActorContext as a separate scheduleOnce() method which can be invoked from within a typed actor’s context for scheduling a one-time delivery of message to another actor:

// Typed ActorContext's schedule once for sending a message to another actor
abstract def scheduleOnce[U](delay: FiniteDuration, target: ActorRef[U], msg: U): Cancellable

The above schedulers are all thread-safe. There is also this TimerScheduler that is not thread-safe, hence should be used within the actor that owns it. An advantage of using TimerScheduler, typically via Behaviors.withTimers(), is that it’s bound to the lifecycle of the containing actor and will be cancelled automatically when the actor is stopped.

Distributed PubSub

A cryptocurrency typically maintains a decentralized ledger as distributed copies of a growing blockchain kept by individual nodes on the system. The blockchain application being used for this actor migration exercise achieves that by means of running a Distributed Publish Subscribe service on an Akka cluster. Named topics (in this case “new-transactions” and “new-block”) can be created and subscribers to a given topic will be sent objects submitted to the topic by the publishers.

In the Akka classic API, the mediator actor, DistributedPubSubMediator, which is supposed to be started on each of the allocated cluster nodes is responsible for managing a registry of actor references and replicating the entries to peer actors among the nodes.

Below is how the mediator actor started from within the Blockchainer actor of a cluster node registers subscribers (in this case the Blockchainer actor itself) and takes published topical objects to be consumed by peer cluster nodes:

// Akka classic pubsub mediator
val mediator: ActorRef = DistributedPubSub(context.system).mediator
mediator ! Subscribe("new-transactions", self)
mediator ! Subscribe("new-block", self)
// ...
mediator ! Publish("new-transactions", AddTransactions(trans, append))
mediator ! Publish("new-block", UpdateBlockchain(block))

The Akka Typed pubsub abandons the “universal” mediator actor in favor of “topic-specific” actors. The typed version of Distributed PubSub functionality initiated from within the Blockchainer actor of a cluster node is as follows:

// Akka Typed pubsub.Topic
val topicTrans = context.spawn(Topic[Req]("new-transactions"), "pubsubNewTransactions")
val topicBlock = context.spawn(Topic[Req]("new-block"), "pubsubNewBlock")
topicTrans ! Topic.Subscribe(context.self)
topicBlock ! Topic.Subscribe(context.self)
//...
topicTrans ! Topic.Publish(AddTransactions(trans, append))
topicBlock ! Topic.Publish(UpdateBlockchain(block))

Final thoughts

This concludes the mini blog series that covers the basics and selected features of Akka Typed actors. The resulting blockchain application in Akka Typed will be published on GitHub along with an overview in a separate blog.

The term “behavior” is commonly used in the standard Actor Model when describing how actors operate and mutate their internal states in accordance with the business logic. As can be seen across sample snippets in this blog series, it’s ubiquitous in the Akka Typed API, with all the Behavior-typed methods from the Behaviors factory object spanning the entire lifecycle of actors. In comparison with putting all actor business logic inside the receive partial function “blackbox” in Akka classic, it does make the using of the various methods more intuitive, especially for new comers.

With some self-discipline in sticking to programming best-practices, the loosely-typed Akka classic API has an advantage of embracing complex non-blocking message processing functionality with minimal boilerplate code. The message “loop” within an actor in the form of a partial function along with the hotswapping feature via context.become provides a simple yet robust construct for processing messages. I’m certainly going to miss its simplicity and elegance. That being said, moving towards the typed Akka API is inevitable if one plans to use Akka actors not just for a one-time project. It’s the right direction for sticking to idiomatic functional programming.

Akka Typed: Spawn, Tell, Ask

This is the 2nd post of the 3-part blog series about migrating an Akka classic actor-based application to one with Akka typed actors. In the 1st post, we looked at how to convert a classic actor into a typed actor.

Obviously, the goal of this mini blog series isn’t to cover all of the classic-to-typed-actors migration how-to’s, given the vast feature set Akka provides. The application being migrated is a blockchain application that mimics mining of a decentralized cryptocurrency. We’re going to cover just the key actor features used by the application, namely:

  • Starting an actor
  • Tell
  • Ask
  • Scheduler
  • Distributed PubSub

In this blog post, we’ll go over the first three bullet items.

Starting an Akka actor

In the Akka classic API, context method actorOf() starts an actor with its “properties” provided by the configuration factory Props. For example, actor Miner is started from within its parent actor as follows:

// Starting classic actor `Miner`
val miner: ActorRef = context.actorOf(Miner.props(accountKey, timeoutPoW), "miner")

It’s common to provide the actor class properties for Props by means of an actor’s companion object method especially when the actor class takes parameters.

// Classic actor `Miner` class
object Miner {
  def props(accountKey: String, timeoutPoW: Long): Props = Props(classOf[Miner], accountKey, timeoutPoW)
  // ...
}

class Miner(accountKey: String, timeoutPoW: Long) extends Actor with ActorLogging {
  // ...
}

Starting an actor in Akka Typed is performed using actor context method spawn(). The typed version of actor Miner would be started from within its parent actor like below:

// Starting typed actor `Miner`
val miner: ActorRef[Miner.Mining] = context.spawn(Miner(accountKey, timeoutPoW), "miner")

An actor’s underlying class properties can now be defined using Behavior methods, hence the Props configuration factory is no longer needed. Below is how the typed Miner can be defined using method Behaviors.setup() from within the actor’s companion object:

// Typed actor `Miner` class
object Miner {
  // ...
  def apply(accountKey: String, timeoutPoW: Long): Behavior[Mining] =
    Behaviors.setup(context =>
      new Miner(context, accountKey, timeoutPoW).messageLoop()
    )
}

class Miner private(context: ActorContext[Miner.Mining], accountKey: String, timeoutPoW: Long) {
  // ...
}

Starting top-level actors

What about in the main program before any actors have been created? In Akka classic API, one can simply invoke actorOf() from the ActorSystem to start main actors. Below is how the main program of a blockchain mining application spawn the top-level actors for mining simulations.

object Main {
  def main(args: Array[String]): Unit = {
    // Parse `args` and load configurations for the cluster and main actors
    // ...

    implicit val system = ActorSystem("blockchain", conf)
    val blockchainer = system.actorOf(
        Blockchainer.props(minerAccountKey, timeoutMining, timeoutValidation), "blockchainer"
      )
    val simulator = system.actorOf(
        Simulator.props(blockchainer, transFeedInterval, miningAvgInterval), "simulator"
      )

    // ...
  }
}

Perhaps for consistency reasons, Akka Typed makes actors always started from within an actor context using method spawn(), thus an explicit ActorContext is needed for top-level main actors. The following snippet shows how the typed version of the blockchain application delegates to the Starter actor for starting the main actors after the main program has loaded actor properties from program arguments and configuration file. The top-level user-defined actor Starter is regarded as the “user guardian”.

object Main {
  object Starter {
    def apply(accountKey: String, timeoutMining: Long, timeoutValidation: Long,
              transFeedInterval: Long, miningAvgInterval: Long, test: Boolean): Behavior[NotUsed] =
      Behaviors.setup { context =>
        Cluster(context.system)

        val blockchainer = context.spawn(Blockchainer(accountKey, timeoutMining, timeoutValidation), "blockchainer")
        val simulator = context.spawn(Simulator(blockchainer, transFeedInterval, miningAvgInterval), "simulator")

        if (test)
          simulator ! Simulator.QuickTest
        else
          simulator ! Simulator.MiningLoop

        Behaviors.receiveSignal { case (_, Terminated(_)) => Behaviors.stopped }
      }
  }

  def main(args: Array[String]): Unit = {
    // Parse `args` and load configurations for the cluster and main actors
    // ...

    implicit val system = ActorSystem(
      Starter(minerAccountKey, timeoutMining, timeoutValidation, transFeedInterval, miningAvgInterval, test),
      "blockchain",
      conf
    )
  }
}

The fire-and-forget “tell”

The most common communication means among actors is via method tell in a fire-and-forget fashion, which in essence implies that messages are sent with an at-most-once guarantee.

Akka classic tell and the symbolic ! variant have the following method signatures:

// Akka classic methods `tell` and `!`
final def tell(msg: Any, sender: ActorRef): Unit
abstract def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit

In Akka Typed, methods tell and ! have signatures as follows:

// Akka Typed methods `tell` and `!`
abstract def tell(msg: T): Unit
def !(msg: T): Unit

Though messages being sent are now strictly typed in the new API, expressions consisting of tell in Akka classic and Akka Typed, both returning a Unit, essentially look and feel the same.

The request-response query “ask”

The other commonly used communication means among actors is the request-response query via method ask.

Below are method signatures of Akka classic ask and ?:

// Akka classic methods `ask` and `?`
def ask(actorSelection: ActorSelection, message: Any)(implicit timeout: Timeout): Future[Any]
def ?(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any]

Here’s how the Blockchainer actor uses the classic ask to request for a new mined block from actor Miner and handle the Future query result:

(miner ? Miner.Mine(blockPrev, trans))(tmoMining).mapTo[Block] onComplete{
  case Success(block) =>
    mediator ! Publish("new-block", UpdateBlockchain(block))
    miner ! Miner.DoneMining
  case Failure(e) =>
    log.error(s"[Req.Mining] ${this}: ERROR: $e")
    e match {
      case _: BusyException => self ! AddTransactions(trans, append = false)
      case _ => miner ! Miner.DoneMining
    }
}

Note that the tmoMining timeout value is being explicit passed in as a method argument and mapTo[Block] is necessary for mapping the returned Future[Any] to the proper type.

In addition, there are also a few general-purpose AskSupport methods allowing a query between two actors both specified as parameters. Here’s the method signature of one of the ask variants:

// Akka classic sender-explicit `ask`
def ask(actorRef: ActorRef, message: Any, sender: ActorRef)(implicit timeout: Timeout): Future[Any]

As for Akka Typed, a context method ask is provided with the following signature:

// Akka Typed context method `ask`
abstract def ask[Req, Res](target: RecipientRef[Req], createRequest: (ActorRef[Res]) => Req)(
    mapResponse: (Try[Res]) => T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit

Below is how the typed version of actor Blockchainer would use context ask to query typed actor Miner:

implicit val tmoMining: Timeout = Timeout(timeoutMining.millis)
context.ask(miner, ref => Miner.Mine(blockPrev, trans, ref)) {
  case Success(r) =>
    r match {
      case MiningResult(block) =>
        topicBlock ! Topic.Publish(UpdateBlockchain(block))
        miner ! Miner.DoneMining
        MiningResult(block)
      case _ =>
        OtherException(s"Unknown mining result $r")
    }
  case Failure(e) =>
    context.log.error(s"[Req.Mining] ${this}: ERROR: $e")
    e match {
      case _: BusyException =>
        context.self ! AddTransactions(trans, append = false)
        BusyException(e.getMessage)
      case _ =>
        miner ! Miner.DoneMining
        OtherException(e.getMessage)
    }
}

Note that context ask doesn’t return the query result as a Future to be handled by a callback such as onComplete. Rather, it expects one to handle the query response by providing a Try[Res] => T function.

Akka Typed also provides AskPattern methods that return Futures with below method signatures:

// Akka Typed Future-returning methods `ask` and `?`
def ask[Res](replyTo: (ActorRef[Res]) => Req)(implicit timeout: Timeout, scheduler: Scheduler): Future[Res]
def ?[Res](replyTo: (ActorRef[Res]) => Req)(implicit timeout: Timeout, scheduler: Scheduler): Future[Res]

That’s all for this post. In the next blog, we’ll wrap up this mini blog series with the remaining topics (i.e. scheduler and distributed pubsub).