Tag Archives: internet of things

Dynamic IoT Streams With Akka GRPC

Having covered a basic use case for gRPC streaming of IoT device states in the previous blog post, we’re now moving onto a slightly more complex scenario that involves dynamically broadcasting response streams from the server to any participating clients.

Rather than having each of the gRPC clients receiving the IoT states update responses to their own request streams, the responses are now being broadcast to all participating clients, making it possible for any given client to work with the received states update of devices inside a property originally processed by other clients.

Akka gRPC streaming

Recall that we were leveraging Akka gRPC to:

  1. generate service interfaces from the Protobuf service schema that get implemented as Scala classes and Akka stream components to create HttpRequest => Future[HttpResponse] routes in the Akka-HTTP server that supports HTTP/2
  2. generate gRPC stubs through implementing the service interfaces with Akka Streams API to invoke the remote services
Akka gRPC Streaming

Dynamically broadcasting IoT streams

Just like in the previous use case, our IoT system of sensor devices consists of a gRPC server simulating algorithmic changes to device states as response streams to the requesting gRPC clients. There can be many clients participating/leaving at any time.

In our new use case, the server broadcasts response streams to requests from all clients and each client receives all the stream elements since its participation. It’s similar in some way to a dynamic pub/sub channel in which the gRPC clients subscribe to a service-topic published by the gRPC server.

How do we create such a channel? As shown in one of the GreeterService examples available at Lightbend developers guide, we could create one by coupling a MergeHub with a BroadcastHub. For more details, another blog post of mine covers the very topic.

A new Protobuf service

A majority of the source code described in the previous blog post remains unchanged, as we’re only adding a new RPC service along with its implementation.

First, we declare the new RPC service broadcastIotUpdate() in iotstream.proto under src/main/protobuf/.

service IotStreamService {
    rpc sendIotUpdate(stream StatesUpdateRequest) returns (stream StatesUpdateResponse) {}
    rpc broadcastIotUpdate(stream StatesUpdateRequest) returns (stream StatesUpdateResponse) {}
}

We then add the implementation of broadcastIotUpdate() in class IotStreamServiceImpl.

val (inHub: Sink[StatesUpdateRequest, NotUsed], outHub: Source[StatesUpdateResponse, NotUsed]) =
  MergeHub.source[StatesUpdateRequest]
    .via(updateIotFlow)
    .toMat(BroadcastHub.sink[StatesUpdateResponse])(Keep.both)
    .run()

val dynamicPubSubFlow: Flow[StatesUpdateRequest, StatesUpdateResponse, NotUsed] =
  Flow.fromSinkAndSource(inHub, outHub)

...

@Override
def broadcastIotUpdate(requests: Source[StatesUpdateRequest, NotUsed]): Source[StatesUpdateResponse, NotUsed] =
  requests.via(dynamicPubSubFlow).backpressureTimeout(backpressureTMO)

As shown in the above snippet, we “sandwich” updateIotFlow (which simulates algorithmic IoT states update) between a MergeHub and a BroadcastHub to materialize a tuple of sink and source, followed by turning them into the dynamicPubSubFlow via Akka Stream’s fromSinkAndSource(). The created flow will then serve like a dynamic pub/sub channel funneling incoming request streams from the gRPC clients to broadcast the response streams with updated states to all participating clients.

As for class IotStreamClient, we add a new command line argument, broadcastYN (1=Yes, 0=No), to the main() function’s argument list to indicate whether broadcast of response streams is wanted. The client application will call the specific RPC service in accordance with the value of broadcastYN.

val responseStream: Source[StatesUpdateResponse, NotUsed] = {
  if (broadcastYN == 0)
    client.sendIotUpdate(requestStream)
  else
    client.broadcastIotUpdate(requestStream)
}

The following diagram highlights the gRPC server/clients components, and the rest of the IoT system that manages the individual remote sensor devices. Note that the IoT Manager sub-system isn’t part of the application we’re focusing on. The sub-system could be designed on top of gRPC as well, or Akka Actors (i.e. similar to the Actor-based IotManager), or any other suitable tech stack.

IoT Streaming with Akka gRPC

Full source code for the gRPC client/server components is available at this GitHub repo.

Final thoughts

It should be noted that this is a simplified use case primarily for demonstrating how device states update from the gRPC server can be dynamically broadcast to the requesting clients. To strengthen the use case, we could maintain a key-value cache using Redis with unique device IDs as keys subject to a pre-set TTL (time-to-live) to prevent multiple gRPC clients processing for the same device simultaneously.

In addition, we might also log in persistence storages the who-what-and-when (i.e. client ID / states / timestamp) of the device states update, or if warranted by the business requirement, putting in place a distributed committed log system with Apache Kafka. Such enhancement would enable the Akka Streams-baced gRPC tech stack to provide a robust streaming mechanism comparable to those solutions like using Akka Actors with distributed pub/sub and persistence journal on clusters.

Sample output

Appended is sample output from the gRPC server and 3 gRPC clients. As shown in the output, one could mix and match clients with different broadcast options, in which case the group of clients with broadcast on will share among themselves all streams responded by the server to requests from themselves, whereas each member of the broadcast-off group will get only responded streams originated by itself.

Terminal #1: gRPC Server

% ./sbt "runMain akkagrpc.IotStreamServer"
[info] ...
[info] done compiling
[info] running (fork) akkagrpc.IotStreamServer 
[info] [2023-10-31 11:38:03,227] [INFO] [akka.event.slf4j.Slf4jLogger] [IotStreamServer-akka.actor.default-dispatcher-3] [] - Slf4jLogger started
[info] [Server] gRPC server bound to 127.0.0.1:8080

Terminal #2: gRPC Client1 — broadcast ON (broadcastYN = 1)

% ./sbt "runMain akkagrpc.IotStreamClient client1 1 1000 1019"
[info] welcome to sbt 1.9.6 (Oracle Corporation Java 11.0.19)
[info] loading global plugins from /Users/leo/.sbt/1.0/plugins
[info] loading settings for project akka-grpc-iot-stream-build from plugins.sbt ...
[info] loading project definition from /Users/leo/intellij/akka-grpc-iot-stream/project
[info] loading settings for project akka-grpc-iot-stream from build.sbt ...
[info] set current project to akka-grpc-iot-stream (in build file:/Users/leo/intellij/akka-grpc-iot-stream/)
[info] running (fork) akkagrpc.IotStreamClient client1 1 1000 1019
[info] [2023-10-31 11:53:39,910] [INFO] [akka.event.slf4j.Slf4jLogger] [IotStreamClient-akka.actor.default-dispatcher-3] [] - Slf4jLogger started
[info] Performing streaming requests from client1 ...
[info] [client1] REQUEST: 1000 5e468f SecurityAlarm | State: 0, Setting: 1
[info] [client1] REQUEST: 1000 70a7bf Lamp | State: 0, Setting: 2
[info] [client1] REQUEST: 1001 a6d07c Lamp | State: 1, Setting: 1
[info] [client1] REQUEST: 1001 b224cf SecurityAlarm | State: 0, Setting: 4
[info] [client1] REQUEST: 1001 df00a1 SecurityAlarm | State: 1, Setting: 4
[info] [client1] REQUEST: 1002 5d9abe Thermostat | State: 1, Setting: 69
[info] [client1] REQUEST: 1002 283a39 Thermostat | State: 1, Setting: 60
[info] [client1] REQUEST: 1002 70b354 SecurityAlarm | State: 0, Setting: 5
[info] [client1] REQUEST: 1002 c12ac7 Thermostat | State: 0, Setting: 66
[info] [client1] REQUEST: 1003 2587a9 Lamp | State: 0, Setting: 2
[info] [client1] REQUEST: 1004 f5732a Thermostat | State: 1, Setting: 62
[info] [client1] REQUEST: 1004 c7aaf1 Lamp | State: 0, Setting: 1
[info] [client1] RESPONSE: [requester: client1] 1000 5e468f SecurityAlarm | State: 0, Setting: 5
[info] [client1] RESPONSE: [requester: client1] 1000 70a7bf Lamp | State: 0, Setting: 3
[info] [client1] RESPONSE: [requester: client1] 1001 a6d07c Lamp | State: 0, Setting: 1
[info] [client1] RESPONSE: [requester: client1] 1001 b224cf SecurityAlarm | State: 0, Setting: 4
[info] [client1] RESPONSE: [requester: client1] 1001 df00a1 SecurityAlarm | State: 1, Setting: 3
[info] [client1] RESPONSE: [requester: client1] 1002 5d9abe Thermostat | State: 1, Setting: 67
[info] [client1] RESPONSE: [requester: client1] 1002 283a39 Thermostat | State: 0, Setting: 60
[info] [client1] RESPONSE: [requester: client1] 1002 70b354 SecurityAlarm | State: 0, Setting: 4
[info] [client1] RESPONSE: [requester: client1] 1002 c12ac7 Thermostat | State: 2, Setting: 64
[info] [client1] RESPONSE: [requester: client1] 1003 2587a9 Lamp | State: 0, Setting: 1
[info] [client1] RESPONSE: [requester: client1] 1004 f5732a Thermostat | State: 0, Setting: 63
[info] [client1] REQUEST: 1004 72a5f5 SecurityAlarm | State: 0, Setting: 2
[info] [client1] RESPONSE: [requester: client1] 1004 c7aaf1 Lamp | State: 1, Setting: 2
[info] [client1] REQUEST: 1005 f859d3 Thermostat | State: 2, Setting: 70
[info] [client1] RESPONSE: [requester: client1] 1004 72a5f5 SecurityAlarm | State: 0, Setting: 3
[info] [client1] REQUEST: 1006 4a7da5 SecurityAlarm | State: 0, Setting: 2
[info] [client1] RESPONSE: [requester: client1] 1005 f859d3 Thermostat | State: 0, Setting: 70
[info] [client1] REQUEST: 1006 17ac1a Lamp | State: 0, Setting: 3
[info] [client1] RESPONSE: [requester: client1] 1006 4a7da5 SecurityAlarm | State: 0, Setting: 2
[info] [client1] REQUEST: 1006 58653a Lamp | State: 0, Setting: 2
[info] [client1] RESPONSE: [requester: client1] 1006 17ac1a Lamp | State: 0, Setting: 1
[info] [client1] REQUEST: 1006 0bfa66 SecurityAlarm | State: 1, Setting: 5
[info] [client1] RESPONSE: [requester: client1] 1006 58653a Lamp | State: 0, Setting: 2
[info] [client1] REQUEST: 1007 6caf32 SecurityAlarm | State: 0, Setting: 2
[info] [client1] RESPONSE: [requester: client1] 1006 0bfa66 SecurityAlarm | State: 0, Setting: 2
[info] [client1] REQUEST: 1007 2b67a7 SecurityAlarm | State: 0, Setting: 1
[info] [client1] RESPONSE: [requester: client1] 1007 6caf32 SecurityAlarm | State: 0, Setting: 4
[info] [client1] RESPONSE: [requester: client3] 1040 f237b5 Lamp | State: 1, Setting: 3
[info] [client1] RESPONSE: [requester: client3] 1040 cbeb82 SecurityAlarm | State: 1, Setting: 2
[info] [client1] RESPONSE: [requester: client3] 1040 742dcd Thermostat | State: 1, Setting: 70
[info] [client1] RESPONSE: [requester: client3] 1041 337d19 Lamp | State: 1, Setting: 2
[info] [client1] RESPONSE: [requester: client3] 1041 6d19d6 Thermostat | State: 1, Setting: 64
[info] [client1] RESPONSE: [requester: client3] 1041 144297 Thermostat | State: 0, Setting: 69
[info] [client1] RESPONSE: [requester: client3] 1041 663271 SecurityAlarm | State: 1, Setting: 2
[info] [client1] RESPONSE: [requester: client3] 1042 83807a Lamp | State: 1, Setting: 1
[info] [client1] RESPONSE: [requester: client3] 1042 5ce343 Thermostat | State: 2, Setting: 60
[info] [client1] RESPONSE: [requester: client3] 1042 7ab082 Lamp | State: 1, Setting: 1
[info] [client1] RESPONSE: [requester: client3] 1042 eae803 Thermostat | State: 2, Setting: 65
[info] [client1] REQUEST: 1008 f0c753 Thermostat | State: 2, Setting: 67
[info] [client1] RESPONSE: [requester: client1] 1007 2b67a7 SecurityAlarm | State: 0, Setting: 1
[info] [client1] RESPONSE: [requester: client3] 1043 88ee61 Lamp | State: 0, Setting: 2
[info] [client1] REQUEST: 1008 32ccb2 Lamp | State: 0, Setting: 2
[info] [client1] RESPONSE: [requester: client1] 1008 f0c753 Thermostat | State: 2, Setting: 65
[info] [client1] RESPONSE: [requester: client3] 1043 440b5b Lamp | State: 1, Setting: 3
. . .
. . .
[info] [client1] REQUEST: 1019 3f122b Thermostat | State: 0, Setting: 63
[info] [client1] RESPONSE: [requester: client1] 1019 6f15e0 Lamp | State: 1, Setting: 1
[info] [client1] RESPONSE: [requester: client3] 1055 6a6fde Lamp | State: 0, Setting: 2
[info] [client1] REQUEST: 1019 b8ce7b SecurityAlarm | State: 1, Setting: 5
[info] [client1] RESPONSE: [requester: client1] 1019 3f122b Thermostat | State: 1, Setting: 64
[info] [client1] RESPONSE: [requester: client3] 1056 60eab7 Thermostat | State: 2, Setting: 70
[info] [client1] REQUEST: 1019 93678e SecurityAlarm | State: 0, Setting: 3
[info] [client1] RESPONSE: [requester: client1] 1019 b8ce7b SecurityAlarm | State: 1, Setting: 5
[info] [client1] RESPONSE: [requester: client3] 1056 ae65ad Thermostat | State: 0, Setting: 66
[info] [client1] RESPONSE: [requester: client1] 1019 93678e SecurityAlarm | State: 1, Setting: 4
[info] [client1] RESPONSE: [requester: client3] 1056 cb5d9f Lamp | State: 0, Setting: 1
[info] [client1] RESPONSE: [requester: client3] 1057 c4abf5 Thermostat | State: 0, Setting: 73
[info] [client1] RESPONSE: [requester: client3] 1058 013a30 SecurityAlarm | State: 1, Setting: 5
[info] [client1] RESPONSE: [requester: client3] 1058 681c43 Lamp | State: 1, Setting: 2
[info] [client1] RESPONSE: [requester: client3] 1058 0f1673 Thermostat | State: 1, Setting: 65
[info] [client1] RESPONSE: [requester: client3] 1058 0c97dd Lamp | State: 1, Setting: 3
[info] [client1] RESPONSE: [requester: client3] 1059 e9834d Lamp | State: 1, Setting: 2
[info] [client1] RESPONSE: [requester: client3] 1059 93166b Thermostat | State: 1, Setting: 73

