blob: 5c7cd01598ad07ada6a1823df9d69a626af06d4d [file] [log] [blame] [edit]
use bencher::{benchmark_group, benchmark_main, Bencher};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use http_body::Body;
use std::{
fmt::{Error, Formatter},
pin::Pin,
task::{Context, Poll},
};
use tonic::{codec::DecodeBuf, codec::Decoder, Status, Streaming};
macro_rules! bench {
($name:ident, $message_size:expr, $chunk_size:expr, $message_count:expr) => {
fn $name(b: &mut Bencher) {
let rt = tokio::runtime::Builder::new_multi_thread()
.build()
.expect("runtime");
let payload = make_payload($message_size, $message_count);
let body = MockBody::new(payload, $chunk_size);
b.bytes = body.len() as u64;
b.iter(|| {
rt.block_on(async {
let decoder = MockDecoder::new($message_size);
let mut stream = Streaming::new_request(decoder, body.clone(), None, None);
let mut count = 0;
while let Some(msg) = stream.message().await.unwrap() {
assert_eq!($message_size, msg.len());
count += 1;
}
assert_eq!(count, $message_count);
assert!(stream.trailers().await.unwrap().is_none());
})
})
}
};
}
#[derive(Clone)]
struct MockBody {
data: Bytes,
chunk_size: usize,
}
impl MockBody {
pub fn new(data: Bytes, chunk_size: usize) -> Self {
MockBody { data, chunk_size }
}
pub fn len(&self) -> usize {
self.data.len()
}
}
impl Body for MockBody {
type Data = Bytes;
type Error = Status;
fn poll_data(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
if self.data.has_remaining() {
let split = std::cmp::min(self.chunk_size, self.data.remaining());
Poll::Ready(Some(Ok(self.data.split_to(split))))
} else {
Poll::Ready(None)
}
}
fn poll_trailers(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}
impl std::fmt::Debug for MockBody {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
let sample = self.data.iter().take(10).collect::<Vec<_>>();
write!(f, "{:?}...({})", sample, self.data.len())
}
}
#[derive(Debug, Clone)]
struct MockDecoder {
message_size: usize,
}
impl MockDecoder {
fn new(message_size: usize) -> Self {
MockDecoder { message_size }
}
}
impl Decoder for MockDecoder {
type Item = Vec<u8>;
type Error = Status;
fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
let out = Vec::from(buf.chunk());
buf.advance(self.message_size);
Ok(Some(out))
}
}
fn make_payload(message_length: usize, message_count: usize) -> Bytes {
let mut buf = BytesMut::new();
for _ in 0..message_count {
let msg = vec![97u8; message_length];
buf.reserve(msg.len() + 5);
buf.put_u8(0);
buf.put_u32(msg.len() as u32);
buf.put(&msg[..]);
}
buf.freeze()
}
// change body chunk size only
bench!(chunk_size_100, 1_000, 100, 1);
bench!(chunk_size_500, 1_000, 500, 1);
bench!(chunk_size_1005, 1_000, 1_005, 1);
// change message size only
bench!(message_size_1k, 1_000, 1_005, 2);
bench!(message_size_5k, 5_000, 1_005, 2);
bench!(message_size_10k, 10_000, 1_005, 2);
// change message count only
bench!(message_count_1, 500, 505, 1);
bench!(message_count_10, 500, 505, 10);
bench!(message_count_20, 500, 505, 20);
benchmark_group!(chunk_size, chunk_size_100, chunk_size_500, chunk_size_1005);
benchmark_group!(
message_size,
message_size_1k,
message_size_5k,
message_size_10k
);
benchmark_group!(
message_count,
message_count_1,
message_count_10,
message_count_20
);
benchmark_main!(chunk_size, message_size, message_count);