Monthly Archives: August 2021

A Rate-limiter In Akka Stream

Rate-limiting is a common measure for preventing the resource of a given computing service (e.g. an API service) from being swamped by excessive requests. There are various strategies for achieving rate-limiting, but fundamentally it’s about how to limit the frequency of requests from any sources within a set time window. While a rate-limiter can be implemented in many different ways, it’s, by nature, something well-positioned to be crafted as a stream operator.

Wouldn’t “throttle()” suffice?

Akka Stream’s versatile stream processing functions make it an appealing option for implementing rate-limiters. It provides stream operators like throttle() with token bucket model for industry-standard rate-limiting. However, directly applying the function to the incoming request elements would mechanically throttle every request, thus “penalizing” requests from all sources when excessive requests were from, say, just a single source.

We need a slightly more sophisticated rate-limiting solution for the computing service to efficiently serve “behaving” callers while not being swamped by “misbehaving” ones.

Rate-limiting calls to an API service

Let’s say we have an API service that we would like to equip with rate-limiting. Incoming requests will be coming through as elements of an input stream. Each incoming request will consist of source-identifying and API-call, represented as a simple case class instance with apiKey being the unique key/id for an API user and apiParam the submitted parameter for the API call:

case class Request[A](apiKey: String, apiParam: A)

A simplistic API call function that takes the apiKey, apiParam and returns a Future may look something like this:

def apiCall[A, B](key: String, param: A)(implicit ec: ExecutionContext): Future[B] = ???

For illustration purpose, we’ll trivialize it to return a String-type Future:

def apiCall[A](key: String, param: A)(implicit ec: ExecutionContext): Future[String] =
  Future{s"apiResult($key, $param)"}

Next, we define the following main attributes for the rate-limiter:

val timeWindow = 2.seconds
val maxReqs = 10       // Max overall requests within the timeWindow
val maxReqsPerKey = 3  // Max requests per apiKey within the timeWindow

Strategy #1: Discard excessive API calls from any sources

We’ll look into two different filtering strategies that rate-limit calls to our API service. One approach is to limit API calls within the predefined timeWindow from any given apiKey to not more than the maxReqsPerKey value. In other words, those excessive incoming requests with a given apiKey above the maxReqsPerKey limit will be discarded. We can come up with such filtering logic as a FlowShape like below:

// Rate-limiting flow that discards excessive API calls from any sources
def keepToLimitPerKey[A](): Flow[Seq[Request[A]], Seq[Request[A]], akka.NotUsed] = Flow[Seq[Request[A]]].
  map{ g =>
    g.foldLeft((List.empty[Request[A]], Map.empty[String, Int])){ case ((acc, m), req) =>
      val count = m.getOrElse(req.apiKey, 0) + 1
      if (count <= maxReqsPerKey) (req :: acc, m + (req.apiKey -> count))
      else (acc, m + (req.apiKey -> count))
    }._1.toSeq.reverse
  }

The filtering Flow takes a sequence of requests returns a filtered sequence. By iterating through the input sequence with foldLeft while keeping track of the request count per apiKey with a Map, it keeps only up to the first maxReqsPerKey of requests for any given apiKey.

Strategy #2: Drop all API calls from any “offending” sources

An alternative strategy is that for any given apiKey, all API calls with the key will be dropped if the count exceeds the maxReqsPerKey value within the timeWindow. Here’s the corresponding filtering Flow:

// Rate-limiting flow that drops all API calls from any offending sources
def dropAllReqsByKey[A](): Flow[Seq[Request[A]], Seq[Request[A]], akka.NotUsed] = Flow[Seq[Request[A]]].
  map{ g =>
    val offendingKeys = g.groupMapReduce(_.apiKey)(_ => 1)(_ + _).
      collect{ case (key, cnt) if cnt > maxReqsPerKey => key }.toSeq
    g.filterNot(req => offendingKeys.contains(req.apiKey))
  }