Terminal #3: gRPC Client2 — broadcast OFF (broadcastYN = 0)

% ./sbt "runMain akkagrpc.IotStreamClient client2 0 1020 1039"
[info] welcome to sbt 1.9.6 (Oracle Corporation Java 11.0.19)
[info] loading global plugins from /Users/leo/.sbt/1.0/plugins
[info] loading settings for project akka-grpc-iot-stream-build from plugins.sbt ...
[info] loading project definition from /Users/leo/intellij/akka-grpc-iot-stream/project
[info] loading settings for project akka-grpc-iot-stream from build.sbt ...
[info] set current project to akka-grpc-iot-stream (in build file:/Users/leo/intellij/akka-grpc-iot-stream/)
[info] running (fork) akkagrpc.IotStreamClient client2 0 1020 1039
[info] [2023-10-31 11:53:40,601] [INFO] [akka.event.slf4j.Slf4jLogger] [IotStreamClient-akka.actor.default-dispatcher-3] [] - Slf4jLogger started
[info] Performing streaming requests from client2 ...
[info] [client2] REQUEST: 1020 be1091 Lamp | State: 1, Setting: 3
[info] [client2] REQUEST: 1021 9e6b12 SecurityAlarm | State: 0, Setting: 3
[info] [client2] REQUEST: 1021 cf913c Thermostat | State: 1, Setting: 69
[info] [client2] REQUEST: 1022 4549d2 Lamp | State: 0, Setting: 1
[info] [client2] REQUEST: 1022 08e429 Thermostat | State: 2, Setting: 60
[info] [client2] REQUEST: 1023 aa1eea Lamp | State: 1, Setting: 3
[info] [client2] REQUEST: 1023 af1da0 Thermostat | State: 0, Setting: 70
[info] [client2] REQUEST: 1023 9e7439 Thermostat | State: 0, Setting: 64
[info] [client2] REQUEST: 1023 b21319 SecurityAlarm | State: 0, Setting: 4
[info] [client2] REQUEST: 1024 6ce4c5 SecurityAlarm | State: 1, Setting: 1
[info] [client2] REQUEST: 1024 827653 Thermostat | State: 0, Setting: 64
[info] [client2] REQUEST: 1024 ae359e SecurityAlarm | State: 0, Setting: 1
[info] [client2] RESPONSE: [requester: client2] 1020 be1091 Lamp | State: 0, Setting: 3
[info] [client2] RESPONSE: [requester: client2] 1021 9e6b12 SecurityAlarm | State: 0, Setting: 3
[info] [client2] RESPONSE: [requester: client2] 1021 cf913c Thermostat | State: 1, Setting: 67
[info] [client2] RESPONSE: [requester: client2] 1022 4549d2 Lamp | State: 0, Setting: 3
[info] [client2] RESPONSE: [requester: client2] 1022 08e429 Thermostat | State: 2, Setting: 61
[info] [client2] RESPONSE: [requester: client2] 1023 aa1eea Lamp | State: 1, Setting: 1
[info] [client2] RESPONSE: [requester: client2] 1023 af1da0 Thermostat | State: 2, Setting: 72
[info] [client2] RESPONSE: [requester: client2] 1023 9e7439 Thermostat | State: 2, Setting: 65
[info] [client2] RESPONSE: [requester: client2] 1023 b21319 SecurityAlarm | State: 0, Setting: 3
[info] [client2] RESPONSE: [requester: client2] 1024 6ce4c5 SecurityAlarm | State: 1, Setting: 5
[info] [client2] RESPONSE: [requester: client2] 1024 827653 Thermostat | State: 0, Setting: 64
[info] [client2] REQUEST: 1024 9b4378 Thermostat | State: 1, Setting: 64
[info] [client2] RESPONSE: [requester: client2] 1024 ae359e SecurityAlarm | State: 0, Setting: 1
[info] [client2] REQUEST: 1025 66f837 Lamp | State: 0, Setting: 1
[info] [client2] RESPONSE: [requester: client2] 1024 9b4378 Thermostat | State: 1, Setting: 66
[info] [client2] REQUEST: 1026 b0ac08 SecurityAlarm | State: 1, Setting: 3
[info] [client2] RESPONSE: [requester: client2] 1025 66f837 Lamp | State: 0, Setting: 1
[info] [client2] REQUEST: 1027 249cb9 SecurityAlarm | State: 0, Setting: 5
[info] [client2] RESPONSE: [requester: client2] 1026 b0ac08 SecurityAlarm | State: 0, Setting: 4
[info] [client2] REQUEST: 1027 d205d3 Lamp | State: 1, Setting: 2
[info] [client2] RESPONSE: [requester: client2] 1027 249cb9 SecurityAlarm | State: 0, Setting: 1
[info] [client2] REQUEST: 1027 6783fc Lamp | State: 0, Setting: 1
. . .
. . .
[info] [client2] REQUEST: 1038 af2956 Lamp | State: 0, Setting: 1
[info] [client2] RESPONSE: [requester: client2] 1037 ed954a Lamp | State: 0, Setting: 2
[info] [client2] REQUEST: 1038 656d9f Thermostat | State: 1, Setting: 63
[info] [client2] RESPONSE: [requester: client2] 1038 af2956 Lamp | State: 1, Setting: 3
[info] [client2] REQUEST: 1039 582904 Lamp | State: 1, Setting: 3
[info] [client2] RESPONSE: [requester: client2] 1038 656d9f Thermostat | State: 0, Setting: 62
[info] [client2] REQUEST: 1039 5daf4a SecurityAlarm | State: 0, Setting: 3
[info] [client2] RESPONSE: [requester: client2] 1039 582904 Lamp | State: 1, Setting: 1
[info] [client2] REQUEST: 1039 e31886 Thermostat | State: 1, Setting: 75
[info] [client2] RESPONSE: [requester: client2] 1039 5daf4a SecurityAlarm | State: 1, Setting: 5
[info] [client2] RESPONSE: [requester: client2] 1039 e31886 Thermostat | State: 0, Setting: 75
[info] [client2] Done IoT states streaming.

Terminal #4: gRPC Client3 — broadcast ON (broadcastYN = 1)

