Monthly Archives: March 2021

From Akka Untyped To Typed Actors

This is part 1 of a 3-part blog series about how to migrate an Akka classic actor-based application to one with Akka typed actors. In this post, we’ll focus on how a typical classic actor can be replaced with a typed actor.

Akka’s move from its loosely-typed classic actor API to Akka Typed has been met with mixed feelings in the Akka community. On one hand, people are happy with the Actor toolkit being “morphed” into a suitably typed API. On the other hand, the general expectation is that it isn’t going to be a straight forward find-and-replace change.

Actors interact by means of non-blocking message passing. Each actor maintains a “mailbox” and messages addressed to it get processed in the order of receipt. Akka classic actors are loosely typed mainly because their message processing logic relies on the implementation of the abstract method receive, which has the following signature:

// Akka Actor.Receive
type Receive = PartialFunction[Any, Unit]
abstract def receive: Receive

It takes an input of type Any (i.e. allowing messages of any type) and isn’t obligated to return anything. In addition, as a partial function it allows by-design non-exhaustive matching against message types.

Actor “behaviors”

In Akka Typed, processing logic of the messages received by an actor is defined in methods that return Behavior[T] of a given message type T. A factory object Behaviors provides a number of predefined Behavior[T]s (e.g. Behaviors.same, Behaviors.stopped) and general methods (e.g. Behaviors.receiveMessage()) for user-defined message processing logic.

Akka’s official style guide proposes two different flavors of the typed API: functional vs object-oriented. I would highly recommend reading through examples and the pros and cons of the two styles presented there. In brief, the functional approach mutates an actor’s state by successively passing in the state as a parameter to the processing method, whereas the alternative embraces the object-oriented principles and relaxes the use of mutable class variables for state maintenance.

An Akka classic actor for blockchain mining

Let’s look at a blockchain mining Scala snippet used in an Actor-based cryptocurrency system as an example. The original code written in Akka classic actors is like this:

// Akka classic actor `Miner`
object Miner {
  def props(accountKey: String, timeoutPoW: Long): Props = Props(classOf[Miner], accountKey, timeoutPoW)

  sealed trait Mining
  case class Mine(blockPrev: Block, trans: Transactions) extends Mining
  case object DoneMining extends Mining
}

class Miner(accountKey: String, timeoutPoW: Long) extends Actor with ActorLogging {
  import Miner._

  implicit val ec: ExecutionContext = context.dispatcher
  implicit val timeout = timeoutPoW.millis

  override def receive: Receive = idle

  def idle: Receive = {
    case Mine(blockPrev, trans) =>
      context.become(busy)

      val recipient = sender()
      val newBlock = generateNewBlock(blockPrev, trans)

      generatePoW(newBlock).map{ newNonce =>
          recipient ! newBlock.copy(nonce = newNonce)
        }.
        recover{ case e: Exception =>
          recipient ! Status.Failure(e)
        }

    case _ =>
      // Do nothing
  }

  def busy: Receive = {
    case Mine(b, t) =>
      log.error(s"[Mining] Miner.Mine($b, $t) received but $this is busy!")
      sender() ! Status.Failure(new Blockchainer.BusyException(s"$this is busy!"))

    case DoneMining =>
      context.become(idle)
      log.info(s"[Mining] Miner.DoneMining received.")
  }

  private def generateNewBlock(blockPrev: Block, trans: Transactions): LinkedBlock = ???

  private def generatePoW(block: Block)(implicit ec: ExecutionContext, timeout: FiniteDuration): Future[Long] = ???
}

It should be noted that Miner is an actor that serves to return a mined block (of type Blockchainer.Req) upon receiving an ask query by another actor (Blockchainer). Within actor Miner, method generatePoW() produces asynchronously a value of Future[Long] which then gets embedded in a block to be sent back to the querying actor.

ADT for actor message type

Before composing the typed actor version of Miner, let’s first look at what message type we would allow it to receive, since the actor reference of a typed actor is strictly typed.

// Akka typed actor `Miner`
object Miner {
  sealed trait Mining
  case class Mine(blockPrev: Block, trans: Transactions, replyTo: ActorRef[Blockchainer.Req]) extends Mining
  case object DoneMining extends Mining
  // ...
}

Similar to how message types are commonly set up in an Akka classic actor, they’re also generally defined as an ADT (algebraic data type) in Akka Typed. Since the actor reference is now typed, the ADT plays an additional role – its base trait becomes the type parameter of the actor. By defining the actor reference of Miner as ActorRef[Miner.Mining], the actor will only be able to take messages of type Miner.Mining or its subtypes.

Because of the type constraint for a given Akka typed actor, the good old non-type binding sender() is no longer supported. To ensure an actor is able to reply to the sender with proper message type, it’s common to see messages sent across typed actors explicitly carrying the sender’s reference. Case class Mine in the Akka typed actor has a replyTo of type ActorRef[Blockchainer.Req] because message Mine is sent via an ask query by actor Blockchainer which expects suitable message type (in this case, Blockchainer.Req) to be returned by actor Miner.

Functional or object-oriented style?

For our Miner actor, the functional approach would be to define all behaviors as methods (e.g. Behaviors.sendMessage) within the object Miner alone. The standard object-oriented alternative would be to add the companion Miner class that extends AbstractBehavior. For this blockchain mining application, I’m going to pick the object-oriented approach, partly for the personal preference of the more structural companion class-object model.