As shown in the self-explanatory code, this alternative filtering Flow simply identifies which apiKeys originate the count-violating requests per timeWindow and filter out all of their requests.

Grouping API requests in time windows using “groupedWithin()”

Now that we’re equipped with a couple of rate-limiting strategies, we’re going to come up with a stream operator that does the appropriate grouping of the API requests. To achieve that, we use Akka Stream function groupedWithin() which divides up a stream into groups of up to a given number of elements received within a time window. It has the following method signature:

def groupedWithin(n: Int, d: FiniteDuration): Repr[Seq[Out]]

The function produces chunks of API requests that serve as properly-typed input to be ingested by one of the filtering Flows we’ve created. That seems to fit perfectly into what we need.

Well, there is a caveat though. The groupedWithin() operator emits when the given time interval (i.e. d, which corresponds to timeWindow in our use case) elapses since the previous emission or the specified number of elements (i.e. n, which corresponds to our maxReqs) is buffered — whichever happens first. In essence, if there are more than n elements readily available upstream, the operator will not fulfill our at-most n elements requirement within the time window.

A work-around is to subsequently apply the throttle() to the grouped requests as a single batch to enforce the time-windowed rate-limiting requirement.

Test-running our API service rate-limiter

Let’s assemble a minuscule stream of requests to test-run our rate-limiter using the first filtering strategy. To make it easy to spot the dropped API requests, we assign the apiParam parameter of each request an integer value that reveals the request’ position in the input stream via zipWithIndex.

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, ThrottleMode, OverflowStrategy}
import akka.NotUsed
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

implicit val system = ActorSystem("system")
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()  // for Akka stream v2.5 or below

case class Request[A](apiKey: String, apiParam: A)

def apiCall[A](key: String, param: A)(implicit ec: ExecutionContext): Future[String] =
  Future{s"apiResult($key, $param)"}

val timeWindow = 2.seconds
val maxReqs = 10
val maxReqsPerKey = 3

val requests: Iterator[Request[Int]] = Vector(
    5, 1, 1, 4, 5, 5, 5, 6, 2, 2,  // Rogue keys: `5`
    1, 5, 5, 2, 3, 4, 6, 6, 4, 4,
    5, 4, 3, 3, 4, 4, 4, 1, 3, 3,  // Rogue keys: `3` & `4` 
    6, 1, 1, 4, 4, 1, 1, 5         // Rogue keys: `1`
  ).
  zipWithIndex.
  map{ case (x, i) => Request(s"k-$x", i + 1) }.
  iterator

Source.fromIterator(() => requests).
  groupedWithin(maxReqs, timeWindow).
  via(keepToLimitPerKey()).  // Rate-limiting strategy #1
  throttle(1, timeWindow, 1, ThrottleMode.Shaping).
  mapConcat(_.map(req => apiCall(req.apiKey, req.apiParam))).
  runForeach(println)

| Future(Success(apiResult(k-5, 1)))
| Future(Success(apiResult(k-1, 2)))
| Future(Success(apiResult(k-1, 3)))
| Future(Success(apiResult(k-4, 4)))
| Future(Success(apiResult(k-5, 5)))
| Future(Success(apiResult(k-5, 6)))   // Request(k-5, 7) dropped
| Future(Success(apiResult(k-6, 8)))
| Future(Success(apiResult(k-2, 9)))
| Future(Success(apiResult(k-2, 10)))
v <-- ~2 seconds
| Future(Success(apiResult(k-1, 11)))
| Future(Success(apiResult(k-5, 12)))
| Future(Success(apiResult(k-5, 13)))
| Future(Success(apiResult(k-2, 14)))
| Future(Success(apiResult(k-3, 15)))
| Future(Success(apiResult(k-4, 16)))
| Future(Success(apiResult(k-6, 17)))
| Future(<not completed>)
| Future(<not completed>)
| Future(Success(apiResult(k-4, 20)))
v <-- ~2 seconds
| Future(Success(apiResult(k-5, 21)))
| Future(Success(apiResult(k-4, 22)))
| Future(Success(apiResult(k-3, 23)))
| Future(Success(apiResult(k-3, 24)))
| Future(Success(apiResult(k-4, 25)))
| Future(Success(apiResult(k-4, 26)))  // Request(k-4, 27) dropped
| Future(Success(apiResult(k-1, 28)))
| Future(Success(apiResult(k-3, 29)))  // Request(k-3, 30) dropped
v <-- ~2 seconds
| Future(<not completed>)
| Future(Success(apiResult(k-1, 32)))
| Future(Success(apiResult(k-1, 33)))
| Future(Success(apiResult(k-4, 34)))
| Future(Success(apiResult(k-4, 35)))
| Future(Success(apiResult(k-1, 36)))  // Request(k-1, 37) dropped
| Future(Success(apiResult(k-5, 38)))
v <-- ~2 seconds