% ./sbt "runMain akkagrpc.IotStreamClient client3 1 1040 1059"
[info] welcome to sbt 1.9.6 (Oracle Corporation Java 11.0.19)
[info] loading global plugins from /Users/leo/.sbt/1.0/plugins
[info] loading settings for project akka-grpc-iot-stream-build from plugins.sbt ...
[info] loading project definition from /Users/leo/intellij/akka-grpc-iot-stream/project
[info] loading settings for project akka-grpc-iot-stream from build.sbt ...
[info] set current project to akka-grpc-iot-stream (in build file:/Users/leo/intellij/akka-grpc-iot-stream/)
[info] running (fork) akkagrpc.IotStreamClient client3 1 1040 1059
[info] [2023-10-31 11:53:40,842] [INFO] [akka.event.slf4j.Slf4jLogger] [IotStreamClient-akka.actor.default-dispatcher-3] [] - Slf4jLogger started
[info] Performing streaming requests from client3 ...
[info] [client3] REQUEST: 1040 f237b5 Lamp | State: 0, Setting: 3
[info] [client3] REQUEST: 1040 cbeb82 SecurityAlarm | State: 0, Setting: 2
[info] [client3] REQUEST: 1040 742dcd Thermostat | State: 1, Setting: 72
[info] [client3] REQUEST: 1041 337d19 Lamp | State: 1, Setting: 2
[info] [client3] REQUEST: 1041 6d19d6 Thermostat | State: 1, Setting: 65
[info] [client3] REQUEST: 1041 144297 Thermostat | State: 2, Setting: 68
[info] [client3] REQUEST: 1041 663271 SecurityAlarm | State: 1, Setting: 3
[info] [client3] REQUEST: 1042 83807a Lamp | State: 1, Setting: 2
[info] [client3] REQUEST: 1042 5ce343 Thermostat | State: 0, Setting: 60
[info] [client3] REQUEST: 1042 7ab082 Lamp | State: 1, Setting: 1
[info] [client3] REQUEST: 1042 eae803 Thermostat | State: 0, Setting: 65
[info] [client3] REQUEST: 1043 88ee61 Lamp | State: 1, Setting: 3
[info] [client3] RESPONSE: [requester: client3] 1040 f237b5 Lamp | State: 1, Setting: 3
[info] [client3] RESPONSE: [requester: client3] 1040 cbeb82 SecurityAlarm | State: 1, Setting: 2
[info] [client3] RESPONSE: [requester: client3] 1040 742dcd Thermostat | State: 1, Setting: 70
[info] [client3] RESPONSE: [requester: client3] 1041 337d19 Lamp | State: 1, Setting: 2
[info] [client3] RESPONSE: [requester: client3] 1041 6d19d6 Thermostat | State: 1, Setting: 64
[info] [client3] RESPONSE: [requester: client3] 1041 144297 Thermostat | State: 0, Setting: 69
[info] [client3] RESPONSE: [requester: client3] 1041 663271 SecurityAlarm | State: 1, Setting: 2
[info] [client3] RESPONSE: [requester: client3] 1042 83807a Lamp | State: 1, Setting: 1
[info] [client3] RESPONSE: [requester: client3] 1042 5ce343 Thermostat | State: 2, Setting: 60
[info] [client3] RESPONSE: [requester: client3] 1042 7ab082 Lamp | State: 1, Setting: 1
[info] [client3] RESPONSE: [requester: client3] 1042 eae803 Thermostat | State: 2, Setting: 65
[info] [client3] RESPONSE: [requester: client1] 1007 2b67a7 SecurityAlarm | State: 0, Setting: 1
[info] [client3] REQUEST: 1043 440b5b Lamp | State: 0, Setting: 1
[info] [client3] RESPONSE: [requester: client3] 1043 88ee61 Lamp | State: 0, Setting: 2
[info] [client3] RESPONSE: [requester: client1] 1008 f0c753 Thermostat | State: 2, Setting: 65
[info] [client3] REQUEST: 1043 99c6e0 SecurityAlarm | State: 1, Setting: 1
[info] [client3] RESPONSE: [requester: client3] 1043 440b5b Lamp | State: 1, Setting: 3
[info] [client3] RESPONSE: [requester: client1] 1008 32ccb2 Lamp | State: 0, Setting: 2
[info] [client3] REQUEST: 1044 de863b Lamp | State: 1, Setting: 2
[info] [client3] RESPONSE: [requester: client3] 1043 99c6e0 SecurityAlarm | State: 1, Setting: 5
[info] [client3] RESPONSE: [requester: client1] 1009 edf984 SecurityAlarm | State: 1, Setting: 5
[info] [client3] REQUEST: 1044 04dff8 Lamp | State: 0, Setting: 1
[info] [client3] RESPONSE: [requester: client3] 1044 de863b Lamp | State: 1, Setting: 2
[info] [client3] RESPONSE: [requester: client1] 1009 274bca Thermostat | State: 2, Setting: 68
[info] [client3] REQUEST: 1044 86ed37 SecurityAlarm | State: 1, Setting: 1
[info] [client3] RESPONSE: [requester: client3] 1044 04dff8 Lamp | State: 0, Setting: 1
[info] [client3] RESPONSE: [requester: client1] 1009 50e02f Thermostat | State: 1, Setting: 66
[info] [client3] REQUEST: 1044 fd5984 SecurityAlarm | State: 0, Setting: 3
[info] [client3] RESPONSE: [requester: client3] 1044 86ed37 SecurityAlarm | State: 1, Setting: 5
[info] [client3] RESPONSE: [requester: client1] 1009 c6160b SecurityAlarm | State: 1, Setting: 5
[info] [client3] REQUEST: 1045 a6b326 Thermostat | State: 0, Setting: 60
[info] [client3] RESPONSE: [requester: client3] 1044 fd5984 SecurityAlarm | State: 1, Setting: 1
[info] [client3] RESPONSE: [requester: client1] 1010 eb624d SecurityAlarm | State: 0, Setting: 4
. . .
. . .
[info] [client3] REQUEST: 1057 c4abf5 Thermostat | State: 2, Setting: 73
[info] [client3] RESPONSE: [requester: client3] 1056 cb5d9f Lamp | State: 0, Setting: 1
[info] [client3] REQUEST: 1058 013a30 SecurityAlarm | State: 1, Setting: 5
[info] [client3] RESPONSE: [requester: client3] 1057 c4abf5 Thermostat | State: 0, Setting: 73
[info] [client3] REQUEST: 1058 681c43 Lamp | State: 1, Setting: 3
[info] [client3] RESPONSE: [requester: client3] 1058 013a30 SecurityAlarm | State: 1, Setting: 5
[info] [client3] REQUEST: 1058 0f1673 Thermostat | State: 1, Setting: 64
[info] [client3] RESPONSE: [requester: client3] 1058 681c43 Lamp | State: 1, Setting: 2
[info] [client3] REQUEST: 1058 0c97dd Lamp | State: 1, Setting: 3
[info] [client3] RESPONSE: [requester: client3] 1058 0f1673 Thermostat | State: 1, Setting: 65
[info] [client3] REQUEST: 1059 e9834d Lamp | State: 0, Setting: 2
[info] [client3] RESPONSE: [requester: client3] 1058 0c97dd Lamp | State: 1, Setting: 3
[info] [client3] REQUEST: 1059 93166b Thermostat | State: 0, Setting: 71
[info] [client3] RESPONSE: [requester: client3] 1059 e9834d Lamp | State: 1, Setting: 2
[info] [client3] RESPONSE: [requester: client3] 1059 93166b Thermostat | State: 1, Setting: 73

Akka GRPC for IoT Streams

Borne out of Google and open-sourced in 2015, gRPC is a RPC framework which has been increasingly talked about over the past few years. In case you’re curious about what the g in gRPC stands for, it turns out the the term is a peculiar recursive acronym of gRPC Remote Procedure Calls.

gRPC uses Protocol Buffers for serialization and as its IDL (Interface Definition Language). It also relies on HTTP/2 as its transport. While HTTP/2 has been growing in demand, its adoption rate is still rather slow. As of this writing, only slightly over 1/3 of websites support HTTP/2. That inevitably slows down gRPC’s adoption. Nevertheless, it’s hard to ignore the goodies it offers. In particular, gRPC has been known for its strength for building systems that demand a microservices design with fast inter-service calls with de-coupled interfaces across polyglot services. A comprehensive list of its benefits is available in this Akka gRPC tech doc section.

Akka gRPC

Operating on top of Akka Streams and Akka HTTP, Akka gRPC provides support for building streaming gRPC servers and clients.

On the server side, Akka gRPC generates service interfaces (as Scala traits) based on the individual services defined in the Protobuf schema. The server-side programming logic can then be crafted as specific service implementations. Akka gRPC also generates from the Protobuf services definition a number of service handlers that take the service implementation as an input parameter and return a HttpRequest => Future[HttpResponse] route in Akka-HTTP.

As for the client side, a client program would use the service stubs generated by Akka gRPC (through implementing the service interfaces) to invoke the remote services. The following diagram highlights what a streaming gRPC server and clients might look like.

Akka gRPC Streaming

IoT systems of sensor devices

In many use cases, an IoT (Internet of Things) system of sensor devices consists of a large amount of devices running on some LAN/WiFi or wireless personal area networks (WPAN). A previous blog post of mine re: running IoT sensor devices using Akka’s distributed pub/sub in Scala illustrates how an Akka Actor-based cluster fits into managing large-scale interactive IoT devices.

This time, rather than centering the IoT system around an actor model, we’re going to implement the system using Akka gRPC in Scala, leveraging the robust Akka Streams API applied in accordance with the Protobuf services definition and running on an HTTP/2 compliant server.

A simple use case

We’ll start with a simple use case. Let’s say we have an optimization algorithm running on a server that analyzes current operational states of a given IoT sensor device (e.g. a thermostat) and returns revised states.

On the client side, there can be many clients, each handles the sensor devices for a specific group of real estate properties. Each client application would submit to the server the current operational states and settings of the devices, as a stream of state-update requests.

The server would then run of the optimization algorithm based on each device’s current state and setting and the property it’s in to return an object with revised state and setting, as an element of the response stream. In this use case, the response stream will be received by the same client firing off the request stream.

Each device has the following attributes:

  • deviceType:
    • 0 = Thermostat
    • 1 = Lamp
    • 2 = SecurityAlarm
  • opState:
    • devType 0 => 0 | 1 | 2 (OFF | HEAT | COOL)
    • devType 1 => 0 | 1 (OFF | ON)
    • devType 2 => 0 | 1 (OFF | ON)
  • setting:
    • devType 0 => 60 – 75
    • devType 1 => 1 – 3
    • devType 2 => 1 – 5

