blob: d1890159cc2ea2ab4f89947c44f5b396d96ca2c3 [file] [log] [blame]
#![cfg(feature = "spawn-ready")]
#[path = "../support.rs"]
mod support;
use tokio::time;
use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
use tower::spawn_ready::{SpawnReady, SpawnReadyLayer};
use tower::util::ServiceExt;
use tower_test::mock;
#[tokio::test(flavor = "current_thread")]
async fn when_inner_is_not_ready() {
time::pause();
let _t = support::trace_init();
let layer = SpawnReadyLayer::new();
let (mut service, mut handle) = mock::spawn_layer::<(), (), _>(layer);
// Make the service NotReady
handle.allow(0);
assert_pending!(service.poll_ready());
// Make the service is Ready
handle.allow(1);
time::sleep(time::Duration::from_millis(100)).await;
assert_ready_ok!(service.poll_ready());
}
#[tokio::test(flavor = "current_thread")]
async fn when_inner_fails() {
let _t = support::trace_init();
let layer = SpawnReadyLayer::new();
let (mut service, mut handle) = mock::spawn_layer::<(), (), _>(layer);
// Make the service NotReady
handle.allow(0);
handle.send_error("foobar");
assert_eq!(
assert_ready_err!(service.poll_ready()).to_string(),
"foobar"
);
}
#[tokio::test(flavor = "current_thread")]
async fn propagates_trace_spans() {
use tracing::Instrument;
let _t = support::trace_init();
let span = tracing::info_span!("my_span");
let service = support::AssertSpanSvc::new(span.clone());
let service = SpawnReady::new(service);
let result = tokio::spawn(service.oneshot(()).instrument(span));
result.await.expect("service panicked").expect("failed");
}
#[cfg(test)]
#[tokio::test(flavor = "current_thread")]
async fn abort_on_drop() {
let (mock, mut handle) = mock::pair::<(), ()>();
let mut svc = SpawnReady::new(mock);
handle.allow(0);
// Drive the service to readiness until we signal a drop.
let (drop_tx, drop_rx) = tokio::sync::oneshot::channel();
let mut task = tokio_test::task::spawn(async move {
tokio::select! {
_ = drop_rx => {}
_ = svc.ready() => unreachable!("Service must not become ready"),
}
});
assert_pending!(task.poll());
assert_pending!(handle.poll_request());
// End the task and ensure that the inner service has been dropped.
assert!(drop_tx.send(()).is_ok());
tokio_test::assert_ready!(task.poll());
tokio::task::yield_now().await;
assert!(tokio_test::assert_ready!(handle.poll_request()).is_none());
}