Note that mapConcat() is for flattening the stream of grouped API requests back to a stream of individual requests in their original order.

Next, we test-run our rate-limiter using the alternative filtering strategy with the same input stream and timeWindow/maxReqs/maxReqsPerKey parameters:

val requests: Iterator[Request[Int]] = Vector(
    5, 1, 1, 4, 5, 5, 5, 6, 2, 2,  // Rogue keys: `5`
    1, 5, 5, 2, 3, 4, 6, 6, 4, 4,
    5, 4, 3, 3, 4, 4, 4, 1, 3, 3,  // Rogue keys: `3` & `4` 
    6, 1, 1, 4, 4, 1, 1, 5         // Rogue keys: `1`
  ).
  zipWithIndex.
  map{ case (x, i) => Request(s"k-$x", i + 1) }.
  iterator

Source.fromIterator(() => requests).
  groupedWithin(maxReqs, timeWindow).
  via(dropAllReqsByKey()).  // Rate-limiting strategy #2
  throttle(1, timeWindow, 1, ThrottleMode.Shaping).
  mapConcat(_.map(req => apiCall(req.apiKey, req.apiParam))).
  runForeach(println)

| Future(Success(apiResult(k-1, 2)))
| Future(<not completed>)
| Future(Success(apiResult(k-4, 4)))
| Future(Success(apiResult(k-6, 8)))
| Future(Success(apiResult(k-2, 9)))
| Future(Success(apiResult(k-2, 10)))  // All requests by k-5 dropped
v <-- ~2 seconds
| Future(<not completed>)
| Future(Success(apiResult(k-5, 12)))
| Future(Success(apiResult(k-5, 13)))
| Future(Success(apiResult(k-2, 14)))
| Future(Success(apiResult(k-3, 15)))
| Future(Success(apiResult(k-4, 16)))
| Future(Success(apiResult(k-6, 17)))
| Future(Success(apiResult(k-6, 18)))
| Future(Success(apiResult(k-4, 19)))
| Future(Success(apiResult(k-4, 20)))
v <-- ~2 seconds
| Future(Success(apiResult(k-5, 21)))
| Future(Success(apiResult(k-1, 28)))  // All requests by k-3 or k-4 dropped
v <-- ~2 seconds
| Future(Success(apiResult(k-6, 31)))
| Future(Success(apiResult(k-4, 34)))
| Future(Success(apiResult(k-4, 35)))
| Future(Success(apiResult(k-5, 38)))  // All requests by k-1 dropped
v <-- ~2 seconds

Wrapping the rate-limiter in a class

To generalize the rate-limiter, we can create a wrapper class that parameterizes apiCall and filteringStrategy along with the timeWindow, maxReqs, maxReqsPerKey parameters.

case class Request[A](apiKey: String, apiParam: A)