A slightly more complex use case that involves dynamically broadcasting response streams from the server to participating clients will be discussed in a subsequent blog post.

Library dependencies

Using sbt as the build tool, build.sbt would look like this:

name := "akka-grpc-iot-stream"

version := "1.0"

scalaVersion := "2.13.4"

lazy val akkaVersion = "2.8.5"
lazy val akkaHttpVersion = "10.5.2"
lazy val akkaGrpcVersion = "2.3.4"

enablePlugins(AkkaGrpcPlugin)

fork := true

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
  "com.typesafe.akka" %% "akka-http2-support" % akkaHttpVersion,
  "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
  "com.typesafe.akka" %% "akka-stream" % akkaVersion,
  "com.typesafe.akka" %% "akka-discovery" % akkaVersion,
  "com.typesafe.akka" %% "akka-pki" % akkaVersion,

  "com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
  "com.typesafe.akka" %% "akka-http2-support" % akkaHttpVersion,

  "ch.qos.logback" % "logback-classic" % "1.2.3"
)

Note that AkkaGrpcPlugin is the plug-in that carries out all the gRPC generator functions with akka-http2-support ensuring the necessary HTTP/2 support for gRPC.

The IotDevice class

First, we come up with the IotDevice class that represents our IoT sensor devices. For illustration purpose, we add a withRandomStates() method within the companion object for creating an IotDevice object initialized with random states.

object DeviceType extends Enumeration {
  type DeviceType = Value
  val Thermostat: Value = Value(0)
  val Lamp: Value = Value(1)
  val SecurityAlarm: Value = Value(2)
}

case class IotDevice( deviceId: String,
                      deviceType: Int,
                      propertyId: Int,
                      timestamp: Long,
                      opState: Int,
                      setting: Int )

object IotDevice {

  def withRandomStates(propertyId: Int): IotDevice = {
    val devType = randomInt(0, 3)  // 0 -> Thermostat | 1 -> Lamp | 2 -> SecurityAlarm
    val (opState: Int, setting: Int) = devType match {
      case 0 => (randomInt(0, 3), randomInt(60, 76))  // 0|1|2 (OFF|HEAT|COOL), 60-75
      case 1 => (randomInt(0, 2), randomInt(1, 4))  // 0|1 (OFF|ON), 1-3
      case 2 => (randomInt(0, 2), randomInt(1, 6))  // 0|1 (OFF|ON), 1-5
    }
    IotDevice(
      randomId(),
      devType,
      propertyId,
      System.currentTimeMillis(),
      opState,
      setting
    )
  }
}

Protobuf schema

Next, we define our request/response messages and RPC services in file src/main/protobuf/iotstream.proto:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "akkagrpc";
option java_outer_classname = "IotStreamProto";

service IotStreamService {
    rpc sendIotUpdate(stream StatesUpdateRequest) returns (stream StatesUpdateResponse) {}
}

message StatesUpdateRequest {
    string id = 1;
    string client_id = 2;
    int32 property_id = 3;
    string device_id = 4;
    int32 device_type = 5;
    int64 timestamp = 6;
    int32 op_state = 7;
    int32 setting = 8;
}

message StatesUpdateResponse {
    string id = 1;
    string client_id = 2;
    int32 property_id = 3;
    string device_id = 4;
    int32 device_type = 5;
    int64 timestamp = 6;
    int32 op_state_new = 7;
    int32 setting_new = 8;
}

As shown in the Protobuf schema, messages StatesUpdateRequest and StatesUpdateResponse define the request and response streams for the gRPC application, whereas service IotStreamService reveals the signature of the RPC service, leaving its business logic in be implemented in Scala code.

Classes generated by Akka gRPC

The AkkaGrpcPlugin automatically generates Scala source code equivalent to the defined Protobuf messages and to-be-implemented RPC services as Scala traits and classes, along with Akka Stream flows and Akka HTTP routes. The generated classes are placed under target/scala-<scalaVersion>/akka-grpc/main/akkagrpc/:

  • IotstreamProto.scala
  • IotStreamService.scala
  • IotStreamServiceHandler.scala
  • IotStreamServiceClient.scala
  • StatesUpdateRequest.scala
  • StatesUpdateResponse.scala

Service implementation

Now that the generated service interfaces and handlers are in place, we’re ready to create our specific service implementation as class IotStreamServiceImpl:

package akkagrpc

import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.stream.scaladsl._
import scala.concurrent.duration._
import java.util.UUID
import java.util.concurrent.ThreadLocalRandom

class IotStreamServiceImpl(system: ActorSystem[_]) extends IotStreamService {

  private implicit val sys: ActorSystem[_] = system

  val backpressureTMO: FiniteDuration = 3.seconds

  val updateIotFlow: Flow[StatesUpdateRequest, StatesUpdateResponse, NotUsed] =
    Flow[StatesUpdateRequest]
      .map { case StatesUpdateRequest(id, clientId, propId, devId, devType, ts, opState, setting, _) =>
        val (opStateNew, settingNew) =
          updateDeviceStates(propId, devId, devType, opState, setting)
        StatesUpdateResponse(
          randomId(),
          clientId,
          propId,
          devId,
          devType,
          System.currentTimeMillis(),
          opStateNew,
          settingNew)
        }

  @Override
  def sendIotUpdate(requests: Source[StatesUpdateRequest, NotUsed]): Source[StatesUpdateResponse, NotUsed] =
    requests.via(updateIotFlow).backpressureTimeout(backpressureTMO)

  def updateDeviceStates(propId: Int, devId: String, devType: Int, opState: Int, setting: Int): (Int, Int) = {
    // Random device states update simulating algorithmic adjustment in accordance with device and
    // property specific factors (temperature, lighting, etc)
    devType match {
      case 0 =>
        val opStateNew =
          if (opState == 0) randomInt(0, 3) else {
            if (opState == 1) randomInt(0, 2) else (2 + randomInt(0, 2)) % 3
          }
        val settingTemp = setting + randomInt(-2, 3)
        val settingNew = if (settingTemp < 60) 60 else if (settingTemp > 75) 75 else settingTemp
        (opStateNew, settingNew)
      case 1 =>
        (randomInt(0, 2), randomInt(1, 4))
      case 2 =>
        (randomInt(0, 2), randomInt(1, 6))
    }
  }

  def randomInt(a: Int, b: Int): Int = ThreadLocalRandom.current().nextInt(a, b)

  def randomId(): String = UUID.randomUUID().toString.slice(0, 6)  // UUID's first 6 chars
}

Encapsulating the device states update simulation logic, the Akka Stream flow updateIotFlow creates the stream to be carried out by the sendIotUpdate() RPC — all handled by Akka gRPC behind the scene.

TLS-enabled HTTP/2 server

For a skeletal HTTP/2 compliant server, we create class IotStreamServer by borrowing part of the GreeterService sample code available at Lightbend developers guide.

package akkagrpc

import java.security.{KeyStore, SecureRandom}
import java.security.cert.{Certificate, CertificateFactory}

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.{ConnectionContext, Http, HttpsConnectionContext}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.pki.pem.{DERPrivateKeyLoader, PEMDecoder}
import com.typesafe.config.ConfigFactory
import javax.net.ssl.{KeyManagerFactory, SSLContext}

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Success, Failure}
import scala.io.Source

object IotStreamServer {

  def main(args: Array[String]): Unit = {
    // important to enable HTTP/2 in ActorSystem's config
    val conf = ConfigFactory.parseString("akka.http.server.preview.enable-http2 = on")
      .withFallback(ConfigFactory.defaultApplication())
    val system = ActorSystem[Nothing](Behaviors.empty, "IotStreamServer", conf)
    new IotStreamServer(system).run()
  }
}

class IotStreamServer(system: ActorSystem[_]) {

  def run(): Future[Http.ServerBinding] = {
    implicit val sys = system
    implicit val ec: ExecutionContext = system.executionContext

    val service: HttpRequest => Future[HttpResponse] =
      IotStreamServiceHandler(new IotStreamServiceImpl(system))

    val boundServer: Future[Http.ServerBinding] = Http(system)
      .newServerAt(interface = "127.0.0.1", port = 8080)
      .enableHttps(serverHttpContext)
      .bind(service)
      .map(_.addToCoordinatedShutdown(hardTerminationDeadline = 10.seconds))

    boundServer.onComplete {
      case Success(binding) =>
        val address = binding.localAddress
        Console.out.println(s"[Server] gRPC server bound to ${address.getHostString}:${address.getPort}")
      case Failure(ex) =>
        Console.err.println("[Server] ERROR: Failed to bind gRPC endpoint, terminating system ", ex)
        system.terminate()
    }

    boundServer
  }

  private def serverHttpContext: HttpsConnectionContext = {
    val privateKey =
      DERPrivateKeyLoader.load(PEMDecoder.decode(readPrivateKeyPem()))
    val fact = CertificateFactory.getInstance("X.509")
    val cer = fact.generateCertificate(
      classOf[IotStreamServer].getResourceAsStream("/certs/server1.pem")
    )
    val ks = KeyStore.getInstance("PKCS12")
    ks.load(null)
    ks.setKeyEntry(
      "private",
      privateKey,
      new Array[Char](0),
      Array[Certificate](cer)
    )
    val keyManagerFactory = KeyManagerFactory.getInstance("SunX509")
    keyManagerFactory.init(ks, null)
    val context = SSLContext.getInstance("TLS")
    context.init(keyManagerFactory.getKeyManagers, null, new SecureRandom)
    ConnectionContext.httpsServer(context)
  }

  private def readPrivateKeyPem(): String =
    Source.fromResource("certs/server1.key").mkString
}

For development purposes, contrary to just having a self-signed PKCS#12 certificate for a TLS-enabled HTTP server, gRPC clients have more stringent requirement, demanding a HTTP server certificate along with a valid Certificate Authority (CA) certificate that signs the server cert. Rather than going through the process of creating our own CA, for just demonstration purpose, we use the CA certificate that comes with the GreeterService code. The host, port number along with the CA certificate file path are configured for the gRPC client within application.conf, which has content like below:

akka.grpc.client {
  "akkagrpc.IotStreamService" {
    host = 127.0.0.1
    port = 8080
    override-authority = foo.test.google.fr
    trusted = /certs/ca.pem
  }
}

The client application

As shown in the source code of IotStreamClient, the client app passes a stream of states update requests from a group of real estate properties as parameters to method sendIotUpdate() in the gRPC stub IotStreamServiceClient generated by Akka gRPC.

package akkagrpc

import akka.{Done, NotUsed}
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.grpc.GrpcClientSettings
import akka.stream.scaladsl.Source
import akka.stream.ThrottleMode.Shaping

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._

