| #![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))] |
| |
| //! A one-shot channel is used for sending a single message between |
| //! asynchronous tasks. The [`channel`] function is used to create a |
| //! [`Sender`] and [`Receiver`] handle pair that form the channel. |
| //! |
| //! The `Sender` handle is used by the producer to send the value. |
| //! The `Receiver` handle is used by the consumer to receive the value. |
| //! |
| //! Each handle can be used on separate tasks. |
| //! |
| //! Since the `send` method is not async, it can be used anywhere. This includes |
| //! sending between two runtimes, and using it from non-async code. |
| //! |
| //! If the [`Receiver`] is closed before receiving a message which has already |
| //! been sent, the message will remain in the channel until the receiver is |
| //! dropped, at which point the message will be dropped immediately. |
| //! |
| //! # Examples |
| //! |
| //! ``` |
| //! use tokio::sync::oneshot; |
| //! |
| //! #[tokio::main] |
| //! async fn main() { |
| //! let (tx, rx) = oneshot::channel(); |
| //! |
| //! tokio::spawn(async move { |
| //! if let Err(_) = tx.send(3) { |
| //! println!("the receiver dropped"); |
| //! } |
| //! }); |
| //! |
| //! match rx.await { |
| //! Ok(v) => println!("got = {:?}", v), |
| //! Err(_) => println!("the sender dropped"), |
| //! } |
| //! } |
| //! ``` |
| //! |
| //! If the sender is dropped without sending, the receiver will fail with |
| //! [`error::RecvError`]: |
| //! |
| //! ``` |
| //! use tokio::sync::oneshot; |
| //! |
| //! #[tokio::main] |
| //! async fn main() { |
| //! let (tx, rx) = oneshot::channel::<u32>(); |
| //! |
| //! tokio::spawn(async move { |
| //! drop(tx); |
| //! }); |
| //! |
| //! match rx.await { |
| //! Ok(_) => panic!("This doesn't happen"), |
| //! Err(_) => println!("the sender dropped"), |
| //! } |
| //! } |
| //! ``` |
| //! |
| //! To use a oneshot channel in a `tokio::select!` loop, add `&mut` in front of |
| //! the channel. |
| //! |
| //! ``` |
| //! use tokio::sync::oneshot; |
| //! use tokio::time::{interval, sleep, Duration}; |
| //! |
| //! #[tokio::main] |
| //! # async fn _doc() {} |
| //! # #[tokio::main(flavor = "current_thread", start_paused = true)] |
| //! async fn main() { |
| //! let (send, mut recv) = oneshot::channel(); |
| //! let mut interval = interval(Duration::from_millis(100)); |
| //! |
| //! # let handle = |
| //! tokio::spawn(async move { |
| //! sleep(Duration::from_secs(1)).await; |
| //! send.send("shut down").unwrap(); |
| //! }); |
| //! |
| //! loop { |
| //! tokio::select! { |
| //! _ = interval.tick() => println!("Another 100ms"), |
| //! msg = &mut recv => { |
| //! println!("Got message: {}", msg.unwrap()); |
| //! break; |
| //! } |
| //! } |
| //! } |
| //! # handle.await.unwrap(); |
| //! } |
| //! ``` |
| //! |
| //! To use a `Sender` from a destructor, put it in an [`Option`] and call |
| //! [`Option::take`]. |
| //! |
| //! ``` |
| //! use tokio::sync::oneshot; |
| //! |
| //! struct SendOnDrop { |
| //! sender: Option<oneshot::Sender<&'static str>>, |
| //! } |
| //! impl Drop for SendOnDrop { |
| //! fn drop(&mut self) { |
| //! if let Some(sender) = self.sender.take() { |
| //! // Using `let _ =` to ignore send errors. |
| //! let _ = sender.send("I got dropped!"); |
| //! } |
| //! } |
| //! } |
| //! |
| //! #[tokio::main] |
| //! # async fn _doc() {} |
| //! # #[tokio::main(flavor = "current_thread")] |
| //! async fn main() { |
| //! let (send, recv) = oneshot::channel(); |
| //! |
| //! let send_on_drop = SendOnDrop { sender: Some(send) }; |
| //! drop(send_on_drop); |
| //! |
| //! assert_eq!(recv.await, Ok("I got dropped!")); |
| //! } |
| //! ``` |
| |
| use crate::loom::cell::UnsafeCell; |
| use crate::loom::sync::atomic::AtomicUsize; |
| use crate::loom::sync::Arc; |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| use crate::util::trace; |
| |
| use std::fmt; |
| use std::future::Future; |
| use std::mem::MaybeUninit; |
| use std::pin::Pin; |
| use std::sync::atomic::Ordering::{self, AcqRel, Acquire}; |
| use std::task::Poll::{Pending, Ready}; |
| use std::task::{Context, Poll, Waker}; |
| |
| /// Sends a value to the associated [`Receiver`]. |
| /// |
| /// A pair of both a [`Sender`] and a [`Receiver`] are created by the |
| /// [`channel`](fn@channel) function. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use tokio::sync::oneshot; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let (tx, rx) = oneshot::channel(); |
| /// |
| /// tokio::spawn(async move { |
| /// if let Err(_) = tx.send(3) { |
| /// println!("the receiver dropped"); |
| /// } |
| /// }); |
| /// |
| /// match rx.await { |
| /// Ok(v) => println!("got = {:?}", v), |
| /// Err(_) => println!("the sender dropped"), |
| /// } |
| /// } |
| /// ``` |
| /// |
| /// If the sender is dropped without sending, the receiver will fail with |
| /// [`error::RecvError`]: |
| /// |
| /// ``` |
| /// use tokio::sync::oneshot; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let (tx, rx) = oneshot::channel::<u32>(); |
| /// |
| /// tokio::spawn(async move { |
| /// drop(tx); |
| /// }); |
| /// |
| /// match rx.await { |
| /// Ok(_) => panic!("This doesn't happen"), |
| /// Err(_) => println!("the sender dropped"), |
| /// } |
| /// } |
| /// ``` |
| /// |
| /// To use a `Sender` from a destructor, put it in an [`Option`] and call |
| /// [`Option::take`]. |
| /// |
| /// ``` |
| /// use tokio::sync::oneshot; |
| /// |
| /// struct SendOnDrop { |
| /// sender: Option<oneshot::Sender<&'static str>>, |
| /// } |
| /// impl Drop for SendOnDrop { |
| /// fn drop(&mut self) { |
| /// if let Some(sender) = self.sender.take() { |
| /// // Using `let _ =` to ignore send errors. |
| /// let _ = sender.send("I got dropped!"); |
| /// } |
| /// } |
| /// } |
| /// |
| /// #[tokio::main] |
| /// # async fn _doc() {} |
| /// # #[tokio::main(flavor = "current_thread")] |
| /// async fn main() { |
| /// let (send, recv) = oneshot::channel(); |
| /// |
| /// let send_on_drop = SendOnDrop { sender: Some(send) }; |
| /// drop(send_on_drop); |
| /// |
| /// assert_eq!(recv.await, Ok("I got dropped!")); |
| /// } |
| /// ``` |
| /// |
| /// [`Option`]: std::option::Option |
| /// [`Option::take`]: std::option::Option::take |
| #[derive(Debug)] |
| pub struct Sender<T> { |
| inner: Option<Arc<Inner<T>>>, |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| resource_span: tracing::Span, |
| } |
| |
| /// Receives a value from the associated [`Sender`]. |
| /// |
| /// A pair of both a [`Sender`] and a [`Receiver`] are created by the |
| /// [`channel`](fn@channel) function. |
| /// |
| /// This channel has no `recv` method because the receiver itself implements the |
| /// [`Future`] trait. To receive a `Result<T, `[`error::RecvError`]`>`, `.await` the `Receiver` object directly. |
| /// |
| /// The `poll` method on the `Future` trait is allowed to spuriously return |
| /// `Poll::Pending` even if the message has been sent. If such a spurious |
| /// failure happens, then the caller will be woken when the spurious failure has |
| /// been resolved so that the caller can attempt to receive the message again. |
| /// Note that receiving such a wakeup does not guarantee that the next call will |
| /// succeed — it could fail with another spurious failure. (A spurious failure |
| /// does not mean that the message is lost. It is just delayed.) |
| /// |
| /// [`Future`]: trait@std::future::Future |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use tokio::sync::oneshot; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let (tx, rx) = oneshot::channel(); |
| /// |
| /// tokio::spawn(async move { |
| /// if let Err(_) = tx.send(3) { |
| /// println!("the receiver dropped"); |
| /// } |
| /// }); |
| /// |
| /// match rx.await { |
| /// Ok(v) => println!("got = {:?}", v), |
| /// Err(_) => println!("the sender dropped"), |
| /// } |
| /// } |
| /// ``` |
| /// |
| /// If the sender is dropped without sending, the receiver will fail with |
| /// [`error::RecvError`]: |
| /// |
| /// ``` |
| /// use tokio::sync::oneshot; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let (tx, rx) = oneshot::channel::<u32>(); |
| /// |
| /// tokio::spawn(async move { |
| /// drop(tx); |
| /// }); |
| /// |
| /// match rx.await { |
| /// Ok(_) => panic!("This doesn't happen"), |
| /// Err(_) => println!("the sender dropped"), |
| /// } |
| /// } |
| /// ``` |
| /// |
| /// To use a `Receiver` in a `tokio::select!` loop, add `&mut` in front of the |
| /// channel. |
| /// |
| /// ``` |
| /// use tokio::sync::oneshot; |
| /// use tokio::time::{interval, sleep, Duration}; |
| /// |
| /// #[tokio::main] |
| /// # async fn _doc() {} |
| /// # #[tokio::main(flavor = "current_thread", start_paused = true)] |
| /// async fn main() { |
| /// let (send, mut recv) = oneshot::channel(); |
| /// let mut interval = interval(Duration::from_millis(100)); |
| /// |
| /// # let handle = |
| /// tokio::spawn(async move { |
| /// sleep(Duration::from_secs(1)).await; |
| /// send.send("shut down").unwrap(); |
| /// }); |
| /// |
| /// loop { |
| /// tokio::select! { |
| /// _ = interval.tick() => println!("Another 100ms"), |
| /// msg = &mut recv => { |
| /// println!("Got message: {}", msg.unwrap()); |
| /// break; |
| /// } |
| /// } |
| /// } |
| /// # handle.await.unwrap(); |
| /// } |
| /// ``` |
| #[derive(Debug)] |
| pub struct Receiver<T> { |
| inner: Option<Arc<Inner<T>>>, |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| resource_span: tracing::Span, |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| async_op_span: tracing::Span, |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| async_op_poll_span: tracing::Span, |
| } |
| |
| pub mod error { |
| //! Oneshot error types. |
| |
| use std::fmt; |
| |
| /// Error returned by the `Future` implementation for `Receiver`. |
| /// |
| /// This error is returned by the receiver when the sender is dropped without sending. |
| #[derive(Debug, Eq, PartialEq, Clone)] |
| pub struct RecvError(pub(super) ()); |
| |
| /// Error returned by the `try_recv` function on `Receiver`. |
| #[derive(Debug, Eq, PartialEq, Clone)] |
| pub enum TryRecvError { |
| /// The send half of the channel has not yet sent a value. |
| Empty, |
| |
| /// The send half of the channel was dropped without sending a value. |
| Closed, |
| } |
| |
| // ===== impl RecvError ===== |
| |
| impl fmt::Display for RecvError { |
| fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
| write!(fmt, "channel closed") |
| } |
| } |
| |
| impl std::error::Error for RecvError {} |
| |
| // ===== impl TryRecvError ===== |
| |
| impl fmt::Display for TryRecvError { |
| fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
| match self { |
| TryRecvError::Empty => write!(fmt, "channel empty"), |
| TryRecvError::Closed => write!(fmt, "channel closed"), |
| } |
| } |
| } |
| |
| impl std::error::Error for TryRecvError {} |
| } |
| |
| use self::error::*; |
| |
| struct Inner<T> { |
| /// Manages the state of the inner cell. |
| state: AtomicUsize, |
| |
| /// The value. This is set by `Sender` and read by `Receiver`. The state of |
| /// the cell is tracked by `state`. |
| value: UnsafeCell<Option<T>>, |
| |
| /// The task to notify when the receiver drops without consuming the value. |
| /// |
| /// ## Safety |
| /// |
| /// The `TX_TASK_SET` bit in the `state` field is set if this field is |
| /// initialized. If that bit is unset, this field may be uninitialized. |
| tx_task: Task, |
| |
| /// The task to notify when the value is sent. |
| /// |
| /// ## Safety |
| /// |
| /// The `RX_TASK_SET` bit in the `state` field is set if this field is |
| /// initialized. If that bit is unset, this field may be uninitialized. |
| rx_task: Task, |
| } |
| |
| struct Task(UnsafeCell<MaybeUninit<Waker>>); |
| |
| impl Task { |
| unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool { |
| self.with_task(|w| w.will_wake(cx.waker())) |
| } |
| |
| unsafe fn with_task<F, R>(&self, f: F) -> R |
| where |
| F: FnOnce(&Waker) -> R, |
| { |
| self.0.with(|ptr| { |
| let waker: *const Waker = (*ptr).as_ptr(); |
| f(&*waker) |
| }) |
| } |
| |
| unsafe fn drop_task(&self) { |
| self.0.with_mut(|ptr| { |
| let ptr: *mut Waker = (*ptr).as_mut_ptr(); |
| ptr.drop_in_place(); |
| }); |
| } |
| |
| unsafe fn set_task(&self, cx: &mut Context<'_>) { |
| self.0.with_mut(|ptr| { |
| let ptr: *mut Waker = (*ptr).as_mut_ptr(); |
| ptr.write(cx.waker().clone()); |
| }); |
| } |
| } |
| |
| #[derive(Clone, Copy)] |
| struct State(usize); |
| |
| /// Creates a new one-shot channel for sending single values across asynchronous |
| /// tasks. |
| /// |
| /// The function returns separate "send" and "receive" handles. The `Sender` |
| /// handle is used by the producer to send the value. The `Receiver` handle is |
| /// used by the consumer to receive the value. |
| /// |
| /// Each handle can be used on separate tasks. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use tokio::sync::oneshot; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let (tx, rx) = oneshot::channel(); |
| /// |
| /// tokio::spawn(async move { |
| /// if let Err(_) = tx.send(3) { |
| /// println!("the receiver dropped"); |
| /// } |
| /// }); |
| /// |
| /// match rx.await { |
| /// Ok(v) => println!("got = {:?}", v), |
| /// Err(_) => println!("the sender dropped"), |
| /// } |
| /// } |
| /// ``` |
| #[track_caller] |
| pub fn channel<T>() -> (Sender<T>, Receiver<T>) { |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| let resource_span = { |
| let location = std::panic::Location::caller(); |
| |
| let resource_span = tracing::trace_span!( |
| "runtime.resource", |
| concrete_type = "Sender|Receiver", |
| kind = "Sync", |
| loc.file = location.file(), |
| loc.line = location.line(), |
| loc.col = location.column(), |
| ); |
| |
| resource_span.in_scope(|| { |
| tracing::trace!( |
| target: "runtime::resource::state_update", |
| tx_dropped = false, |
| tx_dropped.op = "override", |
| ) |
| }); |
| |
| resource_span.in_scope(|| { |
| tracing::trace!( |
| target: "runtime::resource::state_update", |
| rx_dropped = false, |
| rx_dropped.op = "override", |
| ) |
| }); |
| |
| resource_span.in_scope(|| { |
| tracing::trace!( |
| target: "runtime::resource::state_update", |
| value_sent = false, |
| value_sent.op = "override", |
| ) |
| }); |
| |
| resource_span.in_scope(|| { |
| tracing::trace!( |
| target: "runtime::resource::state_update", |
| value_received = false, |
| value_received.op = "override", |
| ) |
| }); |
| |
| resource_span |
| }; |
| |
| let inner = Arc::new(Inner { |
| state: AtomicUsize::new(State::new().as_usize()), |
| value: UnsafeCell::new(None), |
| tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())), |
| rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())), |
| }); |
| |
| let tx = Sender { |
| inner: Some(inner.clone()), |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| resource_span: resource_span.clone(), |
| }; |
| |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| let async_op_span = resource_span |
| .in_scope(|| tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await")); |
| |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| let async_op_poll_span = |
| async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll")); |
| |
| let rx = Receiver { |
| inner: Some(inner), |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| resource_span, |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| async_op_span, |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| async_op_poll_span, |
| }; |
| |
| (tx, rx) |
| } |
| |
| impl<T> Sender<T> { |
| /// Attempts to send a value on this channel, returning it back if it could |
| /// not be sent. |
| /// |
| /// This method consumes `self` as only one value may ever be sent on a oneshot |
| /// channel. It is not marked async because sending a message to an oneshot |
| /// channel never requires any form of waiting. Because of this, the `send` |
| /// method can be used in both synchronous and asynchronous code without |
| /// problems. |
| /// |
| /// A successful send occurs when it is determined that the other end of the |
| /// channel has not hung up already. An unsuccessful send would be one where |
| /// the corresponding receiver has already been deallocated. Note that a |
| /// return value of `Err` means that the data will never be received, but |
| /// a return value of `Ok` does *not* mean that the data will be received. |
| /// It is possible for the corresponding receiver to hang up immediately |
| /// after this function returns `Ok`. |
| /// |
| /// # Examples |
| /// |
| /// Send a value to another task |
| /// |
| /// ``` |
| /// use tokio::sync::oneshot; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let (tx, rx) = oneshot::channel(); |
| /// |
| /// tokio::spawn(async move { |
| /// if let Err(_) = tx.send(3) { |
| /// println!("the receiver dropped"); |
| /// } |
| /// }); |
| /// |
| /// match rx.await { |
| /// Ok(v) => println!("got = {:?}", v), |
| /// Err(_) => println!("the sender dropped"), |
| /// } |
| /// } |
| /// ``` |
| pub fn send(mut self, t: T) -> Result<(), T> { |
| let inner = self.inner.take().unwrap(); |
| |
| inner.value.with_mut(|ptr| unsafe { |
| // SAFETY: The receiver will not access the `UnsafeCell` unless the |
| // channel has been marked as "complete" (the `VALUE_SENT` state bit |
| // is set). |
| // That bit is only set by the sender later on in this method, and |
| // calling this method consumes `self`. Therefore, if it was possible to |
| // call this method, we know that the `VALUE_SENT` bit is unset, and |
| // the receiver is not currently accessing the `UnsafeCell`. |
| *ptr = Some(t); |
| }); |
| |
| if !inner.complete() { |
| unsafe { |
| // SAFETY: The receiver will not access the `UnsafeCell` unless |
| // the channel has been marked as "complete". Calling |
| // `complete()` will return true if this bit is set, and false |
| // if it is not set. Thus, if `complete()` returned false, it is |
| // safe for us to access the value, because we know that the |
| // receiver will not. |
| return Err(inner.consume_value().unwrap()); |
| } |
| } |
| |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| self.resource_span.in_scope(|| { |
| tracing::trace!( |
| target: "runtime::resource::state_update", |
| value_sent = true, |
| value_sent.op = "override", |
| ) |
| }); |
| |
| Ok(()) |
| } |
| |
| /// Waits for the associated [`Receiver`] handle to close. |
| /// |
| /// A [`Receiver`] is closed by either calling [`close`] explicitly or the |
| /// [`Receiver`] value is dropped. |
| /// |
| /// This function is useful when paired with `select!` to abort a |
| /// computation when the receiver is no longer interested in the result. |
| /// |
| /// # Return |
| /// |
| /// Returns a `Future` which must be awaited on. |
| /// |
| /// [`Receiver`]: Receiver |
| /// [`close`]: Receiver::close |
| /// |
| /// # Examples |
| /// |
| /// Basic usage |
| /// |
| /// ``` |
| /// use tokio::sync::oneshot; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let (mut tx, rx) = oneshot::channel::<()>(); |
| /// |
| /// tokio::spawn(async move { |
| /// drop(rx); |
| /// }); |
| /// |
| /// tx.closed().await; |
| /// println!("the receiver dropped"); |
| /// } |
| /// ``` |
| /// |
| /// Paired with select |
| /// |
| /// ``` |
| /// use tokio::sync::oneshot; |
| /// use tokio::time::{self, Duration}; |
| /// |
| /// async fn compute() -> String { |
| /// // Complex computation returning a `String` |
| /// # "hello".to_string() |
| /// } |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let (mut tx, rx) = oneshot::channel(); |
| /// |
| /// tokio::spawn(async move { |
| /// tokio::select! { |
| /// _ = tx.closed() => { |
| /// // The receiver dropped, no need to do any further work |
| /// } |
| /// value = compute() => { |
| /// // The send can fail if the channel was closed at the exact same |
| /// // time as when compute() finished, so just ignore the failure. |
| /// let _ = tx.send(value); |
| /// } |
| /// } |
| /// }); |
| /// |
| /// // Wait for up to 10 seconds |
| /// let _ = time::timeout(Duration::from_secs(10), rx).await; |
| /// } |
| /// ``` |
| pub async fn closed(&mut self) { |
| use crate::future::poll_fn; |
| |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| let resource_span = self.resource_span.clone(); |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| let closed = trace::async_op( |
| || poll_fn(|cx| self.poll_closed(cx)), |
| resource_span, |
| "Sender::closed", |
| "poll_closed", |
| false, |
| ); |
| #[cfg(not(all(tokio_unstable, feature = "tracing")))] |
| let closed = poll_fn(|cx| self.poll_closed(cx)); |
| |
| closed.await |
| } |
| |
| /// Returns `true` if the associated [`Receiver`] handle has been dropped. |
| /// |
| /// A [`Receiver`] is closed by either calling [`close`] explicitly or the |
| /// [`Receiver`] value is dropped. |
| /// |
| /// If `true` is returned, a call to `send` will always result in an error. |
| /// |
| /// [`Receiver`]: Receiver |
| /// [`close`]: Receiver::close |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use tokio::sync::oneshot; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let (tx, rx) = oneshot::channel(); |
| /// |
| /// assert!(!tx.is_closed()); |
| /// |
| /// drop(rx); |
| /// |
| /// assert!(tx.is_closed()); |
| /// assert!(tx.send("never received").is_err()); |
| /// } |
| /// ``` |
| pub fn is_closed(&self) -> bool { |
| let inner = self.inner.as_ref().unwrap(); |
| |
| let state = State::load(&inner.state, Acquire); |
| state.is_closed() |
| } |
| |
| /// Checks whether the oneshot channel has been closed, and if not, schedules the |
| /// `Waker` in the provided `Context` to receive a notification when the channel is |
| /// closed. |
| /// |
| /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the |
| /// [`Receiver`] value is dropped. |
| /// |
| /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed |
| /// to the most recent call will be scheduled to receive a wakeup. |
| /// |
| /// [`Receiver`]: struct@crate::sync::oneshot::Receiver |
| /// [`close`]: fn@crate::sync::oneshot::Receiver::close |
| /// |
| /// # Return value |
| /// |
| /// This function returns: |
| /// |
| /// * `Poll::Pending` if the channel is still open. |
| /// * `Poll::Ready(())` if the channel is closed. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use tokio::sync::oneshot; |
| /// |
| /// use futures::future::poll_fn; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let (mut tx, mut rx) = oneshot::channel::<()>(); |
| /// |
| /// tokio::spawn(async move { |
| /// rx.close(); |
| /// }); |
| /// |
| /// poll_fn(|cx| tx.poll_closed(cx)).await; |
| /// |
| /// println!("the receiver dropped"); |
| /// } |
| /// ``` |
| pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
| // Keep track of task budget |
| let coop = ready!(crate::runtime::coop::poll_proceed(cx)); |
| |
| let inner = self.inner.as_ref().unwrap(); |
| |
| let mut state = State::load(&inner.state, Acquire); |
| |
| if state.is_closed() { |
| coop.made_progress(); |
| return Poll::Ready(()); |
| } |
| |
| if state.is_tx_task_set() { |
| let will_notify = unsafe { inner.tx_task.will_wake(cx) }; |
| |
| if !will_notify { |
| state = State::unset_tx_task(&inner.state); |
| |
| if state.is_closed() { |
| // Set the flag again so that the waker is released in drop |
| State::set_tx_task(&inner.state); |
| coop.made_progress(); |
| return Ready(()); |
| } else { |
| unsafe { inner.tx_task.drop_task() }; |
| } |
| } |
| } |
| |
| if !state.is_tx_task_set() { |
| // Attempt to set the task |
| unsafe { |
| inner.tx_task.set_task(cx); |
| } |
| |
| // Update the state |
| state = State::set_tx_task(&inner.state); |
| |
| if state.is_closed() { |
| coop.made_progress(); |
| return Ready(()); |
| } |
| } |
| |
| Pending |
| } |
| } |
| |
| impl<T> Drop for Sender<T> { |
| fn drop(&mut self) { |
| if let Some(inner) = self.inner.as_ref() { |
| inner.complete(); |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| self.resource_span.in_scope(|| { |
| tracing::trace!( |
| target: "runtime::resource::state_update", |
| tx_dropped = true, |
| tx_dropped.op = "override", |
| ) |
| }); |
| } |
| } |
| } |
| |
| impl<T> Receiver<T> { |
| /// Prevents the associated [`Sender`] handle from sending a value. |
| /// |
| /// Any `send` operation which happens after calling `close` is guaranteed |
| /// to fail. After calling `close`, [`try_recv`] should be called to |
| /// receive a value if one was sent **before** the call to `close` |
| /// completed. |
| /// |
| /// This function is useful to perform a graceful shutdown and ensure that a |
| /// value will not be sent into the channel and never received. |
| /// |
| /// `close` is no-op if a message is already received or the channel |
| /// is already closed. |
| /// |
| /// [`Sender`]: Sender |
| /// [`try_recv`]: Receiver::try_recv |
| /// |
| /// # Examples |
| /// |
| /// Prevent a value from being sent |
| /// |
| /// ``` |
| /// use tokio::sync::oneshot; |
| /// use tokio::sync::oneshot::error::TryRecvError; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let (tx, mut rx) = oneshot::channel(); |
| /// |
| /// assert!(!tx.is_closed()); |
| /// |
| /// rx.close(); |
| /// |
| /// assert!(tx.is_closed()); |
| /// assert!(tx.send("never received").is_err()); |
| /// |
| /// match rx.try_recv() { |
| /// Err(TryRecvError::Closed) => {} |
| /// _ => unreachable!(), |
| /// } |
| /// } |
| /// ``` |
| /// |
| /// Receive a value sent **before** calling `close` |
| /// |
| /// ``` |
| /// use tokio::sync::oneshot; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let (tx, mut rx) = oneshot::channel(); |
| /// |
| /// assert!(tx.send("will receive").is_ok()); |
| /// |
| /// rx.close(); |
| /// |
| /// let msg = rx.try_recv().unwrap(); |
| /// assert_eq!(msg, "will receive"); |
| /// } |
| /// ``` |
| pub fn close(&mut self) { |
| if let Some(inner) = self.inner.as_ref() { |
| inner.close(); |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| self.resource_span.in_scope(|| { |
| tracing::trace!( |
| target: "runtime::resource::state_update", |
| rx_dropped = true, |
| rx_dropped.op = "override", |
| ) |
| }); |
| } |
| } |
| |
| /// Attempts to receive a value. |
| /// |
| /// If a pending value exists in the channel, it is returned. If no value |
| /// has been sent, the current task **will not** be registered for |
| /// future notification. |
| /// |
| /// This function is useful to call from outside the context of an |
| /// asynchronous task. |
| /// |
| /// Note that unlike the `poll` method, the `try_recv` method cannot fail |
| /// spuriously. Any send or close event that happens before this call to |
| /// `try_recv` will be correctly returned to the caller. |
| /// |
| /// # Return |
| /// |
| /// - `Ok(T)` if a value is pending in the channel. |
| /// - `Err(TryRecvError::Empty)` if no value has been sent yet. |
| /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending |
| /// a value, or if the message has already been received. |
| /// |
| /// # Examples |
| /// |
| /// `try_recv` before a value is sent, then after. |
| /// |
| /// ``` |
| /// use tokio::sync::oneshot; |
| /// use tokio::sync::oneshot::error::TryRecvError; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let (tx, mut rx) = oneshot::channel(); |
| /// |
| /// match rx.try_recv() { |
| /// // The channel is currently empty |
| /// Err(TryRecvError::Empty) => {} |
| /// _ => unreachable!(), |
| /// } |
| /// |
| /// // Send a value |
| /// tx.send("hello").unwrap(); |
| /// |
| /// match rx.try_recv() { |
| /// Ok(value) => assert_eq!(value, "hello"), |
| /// _ => unreachable!(), |
| /// } |
| /// } |
| /// ``` |
| /// |
| /// `try_recv` when the sender dropped before sending a value |
| /// |
| /// ``` |
| /// use tokio::sync::oneshot; |
| /// use tokio::sync::oneshot::error::TryRecvError; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let (tx, mut rx) = oneshot::channel::<()>(); |
| /// |
| /// drop(tx); |
| /// |
| /// match rx.try_recv() { |
| /// // The channel will never receive a value. |
| /// Err(TryRecvError::Closed) => {} |
| /// _ => unreachable!(), |
| /// } |
| /// } |
| /// ``` |
| pub fn try_recv(&mut self) -> Result<T, TryRecvError> { |
| let result = if let Some(inner) = self.inner.as_ref() { |
| let state = State::load(&inner.state, Acquire); |
| |
| if state.is_complete() { |
| // SAFETY: If `state.is_complete()` returns true, then the |
| // `VALUE_SENT` bit has been set and the sender side of the |
| // channel will no longer attempt to access the inner |
| // `UnsafeCell`. Therefore, it is now safe for us to access the |
| // cell. |
| match unsafe { inner.consume_value() } { |
| Some(value) => { |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| self.resource_span.in_scope(|| { |
| tracing::trace!( |
| target: "runtime::resource::state_update", |
| value_received = true, |
| value_received.op = "override", |
| ) |
| }); |
| Ok(value) |
| } |
| None => Err(TryRecvError::Closed), |
| } |
| } else if state.is_closed() { |
| Err(TryRecvError::Closed) |
| } else { |
| // Not ready, this does not clear `inner` |
| return Err(TryRecvError::Empty); |
| } |
| } else { |
| Err(TryRecvError::Closed) |
| }; |
| |
| self.inner = None; |
| result |
| } |
| |
| /// Blocking receive to call outside of asynchronous contexts. |
| /// |
| /// # Panics |
| /// |
| /// This function panics if called within an asynchronous execution |
| /// context. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use tokio::sync::oneshot; |
| /// |
| /// #[tokio::main] |
| /// async fn main() { |
| /// let (tx, rx) = oneshot::channel::<u8>(); |
| /// |
| /// let sync_code = thread::spawn(move || { |
| /// assert_eq!(Ok(10), rx.blocking_recv()); |
| /// }); |
| /// |
| /// let _ = tx.send(10); |
| /// sync_code.join().unwrap(); |
| /// } |
| /// ``` |
| #[track_caller] |
| #[cfg(feature = "sync")] |
| #[cfg_attr(docsrs, doc(alias = "recv_blocking"))] |
| pub fn blocking_recv(self) -> Result<T, RecvError> { |
| crate::future::block_on(self) |
| } |
| } |
| |
| impl<T> Drop for Receiver<T> { |
| fn drop(&mut self) { |
| if let Some(inner) = self.inner.as_ref() { |
| inner.close(); |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| self.resource_span.in_scope(|| { |
| tracing::trace!( |
| target: "runtime::resource::state_update", |
| rx_dropped = true, |
| rx_dropped.op = "override", |
| ) |
| }); |
| } |
| } |
| } |
| |
| impl<T> Future for Receiver<T> { |
| type Output = Result<T, RecvError>; |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| // If `inner` is `None`, then `poll()` has already completed. |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| let _res_span = self.resource_span.clone().entered(); |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| let _ao_span = self.async_op_span.clone().entered(); |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| let _ao_poll_span = self.async_op_poll_span.clone().entered(); |
| |
| let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() { |
| #[cfg(all(tokio_unstable, feature = "tracing"))] |
| let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?; |
| |
| #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] |
| let res = ready!(inner.poll_recv(cx))?; |
| |
| res |
| } else { |
| panic!("called after complete"); |
| }; |
| |
| self.inner = None; |
| Ready(Ok(ret)) |
| } |
| } |
| |
| impl<T> Inner<T> { |
| fn complete(&self) -> bool { |
| let prev = State::set_complete(&self.state); |
| |
| if prev.is_closed() { |
| return false; |
| } |
| |
| if prev.is_rx_task_set() { |
| // TODO: Consume waker? |
| unsafe { |
| self.rx_task.with_task(Waker::wake_by_ref); |
| } |
| } |
| |
| true |
| } |
| |
| fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> { |
| // Keep track of task budget |
| let coop = ready!(crate::runtime::coop::poll_proceed(cx)); |
| |
| // Load the state |
| let mut state = State::load(&self.state, Acquire); |
| |
| if state.is_complete() { |
| coop.made_progress(); |
| match unsafe { self.consume_value() } { |
| Some(value) => Ready(Ok(value)), |
| None => Ready(Err(RecvError(()))), |
| } |
| } else if state.is_closed() { |
| coop.made_progress(); |
| Ready(Err(RecvError(()))) |
| } else { |
| if state.is_rx_task_set() { |
| let will_notify = unsafe { self.rx_task.will_wake(cx) }; |
| |
| // Check if the task is still the same |
| if !will_notify { |
| // Unset the task |
| state = State::unset_rx_task(&self.state); |
| if state.is_complete() { |
| // Set the flag again so that the waker is released in drop |
| State::set_rx_task(&self.state); |
| |
| coop.made_progress(); |
| // SAFETY: If `state.is_complete()` returns true, then the |
| // `VALUE_SENT` bit has been set and the sender side of the |
| // channel will no longer attempt to access the inner |
| // `UnsafeCell`. Therefore, it is now safe for us to access the |
| // cell. |
| return match unsafe { self.consume_value() } { |
| Some(value) => Ready(Ok(value)), |
| None => Ready(Err(RecvError(()))), |
| }; |
| } else { |
| unsafe { self.rx_task.drop_task() }; |
| } |
| } |
| } |
| |
| if !state.is_rx_task_set() { |
| // Attempt to set the task |
| unsafe { |
| self.rx_task.set_task(cx); |
| } |
| |
| // Update the state |
| state = State::set_rx_task(&self.state); |
| |
| if state.is_complete() { |
| coop.made_progress(); |
| match unsafe { self.consume_value() } { |
| Some(value) => Ready(Ok(value)), |
| None => Ready(Err(RecvError(()))), |
| } |
| } else { |
| Pending |
| } |
| } else { |
| Pending |
| } |
| } |
| } |
| |
| /// Called by `Receiver` to indicate that the value will never be received. |
| fn close(&self) { |
| let prev = State::set_closed(&self.state); |
| |
| if prev.is_tx_task_set() && !prev.is_complete() { |
| unsafe { |
| self.tx_task.with_task(Waker::wake_by_ref); |
| } |
| } |
| } |
| |
| /// Consumes the value. This function does not check `state`. |
| /// |
| /// # Safety |
| /// |
| /// Calling this method concurrently on multiple threads will result in a |
| /// data race. The `VALUE_SENT` state bit is used to ensure that only the |
| /// sender *or* the receiver will call this method at a given point in time. |
| /// If `VALUE_SENT` is not set, then only the sender may call this method; |
| /// if it is set, then only the receiver may call this method. |
| unsafe fn consume_value(&self) -> Option<T> { |
| self.value.with_mut(|ptr| (*ptr).take()) |
| } |
| } |
| |
| unsafe impl<T: Send> Send for Inner<T> {} |
| unsafe impl<T: Send> Sync for Inner<T> {} |
| |
| fn mut_load(this: &mut AtomicUsize) -> usize { |
| this.with_mut(|v| *v) |
| } |
| |
| impl<T> Drop for Inner<T> { |
| fn drop(&mut self) { |
| let state = State(mut_load(&mut self.state)); |
| |
| if state.is_rx_task_set() { |
| unsafe { |
| self.rx_task.drop_task(); |
| } |
| } |
| |
| if state.is_tx_task_set() { |
| unsafe { |
| self.tx_task.drop_task(); |
| } |
| } |
| } |
| } |
| |
| impl<T: fmt::Debug> fmt::Debug for Inner<T> { |
| fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
| use std::sync::atomic::Ordering::Relaxed; |
| |
| fmt.debug_struct("Inner") |
| .field("state", &State::load(&self.state, Relaxed)) |
| .finish() |
| } |
| } |
| |
| /// Indicates that a waker for the receiving task has been set. |
| /// |
| /// # Safety |
| /// |
| /// If this bit is not set, the `rx_task` field may be uninitialized. |
| const RX_TASK_SET: usize = 0b00001; |
| /// Indicates that a value has been stored in the channel's inner `UnsafeCell`. |
| /// |
| /// # Safety |
| /// |
| /// This bit controls which side of the channel is permitted to access the |
| /// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the |
| /// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by |
| /// the sender. |
| const VALUE_SENT: usize = 0b00010; |
| const CLOSED: usize = 0b00100; |
| |
| /// Indicates that a waker for the sending task has been set. |
| /// |
| /// # Safety |
| /// |
| /// If this bit is not set, the `tx_task` field may be uninitialized. |
| const TX_TASK_SET: usize = 0b01000; |
| |
| impl State { |
| fn new() -> State { |
| State(0) |
| } |
| |
| fn is_complete(self) -> bool { |
| self.0 & VALUE_SENT == VALUE_SENT |
| } |
| |
| fn set_complete(cell: &AtomicUsize) -> State { |
| // This method is a compare-and-swap loop rather than a fetch-or like |
| // other `set_$WHATEVER` methods on `State`. This is because we must |
| // check if the state has been closed before setting the `VALUE_SENT` |
| // bit. |
| // |
| // We don't want to set both the `VALUE_SENT` bit if the `CLOSED` |
| // bit is already set, because `VALUE_SENT` will tell the receiver that |
| // it's okay to access the inner `UnsafeCell`. Immediately after calling |
| // `set_complete`, if the channel was closed, the sender will _also_ |
| // access the `UnsafeCell` to take the value back out, so if a |
| // `poll_recv` or `try_recv` call is occurring concurrently, both |
| // threads may try to access the `UnsafeCell` if we were to set the |
| // `VALUE_SENT` bit on a closed channel. |
| let mut state = cell.load(Ordering::Relaxed); |
| loop { |
| if State(state).is_closed() { |
| break; |
| } |
| // TODO: This could be `Release`, followed by an `Acquire` fence *if* |
| // the `RX_TASK_SET` flag is set. However, `loom` does not support |
| // fences yet. |
| match cell.compare_exchange_weak( |
| state, |
| state | VALUE_SENT, |
| Ordering::AcqRel, |
| Ordering::Acquire, |
| ) { |
| Ok(_) => break, |
| Err(actual) => state = actual, |
| } |
| } |
| State(state) |
| } |
| |
| fn is_rx_task_set(self) -> bool { |
| self.0 & RX_TASK_SET == RX_TASK_SET |
| } |
| |
| fn set_rx_task(cell: &AtomicUsize) -> State { |
| let val = cell.fetch_or(RX_TASK_SET, AcqRel); |
| State(val | RX_TASK_SET) |
| } |
| |
| fn unset_rx_task(cell: &AtomicUsize) -> State { |
| let val = cell.fetch_and(!RX_TASK_SET, AcqRel); |
| State(val & !RX_TASK_SET) |
| } |
| |
| fn is_closed(self) -> bool { |
| self.0 & CLOSED == CLOSED |
| } |
| |
| fn set_closed(cell: &AtomicUsize) -> State { |
| // Acquire because we want all later writes (attempting to poll) to be |
| // ordered after this. |
| let val = cell.fetch_or(CLOSED, Acquire); |
| State(val) |
| } |
| |
| fn set_tx_task(cell: &AtomicUsize) -> State { |
| let val = cell.fetch_or(TX_TASK_SET, AcqRel); |
| State(val | TX_TASK_SET) |
| } |
| |
| fn unset_tx_task(cell: &AtomicUsize) -> State { |
| let val = cell.fetch_and(!TX_TASK_SET, AcqRel); |
| State(val & !TX_TASK_SET) |
| } |
| |
| fn is_tx_task_set(self) -> bool { |
| self.0 & TX_TASK_SET == TX_TASK_SET |
| } |
| |
| fn as_usize(self) -> usize { |
| self.0 |
| } |
| |
| fn load(cell: &AtomicUsize, order: Ordering) -> State { |
| let val = cell.load(order); |
| State(val) |
| } |
| } |
| |
| impl fmt::Debug for State { |
| fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
| fmt.debug_struct("State") |
| .field("is_complete", &self.is_complete()) |
| .field("is_closed", &self.is_closed()) |
| .field("is_rx_task_set", &self.is_rx_task_set()) |
| .field("is_tx_task_set", &self.is_tx_task_set()) |
| .finish() |
| } |
| } |