This is the final post of the 3-part blog series that centers around migrating an actor-based blockchain application from Akka classic to Akka Typed. In the previous post, we looked at the difference between the two APIs in starting actors and in tell
-ing and ask
-ing. This time, we’re going to cover the following topics:
- Scheduler
- Distributed PubSub
Scheduler
Akka classic’s Scheduler
feature provides task-scheduling methods for recurring schedule (e.g. scheduleAtFixedRate()
, which replaces the deprecated schedule()
) or one-time schedule (scheduleOnce()
), each with a few signature variants. For example, scheduleOnce()
has more than a couple of different method signatures for different use cases:
// Schedule once for a `f: => Unit` by-name parameter final def scheduleOnce(delay: FiniteDuration)(f: => Unit)(implicit executor: ExecutionContext): Cancellable // Schedule once for a `Runnable` def scheduleOnce(delay: Duration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable // Schedule once for sending a message to another actor final def scheduleOnce(delay: FiniteDuration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable
The blockchain application uses schedulers in a number of places. For instance, to ensure that the potentially time-consuming proof generating process won’t take more than a set duration, the generatePoW()
method within actor Miner
uses scheduleOnce()
to try complete a Promise
with a timeout exception at a scheduled time:
private def generatePoW(block: Block)(implicit ec: ExecutionContext, timeout: FiniteDuration): Future[Long] = { val promise = Promise[Long]() context.system.scheduler.scheduleOnce(timeout){ promise tryFailure new TimeoutException(s"$block: $timeout") } Future{ Try{ val incrementedNonce = ProofOfWork.generateProof(bytesToBase64(block.hash), defaultDifficulty, defaultNonce) promise success incrementedNonce }. recover{ case e: Exception => promise failure e } } promise.future }
Similar to its counterpart in the classic API, Akka Typed Scheduler
also provides task-scheduling methods for recurring and one-time schedules. Here’s the method signature of the typed version of scheduleOnce()
:
// Schedule once for a `Runnable` abstract def scheduleOnce(delay: FiniteDuration, runnable: Runnable)( implicit executor: ExecutionContext): Cancellable
The variant that takes a f: => Unit
by-name parameter is no longer available in Akka Typed. However, since Runnable
only has a single abstract method (SAM), run()
, the following expressions are essentially the same:
// Invoke scheduleOnce() with a Runnable object context.system.scheduler.scheduleOnce(timeout, new Runnable { override def run() = ??? }) // Invoke scheduleOnce() with a `() => Unit` by-name parameter context.system.scheduler.scheduleOnce(timeout, () => ???)
For example, the typed version of the proof-of-work generating method generatePoW()
could be coded as follows:
private def generatePoW(block: Block)(implicit ec: ExecutionContext, timeout: FiniteDuration): Future[Long] = { val promise = Promise[Long]() context.system.scheduler.scheduleOnce( timeout, () => promise tryFailure new TimeoutException(s"$block: $timeout") ) Future{ Try{ val incrementedNonce = ProofOfWork.generateProof(bytesToBase64(block.hash), defaultDifficulty, defaultNonce) promise success incrementedNonce }. recover{ case e: Exception => promise failure e } } promise.future }
Also seemingly missing is the other variant for sending a message to another actor. In fact, it has been moved into ActorContext
as a separate scheduleOnce()
method which can be invoked from within a typed actor’s context for scheduling a one-time delivery of message to another actor:
// Typed ActorContext's schedule once for sending a message to another actor abstract def scheduleOnce[U](delay: FiniteDuration, target: ActorRef[U], msg: U): Cancellable
The above schedulers are all thread-safe. There is also this TimerScheduler
that is not thread-safe, hence should be used within the actor that owns it. An advantage of using TimerScheduler
, typically via Behaviors.withTimers()
, is that it’s bound to the lifecycle of the containing actor and will be cancelled automatically when the actor is stopped.
Distributed PubSub
A cryptocurrency typically maintains a decentralized ledger as distributed copies of a growing blockchain kept by individual nodes on the system. The blockchain application being used for this actor migration exercise achieves that by means of running a Distributed Publish Subscribe service on an Akka cluster. Named topics (in this case “new-transactions” and “new-block”) can be created and subscribers to a given topic will be sent objects submitted to the topic by the publishers.
In the Akka classic API, the mediator actor, DistributedPubSubMediator
, which is supposed to be started on each of the allocated cluster nodes is responsible for managing a registry of actor references and replicating the entries to peer actors among the nodes.
Below is how the mediator actor started from within the Blockchainer actor of a cluster node registers subscribers (in this case the Blockchainer actor itself) and takes published topical objects to be consumed by peer cluster nodes:
// Akka classic pubsub mediator val mediator: ActorRef = DistributedPubSub(context.system).mediator mediator ! Subscribe("new-transactions", self) mediator ! Subscribe("new-block", self) // ... mediator ! Publish("new-transactions", AddTransactions(trans, append)) mediator ! Publish("new-block", UpdateBlockchain(block))
The Akka Typed pubsub abandons the “universal” mediator actor in favor of “topic-specific” actors. The typed version of Distributed PubSub functionality initiated from within the Blockchainer actor of a cluster node is as follows:
// Akka Typed pubsub.Topic val topicTrans = context.spawn(Topic[Req]("new-transactions"), "pubsubNewTransactions") val topicBlock = context.spawn(Topic[Req]("new-block"), "pubsubNewBlock") topicTrans ! Topic.Subscribe(context.self) topicBlock ! Topic.Subscribe(context.self) //... topicTrans ! Topic.Publish(AddTransactions(trans, append)) topicBlock ! Topic.Publish(UpdateBlockchain(block))
Final thoughts
This concludes the mini blog series that covers the basics and selected features of Akka Typed actors. The resulting blockchain application in Akka Typed will be published on GitHub along with an overview in a separate blog.
The term “behavior” is commonly used in the standard Actor Model when describing how actors operate and mutate their internal states in accordance with the business logic. As can be seen across sample snippets in this blog series, it’s ubiquitous in the Akka Typed API, with all the Behavior
-typed methods from the Behaviors
factory object spanning the entire lifecycle of actors. In comparison with putting all actor business logic inside the receive
partial function “blackbox” in Akka classic, it does make the using of the various methods more intuitive, especially for new comers.
With some self-discipline in sticking to programming best-practices, the loosely-typed Akka classic API has an advantage of embracing complex non-blocking message processing functionality with minimal boilerplate code. The message “loop” within an actor in the form of a partial function along with the hotswapping feature via context.become
provides a simple yet robust construct for processing messages. I’m certainly going to miss its simplicity and elegance. That being said, moving towards the typed Akka API is inevitable if one plans to use Akka actors not just for a one-time project. It’s the right direction for sticking to idiomatic functional programming.