Extending AbstractBehavior would require implementation of abstract method onMessage() which has the following signature:

// AbstractBehavior.onMessage()
abstract def onMessage(msg: T): Behavior[T]

The typed Miner actor would look like this:

// Akka typed actor `Miner`
object Miner {
  // ...
  def apply(accountKey: String, timeoutPoW: Long): Behavior[Mining] =
    Behaviors.setup(context =>
      new Miner(context, accountKey, timeoutPoW)
    )
}

class Miner private(context: ActorContext[Miner.Mining], accountKey: String, timeoutPoW: Long)
    extends AbstractBehavior[Miner.Mining](context) {
  import Miner._
  override onMessage(msg: Mining): Behavior[Mining] = msg match {
    case Mine(...) => ???
    case DoneMining => ???
  }
  // ...
}

Mimicking context.become in Akka Typed

The “hotswapping” feature within the message loop of an Akka classic actor, context.become, is a useful functionality for state switching upon receipt of designated messages. Surprisingly, I haven’t been able to find any concrete examples of how the good old context.become should be done in Akka Typed. Mimicking the feature using the functional approach seems pretty straight forward, but since I’m taking the object-oriented approach it’s not immediately clear to me how onMessage() fits into the “behavioral” switching scheme.

Method onMessage(msg:T) takes a received message and processes it in accordance with user-defined logic. Problem lies in the existence of the msg:T argument in the method. As soon as the actor is up, the initial message(s) received will be passed in, and in the case of a context.become switching to another Behavior method the received message would need to be delegated to the relayed method. However, Behavior methods are designed for taking a user-defined T => Behavior[T] function (or partial function), thus the initial msg:T must be handled from within onMessage(). This results in duplicated message processing logic among the Behavior methods.

Rather than taking the standard object-oriented approach, our Akka Typed Miner actor is defined with a companion class but without extending AbstractBehavior, thus leaving onMessage() out of the picture. A default message handler, messageLoop(), within class Miner is called upon instantiation by the companion object’s apply() to kick off a “behavior switching” loop. Behavior method idle() gets called, executes its business logic before conditionally relaying to another Behavior method busy() which, in turn, does its work and conditionally relays back to idle().

object Miner {
  // ...
  def apply(accountKey: String, timeoutPoW: Long): Behavior[Mining] =
    Behaviors.setup(context =>
      new Miner(context, accountKey, timeoutPoW).messageLoop()
      // ^^^ Instantiate class `Miner` and run `messageLoop()`
    )
}

class Miner private(context: ActorContext[Miner.Mining], accountKey: String, timeoutPoW: Long) {
  import Miner._
  private def messageLoop(): Behavior[Mining] = idle()  // <<< Switch to `idle() behavior`
  private def idle(): Behavior[Mining] = ???  // <<< Conditionally relay to `busy()`
  private def busy(): Behavior[Mining] = ???  // <<< Conditionally relay back to `idle()`
  // ...    
}

The blockchain mining actor in Akka Typed

Putting everything together, here’s what the blockchain mining actor in Akka Typed is like:

// Akka typed actor `Miner`
object Miner {
  sealed trait Mining
  case class Mine(blockPrev: Block, trans: Transactions, replyTo: ActorRef[Blockchainer.Req]) extends Mining
  case object DoneMining extends Mining

  def apply(accountKey: String, timeoutPoW: Long): Behavior[Mining] =
    Behaviors.setup(context =>
      new Miner(context, accountKey, timeoutPoW).messageLoop()
    )
}

class Miner private(context: ActorContext[Miner.Mining], accountKey: String, timeoutPoW: Long) {
  import Miner._

  implicit val ec: ExecutionContext = context.executionContext
  implicit val timeout = timeoutPoW.millis

  private def messageLoop(): Behavior[Mining] = idle()  // <-- Switch to `idle() behavior`

  private def idle(): Behavior[Mining] = Behaviors.receiveMessage{
    case Mine(blockPrev, trans, replyTo) =>
      val newBlock = generateNewBlock(blockPrev, trans)
      generatePoW(newBlock).map(newNonce =>
          replyTo ! Blockchainer.MiningResult(newBlock.copy(nonce = newNonce))
        ).
        recover{ case e: Exception => Blockchainer.OtherException(s"$e") }
      busy()  // <-- Switch to `busy() behavior`

    case DoneMining =>
      Behaviors.same
  }

  private def busy(): Behavior[Mining] = Behaviors.receiveMessage{
    case Mine(blockPrev, trans, replyTo) =>
      context.log.error(s"[Mining] Miner.Mine($blockPrev, $trans) received but $this is busy!")
      replyTo ! Blockchainer.BusyException(s"$this is busy!")
      Behaviors.same

    case DoneMining =>
      context.log.info(s"[Mining] Miner.DoneMining received.")
      idle()  // <-- Switch back to `idle() behavior`
  }

  private def generateNewBlock(blockPrev: Block, trans: Transactions): LinkedBlock = ???

  private def generatePoW(block: Block)(implicit ec: ExecutionContext, timeout: FiniteDuration): Future[Long] = ???
}