blob: 3705f827584c182999d16e0a8a45908155aa2c0c [file] [log] [blame] [edit]
//! A retry "budget" for allowing only a certain amount of retries over time.
use std::{
fmt,
sync::{
atomic::{AtomicIsize, Ordering},
Mutex,
},
time::Duration,
};
use tokio::time::Instant;
/// Represents a "budget" for retrying requests.
///
/// This is useful for limiting the amount of retries a service can perform
/// over a period of time, or per a certain number of requests attempted.
pub struct Budget {
bucket: Bucket,
deposit_amount: isize,
withdraw_amount: isize,
}
/// Indicates that it is not currently allowed to "withdraw" another retry
/// from the [`Budget`].
#[derive(Debug)]
pub struct Overdrawn {
_inner: (),
}
#[derive(Debug)]
struct Bucket {
generation: Mutex<Generation>,
/// Initial budget allowed for every second.
reserve: isize,
/// Slots of a the TTL divided evenly.
slots: Box<[AtomicIsize]>,
/// The amount of time represented by each slot.
window: Duration,
/// The changers for the current slot to be commited
/// after the slot expires.
writer: AtomicIsize,
}
#[derive(Debug)]
struct Generation {
/// Slot index of the last generation.
index: usize,
/// The timestamp since the last generation expired.
time: Instant,
}
// ===== impl Budget =====
impl Budget {
/// Create a [`Budget`] that allows for a certain percent of the total
/// requests to be retried.
///
/// - The `ttl` is the duration of how long a single `deposit` should be
/// considered. Must be between 1 and 60 seconds.
/// - The `min_per_sec` is the minimum rate of retries allowed to accomodate
/// clients that have just started issuing requests, or clients that do
/// not issue many requests per window.
/// - The `retry_percent` is the percentage of calls to `deposit` that can
/// be retried. This is in addition to any retries allowed for via
/// `min_per_sec`. Must be between 0 and 1000.
///
/// As an example, if `0.1` is used, then for every 10 calls to `deposit`,
/// 1 retry will be allowed. If `2.0` is used, then every `deposit`
/// allows for 2 retries.
pub fn new(ttl: Duration, min_per_sec: u32, retry_percent: f32) -> Self {
// assertions taken from finagle
assert!(ttl >= Duration::from_secs(1));
assert!(ttl <= Duration::from_secs(60));
assert!(retry_percent >= 0.0);
assert!(retry_percent <= 1000.0);
assert!(min_per_sec < ::std::i32::MAX as u32);
let (deposit_amount, withdraw_amount) = if retry_percent == 0.0 {
// If there is no percent, then you gain nothing from deposits.
// Withdrawals can only be made against the reserve, over time.
(0, 1)
} else if retry_percent <= 1.0 {
(1, (1.0 / retry_percent) as isize)
} else {
// Support for when retry_percent is between 1.0 and 1000.0,
// meaning for every deposit D, D*retry_percent withdrawals
// can be made.
(1000, (1000.0 / retry_percent) as isize)
};
let reserve = (min_per_sec as isize)
.saturating_mul(ttl.as_secs() as isize) // ttl is between 1 and 60 seconds
.saturating_mul(withdraw_amount);
// AtomicIsize isn't clone, so the slots need to be built in a loop...
let windows = 10u32;
let mut slots = Vec::with_capacity(windows as usize);
for _ in 0..windows {
slots.push(AtomicIsize::new(0));
}
Budget {
bucket: Bucket {
generation: Mutex::new(Generation {
index: 0,
time: Instant::now(),
}),
reserve,
slots: slots.into_boxed_slice(),
window: ttl / windows,
writer: AtomicIsize::new(0),
},
deposit_amount,
withdraw_amount,
}
}
/// Store a "deposit" in the budget, which will be used to permit future
/// withdrawals.
pub fn deposit(&self) {
self.bucket.put(self.deposit_amount);
}
/// Check whether there is enough "balance" in the budget to issue a new
/// retry.
///
/// If there is not enough, an `Err(Overdrawn)` is returned.
pub fn withdraw(&self) -> Result<(), Overdrawn> {
if self.bucket.try_get(self.withdraw_amount) {
Ok(())
} else {
Err(Overdrawn { _inner: () })
}
}
}
impl Default for Budget {
fn default() -> Budget {
Budget::new(Duration::from_secs(10), 10, 0.2)
}
}
impl fmt::Debug for Budget {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Budget")
.field("deposit", &self.deposit_amount)
.field("withdraw", &self.withdraw_amount)
.field("balance", &self.bucket.sum())
.finish()
}
}
// ===== impl Bucket =====
impl Bucket {
fn put(&self, amt: isize) {
self.expire();
self.writer.fetch_add(amt, Ordering::SeqCst);
}
fn try_get(&self, amt: isize) -> bool {
debug_assert!(amt >= 0);
self.expire();
let sum = self.sum();
if sum >= amt {
self.writer.fetch_add(-amt, Ordering::SeqCst);
true
} else {
false
}
}
fn expire(&self) {
let mut gen = self.generation.lock().expect("generation lock");
let now = Instant::now();
let diff = now.saturating_duration_since(gen.time);
if diff < self.window {
// not expired yet
return;
}
let to_commit = self.writer.swap(0, Ordering::SeqCst);
self.slots[gen.index].store(to_commit, Ordering::SeqCst);
let mut diff = diff;
let mut idx = (gen.index + 1) % self.slots.len();
while diff > self.window {
self.slots[idx].store(0, Ordering::SeqCst);
diff -= self.window;
idx = (idx + 1) % self.slots.len();
}
gen.index = idx;
gen.time = now;
}
fn sum(&self) -> isize {
let current = self.writer.load(Ordering::SeqCst);
let windowed_sum: isize = self
.slots
.iter()
.map(|slot| slot.load(Ordering::SeqCst))
// fold() is used instead of sum() to determine overflow behavior
.fold(0, isize::saturating_add);
current
.saturating_add(windowed_sum)
.saturating_add(self.reserve)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time;
#[test]
fn empty() {
let bgt = Budget::new(Duration::from_secs(1), 0, 1.0);
bgt.withdraw().unwrap_err();
}
#[tokio::test]
async fn leaky() {
time::pause();
let bgt = Budget::new(Duration::from_secs(1), 0, 1.0);
bgt.deposit();
time::advance(Duration::from_secs(3)).await;
bgt.withdraw().unwrap_err();
}
#[tokio::test]
async fn slots() {
time::pause();
let bgt = Budget::new(Duration::from_secs(1), 0, 0.5);
bgt.deposit();
bgt.deposit();
time::advance(Duration::from_millis(901)).await;
// 900ms later, the deposit should still be valid
bgt.withdraw().unwrap();
// blank slate
time::advance(Duration::from_millis(2001)).await;
bgt.deposit();
time::advance(Duration::from_millis(301)).await;
bgt.deposit();
time::advance(Duration::from_millis(801)).await;
bgt.deposit();
// the first deposit is expired, but the 2nd should still be valid,
// combining with the 3rd
bgt.withdraw().unwrap();
}
#[tokio::test]
async fn reserve() {
let bgt = Budget::new(Duration::from_secs(1), 5, 1.0);
bgt.withdraw().unwrap();
bgt.withdraw().unwrap();
bgt.withdraw().unwrap();
bgt.withdraw().unwrap();
bgt.withdraw().unwrap();
bgt.withdraw().unwrap_err();
}
}