| #![warn(rust_2018_idioms)] |
| #![cfg(all(feature = "full", tokio_unstable, not(tokio_wasi)))] |
| |
| use std::future::Future; |
| use std::sync::{Arc, Mutex}; |
| use std::task::Poll; |
| use tokio::macros::support::poll_fn; |
| |
| use tokio::runtime::Runtime; |
| use tokio::task::consume_budget; |
| use tokio::time::{self, Duration}; |
| |
| #[test] |
| fn num_workers() { |
| let rt = current_thread(); |
| assert_eq!(1, rt.metrics().num_workers()); |
| |
| let rt = threaded(); |
| assert_eq!(2, rt.metrics().num_workers()); |
| } |
| |
| #[test] |
| fn num_blocking_threads() { |
| let rt = current_thread(); |
| assert_eq!(0, rt.metrics().num_blocking_threads()); |
| let _ = rt.block_on(rt.spawn_blocking(move || {})); |
| assert_eq!(1, rt.metrics().num_blocking_threads()); |
| } |
| |
| #[test] |
| fn num_idle_blocking_threads() { |
| let rt = current_thread(); |
| assert_eq!(0, rt.metrics().num_idle_blocking_threads()); |
| let _ = rt.block_on(rt.spawn_blocking(move || {})); |
| rt.block_on(async { |
| time::sleep(Duration::from_millis(5)).await; |
| }); |
| |
| // We need to wait until the blocking thread has become idle. Usually 5ms is |
| // enough for this to happen, but not always. When it isn't enough, sleep |
| // for another second. We don't always wait for a whole second since we want |
| // the test suite to finish quickly. |
| // |
| // Note that the timeout for idle threads to be killed is 10 seconds. |
| if 0 == rt.metrics().num_idle_blocking_threads() { |
| rt.block_on(async { |
| time::sleep(Duration::from_secs(1)).await; |
| }); |
| } |
| |
| assert_eq!(1, rt.metrics().num_idle_blocking_threads()); |
| } |
| |
| #[test] |
| fn blocking_queue_depth() { |
| let rt = tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .max_blocking_threads(1) |
| .build() |
| .unwrap(); |
| |
| assert_eq!(0, rt.metrics().blocking_queue_depth()); |
| |
| let ready = Arc::new(Mutex::new(())); |
| let guard = ready.lock().unwrap(); |
| |
| let ready_cloned = ready.clone(); |
| let wait_until_ready = move || { |
| let _unused = ready_cloned.lock().unwrap(); |
| }; |
| |
| let h1 = rt.spawn_blocking(wait_until_ready.clone()); |
| let h2 = rt.spawn_blocking(wait_until_ready); |
| assert!(rt.metrics().blocking_queue_depth() > 0); |
| |
| drop(guard); |
| |
| let _ = rt.block_on(h1); |
| let _ = rt.block_on(h2); |
| |
| assert_eq!(0, rt.metrics().blocking_queue_depth()); |
| } |
| |
| #[test] |
| fn remote_schedule_count() { |
| use std::thread; |
| |
| let rt = current_thread(); |
| let handle = rt.handle().clone(); |
| let task = thread::spawn(move || { |
| handle.spawn(async { |
| // DO nothing |
| }) |
| }) |
| .join() |
| .unwrap(); |
| |
| rt.block_on(task).unwrap(); |
| |
| assert_eq!(1, rt.metrics().remote_schedule_count()); |
| |
| let rt = threaded(); |
| let handle = rt.handle().clone(); |
| let task = thread::spawn(move || { |
| handle.spawn(async { |
| // DO nothing |
| }) |
| }) |
| .join() |
| .unwrap(); |
| |
| rt.block_on(task).unwrap(); |
| |
| assert_eq!(1, rt.metrics().remote_schedule_count()); |
| } |
| |
| #[test] |
| fn worker_park_count() { |
| let rt = current_thread(); |
| let metrics = rt.metrics(); |
| rt.block_on(async { |
| time::sleep(Duration::from_millis(1)).await; |
| }); |
| drop(rt); |
| assert!(1 <= metrics.worker_park_count(0)); |
| |
| let rt = threaded(); |
| let metrics = rt.metrics(); |
| rt.block_on(async { |
| time::sleep(Duration::from_millis(1)).await; |
| }); |
| drop(rt); |
| assert!(1 <= metrics.worker_park_count(0)); |
| assert!(1 <= metrics.worker_park_count(1)); |
| } |
| |
| #[test] |
| fn worker_noop_count() { |
| // There isn't really a great way to generate no-op parks as they happen as |
| // false-positive events under concurrency. |
| |
| let rt = current_thread(); |
| let metrics = rt.metrics(); |
| rt.block_on(async { |
| time::sleep(Duration::from_millis(1)).await; |
| }); |
| drop(rt); |
| assert!(0 < metrics.worker_noop_count(0)); |
| |
| let rt = threaded(); |
| let metrics = rt.metrics(); |
| rt.block_on(async { |
| time::sleep(Duration::from_millis(1)).await; |
| }); |
| drop(rt); |
| assert!(0 < metrics.worker_noop_count(0)); |
| assert!(0 < metrics.worker_noop_count(1)); |
| } |
| |
| #[test] |
| fn worker_steal_count() { |
| // This metric only applies to the multi-threaded runtime. |
| // |
| // We use a blocking channel to backup one worker thread. |
| use std::sync::mpsc::channel; |
| |
| let rt = threaded(); |
| let metrics = rt.metrics(); |
| |
| rt.block_on(async { |
| let (tx, rx) = channel(); |
| |
| // Move to the runtime. |
| tokio::spawn(async move { |
| // Spawn the task that sends to the channel |
| tokio::spawn(async move { |
| tx.send(()).unwrap(); |
| }); |
| |
| // Spawn a task that bumps the previous task out of the "next |
| // scheduled" slot. |
| tokio::spawn(async {}); |
| |
| // Blocking receive on the channel. |
| rx.recv().unwrap(); |
| }) |
| .await |
| .unwrap(); |
| }); |
| |
| drop(rt); |
| |
| let n: u64 = (0..metrics.num_workers()) |
| .map(|i| metrics.worker_steal_count(i)) |
| .sum(); |
| |
| assert_eq!(1, n); |
| } |
| |
| #[test] |
| fn worker_poll_count() { |
| const N: u64 = 5; |
| |
| let rt = current_thread(); |
| let metrics = rt.metrics(); |
| rt.block_on(async { |
| for _ in 0..N { |
| tokio::spawn(async {}).await.unwrap(); |
| } |
| }); |
| drop(rt); |
| assert_eq!(N, metrics.worker_poll_count(0)); |
| |
| let rt = threaded(); |
| let metrics = rt.metrics(); |
| rt.block_on(async { |
| for _ in 0..N { |
| tokio::spawn(async {}).await.unwrap(); |
| } |
| }); |
| drop(rt); |
| // Account for the `block_on` task |
| let n = (0..metrics.num_workers()) |
| .map(|i| metrics.worker_poll_count(i)) |
| .sum(); |
| |
| assert_eq!(N, n); |
| } |
| |
| #[test] |
| fn worker_total_busy_duration() { |
| const N: usize = 5; |
| |
| let zero = Duration::from_millis(0); |
| |
| let rt = current_thread(); |
| let metrics = rt.metrics(); |
| |
| rt.block_on(async { |
| for _ in 0..N { |
| tokio::spawn(async { |
| tokio::task::yield_now().await; |
| }) |
| .await |
| .unwrap(); |
| } |
| }); |
| |
| drop(rt); |
| |
| assert!(zero < metrics.worker_total_busy_duration(0)); |
| |
| let rt = threaded(); |
| let metrics = rt.metrics(); |
| |
| rt.block_on(async { |
| for _ in 0..N { |
| tokio::spawn(async { |
| tokio::task::yield_now().await; |
| }) |
| .await |
| .unwrap(); |
| } |
| }); |
| |
| drop(rt); |
| |
| for i in 0..metrics.num_workers() { |
| assert!(zero < metrics.worker_total_busy_duration(i)); |
| } |
| } |
| |
| #[test] |
| fn worker_local_schedule_count() { |
| let rt = current_thread(); |
| let metrics = rt.metrics(); |
| rt.block_on(async { |
| tokio::spawn(async {}).await.unwrap(); |
| }); |
| drop(rt); |
| |
| assert_eq!(1, metrics.worker_local_schedule_count(0)); |
| assert_eq!(0, metrics.remote_schedule_count()); |
| |
| let rt = threaded(); |
| let metrics = rt.metrics(); |
| rt.block_on(async { |
| // Move to the runtime |
| tokio::spawn(async { |
| tokio::spawn(async {}).await.unwrap(); |
| }) |
| .await |
| .unwrap(); |
| }); |
| drop(rt); |
| |
| let n: u64 = (0..metrics.num_workers()) |
| .map(|i| metrics.worker_local_schedule_count(i)) |
| .sum(); |
| |
| assert_eq!(2, n); |
| assert_eq!(1, metrics.remote_schedule_count()); |
| } |
| |
| #[test] |
| fn worker_overflow_count() { |
| // Only applies to the threaded worker |
| let rt = threaded(); |
| let metrics = rt.metrics(); |
| rt.block_on(async { |
| // Move to the runtime |
| tokio::spawn(async { |
| let (tx1, rx1) = std::sync::mpsc::channel(); |
| let (tx2, rx2) = std::sync::mpsc::channel(); |
| |
| // First, we need to block the other worker until all tasks have |
| // been spawned. |
| tokio::spawn(async move { |
| tx1.send(()).unwrap(); |
| rx2.recv().unwrap(); |
| }); |
| |
| // Bump the next-run spawn |
| tokio::spawn(async {}); |
| |
| rx1.recv().unwrap(); |
| |
| // Spawn many tasks |
| for _ in 0..300 { |
| tokio::spawn(async {}); |
| } |
| |
| tx2.send(()).unwrap(); |
| }) |
| .await |
| .unwrap(); |
| }); |
| drop(rt); |
| |
| let n: u64 = (0..metrics.num_workers()) |
| .map(|i| metrics.worker_overflow_count(i)) |
| .sum(); |
| |
| assert_eq!(1, n); |
| } |
| |
| #[test] |
| fn injection_queue_depth() { |
| use std::thread; |
| |
| let rt = current_thread(); |
| let handle = rt.handle().clone(); |
| let metrics = rt.metrics(); |
| |
| thread::spawn(move || { |
| handle.spawn(async {}); |
| }) |
| .join() |
| .unwrap(); |
| |
| assert_eq!(1, metrics.injection_queue_depth()); |
| |
| let rt = threaded(); |
| let handle = rt.handle().clone(); |
| let metrics = rt.metrics(); |
| |
| // First we need to block the runtime workers |
| let (tx1, rx1) = std::sync::mpsc::channel(); |
| let (tx2, rx2) = std::sync::mpsc::channel(); |
| |
| rt.spawn(async move { rx1.recv().unwrap() }); |
| rt.spawn(async move { rx2.recv().unwrap() }); |
| |
| thread::spawn(move || { |
| handle.spawn(async {}); |
| }) |
| .join() |
| .unwrap(); |
| |
| let n = metrics.injection_queue_depth(); |
| assert!(1 <= n, "{}", n); |
| assert!(3 >= n, "{}", n); |
| |
| tx1.send(()).unwrap(); |
| tx2.send(()).unwrap(); |
| } |
| |
| #[test] |
| fn worker_local_queue_depth() { |
| const N: usize = 100; |
| |
| let rt = current_thread(); |
| let metrics = rt.metrics(); |
| rt.block_on(async { |
| for _ in 0..N { |
| tokio::spawn(async {}); |
| } |
| |
| assert_eq!(N, metrics.worker_local_queue_depth(0)); |
| }); |
| |
| let rt = threaded(); |
| let metrics = rt.metrics(); |
| rt.block_on(async move { |
| // Move to the runtime |
| tokio::spawn(async move { |
| let (tx1, rx1) = std::sync::mpsc::channel(); |
| let (tx2, rx2) = std::sync::mpsc::channel(); |
| |
| // First, we need to block the other worker until all tasks have |
| // been spawned. |
| tokio::spawn(async move { |
| tx1.send(()).unwrap(); |
| rx2.recv().unwrap(); |
| }); |
| |
| // Bump the next-run spawn |
| tokio::spawn(async {}); |
| |
| rx1.recv().unwrap(); |
| |
| // Spawn some tasks |
| for _ in 0..100 { |
| tokio::spawn(async {}); |
| } |
| |
| let n: usize = (0..metrics.num_workers()) |
| .map(|i| metrics.worker_local_queue_depth(i)) |
| .sum(); |
| |
| assert_eq!(n, N); |
| |
| tx2.send(()).unwrap(); |
| }) |
| .await |
| .unwrap(); |
| }); |
| } |
| |
| #[test] |
| fn budget_exhaustion_yield() { |
| let rt = current_thread(); |
| let metrics = rt.metrics(); |
| |
| assert_eq!(0, metrics.budget_forced_yield_count()); |
| |
| let mut did_yield = false; |
| |
| // block on a task which consumes budget until it yields |
| rt.block_on(poll_fn(|cx| loop { |
| if did_yield { |
| return Poll::Ready(()); |
| } |
| |
| let fut = consume_budget(); |
| tokio::pin!(fut); |
| |
| if fut.poll(cx).is_pending() { |
| did_yield = true; |
| return Poll::Pending; |
| } |
| })); |
| |
| assert_eq!(1, rt.metrics().budget_forced_yield_count()); |
| } |
| |
| #[test] |
| fn budget_exhaustion_yield_with_joins() { |
| let rt = current_thread(); |
| let metrics = rt.metrics(); |
| |
| assert_eq!(0, metrics.budget_forced_yield_count()); |
| |
| let mut did_yield_1 = false; |
| let mut did_yield_2 = false; |
| |
| // block on a task which consumes budget until it yields |
| rt.block_on(async { |
| tokio::join!( |
| poll_fn(|cx| loop { |
| if did_yield_1 { |
| return Poll::Ready(()); |
| } |
| |
| let fut = consume_budget(); |
| tokio::pin!(fut); |
| |
| if fut.poll(cx).is_pending() { |
| did_yield_1 = true; |
| return Poll::Pending; |
| } |
| }), |
| poll_fn(|cx| loop { |
| if did_yield_2 { |
| return Poll::Ready(()); |
| } |
| |
| let fut = consume_budget(); |
| tokio::pin!(fut); |
| |
| if fut.poll(cx).is_pending() { |
| did_yield_2 = true; |
| return Poll::Pending; |
| } |
| }) |
| ) |
| }); |
| |
| assert_eq!(1, rt.metrics().budget_forced_yield_count()); |
| } |
| |
| #[cfg(any(target_os = "linux", target_os = "macos"))] |
| #[test] |
| fn io_driver_fd_count() { |
| let rt = current_thread(); |
| let metrics = rt.metrics(); |
| |
| assert_eq!(metrics.io_driver_fd_registered_count(), 0); |
| |
| let stream = tokio::net::TcpStream::connect("google.com:80"); |
| let stream = rt.block_on(async move { stream.await.unwrap() }); |
| |
| assert_eq!(metrics.io_driver_fd_registered_count(), 1); |
| assert_eq!(metrics.io_driver_fd_deregistered_count(), 0); |
| |
| drop(stream); |
| |
| assert_eq!(metrics.io_driver_fd_deregistered_count(), 1); |
| assert_eq!(metrics.io_driver_fd_registered_count(), 1); |
| } |
| |
| #[cfg(any(target_os = "linux", target_os = "macos"))] |
| #[test] |
| fn io_driver_ready_count() { |
| let rt = current_thread(); |
| let metrics = rt.metrics(); |
| |
| let stream = tokio::net::TcpStream::connect("google.com:80"); |
| let _stream = rt.block_on(async move { stream.await.unwrap() }); |
| |
| assert_eq!(metrics.io_driver_ready_count(), 1); |
| } |
| |
| fn current_thread() -> Runtime { |
| tokio::runtime::Builder::new_current_thread() |
| .enable_all() |
| .build() |
| .unwrap() |
| } |
| |
| fn threaded() -> Runtime { |
| tokio::runtime::Builder::new_multi_thread() |
| .worker_threads(2) |
| .enable_all() |
| .build() |
| .unwrap() |
| } |