Akka Typed: Spawn, Tell, Ask

This is the 2nd post of the 3-part blog series about migrating an Akka classic actor-based application to one with Akka typed actors. In the 1st post, we looked at how to convert a classic actor into a typed actor.

Obviously, the goal of this mini blog series isn’t to cover all of the classic-to-typed-actors migration how-to’s, given the vast feature set Akka provides. The application being migrated is a blockchain application that mimics mining of a decentralized cryptocurrency. We’re going to cover just the key actor features used by the application, namely:

  • Starting an actor
  • Tell
  • Ask
  • Scheduler
  • Distributed PubSub

In this blog post, we’ll go over the first three bullet items.

Starting an Akka actor

In the Akka classic API, context method actorOf() starts an actor with its “properties” provided by the configuration factory Props. For example, actor Miner is started from within its parent actor as follows:

// Starting classic actor `Miner`
val miner: ActorRef = context.actorOf(Miner.props(accountKey, timeoutPoW), "miner")

It’s common to provide the actor class properties for Props by means of an actor’s companion object method especially when the actor class takes parameters.

// Classic actor `Miner` class
object Miner {
  def props(accountKey: String, timeoutPoW: Long): Props = Props(classOf[Miner], accountKey, timeoutPoW)
  // ...
}

class Miner(accountKey: String, timeoutPoW: Long) extends Actor with ActorLogging {
  // ...
}

Starting an actor in Akka Typed is performed using actor context method spawn(). The typed version of actor Miner would be started from within its parent actor like below:

// Starting typed actor `Miner`
val miner: ActorRef[Miner.Mining] = context.spawn(Miner(accountKey, timeoutPoW), "miner")

An actor’s underlying class properties can now be defined using Behavior methods, hence the Props configuration factory is no longer needed. Below is how the typed Miner can be defined using method Behaviors.setup() from within the actor’s companion object:

// Typed actor `Miner` class
object Miner {
  // ...
  def apply(accountKey: String, timeoutPoW: Long): Behavior[Mining] =
    Behaviors.setup(context =>
      new Miner(context, accountKey, timeoutPoW).messageLoop()
    )
}

class Miner private(context: ActorContext[Miner.Mining], accountKey: String, timeoutPoW: Long) {
  // ...
}

Starting top-level actors

What about in the main program before any actors have been created? In Akka classic API, one can simply invoke actorOf() from the ActorSystem to start main actors. Below is how the main program of a blockchain mining application spawn the top-level actors for mining simulations.

object Main {
  def main(args: Array[String]): Unit = {
    // Parse `args` and load configurations for the cluster and main actors
    // ...

    implicit val system = ActorSystem("blockchain", conf)
    val blockchainer = system.actorOf(
        Blockchainer.props(minerAccountKey, timeoutMining, timeoutValidation), "blockchainer"
      )
    val simulator = system.actorOf(
        Simulator.props(blockchainer, transFeedInterval, miningAvgInterval), "simulator"
      )

    // ...
  }
}

Perhaps for consistency reasons, Akka Typed makes actors always started from within an actor context using method spawn(), thus an explicit ActorContext is needed for top-level main actors. The following snippet shows how the typed version of the blockchain application delegates to the Starter actor for starting the main actors after the main program has loaded actor properties from program arguments and configuration file. The top-level user-defined actor Starter is regarded as the “user guardian”.

object Main {
  object Starter {
    def apply(accountKey: String, timeoutMining: Long, timeoutValidation: Long,
              transFeedInterval: Long, miningAvgInterval: Long, test: Boolean): Behavior[NotUsed] =
      Behaviors.setup { context =>
        Cluster(context.system)

        val blockchainer = context.spawn(Blockchainer(accountKey, timeoutMining, timeoutValidation), "blockchainer")
        val simulator = context.spawn(Simulator(blockchainer, transFeedInterval, miningAvgInterval), "simulator")

        if (test)
          simulator ! Simulator.QuickTest
        else
          simulator ! Simulator.MiningLoop

        Behaviors.receiveSignal { case (_, Terminated(_)) => Behaviors.stopped }
      }
  }

