Rate-limiting is a common measure for preventing the resource of a given computing service (e.g. an API service) from being swamped by excessive requests. There are various strategies for achieving rate-limiting, but fundamentally it’s about how to limit the frequency of requests from any sources within a set time window. While a rate-limiter can be implemented in many different ways, it’s, by nature, something well-positioned to be crafted as a stream operator.
Wouldn’t “throttle()” suffice?
Akka Stream’s versatile stream processing functions make it an appealing option for implementing rate-limiters. It provides stream operators like throttle()
with token bucket
model for industry-standard rate-limiting. However, directly applying the function to the incoming request elements would mechanically throttle every request, thus “penalizing” requests from all sources when excessive requests were from, say, just a single source.
We need a slightly more sophisticated rate-limiting solution for the computing service to efficiently serve “behaving” callers while not being swamped by “misbehaving” ones.
Rate-limiting calls to an API service
Let’s say we have an API service that we would like to equip with rate-limiting. Incoming requests will be coming through as elements of an input stream. Each incoming request will consist of source-identifying and API-call, represented as a simple case class instance with apiKey
being the unique key/id for an API user and apiParam
the submitted parameter for the API call:
case class Request[A](apiKey: String, apiParam: A)
A simplistic API call function that takes the apiKey
, apiParam
and returns a Future may look something like this:
def apiCall[A, B](key: String, param: A)(implicit ec: ExecutionContext): Future[B] = ???
For illustration purpose, we’ll trivialize it to return a String-type Future:
def apiCall[A](key: String, param: A)(implicit ec: ExecutionContext): Future[String] = Future{s"apiResult($key, $param)"}
Next, we define the following main attributes for the rate-limiter:
val timeWindow = 2.seconds val maxReqs = 10 // Max overall requests within the timeWindow val maxReqsPerKey = 3 // Max requests per apiKey within the timeWindow
Strategy #1: Discard excessive API calls from any sources
We’ll look into two different filtering strategies that rate-limit calls to our API service. One approach is to limit API calls within the predefined timeWindow
from any given apiKey
to not more than the maxReqsPerKey
value. In other words, those excessive incoming requests with a given apiKey
above the maxReqsPerKey
limit will be discarded. We can come up with such filtering logic as a FlowShape
like below:
// Rate-limiting flow that discards excessive API calls from any sources def keepToLimitPerKey[A](): Flow[Seq[Request[A]], Seq[Request[A]], akka.NotUsed] = Flow[Seq[Request[A]]]. map{ g => g.foldLeft((List.empty[Request[A]], Map.empty[String, Int])){ case ((acc, m), req) => val count = m.getOrElse(req.apiKey, 0) + 1 if (count <= maxReqsPerKey) (req :: acc, m + (req.apiKey -> count)) else (acc, m + (req.apiKey -> count)) }._1.toSeq.reverse }
The filtering Flow
takes a sequence of requests returns a filtered sequence. By iterating through the input sequence with foldLeft
while keeping track of the request count per apiKey
with a Map
, it keeps only up to the first maxReqsPerKey
of requests for any given apiKey
.
Strategy #2: Drop all API calls from any “offending” sources
An alternative strategy is that for any given apiKey
, all API calls with the key will be dropped if the count exceeds the maxReqsPerKey
value within the timeWindow
. Here’s the corresponding filtering Flow
:
// Rate-limiting flow that drops all API calls from any offending sources def dropAllReqsByKey[A](): Flow[Seq[Request[A]], Seq[Request[A]], akka.NotUsed] = Flow[Seq[Request[A]]]. map{ g => val offendingKeys = g.groupMapReduce(_.apiKey)(_ => 1)(_ + _). collect{ case (key, cnt) if cnt > maxReqsPerKey => key }.toSeq g.filterNot(req => offendingKeys.contains(req.apiKey)) }
As shown in the self-explanatory code, this alternative filtering Flow
simply identifies which apiKey
s originate the count-violating requests per timeWindow
and filter out all of their requests.
Grouping API requests in time windows using “groupedWithin()”
Now that we’re equipped with a couple of rate-limiting strategies, we’re going to come up with a stream operator that does the appropriate grouping of the API requests. To achieve that, we use Akka Stream function groupedWithin()
which divides up a stream into groups of up to a given number of elements received within a time window. It has the following method signature:
def groupedWithin(n: Int, d: FiniteDuration): Repr[Seq[Out]]
The function produces chunks of API requests that serve as properly-typed input to be ingested by one of the filtering Flow
s we’ve created. That seems to fit perfectly into what we need.
Well, there is a caveat though. The groupedWithin()
operator emits when the given time interval (i.e. d
, which corresponds to timeWindow
in our use case) elapses since the previous emission or the specified number of elements (i.e. n
, which corresponds to our maxReqs
) is buffered — whichever happens first. In essence, if there are more than n
elements readily available upstream, the operator will not fulfill our at-most n
elements requirement within the time window.
A work-around is to subsequently apply the throttle()
to the grouped requests as a single batch to enforce the time-windowed rate-limiting requirement.
Test-running our API service rate-limiter
Let’s assemble a minuscule stream of requests to test-run our rate-limiter using the first filtering strategy. To make it easy to spot the dropped API requests, we assign the apiParam
parameter of each request an integer value that reveals the request’ position in the input stream via zipWithIndex
.
import akka.actor.ActorSystem import akka.stream.scaladsl._ import akka.stream.{ActorMaterializer, ThrottleMode, OverflowStrategy} import akka.NotUsed import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ implicit val system = ActorSystem("system") implicit val ec = system.dispatcher implicit val materializer = ActorMaterializer() // for Akka stream v2.5 or below case class Request[A](apiKey: String, apiParam: A) def apiCall[A](key: String, param: A)(implicit ec: ExecutionContext): Future[String] = Future{s"apiResult($key, $param)"} val timeWindow = 2.seconds val maxReqs = 10 val maxReqsPerKey = 3 val requests: Iterator[Request[Int]] = Vector( 5, 1, 1, 4, 5, 5, 5, 6, 2, 2, // Rogue keys: `5` 1, 5, 5, 2, 3, 4, 6, 6, 4, 4, 5, 4, 3, 3, 4, 4, 4, 1, 3, 3, // Rogue keys: `3` & `4` 6, 1, 1, 4, 4, 1, 1, 5 // Rogue keys: `1` ). zipWithIndex. map{ case (x, i) => Request(s"k-$x", i + 1) }. iterator Source.fromIterator(() => requests). groupedWithin(maxReqs, timeWindow). via(keepToLimitPerKey()). // Rate-limiting strategy #1 throttle(1, timeWindow, 1, ThrottleMode.Shaping). mapConcat(_.map(req => apiCall(req.apiKey, req.apiParam))). runForeach(println) | Future(Success(apiResult(k-5, 1))) | Future(Success(apiResult(k-1, 2))) | Future(Success(apiResult(k-1, 3))) | Future(Success(apiResult(k-4, 4))) | Future(Success(apiResult(k-5, 5))) | Future(Success(apiResult(k-5, 6))) // Request(k-5, 7) dropped | Future(Success(apiResult(k-6, 8))) | Future(Success(apiResult(k-2, 9))) | Future(Success(apiResult(k-2, 10))) v <-- ~2 seconds | Future(Success(apiResult(k-1, 11))) | Future(Success(apiResult(k-5, 12))) | Future(Success(apiResult(k-5, 13))) | Future(Success(apiResult(k-2, 14))) | Future(Success(apiResult(k-3, 15))) | Future(Success(apiResult(k-4, 16))) | Future(Success(apiResult(k-6, 17))) | Future(<not completed>) | Future(<not completed>) | Future(Success(apiResult(k-4, 20))) v <-- ~2 seconds | Future(Success(apiResult(k-5, 21))) | Future(Success(apiResult(k-4, 22))) | Future(Success(apiResult(k-3, 23))) | Future(Success(apiResult(k-3, 24))) | Future(Success(apiResult(k-4, 25))) | Future(Success(apiResult(k-4, 26))) // Request(k-4, 27) dropped | Future(Success(apiResult(k-1, 28))) | Future(Success(apiResult(k-3, 29))) // Request(k-3, 30) dropped v <-- ~2 seconds | Future(<not completed>) | Future(Success(apiResult(k-1, 32))) | Future(Success(apiResult(k-1, 33))) | Future(Success(apiResult(k-4, 34))) | Future(Success(apiResult(k-4, 35))) | Future(Success(apiResult(k-1, 36))) // Request(k-1, 37) dropped | Future(Success(apiResult(k-5, 38))) v <-- ~2 seconds
Note that mapConcat()
is for flattening the stream of grouped API requests back to a stream of individual requests in their original order.
Next, we test-run our rate-limiter using the alternative filtering strategy with the same input stream and timeWindow/maxReqs/maxReqsPerKey parameters:
val requests: Iterator[Request[Int]] = Vector( 5, 1, 1, 4, 5, 5, 5, 6, 2, 2, // Rogue keys: `5` 1, 5, 5, 2, 3, 4, 6, 6, 4, 4, 5, 4, 3, 3, 4, 4, 4, 1, 3, 3, // Rogue keys: `3` & `4` 6, 1, 1, 4, 4, 1, 1, 5 // Rogue keys: `1` ). zipWithIndex. map{ case (x, i) => Request(s"k-$x", i + 1) }. iterator Source.fromIterator(() => requests). groupedWithin(maxReqs, timeWindow). via(dropAllReqsByKey()). // Rate-limiting strategy #2 throttle(1, timeWindow, 1, ThrottleMode.Shaping). mapConcat(_.map(req => apiCall(req.apiKey, req.apiParam))). runForeach(println) | Future(Success(apiResult(k-1, 2))) | Future(<not completed>) | Future(Success(apiResult(k-4, 4))) | Future(Success(apiResult(k-6, 8))) | Future(Success(apiResult(k-2, 9))) | Future(Success(apiResult(k-2, 10))) // All requests by k-5 dropped v <-- ~2 seconds | Future(<not completed>) | Future(Success(apiResult(k-5, 12))) | Future(Success(apiResult(k-5, 13))) | Future(Success(apiResult(k-2, 14))) | Future(Success(apiResult(k-3, 15))) | Future(Success(apiResult(k-4, 16))) | Future(Success(apiResult(k-6, 17))) | Future(Success(apiResult(k-6, 18))) | Future(Success(apiResult(k-4, 19))) | Future(Success(apiResult(k-4, 20))) v <-- ~2 seconds | Future(Success(apiResult(k-5, 21))) | Future(Success(apiResult(k-1, 28))) // All requests by k-3 or k-4 dropped v <-- ~2 seconds | Future(Success(apiResult(k-6, 31))) | Future(Success(apiResult(k-4, 34))) | Future(Success(apiResult(k-4, 35))) | Future(Success(apiResult(k-5, 38))) // All requests by k-1 dropped v <-- ~2 seconds
Wrapping the rate-limiter in a class
To generalize the rate-limiter, we can create a wrapper class that parameterizes apiCall
and filteringStrategy
along with the timeWindow
, maxReqs
, maxReqsPerKey
parameters.
case class Request[A](apiKey: String, apiParam: A) case class RateLimiter[A, B](apiCall: (String, A) => Future[B], filteringStrategy: Int => Flow[Seq[Request[A]], Seq[Request[A]], NotUsed], timeWindow: FiniteDuration, maxReqs: Int, maxReqsPerKey: Int)(implicit ec: ExecutionContext) { def flow(): Flow[Request[A], Future[B], NotUsed] = Flow[Request[A]]. groupedWithin(maxReqs, timeWindow). via(filteringStrategy(maxReqsPerKey)). throttle(1, timeWindow, 1, ThrottleMode.Shaping). mapConcat(_.map(req => apiCall(req.apiKey, req.apiParam))) } object RateLimiter { // Rate-limiting flow that discards excessive API calls from any sources def keepToLimitPerKey[A](maxReqsPerKey: Int): Flow[Seq[Request[A]], Seq[Request[A]], akka.NotUsed] = Flow[Seq[Request[A]]].map{ g => g.foldLeft((List.empty[Request[A]], Map.empty[String, Int])){ case ((acc, m), req) => val count = m.getOrElse(req.apiKey, 0) + 1 if (count <= maxReqsPerKey) (req :: acc, m + (req.apiKey -> count)) else (acc, m + (req.apiKey -> count)) }._1.toSeq.reverse } // Rate-limiting flow that drops all API calls from any offending sources def dropAllReqsByKey[A](maxReqsPerKey: Int): Flow[Seq[Request[A]], Seq[Request[A]], akka.NotUsed] = Flow[Seq[Request[A]]].map{ g => val offendingKeys = g.groupMapReduce(_.apiKey)(_ => 1)(_ + _). collect{ case (key, cnt) if cnt > maxReqsPerKey => key }.toSeq g.filterNot(req => offendingKeys.contains(req.apiKey)) } }
Note that implementations of any available filtering strategies are now kept within the RateLimiter
companion object.
A “biased” random-number function
Let’s also create a simple function for generating “biased” random integers for test-running the rate-limiter class.
def biasedRandNum(l: Int, u: Int, biasedNums: Set[Int], biasedFactor: Int = 1): Int = { def rand = java.util.concurrent.ThreadLocalRandom.current Vector. iterate(rand.nextInt(l, u+1), biasedFactor)(_ => rand.nextInt(l, u+1)). dropWhile(!biasedNums.contains(_)). headOption match { case Some(n) => n case None => rand.nextInt(l, u+1) } }
Method biasedRandNum()
simply generates a random integer within a given range that skews towards elements in the provided biasedNums
list. The biasedFactor
(e.g. 0, 1, 2, …) influences the skew-level by forcing the random number generator to repeat “biased” trials, with 0
representing no-bias. A larger biasedFactor
value will increase the skew.
For example, biasedRandNum(0, 9, Set(1, 3, 5))
will generate a random integer between 0 and 9 (inclusive), skewing towards generating 1, 3 or 5 with the default biasedFactor = 1
.
Test-running the rate-limiter class with random data
def apiCall[A](key: String, param: A)(implicit ec: ExecutionContext): Future[String] = Future{s"apiResult($key, $param)"} val requests: Iterator[Request[Int]] = Vector.tabulate(1200)(_ => biasedRandNum(0, 9, Set(1, 3, 5), 2)). zipWithIndex. map{ case (x, i) => Request(s"k-$x", i + 1) }. iterator Source.fromIterator(() => requests). via(RateLimiter(apiCall, RateLimiter.dropAllReqsByKey[Int], 2.seconds, 500, 20).flow()). runForeach(println)
In the above example, you’ll see in the output a batch of up to 500 elements get printed for every couple of seconds. The “biasedFactor” is set to 2
significantly skewing the random apiKey
values towards the biasedNums
elements 1, 3 and 5, and since filtering strategy dropAllReqsByKey
is chosen, a likely observation is that all requests with apiKey
k-1, k-3 or k-5 will be dropped by the rate-limiter.
I’ll leave it to the readers to experiment with the rate-limiter by changing the values of parameters in biasedRandNum() as well as constructor fields in class RateLimiter
.