import Util._

// akkagrpc.IotStreamClient clientId propIdStart propIdEnd
//   e.g. akkagrpc.IotStreamClient client1 1000 1049
object IotStreamClient {

  def getDevicesByProperty(propId: Int): Iterator[IotDevice] =
    (1 to randomInt(1, 5)).map { _ =>  // 1-4 devices per property
        IotDevice.withRandomStates(propId)
      }.iterator

  def main(args: Array[String]): Unit = {
    implicit val sys: ActorSystem[_] = ActorSystem(Behaviors.empty, "IotStreamClient")
    implicit val ec: ExecutionContext = sys.executionContext

    val client = IotStreamServiceClient(GrpcClientSettings.fromConfig("akkagrpc.IotStreamService"))

    val (clientId: String, broadcastYN: Int, propIdStart: Int, propIdEnd: Int) =
      if (args.length == 3) {
        Try((args(0), args(1).toInt, args(2).toInt) match {
          case Success((cid, pid1, pid2)) =>
            (cid, pid1, pid2)
          case _ =>
            Console.err.println("[Main] ERROR: Arguments required: clientId, propIdStart & propIdEnd  e.g. client1 1000 1049")
            System.exit(1)
        }
      }
      else
        ("client1", 1000, 1029)  // Default clientId & property id range (inclusive)

    val devices: Iterator[IotDevice] =
      (propIdStart to propIdEnd).flatMap(getDevicesByProperty).iterator

    Console.out.println(s"Performing streaming requests from $clientId ...")
    grpcStreaming(clientId)

    def grpcStreaming(clientId: String): Unit = {
      val requestStream: Source[StatesUpdateRequest, NotUsed] =
        Source
          .fromIterator(() => devices)
          .map { case IotDevice(devId, devType, propId, ts, opState, setting) =>
            Console.out.println(s"[$clientId] REQUEST: $propId $devId ${DeviceType(devType)} | State: $opState, Setting: $setting")
            StatesUpdateRequest(randomId(), clientId, propId, devId, devType, ts, opState, setting)
          }
          .throttle(1, 100.millis, 10, Shaping)  // For illustration only

      val responseStream: Source[StatesUpdateResponse, NotUsed] = client.sendIotUpdate(requestStream)

      val done: Future[Done] =
        responseStream.runForeach {
          case StatesUpdateResponse(id, clntId, propId, devId, devType, ts, opState, setting, _) =>
            Console.out.println(s"[$clientId] RESPONSE: [requester: $clntId] $propId $devId ${DeviceType(devType)} | State: $opState, Setting: $setting")
        }

      done.onComplete {
        case Success(_) =>
          Console.out.println(s"[$clientId] Done IoT states streaming.")
        case Failure(e) =>
          Console.err.println(s"[$clientId] ERROR: $e")
      }

      Thread.sleep(2000)
    }
  }
}

Running the applications

To run the server application, open a command line terminal and run as follows:

# On terminal #1
cd <project-root>
sbt "runMain akkagrpc.IotStreamServer"

For the client application, open a terminal for each client to run a specific range of IDs of real estate properties:

# On terminal #2
cd <project-root>
sbt "runMain akkagrpc.IotStreamClient client1 1000 1019"

# On terminal #3
cd <project-root>
sbt "runMain akkagrpc.IotStreamClient client2 1020 1039"

Below are sample output of the gRPC server and a couple of clients.

Terminal #1: gRPC Server

% ./sbt "runMain akkagrpc.IotStreamServer"
[info] compiling ...
[info] done compiling
[info] running (fork) akkagrpc.IotStreamServer 
[info] [2023-10-23 11:36:53,173] [INFO] [akka.event.slf4j.Slf4jLogger] [IotStreamServer-akka.actor.default-dispatcher-3] [] - Slf4jLogger started
[info] [Server] gRPC server bound to 127.0.0.1:8080

Terminal #2: gRPC Client1

% ./sbt "runMain akkagrpc.IotStreamClient client1 1000 1019"
[info] welcome to sbt 1.9.6 (Oracle Corporation Java 11.0.19)
[info] loading global plugins from /Users/leo/.sbt/1.0/plugins
[info] loading settings for project akka-grpc-iot-stream-build from plugins.sbt ...
[info] loading project definition from /Users/leo/intellij/akka-grpc-iot-stream/project
[info] loading settings for project akka-grpc-iot-stream from build.sbt ...
[info] set current project to akka-grpc-iot-stream (in build file:/Users/leo/intellij/akka-grpc-iot-stream/)
[info] running (fork) akkagrpc.IotStreamClient client1 0 1000 1019
[info] [2023-10-18 11:37:19,401] [INFO] [akka.event.slf4j.Slf4jLogger] [IotStreamClient-akka.actor.default-dispatcher-4] [] - Slf4jLogger started
[info] Performing streaming requests from client1 ...
[info] [client1] REQUEST: 1000 76a2cb Thermostat | State: 1, Setting: 65
[info] [client1] REQUEST: 1001 6588b0 SecurityAlarm | State: 0, Setting: 3
[info] [client1] REQUEST: 1001 2f9150 Lamp | State: 0, Setting: 3
[info] [client1] REQUEST: 1001 a33bd1 SecurityAlarm | State: 0, Setting: 2
[info] [client1] REQUEST: 1001 eabc4b SecurityAlarm | State: 1, Setting: 4
[info] [client1] REQUEST: 1002 c57009 Thermostat | State: 1, Setting: 70
[info] [client1] REQUEST: 1003 4a038c Lamp | State: 0, Setting: 2
[info] [client1] REQUEST: 1003 06bc8b Thermostat | State: 1, Setting: 65
[info] [client1] REQUEST: 1004 ad9432 Lamp | State: 1, Setting: 3
[info] [client1] REQUEST: 1004 356ef0 Lamp | State: 1, Setting: 1
[info] [client1] REQUEST: 1004 f12061 Thermostat | State: 2, Setting: 61
[info] [client1] REQUEST: 1004 983c20 Lamp | State: 0, Setting: 1
[info] [client1] REQUEST: 1005 087963 SecurityAlarm | State: 1, Setting: 3
[info] [client1] RESPONSE: [requester: client1] 1000 76a2cb Thermostat | State: 1, Setting: 63
[info] [client1] RESPONSE: [requester: client1] 1001 6588b0 SecurityAlarm | State: 1, Setting: 1
[info] [client1] RESPONSE: [requester: client1] 1001 2f9150 Lamp | State: 0, Setting: 3
[info] [client1] RESPONSE: [requester: client1] 1001 a33bd1 SecurityAlarm | State: 1, Setting: 3
[info] [client1] RESPONSE: [requester: client1] 1001 eabc4b SecurityAlarm | State: 0, Setting: 1
. . .
. . .
[info] [client1] REQUEST: 1019 799659 Lamp | State: 1, Setting: 3
[info] [client1] RESPONSE: [requester: client1] 1019 194f79 Thermostat | State: 2, Setting: 63
[info] [client1] REQUEST: 1019 cbdf82 Thermostat | State: 0, Setting: 66
[info] [client1] RESPONSE: [requester: client1] 1019 799659 Lamp | State: 1, Setting: 3
[info] [client1] REQUEST: 1019 76307e SecurityAlarm | State: 1, Setting: 1
[info] [client1] RESPONSE: [requester: client1] 1019 cbdf82 Thermostat | State: 0, Setting: 64
[info] [client1] RESPONSE: [requester: client1] 1019 76307e SecurityAlarm | State: 0, Setting: 4
[info] [client1] Done IoT states streaming.

Terminal #3: gRPC Client2

% ./sbt "runMain akkagrpc.IotStreamClient client2 1020 1039"
[info] welcome to sbt 1.9.6 (Oracle Corporation Java 11.0.19)
[info] loading global plugins from /Users/leo/.sbt/1.0/plugins
[info] loading settings for project akka-grpc-iot-stream-build from plugins.sbt ...
[info] loading project definition from /Users/leo/intellij/akka-grpc-iot-stream/project
[info] loading settings for project akka-grpc-iot-stream from build.sbt ...
[info] set current project to akka-grpc-iot-stream (in build file:/Users/leo/intellij/akka-grpc-iot-stream/)
[info] running (fork) akkagrpc.IotStreamClient client2 0 1020 1039
[info] [2023-10-18 11:37:19,401] [INFO] [akka.event.slf4j.Slf4jLogger] [IotStreamClient-akka.actor.default-dispatcher-3] [] - Slf4jLogger started
[info] Performing streaming requests from client2 ...
[info] [client2] REQUEST: 1020 2cb5a8 Thermostat | State: 0, Setting: 71
[info] [client2] REQUEST: 1020 1a6b0e Lamp | State: 1, Setting: 3
[info] [client2] REQUEST: 1020 d1bd73 Thermostat | State: 2, Setting: 60
[info] [client2] REQUEST: 1020 5d9f65 Thermostat | State: 0, Setting: 69
[info] [client2] REQUEST: 1021 711d17 Lamp | State: 1, Setting: 2
[info] [client2] REQUEST: 1021 443245 Thermostat | State: 2, Setting: 73
[info] [client2] REQUEST: 1022 16fff9 Lamp | State: 0, Setting: 2
[info] [client2] REQUEST: 1022 687826 Lamp | State: 0, Setting: 3
[info] [client2] REQUEST: 1022 2cae5f SecurityAlarm | State: 0, Setting: 1
[info] [client2] REQUEST: 1022 c9e8f9 Thermostat | State: 1, Setting: 69
[info] [client2] REQUEST: 1023 6984f2 Lamp | State: 1, Setting: 2
[info] [client2] REQUEST: 1024 a85495 Thermostat | State: 2, Setting: 72
[info] [client2] RESPONSE: [requester: client2] 1020 2cb5a8 Thermostat | State: 0, Setting: 71
[info] [client2] RESPONSE: [requester: client2] 1020 1a6b0e Lamp | State: 0, Setting: 1
[info] [client2] RESPONSE: [requester: client2] 1020 d1bd73 Thermostat | State: 0, Setting: 61
[info] [client2] RESPONSE: [requester: client2] 1020 5d9f65 Thermostat | State: 2, Setting: 67
[info] [client2] RESPONSE: [requester: client2] 1021 711d17 Lamp | State: 0, Setting: 1
. . .
. . .
[info] [client2] REQUEST: 1038 a03d22 Thermostat | State: 2, Setting: 68
[info] [client2] RESPONSE: [requester: client2] 1038 c02238 Lamp | State: 1, Setting: 1
[info] [client2] REQUEST: 1038 9e6d4b Thermostat | State: 0, Setting: 72
[info] [client2] RESPONSE: [requester: client2] 1038 a03d22 Thermostat | State: 0, Setting: 67
[info] [client2] REQUEST: 1039 3d7e6d SecurityAlarm | State: 1, Setting: 3
[info] [client2] RESPONSE: [requester: client2] 1038 9e6d4b Thermostat | State: 2, Setting: 74
[info] [client2] RESPONSE: [requester: client2] 1039 3d7e6d SecurityAlarm | State: 1, Setting: 2
[info] [client2] Done IoT states streaming.

What’s next?

In the next blog post, we’ll go over a use case in which the states update request streams from the various clients will be processed by a gRPC dynamic pub/sub service with the response streams broadcast to be consumed by all the participating clients. Source code that includes both use cases will be published in a GitHub repo.

Scala IoT Systems With Akka Actors II

Back in 2016, I built an Internet-of-Thing (IoT) prototype system leveraging the “minimalist” design principle of the Actor model to simulate low-cost, low-powered IoT devices. A simplified version of the prototype was published in a previous blog post. The stripped-down application was written in Scala along with the Akka Actors run-time library, which is arguably the predominant Actor model implementation at present. Message Queue Telemetry Transport (MQTT) was used as the publish-subscribe messaging protocol for the simulated IoT devices. For simplicity, a single actor was used to simulate requests from a bunch of IoT devices.

In this blog post, I would like to share a version closer to the design of the full prototype system. With the same tech stack used in the previous application, it’s an expanded version (hence, II) that uses loosely-coupled lightweight actors to simulate individual IoT devices, each of which maintains its own internal state and handles bidirectional communications via non-blocking message passing. Using a distributed workers system adapted from a Lightbend template along with a persistence journal, the end product is an IoT system equipped with a scalable fault-tolerant data processing system.

Main components

Below is a diagram and a summary of the revised Scala application which consists of 3 main components:

IoT with MQTT and Akka Actor Systems v.2

1. IoT