  def main(args: Array[String]): Unit = {
    // Parse `args` and load configurations for the cluster and main actors
    // ...

    implicit val system = ActorSystem(
      Starter(minerAccountKey, timeoutMining, timeoutValidation, transFeedInterval, miningAvgInterval, test),
      "blockchain",
      conf
    )
  }
}

The fire-and-forget “tell”

The most common communication means among actors is via method tell in a fire-and-forget fashion, which in essence implies that messages are sent with an at-most-once guarantee.

Akka classic tell and the symbolic ! variant have the following method signatures:

// Akka classic methods `tell` and `!`
final def tell(msg: Any, sender: ActorRef): Unit
abstract def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit

In Akka Typed, methods tell and ! have signatures as follows:

// Akka Typed methods `tell` and `!`
abstract def tell(msg: T): Unit
def !(msg: T): Unit

Though messages being sent are now strictly typed in the new API, expressions consisting of tell in Akka classic and Akka Typed, both returning a Unit, essentially look and feel the same.

The request-response query “ask”

The other commonly used communication means among actors is the request-response query via method ask.

Below are method signatures of Akka classic ask and ?:

// Akka classic methods `ask` and `?`
def ask(actorSelection: ActorSelection, message: Any)(implicit timeout: Timeout): Future[Any]
def ?(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any]

Here’s how the Blockchainer actor uses the classic ask to request for a new mined block from actor Miner and handle the Future query result:

(miner ? Miner.Mine(blockPrev, trans))(tmoMining).mapTo[Block] onComplete{
  case Success(block) =>
    mediator ! Publish("new-block", UpdateBlockchain(block))
    miner ! Miner.DoneMining
  case Failure(e) =>
    log.error(s"[Req.Mining] ${this}: ERROR: $e")
    e match {
      case _: BusyException => self ! AddTransactions(trans, append = false)
      case _ => miner ! Miner.DoneMining
    }
}

Note that the tmoMining timeout value is being explicit passed in as a method argument and mapTo[Block] is necessary for mapping the returned Future[Any] to the proper type.

In addition, there are also a few general-purpose AskSupport methods allowing a query between two actors both specified as parameters. Here’s the method signature of one of the ask variants:

// Akka classic sender-explicit `ask`
def ask(actorRef: ActorRef, message: Any, sender: ActorRef)(implicit timeout: Timeout): Future[Any]

As for Akka Typed, a context method ask is provided with the following signature:

// Akka Typed context method `ask`
abstract def ask[Req, Res](target: RecipientRef[Req], createRequest: (ActorRef[Res]) => Req)(
    mapResponse: (Try[Res]) => T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit

Below is how the typed version of actor Blockchainer would use context ask to query typed actor Miner:

implicit val tmoMining: Timeout = Timeout(timeoutMining.millis)
context.ask(miner, ref => Miner.Mine(blockPrev, trans, ref)) {
  case Success(r) =>
    r match {
      case MiningResult(block) =>
        topicBlock ! Topic.Publish(UpdateBlockchain(block))
        miner ! Miner.DoneMining
        MiningResult(block)
      case _ =>
        OtherException(s"Unknown mining result $r")
    }
  case Failure(e) =>
    context.log.error(s"[Req.Mining] ${this}: ERROR: $e")
    e match {
      case _: BusyException =>
        context.self ! AddTransactions(trans, append = false)
        BusyException(e.getMessage)
      case _ =>
        miner ! Miner.DoneMining
        OtherException(e.getMessage)
    }
}

Note that context ask doesn’t return the query result as a Future to be handled by a callback such as onComplete. Rather, it expects one to handle the query response by providing a Try[Res] => T function.

Akka Typed also provides AskPattern methods that return Futures with below method signatures:

// Akka Typed Future-returning methods `ask` and `?`
def ask[Res](replyTo: (ActorRef[Res]) => Req)(implicit timeout: Timeout, scheduler: Scheduler): Future[Res]
def ?[Res](replyTo: (ActorRef[Res]) => Req)(implicit timeout: Timeout, scheduler: Scheduler): Future[Res]

That’s all for this post. In the next blog, we’ll wrap up this mini blog series with the remaining topics (i.e. scheduler and distributed pubsub).

Leave a Reply

Your email address will not be published. Required fields are marked *