blob: 08f421598d5b0c32f58bf66ce3dc6316d8f8f1ce [file] [log] [blame]
#![cfg(not(target_arch = "wasm32"))]
use std::convert::Infallible;
use std::future::Future;
use std::net;
use std::sync::Arc;
use std::time::Duration;
use futures_util::FutureExt;
use http::{Request, Response};
use hyper::service::service_fn;
use hyper::Body;
use tokio::net::TcpListener;
use tokio::select;
use tokio::sync::oneshot;
/// This server, unlike [`super::server::Server`], allows for delaying the
/// specified amount of time after each TCP connection is established. This is
/// useful for testing the behavior of the client when the server is slow.
///
/// For example, in case of HTTP/2, once the TCP/TLS connection is established,
/// both endpoints are supposed to send a preface and an initial `SETTINGS`
/// frame (See [RFC9113 3.4] for details). What if these frames are delayed for
/// whatever reason? This server allows for testing such scenarios.
///
/// [RFC9113 3.4]: https://www.rfc-editor.org/rfc/rfc9113.html#name-http-2-connection-preface
pub struct Server {
addr: net::SocketAddr,
shutdown_tx: Option<oneshot::Sender<()>>,
server_terminated_rx: oneshot::Receiver<()>,
}
impl Server {
pub async fn new<F1, Fut, F2>(func: F1, apply_config: F2, delay: Duration) -> Self
where
F1: Fn(Request<Body>) -> Fut + Clone + Send + 'static,
Fut: Future<Output = Response<Body>> + Send + 'static,
F2: FnOnce(hyper::server::conn::Http) -> hyper::server::conn::Http + Send + 'static,
{
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let (server_terminated_tx, server_terminated_rx) = oneshot::channel();
let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = tcp_listener.local_addr().unwrap();
tokio::spawn(async move {
let http = Arc::new(apply_config(hyper::server::conn::Http::new()));
tokio::spawn(async move {
let (connection_shutdown_tx, connection_shutdown_rx) = oneshot::channel();
let connection_shutdown_rx = connection_shutdown_rx.shared();
let mut shutdown_rx = std::pin::pin!(shutdown_rx);
let mut handles = Vec::new();
loop {
select! {
_ = shutdown_rx.as_mut() => {
connection_shutdown_tx.send(()).unwrap();
break;
}
res = tcp_listener.accept() => {
let (stream, _) = res.unwrap();
let handle = tokio::spawn({
let connection_shutdown_rx = connection_shutdown_rx.clone();
let http = http.clone();
let func = func.clone();
async move {
tokio::time::sleep(delay).await;
let mut conn = std::pin::pin!(http.serve_connection(
stream,
service_fn(move |req| {
let fut = func(req);
async move {
Ok::<_, Infallible>(fut.await)
}})
));
select! {
_ = conn.as_mut() => {}
_ = connection_shutdown_rx => {
conn.as_mut().graceful_shutdown();
conn.await.unwrap();
}
}
}
});
handles.push(handle);
}
}
}
futures_util::future::join_all(handles).await;
server_terminated_tx.send(()).unwrap();
});
});
Self {
addr,
shutdown_tx: Some(shutdown_tx),
server_terminated_rx,
}
}
pub async fn shutdown(mut self) {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
self.server_terminated_rx.await.unwrap();
}
pub fn addr(&self) -> net::SocketAddr {
self.addr
}
}