  • An IotManager actor which:
    • instantiates a specified number of devices upon start-up
    • subscribes to a MQTT pub-sub topic for the work requests
    • sends received work requests via ClusterClient to the master cluster
    • notifies Device actors upon receiving failure messages from Master actor
    • forwards work results to the corresponding devices upon receiving them from ResultProcessor
  • Device actors each of which:
    • simulates a thermostat, lamp, or security alarm with random initial state and setting
    • maintains and updates internal state and setting upon receiving work results from IotManager
    • generates work requests and publishes them to the MQTT pub-sub topic
    • re-publishes requests upon receiving failure messages from IotManager
  • A MQTT pub-sub broker and a MQTT client for communicating with the broker
  • A configuration helper object, MqttConfig, consisting of:
    • MQTT pub-sub topic
    • URL for the MQTT broker
    • serialization methods to convert objects to byte arrays, and vice versa

2. Master Cluster

  • A fault-tolerant decentralized cluster which:
    • manages a singleton actor instance among the cluster nodes (with a specified role)
    • delegates ClusterClientReceptionist on every node to answer external connection requests
    • provides fail-over of the singleton actor to the next-oldest node in the cluster
  • A Master singleton actor which:
    • registers Workers and distributes work to available Workers
    • acknowledges work request reception with IotManager
    • publishes work results from Workers to ‘work-results’ topic via Akka distributed pub-sub
    • maintains work states using persistence journal
  • A ResultProcessor actor in the master cluster which:
    • gets instantiated upon starting up the IoT system (more on this below)
    • consumes work results by subscribing to the ‘work-results’ topic
    • sends work results received from Master to IotManager

3. Workers

  • An actor system of Workers each of which:
    • communicates via ClusterClient with the master cluster
    • registers with, pulls work from the Master actor
    • reports work status with the Master actor
    • instantiates a WorkProcessor actor to perform the actual work
  • WorkProcessor actors each of which:
    • processes the work requests from its parent Worker
    • generates work results and send back to Worker

Master-worker system with a ‘pull’ model

While significant changes have been made to the IoT actor system, much of the setup for the Master/Worker actor systems and MQTT pub-sub messaging remains largely unchanged from the previous version:

