This is the 2nd post of the 3-part blog series about migrating an Akka classic actor-based application to one with Akka typed actors. In the 1st post, we looked at how to convert a classic actor into a typed actor.
Obviously, the goal of this mini blog series isn’t to cover all of the classic-to-typed-actors migration how-to’s, given the vast feature set Akka provides. The application being migrated is a blockchain application that mimics mining of a decentralized cryptocurrency. We’re going to cover just the key actor features used by the application, namely:
- Starting an actor
- Tell
- Ask
- Scheduler
- Distributed PubSub
In this blog post, we’ll go over the first three bullet items.
Starting an Akka actor
In the Akka classic API, context method actorOf()
starts an actor with its “properties” provided by the configuration factory Props
. For example, actor Miner
is started from within its parent actor as follows:
// Starting classic actor `Miner` val miner: ActorRef = context.actorOf(Miner.props(accountKey, timeoutPoW), "miner")
It’s common to provide the actor class properties for Props
by means of an actor’s companion object method especially when the actor class takes parameters.
// Classic actor `Miner` class object Miner { def props(accountKey: String, timeoutPoW: Long): Props = Props(classOf[Miner], accountKey, timeoutPoW) // ... } class Miner(accountKey: String, timeoutPoW: Long) extends Actor with ActorLogging { // ... }
Starting an actor in Akka Typed is performed using actor context method spawn()
. The typed version of actor Miner
would be started from within its parent actor like below:
// Starting typed actor `Miner` val miner: ActorRef[Miner.Mining] = context.spawn(Miner(accountKey, timeoutPoW), "miner")
An actor’s underlying class properties can now be defined using Behavior
methods, hence the Props
configuration factory is no longer needed. Below is how the typed Miner
can be defined using method Behaviors.setup()
from within the actor’s companion object:
// Typed actor `Miner` class object Miner { // ... def apply(accountKey: String, timeoutPoW: Long): Behavior[Mining] = Behaviors.setup(context => new Miner(context, accountKey, timeoutPoW).messageLoop() ) } class Miner private(context: ActorContext[Miner.Mining], accountKey: String, timeoutPoW: Long) { // ... }
Starting top-level actors
What about in the main program before any actors have been created? In Akka classic API, one can simply invoke actorOf()
from the ActorSystem
to start main actors. Below is how the main
program of a blockchain mining application spawn the top-level actors for mining simulations.
object Main { def main(args: Array[String]): Unit = { // Parse `args` and load configurations for the cluster and main actors // ... implicit val system = ActorSystem("blockchain", conf) val blockchainer = system.actorOf( Blockchainer.props(minerAccountKey, timeoutMining, timeoutValidation), "blockchainer" ) val simulator = system.actorOf( Simulator.props(blockchainer, transFeedInterval, miningAvgInterval), "simulator" ) // ... } }
Perhaps for consistency reasons, Akka Typed makes actors always started from within an actor context using method spawn()
, thus an explicit ActorContext
is needed for top-level main actors. The following snippet shows how the typed version of the blockchain application delegates to the Starter
actor for starting the main actors after the main program has loaded actor properties from program arguments and configuration file. The top-level user-defined actor Starter
is regarded as the “user guardian”.
object Main { object Starter { def apply(accountKey: String, timeoutMining: Long, timeoutValidation: Long, transFeedInterval: Long, miningAvgInterval: Long, test: Boolean): Behavior[NotUsed] = Behaviors.setup { context => Cluster(context.system) val blockchainer = context.spawn(Blockchainer(accountKey, timeoutMining, timeoutValidation), "blockchainer") val simulator = context.spawn(Simulator(blockchainer, transFeedInterval, miningAvgInterval), "simulator") if (test) simulator ! Simulator.QuickTest else simulator ! Simulator.MiningLoop Behaviors.receiveSignal { case (_, Terminated(_)) => Behaviors.stopped } } } def main(args: Array[String]): Unit = { // Parse `args` and load configurations for the cluster and main actors // ... implicit val system = ActorSystem( Starter(minerAccountKey, timeoutMining, timeoutValidation, transFeedInterval, miningAvgInterval, test), "blockchain", conf ) } }
The fire-and-forget “tell”
The most common communication means among actors is via method tell
in a fire-and-forget fashion, which in essence implies that messages are sent with an at-most-once guarantee.
Akka classic tell
and the symbolic !
variant have the following method signatures:
// Akka classic methods `tell` and `!` final def tell(msg: Any, sender: ActorRef): Unit abstract def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit
In Akka Typed, methods tell
and !
have signatures as follows:
// Akka Typed methods `tell` and `!` abstract def tell(msg: T): Unit def !(msg: T): Unit
Though messages being sent are now strictly typed in the new API, expressions consisting of tell
in Akka classic and Akka Typed, both returning a Unit
, essentially look and feel the same.
The request-response query “ask”
The other commonly used communication means among actors is the request-response query via method ask
.
Below are method signatures of Akka classic ask
and ?
:
// Akka classic methods `ask` and `?` def ask(actorSelection: ActorSelection, message: Any)(implicit timeout: Timeout): Future[Any] def ?(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any]
Here’s how the Blockchainer
actor uses the classic ask
to request for a new mined block from actor Miner and handle the Future
query result:
(miner ? Miner.Mine(blockPrev, trans))(tmoMining).mapTo[Block] onComplete{ case Success(block) => mediator ! Publish("new-block", UpdateBlockchain(block)) miner ! Miner.DoneMining case Failure(e) => log.error(s"[Req.Mining] ${this}: ERROR: $e") e match { case _: BusyException => self ! AddTransactions(trans, append = false) case _ => miner ! Miner.DoneMining } }
Note that the tmoMining
timeout value is being explicit passed in as a method argument and mapTo[Block]
is necessary for mapping the returned Future[Any]
to the proper type.
In addition, there are also a few general-purpose AskSupport
methods allowing a query between two actors both specified as parameters. Here’s the method signature of one of the ask
variants:
// Akka classic sender-explicit `ask` def ask(actorRef: ActorRef, message: Any, sender: ActorRef)(implicit timeout: Timeout): Future[Any]
As for Akka Typed, a context method ask
is provided with the following signature:
// Akka Typed context method `ask` abstract def ask[Req, Res](target: RecipientRef[Req], createRequest: (ActorRef[Res]) => Req)( mapResponse: (Try[Res]) => T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit
Below is how the typed version of actor Blockchainer
would use context ask
to query typed actor Miner
:
implicit val tmoMining: Timeout = Timeout(timeoutMining.millis) context.ask(miner, ref => Miner.Mine(blockPrev, trans, ref)) { case Success(r) => r match { case MiningResult(block) => topicBlock ! Topic.Publish(UpdateBlockchain(block)) miner ! Miner.DoneMining MiningResult(block) case _ => OtherException(s"Unknown mining result $r") } case Failure(e) => context.log.error(s"[Req.Mining] ${this}: ERROR: $e") e match { case _: BusyException => context.self ! AddTransactions(trans, append = false) BusyException(e.getMessage) case _ => miner ! Miner.DoneMining OtherException(e.getMessage) } }
Note that context ask
doesn’t return the query result as a Future
to be handled by a callback such as onComplete
. Rather, it expects one to handle the query response by providing a Try[Res] => T
function.
Akka Typed also provides AskPattern
methods that return Futures with below method signatures:
// Akka Typed Future-returning methods `ask` and `?` def ask[Res](replyTo: (ActorRef[Res]) => Req)(implicit timeout: Timeout, scheduler: Scheduler): Future[Res] def ?[Res](replyTo: (ActorRef[Res]) => Req)(implicit timeout: Timeout, scheduler: Scheduler): Future[Res]
That’s all for this post. In the next blog, we’ll wrap up this mini blog series with the remaining topics (i.e. scheduler
and distributed pubsub
).