case class RateLimiter[A, B](apiCall: (String, A) => Future[B],
                             filteringStrategy: Int => Flow[Seq[Request[A]], Seq[Request[A]], NotUsed],
                             timeWindow: FiniteDuration,
                             maxReqs: Int,
                             maxReqsPerKey: Int)(implicit ec: ExecutionContext) {
  def flow(): Flow[Request[A], Future[B], NotUsed] =
    Flow[Request[A]].
      groupedWithin(maxReqs, timeWindow).
      via(filteringStrategy(maxReqsPerKey)).
      throttle(1, timeWindow, 1, ThrottleMode.Shaping).
      mapConcat(_.map(req => apiCall(req.apiKey, req.apiParam)))
}

object RateLimiter {
  // Rate-limiting flow that discards excessive API calls from any sources
  def keepToLimitPerKey[A](maxReqsPerKey: Int): Flow[Seq[Request[A]], Seq[Request[A]], akka.NotUsed] =
    Flow[Seq[Request[A]]].map{ g =>
      g.foldLeft((List.empty[Request[A]], Map.empty[String, Int])){ case ((acc, m), req) =>
        val count = m.getOrElse(req.apiKey, 0) + 1
        if (count <= maxReqsPerKey) (req :: acc, m + (req.apiKey -> count))
        else (acc, m + (req.apiKey -> count))
      }._1.toSeq.reverse
    }

  // Rate-limiting flow that drops all API calls from any offending sources
  def dropAllReqsByKey[A](maxReqsPerKey: Int): Flow[Seq[Request[A]], Seq[Request[A]], akka.NotUsed] =
    Flow[Seq[Request[A]]].map{ g =>
      val offendingKeys = g.groupMapReduce(_.apiKey)(_ => 1)(_ + _).
        collect{ case (key, cnt) if cnt > maxReqsPerKey => key }.toSeq
      g.filterNot(req => offendingKeys.contains(req.apiKey))
    }
}

Note that implementations of any available filtering strategies are now kept within the RateLimiter companion object.

A “biased” random-number function

Let’s also create a simple function for generating “biased” random integers for test-running the rate-limiter class.

def biasedRandNum(l: Int, u: Int, biasedNums: Set[Int], biasedFactor: Int = 1): Int = {
  def rand = java.util.concurrent.ThreadLocalRandom.current 
  Vector.
    iterate(rand.nextInt(l, u+1), biasedFactor)(_ => rand.nextInt(l, u+1)).
    dropWhile(!biasedNums.contains(_)).
    headOption match {
      case Some(n) => n
      case None => rand.nextInt(l, u+1)
    }
}

Method biasedRandNum() simply generates a random integer within a given range that skews towards elements in the provided biasedNums list. The biasedFactor (e.g. 0, 1, 2, …) influences the skew-level by forcing the random number generator to repeat “biased” trials, with 0 representing no-bias. A larger biasedFactor value will increase the skew.

For example, biasedRandNum(0, 9, Set(1, 3, 5)) will generate a random integer between 0 and 9 (inclusive), skewing towards generating 1, 3 or 5 with the default biasedFactor = 1.

Test-running the rate-limiter class with random data

def apiCall[A](key: String, param: A)(implicit ec: ExecutionContext): Future[String] =
  Future{s"apiResult($key, $param)"}

val requests: Iterator[Request[Int]] = Vector.tabulate(1200)(_ => biasedRandNum(0, 9, Set(1, 3, 5), 2)).
  zipWithIndex.
  map{ case (x, i) => Request(s"k-$x", i + 1) }.
  iterator

Source.fromIterator(() => requests).
  via(RateLimiter(apiCall, RateLimiter.dropAllReqsByKey[Int], 2.seconds, 500, 20).flow()).
  runForeach(println)

In the above example, you’ll see in the output a batch of up to 500 elements get printed for every couple of seconds. The “biasedFactor” is set to 2 significantly skewing the random apiKey values towards the biasedNums elements 1, 3 and 5, and since filtering strategy dropAllReqsByKey is chosen, a likely observation is that all requests with apiKey k-1, k-3 or k-5 will be dropped by the rate-limiter.

I’ll leave it to the readers to experiment with the rate-limiter by changing the values of parameters in biasedRandNum() as well as constructor fields in class RateLimiter.