  • As separate independent actor systems, both the IoT and Worker systems communicate with the Master cluster via ClusterClient.
  • Using a ‘pull’ model which generally performs better at scale, the Worker actors register with the Master cluster and pull work when available.
  • Paho-Akka is used as the MQTT pub-sub messaging client.
  • A helper object, MqttConfig, encapsulates a MQTT pub-sub topic and broker information along with serialization methods to handle MQTT messaging using a test Mosquitto broker.

What’s new?

Now, let’s look at the major changes in the revised application:

First of all, Lightbend’s Activator has been retired and Sbt is being used instead.

On persisting actors state, a Redis data store is used as the persistence journal. In the previous version the shared LevelDB journal is coupled with the first seed node which becomes a single point of failure. With the Redis persistence journal decoupled from a specific cluster node, fault tolerance steps up a notch.

As mentioned earlier in the post, one of the key changes to the previous application is the using of actors representing individual IoT devices each with its own state and capability of communicating with entities designated for interfacing with external actor systems. Actors, lightweight and loosely-coupled by design, serve as an excellent vehicle for modeling individual IoT devices. In addition, non-blocking message passing among actors provides an efficient and economical means for communication and logic control of the device state.

The IotManager actor is responsible for creating and managing a specified number of Device actors. Upon startup, the IoT manager instantiates individual Device actors of random device type (thermostat, lamp or security alarm). These devices are maintained in an internal registry regularly updated by the IoT manager.

Each of the Device actors starts up with a random state and setting. For instance, a thermostat device may start with an ON state and a temperature setting of 68F whereas a lamp device might have an initial state of OFF and brightness setting of 2. Once instantiated, a Device actor will maintain its internal operational state and setting from then on and will report and update the state and setting per request.

Work and WorkResult

In this application, a Work object represents a request sent by a specific Device actor and carries the Device’s Id and its current state and setting data. A WorkResult object, on the other hand, represents a returned request for the Device actor to update its state and setting stored within the object.

Responsible for processing the WorkResult generated by the Worker actors, the ResultProcessor actor simulates the processing of work result – in this case it simply sends via the actorSelection method the work result back to the original Device actor through IotManager. Interacting with only the Master cluster system as a cluster client, the Worker actors have no knowledge of the ResultProcessor actor. ResultProcessor receives the work result through subscribing to the Akka distributed pub-sub topic which the Master is the publisher.

While a participant of the Master cluster actor system, the ResultProcessor actor gets instantiated when the IoT actor system starts up. The decoupling of ResultProcessor instantiation from the Master cluster ensures that no excessive ResultProcessor instances get started when multiple Master cluster nodes start up.

Test running the application

Complete source code of the application is available at GitHub.

To run the application on a single JVM, just git-clone the repo, run the following command at a command line terminal and observe the console output:

# Start a Redis server accessible to the master cluster to serve as the persistence journal:
$ nohup redis-server /path/to/conf &

# Launch the master cluster with 2 seed nodes, IoT actor system and Worker actor system:
$ cd {project-root}
$ bin/sbt "runMain akkaiot.Main [NumOfDevices]"

The optional NumOfDevices parameter defaults to 20.

To run the application on separate JVMs, git-clone the repo to a local disk, open up separate command line terminals and launch the different components on separate terminals:

# Start a Redis server accessible to the master cluster to serve as the persistence journal:
$ nohup redis-server /path/to/conf &

cd {project-root}
# Launch the master cluster seed node with persistence journal:
$ bin/sbt "runMain akkaiot.Main 2551"

# Launch additional master cluster seed node:
$ bin/sbt "runMain akkaiot.Main 2552"

# Launch the IoT node:
$ bin/sbt "runMain akkaiot.Main 3001 [NumOfDevices]"

# Launch a Worker node:
$ bin/sbt "runMain akkaiot.Main 0"

# Launch additional Worker node:
$ bin/sbt "runMain akkaiot.Main 0"

Sample console log

Below is filtered console log output from the console tracing the evolving state and setting of a thermostat device:

#
### Console log filtered for device thermostat-1015
#
[info] [INFO] [07/12/2017 14:38:44.707] [IotSystem-akka.actor.default-dispatcher-21] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 started
[info] [INFO] [07/12/2017 14:38:49.726] [IotSystem-akka.actor.default-dispatcher-29] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 with state 0 created work (Id: 69d82872-a9f4-492e-8ae4-28229b797994) 
[info] [INFO] [07/12/2017 14:38:49.726] [IotSystem-akka.actor.default-dispatcher-29] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Publishing MQTT Topic akka-iot-mqtt-topic: Device thermostat-1015
[info] [INFO] [07/12/2017 14:38:49.915] [IotSystem-akka.actor.default-dispatcher-3] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Agent -> Received MQTT message: thermostat-1015 | State 0 | Setting 70
[info] [INFO] [07/12/2017 14:38:50.261] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work for thermostat-1015 : Work Id 69d82872-a9f4-492e-8ae4-28229b797994
[info] [INFO] [07/12/2017 14:38:50.265] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Work for thermostat-1015 accepted | Work Id 69d82872-a9f4-492e-8ae4-28229b797994
[info] [INFO] [07/12/2017 14:38:50.488] [ClusterSystem-akka.actor.default-dispatcher-25] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work for thermostat-1015 to Worker 3ec3b314-1c35-4e57-92b3-41c9edb86fbc | Work Id 69d82872-a9f4-492e-8ae4-28229b797994
[info] [INFO] [07/12/2017 14:38:50.489] [WorkerSystem-akka.actor.default-dispatcher-2] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Received work request from thermostat-1015 | State 0 | Setting 70
[info] [INFO] [07/12/2017 14:38:50.489] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker/work-processor] Work Processor -> thermostat-1015: Switch to COOL | LOWER temperature by -2
[info] [INFO] [07/12/2017 14:38:50.489] [WorkerSystem-akka.actor.default-dispatcher-4] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Processed work: thermostat-1015 | Work Id 69d82872-a9f4-492e-8ae4-28229b797994
[info] [INFO] [07/12/2017 14:38:50.493] [ClusterSystem-akka.actor.default-dispatcher-27] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Got work result: thermostat-1015 | State 2 | Setting 68
[info] [INFO] [07/12/2017 14:38:50.493] [ClusterSystem-akka.actor.default-dispatcher-27] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Sent work result for thermostat-1015 to IoT Manager
[info] [INFO] [07/12/2017 14:38:50.495] [IotSystem-akka.actor.default-dispatcher-33] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Manager -> Work result forwarded to thermostat-1015 
[info] [INFO] [07/12/2017 14:38:50.495] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 received work result with work Id 69d82872-a9f4-492e-8ae4-28229b797994.
[info] [INFO] [07/12/2017 14:38:50.495] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Updated thermostat-1015 with state 2 and setting 68.
[info] [INFO] [07/12/2017 14:38:55.275] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 with state 2 created work (Id: df9622cd-6b80-42e9-be1b-f0a92d002d75) 
[info] [INFO] [07/12/2017 14:38:55.275] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Publishing MQTT Topic akka-iot-mqtt-topic: Device thermostat-1015
[info] [INFO] [07/12/2017 14:38:55.578] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Agent -> Received MQTT message: thermostat-1015 | State 2 | Setting 68
[info] [INFO] [07/12/2017 14:38:55.580] [ClusterSystem-akka.actor.default-dispatcher-25] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work for thermostat-1015 : Work Id df9622cd-6b80-42e9-be1b-f0a92d002d75
[info] [INFO] [07/12/2017 14:38:55.583] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Work for thermostat-1015 accepted | Work Id df9622cd-6b80-42e9-be1b-f0a92d002d75
[info] [INFO] [07/12/2017 14:38:55.586] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work for thermostat-1015 to Worker 3ec3b314-1c35-4e57-92b3-41c9edb86fbc | Work Id df9622cd-6b80-42e9-be1b-f0a92d002d75
[info] [INFO] [07/12/2017 14:38:55.587] [WorkerSystem-akka.actor.default-dispatcher-3] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Received work request from thermostat-1015 | State 2 | Setting 68
[info] [INFO] [07/12/2017 14:38:55.587] [WorkerSystem-akka.actor.default-dispatcher-3] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker/work-processor] Work Processor -> thermostat-1015: Keep state COOL | LOWER temperature by -2
[info] [INFO] [07/12/2017 14:38:55.587] [WorkerSystem-akka.actor.default-dispatcher-3] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Processed work: thermostat-1015 | Work Id df9622cd-6b80-42e9-be1b-f0a92d002d75
[info] [INFO] [07/12/2017 14:38:55.590] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Got work result: thermostat-1015 | State 2 | Setting 66
[info] [INFO] [07/12/2017 14:38:55.590] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Sent work result for thermostat-1015 to IoT Manager
[info] [INFO] [07/12/2017 14:38:55.591] [IotSystem-akka.actor.default-dispatcher-18] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Manager -> Work result forwarded to thermostat-1015 
[info] [INFO] [07/12/2017 14:38:55.591] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 received work result with work Id df9622cd-6b80-42e9-be1b-f0a92d002d75.
[info] [INFO] [07/12/2017 14:38:55.591] [IotSystem-akka.actor.default-dispatcher-16] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Updated thermostat-1015 with state 2 and setting 66.
[info] [INFO] [07/12/2017 14:39:01.596] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 with state 2 created work (Id: c57d21af-3957-43ba-a995-f4f558900fa3) 
[info] [INFO] [07/12/2017 14:39:01.597] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Publishing MQTT Topic akka-iot-mqtt-topic: Device thermostat-1015
[info] [INFO] [07/12/2017 14:39:01.752] [IotSystem-akka.actor.default-dispatcher-17] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Agent -> Received MQTT message: thermostat-1015 | State 2 | Setting 66
[info] [INFO] [07/12/2017 14:39:01.753] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Accepted work for thermostat-1015 : Work Id c57d21af-3957-43ba-a995-f4f558900fa3
[info] [INFO] [07/12/2017 14:39:01.755] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Work for thermostat-1015 accepted | Work Id c57d21af-3957-43ba-a995-f4f558900fa3
[info] [INFO] [07/12/2017 14:39:01.757] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/master/singleton] Cluster Master -> Delegating work for thermostat-1015 to Worker 3ec3b314-1c35-4e57-92b3-41c9edb86fbc | Work Id c57d21af-3957-43ba-a995-f4f558900fa3
[info] [INFO] [07/12/2017 14:39:01.758] [WorkerSystem-akka.actor.default-dispatcher-18] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Received work request from thermostat-1015 | State 2 | Setting 66
[info] [INFO] [07/12/2017 14:39:01.758] [WorkerSystem-akka.actor.default-dispatcher-18] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker/work-processor] Work Processor -> thermostat-1015: Switch to OFF | RAISE temperature by 2
[info] [INFO] [07/12/2017 14:39:01.758] [WorkerSystem-akka.actor.default-dispatcher-18] [akka.tcp://WorkerSystem@127.0.0.1:59399/user/worker] Worker -> Processed work: thermostat-1015 | Work Id c57d21af-3957-43ba-a995-f4f558900fa3
[info] [INFO] [07/12/2017 14:39:01.760] [ClusterSystem-akka.actor.default-dispatcher-25] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Got work result: thermostat-1015 | State 0 | Setting 68
[info] [INFO] [07/12/2017 14:39:01.761] [ClusterSystem-akka.actor.default-dispatcher-25] [akka.tcp://ClusterSystem@127.0.0.1:59407/user/result-processor] Result Processor -> Sent work result for thermostat-1015 to IoT Manager
[info] [INFO] [07/12/2017 14:39:01.761] [IotSystem-akka.actor.default-dispatcher-18] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager] IoT Manager -> Work result forwarded to thermostat-1015 
[info] [INFO] [07/12/2017 14:39:01.761] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> thermostat-1015 received work result with work Id c57d21af-3957-43ba-a995-f4f558900fa3.
[info] [INFO] [07/12/2017 14:39:01.761] [IotSystem-akka.actor.default-dispatcher-15] [akka.tcp://IotSystem@127.0.0.1:3001/user/iot-manager/thermostat-1015] Device -> Updated thermostat-1015 with state 0 and setting 68.
....
....

The following annotated console log showcases fault-tolerance of the master cluster – how it fails over to the 2nd node upon detecting that the 1st node crashes:

#
### Annotated console log from the 2nd cluster node
#
<<<
<<<--- 2nd cluster node (with port# 2552) starts shortly after 1st node (with port# 2551) has started.
<<<
Leos-MBP:akka-iot-mqtt-v2 leo$ bin/sbt "runMain akkaiot.Main 2552"
[info] Loading project definition from /Users/leo/apps/scala/akka-iot-mqtt-v2/project
[info] Set current project to akka-iot-mqtt (in build file:/Users/leo/apps/scala/akka-iot-mqtt-v2/)
[info] Running akkaiot.Main 2552
[info] [INFO] [07/12/2017 14:24:16.491] [main] [akka.remote.Remoting] Starting remoting
[info] [INFO] [07/12/2017 14:24:16.661] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2552]
[info] [INFO] [07/12/2017 14:24:16.662] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://ClusterSystem@127.0.0.1:2552]
[info] [INFO] [07/12/2017 14:24:16.675] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Starting up...
[info] [INFO] [07/12/2017 14:24:16.790] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Registered cluster JMX MBean [akka:type=Cluster]
[info] [INFO] [07/12/2017 14:24:16.790] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Started up successfully
[info] [INFO] [07/12/2017 14:24:16.794] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[info] [INFO] [07/12/2017 14:24:16.797] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Metrics collection has started successfully
[info] [WARN] [07/12/2017 14:24:16.822] [ClusterSystem-akka.actor.default-dispatcher-18] [akka.tcp://ClusterSystem@127.0.0.1:2552/system/cluster/core/daemon/downingProvider] Don't use auto-down feature of Akka Cluster in production. See 'Auto-downing (DO NOT USE)' section of Akka Cluster documentation.
<<<
<<<--- Gets 'welcome' acknowledgement from 1st cluster node.
<<<
[info] [INFO] [07/12/2017 14:24:17.234] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]
[info] [INFO] [07/12/2017 14:24:17.697] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master] ClusterSingletonManager state change [Start -> Younger]
<<<
<<<--- 1st cluster node crashes after running for about a minute!
<<<
[info] [WARN] [07/12/2017 14:25:17.329] [ClusterSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://ClusterSystem@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
[info] [INFO] [07/12/2017 14:25:17.336] [ClusterSystem-akka.actor.default-dispatcher-24] [akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0/endpointWriter] Message [akka.remote.EndpointWriter$AckIdleCheckTimer$] from Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0/endpointWriter#1930129898] to Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0/endpointWriter#1930129898] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[info] [INFO] [07/12/2017 14:25:17.828] [ClusterSystem-akka.actor.default-dispatcher-24] [akka://ClusterSystem/deadLetters] Message [akka.cluster.pubsub.DistributedPubSubMediator$Internal$Status] from Actor[akka://ClusterSystem/system/distributedPubSubMediator#-2106783060] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[info] [INFO] [07/12/2017 14:25:18.257] [ClusterSystem-akka.actor.default-dispatcher-24] [akka://ClusterSystem/deadLetters] Message [akka.cluster.ClusterHeartbeatSender$Heartbeat] from Actor[akka://ClusterSystem/system/cluster/core/daemon/heartbeatSender#-975290267] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[info] [INFO] [07/12/2017 14:25:18.826] [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/deadLetters] Message [akka.cluster.pubsub.DistributedPubSubMediator$Internal$Status] from Actor[akka://ClusterSystem/system/distributedPubSubMediator#-2106783060] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
....
....
[info] [WARN] [07/12/2017 14:25:20.828] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://ClusterSystem@127.0.0.1:2552/system/cluster/core/daemon] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Marking node(s) as UNREACHABLE [Member(address = akka.tcp://ClusterSystem@127.0.0.1:2551, status = Up)]. Node roles [backend]
[info] [INFO] [07/12/2017 14:25:21.258] [ClusterSystem-akka.actor.default-dispatcher-20] [akka://ClusterSystem/deadLetters] Message [akka.cluster.ClusterHeartbeatSender$Heartbeat] from Actor[akka://ClusterSystem/system/cluster/core/daemon/heartbeatSender#-975290267] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[info] [INFO] [07/12/2017 14:25:21.827] [ClusterSystem-akka.actor.default-dispatcher-24] [akka://ClusterSystem/deadLetters] Message [akka.cluster.pubsub.DistributedPubSubMediator$Internal$Status] from Actor[akka://ClusterSystem/system/distributedPubSubMediator#-2106783060] to Actor[akka://ClusterSystem/deadLetters] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[info] [WARN] [07/12/2017 14:25:23.260] [New I/O boss #3] [NettyTransport(akka://ClusterSystem)] Remote connection to null failed with java.net.ConnectException: Connection refused: /127.0.0.1:2551
[info] [WARN] [07/12/2017 14:25:23.262] [ClusterSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://ClusterSystem@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://ClusterSystem@127.0.0.1:2551]] Caused by: [Connection refused: /127.0.0.1:2551]
[info] [WARN] [07/12/2017 14:25:28.829] [New I/O boss #3] [NettyTransport(akka://ClusterSystem)] Remote connection to null failed with java.net.ConnectException: Connection refused: /127.0.0.1:2551
[info] [WARN] [07/12/2017 14:25:28.830] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://ClusterSystem@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.1%3A2551-0] Association with remote system [akka.tcp://ClusterSystem@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://ClusterSystem@127.0.0.1:2551]] Caused by: [Connection refused: /127.0.0.1:2551]
[info] [INFO] [07/12/2017 14:25:30.845] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Leader is auto-downing unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2551]. Don't use auto-down feature of Akka Cluster in production. See 'Auto-downing (DO NOT USE)' section of Akka Cluster documentation.
[info] [INFO] [07/12/2017 14:25:30.846] [ClusterSystem-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Marking unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2551] as [Down]
<<<
<<<--- 1st cluster node (with port# 2551) is marked as 'down'.
<<<
[info] [INFO] [07/12/2017 14:25:31.830] [ClusterSystem-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Leader is removing unreachable node [akka.tcp://ClusterSystem@127.0.0.1:2551]
[info] [INFO] [07/12/2017 14:25:31.832] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master] Previous oldest removed [akka.tcp://ClusterSystem@127.0.0.1:2551]
<<<
<<<--- 2nd cluster node (with port# 2552) replaces 1st node to start a new master singleton actor.
<<<
[info] [INFO] [07/12/2017 14:25:31.833] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master] Younger observed OldestChanged: [None -> myself]
[info] [WARN] [07/12/2017 14:25:31.833] [ClusterSystem-akka.remote.default-remote-dispatcher-15] [akka.remote.Remoting] Association to [akka.tcp://ClusterSystem@127.0.0.1:2551] having UID [1664659180] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.
[info] [INFO] [07/12/2017 14:25:31.877] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master] Singleton manager starting singleton actor [akka://ClusterSystem/user/master/singleton]
[info] [INFO] [07/12/2017 14:25:31.878] [ClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master] ClusterSingletonManager state change [Younger -> Oldest]
[info] [INFO] [07/12/2017 14:25:31.994] [ClusterSystem-rediscala.rediscala-client-worker-dispatcher-28] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/RedisClient-$a] Connect to localhost/127.0.0.1:6379
[info] [INFO] [07/12/2017 14:25:32.015] [ClusterSystem-rediscala.rediscala-client-worker-dispatcher-28] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/RedisClient-$a] Connected to localhost/127.0.0.1:6379
[info] [INFO] [07/12/2017 14:25:32.093] [ClusterSystem-rediscala.rediscala-client-worker-dispatcher-28] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/RedisClient-$b] Connect to localhost/127.0.0.1:6379
[info] [INFO] [07/12/2017 14:25:32.095] [ClusterSystem-rediscala.rediscala-client-worker-dispatcher-28] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/RedisClient-$b] Connected to localhost/127.0.0.1:6379
<<<
<<<--- The new master singleton actor starts replaying events stored in the persistence journal.
<<<
[info] [INFO] [07/12/2017 14:25:32.485] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkAccepted
[info] [INFO] [07/12/2017 14:25:32.486] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkAccepted
[info] [INFO] [07/12/2017 14:25:32.486] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkAccepted
....
....
[info] [INFO] [07/12/2017 14:25:32.532] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkStarted
[info] [INFO] [07/12/2017 14:25:32.532] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkCompleted
[info] [INFO] [07/12/2017 14:25:32.533] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkAccepted
[info] [INFO] [07/12/2017 14:25:32.533] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkStarted
[info] [INFO] [07/12/2017 14:25:32.533] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Replayed event: WorkCompleted
<<<
<<<--- The new master singleton actor starts resuming live operation
<<<
[info] [INFO] [07/12/2017 14:25:32.863] [ClusterSystem-akka.actor.default-dispatcher-40] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for security-alarm-1013 : Work Id f2a1e777-1e35-4b79-9acb-87477358adc1
[info] [WARN] [SECURITY][07/12/2017 14:25:32.874] [ClusterSystem-akka.persistence.dispatchers.default-plugin-dispatcher-26] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [akkaiot.WorkQueue$WorkAccepted] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
[info] [WARN] [SECURITY][07/12/2017 14:25:32.908] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [akkaiot.Master$Ack] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
[info] [INFO] [07/12/2017 14:25:33.694] [ClusterSystem-akka.actor.default-dispatcher-40] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for security-alarm-1001 : Work Id cfe1ecb3-683d-46f4-ab7e-d8c66fa88755
[info] [INFO] [07/12/2017 14:25:33.851] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for lamp-1002 : Work Id 469f7148-a6f8-43e6-aa2e-764ac911c251
[info] [INFO] [07/12/2017 14:25:34.014] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1015 : Work Id 92cb01ea-bb90-482c-b20b-accd4770f21a
[info] [INFO] [07/12/2017 14:25:34.172] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1005 : Work Id 73ccc2c1-ceb7-4923-b705-8fe82c666bc5
[info] [INFO] [07/12/2017 14:25:34.614] [ClusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1012 : Work Id 711b212c-c53d-4f41-a9fa-e8f5ad44ad55
[info] [INFO] [07/12/2017 14:25:35.040] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for lamp-1009 : Work Id 435264de-5891-4ee9-a7f8-352f7c0abc11
[info] [INFO] [07/12/2017 14:25:35.533] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1008 : Work Id 2873a66e-b04f-40d0-be49-0500449f54f4
[info] [INFO] [07/12/2017 14:25:35.692] [ClusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1020 : Work Id eefc57d2-8571-4820-8a6e-a7b083fdbaca
[info] [INFO] [07/12/2017 14:25:35.849] [ClusterSystem-akka.actor.default-dispatcher-37] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for lamp-1007 : Work Id bdcb4f22-4ca9-4b76-9f43-68a1168637cf
[info] [INFO] [07/12/2017 14:25:36.009] [ClusterSystem-akka.actor.default-dispatcher-37] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for security-alarm-1006 : Work Id 01999269-3608-4877-85ef-6c75f39f7db9
[info] [INFO] [07/12/2017 14:25:37.024] [ClusterSystem-akka.actor.default-dispatcher-37] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/master/singleton] Cluster Master -> Accepted work for thermostat-1004 : Work Id ccbc9edb-7a60-4f65-8121-261928f01ff2
....
....

Scaling for production

The Actor model is well suited for building scalable distributed systems. While the application has an underlying architecture that emphasizes on scalability, it would require further effort in the following areas to make it production ready:

  • IotManager uses the ‘ask’ method for message receipt confirmation via a Future return by the Master. If business logic allows, using the fire-and-forget ‘tell’ method will be significantly more efficient especially at scale.
  • The MQTT broker used in the application is a test broker provided by Mosquitto. A production version of the broker should be installed preferably local to the the IoT system. MQTT brokers from other vendors like HiveMQ, RabbitMQ are also available.
  • As displayed in the console log when running the application, Akka’s default Java serializer isn’t best known for its efficiency. Other serializers such as Kryo, Protocol Buffers should be considered.
  • The Redis data store for actor state persistence should be configured for production environment

Further code changes to be considered

A couple of changes to the current application might be worth considering:

Device types are currently represented as strings, and code logic for device type-specific states and settings is repeated during instantiation of devices and processing of work requests. Such logic could be encapsulated within classes defined for individual device types. The payload would probably be larger as a consequence, but it might be worth for better code maintainability especially if there are many device types.

Another change to be considered is that Work and WorkResult could be generalized into a single class. Conversely, they could be further differentiated in accordance with specific business needs. A slightly more extensive change would be to retire ResultProcessor altogether and let Worker actors process WorkResult as well.

State mutation in Akka Actors

In this application, a few actors maintain mutable internal states using private variables (private var):

  • Master
  • IotManager
  • Device

As an actor by-design will never be accessed by multiple threads, it’s generally safe enough to use ‘private var’ to store changed states. But if one prefers state transitioning (as opposed to updating), Akka Actors provides a method to hot-swap an actor’s internal state.

Hot-swapping an actor’s state

Below is a sample snippet that illustrates how hot-swapping mimics a state machine without having to use any mutable variable for maintaining the actor state:

// An Actor that mutates internal state by hot-swapping
object Worker {
  def props(workerId, String, clusterClient: ActorRef) = // ...

  // ...
}

class Worker(workerId, String, clusterClient: ActorRef) extends Actor {
  import Worker._

  override def preStart(): Unit = // Initialize worker
  override def postStop(): Unit = // Terminate worker

  def sendToMaster(msg: Any): Unit = {
    clusterClient ! SendToAll("/user/master/singleton", msg)
  }

  // ...

  def receive = idle

  def idle: Receive = {
    case WorkIsReady =>
      // Tell Master it is free
      sendToMaster(WorkerIsFree(workerId))

    case Work =>
      // Send work to a work processing actor
      workProcessor ! work
      context.become(working)
  }

  def working: Receive = {
    case WorkProcessed(workResult) =>
      // Tell Master work is done
      sendToMaster(WorkIsDone(workerId, workResult))
      context.become(idle)

    case Work =>
      // Tell Master it is busy
      sendToMaster(WorkerIsBusy(workerId))
  }
}

Simplified for illustration, the above snippet depicts a Worker actor that pulls work from the Master cluster. The context.become method allows the actor to switch its internal state at run-time like a state machine. As shown in the simplified code, it takes an ‘Actor.Receive’ (which is a partial function) that implements a new message handler. Under the hood, Akka manages the hot-swapping via a stack. As a side note, according to the relevant source code, the stack for hot-swapping actor behavior is, ironically, a mutable ‘private var’ of List[Actor.Receive].

Recursive transformation of immutable parameter

Another functional approach to mutating actor state is via recursive transformation of an immutable parameter. As an example, we can avoid using a mutable ‘private var registry’ as shown in the following ActorManager actor and use ‘context.become’ to recursively transform a registry as an immutable parameter passed to the updateState method:

// Approach #1: Store map entries in a mutable registry
class ActorManager extends Actor {
  private var registry: Map.empty[String, ActorRef]
  def receive = {
    case Add(id, ref) =>
      registry += id -> ref
    // Other cases
  }
}
// Approach #2: Store map entries in a recursively transformed immutable registry
class ActorManager extends Actor {
  def receive = updateRegistry(Map.empty[String, ActorRef])
  def updateState(registry: Map[String, ActorRef]): Receive = {
    case Add(id, ref) =>
      context.become(updateState(registry + (id -> ref)))
    // Other cases
  }
}