blob: 7852b0cb1bf2505a6c2a7b08639f9b05d6a4a799 [file] [log] [blame]
#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
//! A single-producer, multi-consumer channel that only retains the *last* sent
//! value.
//! This channel is useful for watching for changes to a value from multiple
//! points in the code base, for example, changes to configuration values.
//! # Usage
//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
//! and sender halves of the channel. The channel is created with an initial
//! value. The **latest** value stored in the channel is accessed with
//! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new
//! value to sent by the [`Sender`] half.
//! # Examples
//! ```
//! use tokio::sync::watch;
//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
//! let (tx, mut rx) = watch::channel("hello");
//! tokio::spawn(async move {
//! while rx.changed().await.is_ok() {
//! println!("received = {:?}", *rx.borrow());
//! }
//! });
//! tx.send("world")?;
//! # Ok(())
//! # }
//! ```
//! # Closing
//! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
//! when all [`Receiver`] handles have been dropped. This indicates that there
//! is no further interest in the values being produced and work can be stopped.
//! # Thread safety
//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
//! handles may be moved to separate threads and also used concurrently.
//! [`Sender`]: crate::sync::watch::Sender
//! [`Receiver`]: crate::sync::watch::Receiver
//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
//! [`channel`]: crate::sync::watch::channel
//! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
//! [`Sender::closed`]: crate::sync::watch::Sender::closed
use crate::sync::notify::Notify;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::atomic::Ordering::{Relaxed, SeqCst};
use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
use std::ops;
/// Receives values from the associated [`Sender`](struct@Sender).
/// Instances are created by the [`channel`](fn@channel) function.
/// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
/// wrapper.
/// [`WatchStream`]:
pub struct Receiver<T> {
/// Pointer to the shared state
shared: Arc<Shared<T>>,
/// Last observed version
version: usize,
/// Sends values to the associated [`Receiver`](struct@Receiver).
/// Instances are created by the [`channel`](fn@channel) function.
pub struct Sender<T> {
shared: Arc<Shared<T>>,
/// Returns a reference to the inner value
/// Outstanding borrows hold a read lock on the inner value. This means that
/// long lived borrows could cause the produce half to block. It is recommended
/// to keep the borrow as short lived as possible.
pub struct Ref<'a, T> {
inner: RwLockReadGuard<'a, T>,
struct Shared<T> {
/// The most recent value
value: RwLock<T>,
/// The current version
/// The lowest bit represents a "closed" state. The rest of the bits
/// represent the current version.
version: AtomicUsize,
/// Tracks the number of `Receiver` instances
ref_count_rx: AtomicUsize,
/// Notifies waiting receivers that the value changed.
notify_rx: Notify,
/// Notifies any task listening for `Receiver` dropped events
notify_tx: Notify,
pub mod error {
//! Watch error types
use std::fmt;
/// Error produced when sending a value fails.
pub struct SendError<T> {
pub(crate) inner: T,
// ===== impl SendError =====
impl<T: fmt::Debug> fmt::Display for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
impl<T: fmt::Debug> std::error::Error for SendError<T> {}
/// Error produced when receiving a change notification.
pub struct RecvError(pub(super) ());
// ===== impl RecvError =====
impl fmt::Display for RecvError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
impl std::error::Error for RecvError {}
const CLOSED: usize = 1;
/// Creates a new watch channel, returning the "send" and "receive" handles.
/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
/// Only the last value sent is made available to the [`Receiver`] half. All
/// intermediate values are dropped.
/// # Examples
/// ```
/// use tokio::sync::watch;
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let (tx, mut rx) = watch::channel("hello");
/// tokio::spawn(async move {
/// while rx.changed().await.is_ok() {
/// println!("received = {:?}", *rx.borrow());
/// }
/// });
/// tx.send("world")?;
/// # Ok(())
/// # }
/// ```
/// [`Sender`]: struct@Sender
/// [`Receiver`]: struct@Receiver
pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
let shared = Arc::new(Shared {
value: RwLock::new(init),
version: AtomicUsize::new(0),
ref_count_rx: AtomicUsize::new(1),
notify_rx: Notify::new(),
notify_tx: Notify::new(),
let tx = Sender {
shared: shared.clone(),
let rx = Receiver { shared, version: 0 };
(tx, rx)
impl<T> Receiver<T> {
fn from_shared(version: usize, shared: Arc<Shared<T>>) -> Self {
// No synchronization necessary as this is only used as a counter and
// not memory access.
shared.ref_count_rx.fetch_add(1, Relaxed);
Self { shared, version }
/// Returns a reference to the most recently sent value.
/// This method does not mark the returned value as seen, so future calls to
/// [`changed`] may return immediately even if you have already seen the
/// value with a call to `borrow`.
/// Outstanding borrows hold a read lock. This means that long lived borrows
/// could cause the send half to block. It is recommended to keep the borrow
/// as short lived as possible.
/// [`changed`]: Receiver::changed
/// # Examples
/// ```
/// use tokio::sync::watch;
/// let (_, rx) = watch::channel("hello");
/// assert_eq!(*rx.borrow(), "hello");
/// ```
pub fn borrow(&self) -> Ref<'_, T> {
let inner =;
Ref { inner }
/// Returns a reference to the most recently sent value and mark that value
/// as seen.
/// This method marks the value as seen, so [`changed`] will not return
/// immediately if the newest value is one previously returned by
/// `borrow_and_update`.
/// Outstanding borrows hold a read lock. This means that long lived borrows
/// could cause the send half to block. It is recommended to keep the borrow
/// as short lived as possible.
/// [`changed`]: Receiver::changed
pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
let inner =;
self.version = self.shared.version.load(SeqCst) & !CLOSED;
Ref { inner }
/// Wait for a change notification, then mark the newest value as seen.
/// If the newest value in the channel has not yet been marked seen when
/// this method is called, the method marks that value seen and returns
/// immediately. If the newest value has already been marked seen, then the
/// method sleeps until a new message is sent by the [`Sender`] connected to
/// this `Receiver`, or until the [`Sender`] is dropped.
/// This method returns an error if and only if the [`Sender`] is dropped.
/// # Cancel safety
/// This method is cancel safe. If you use it as the event in a
/// [`tokio::select!`](crate::select) statement and some other branch
/// completes first, then it is guaranteed that no values have been marked
/// seen by this call to `changed`.
/// [`Sender`]: struct@Sender
/// # Examples
/// ```
/// use tokio::sync::watch;
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = watch::channel("hello");
/// tokio::spawn(async move {
/// tx.send("goodbye").unwrap();
/// });
/// assert!(rx.changed().await.is_ok());
/// assert_eq!(*rx.borrow(), "goodbye");
/// // The `tx` handle has been dropped
/// assert!(rx.changed().await.is_err());
/// }
/// ```
pub async fn changed(&mut self) -> Result<(), error::RecvError> {
loop {
// In order to avoid a race condition, we first request a notification,
// **then** check the current value's version. If a new version exists,
// the notification request is dropped.
let notified = self.shared.notify_rx.notified();
if let Some(ret) = maybe_changed(&self.shared, &mut self.version) {
return ret;
// loop around again in case the wake-up was spurious
cfg_process_driver! {
pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
maybe_changed(&self.shared, &mut self.version)
fn maybe_changed<T>(
shared: &Shared<T>,
version: &mut usize,
) -> Option<Result<(), error::RecvError>> {
// Load the version from the state
let state = shared.version.load(SeqCst);
let new_version = state & !CLOSED;
if *version != new_version {
// Observe the new version and return
*version = new_version;
return Some(Ok(()));
if CLOSED == state & CLOSED {
// All receivers have dropped.
return Some(Err(error::RecvError(())));
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
let version = self.version;
let shared = self.shared.clone();
Self::from_shared(version, shared)
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
// No synchronization necessary as this is only used as a counter and
// not memory access.
if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
// This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
impl<T> Sender<T> {
/// Sends a new value via the channel, notifying all receivers.
pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
// This is pretty much only useful as a hint anyway, so synchronization isn't critical.
if 0 == self.shared.ref_count_rx.load(Relaxed) {
return Err(error::SendError { inner: value });
// Acquire the write lock and update the value.
let mut lock = self.shared.value.write().unwrap();
*lock = value;
// Update the version. 2 is used so that the CLOSED bit is not set.
self.shared.version.fetch_add(2, SeqCst);
// Release the write lock.
// Incrementing the version counter while holding the lock ensures
// that receivers are able to figure out the version number of the
// value they are currently looking at.
// Notify all watchers
/// Returns a reference to the most recently sent value
/// Outstanding borrows hold a read lock. This means that long lived borrows
/// could cause the send half to block. It is recommended to keep the borrow
/// as short lived as possible.
/// # Examples
/// ```
/// use tokio::sync::watch;
/// let (tx, _) = watch::channel("hello");
/// assert_eq!(*tx.borrow(), "hello");
/// ```
pub fn borrow(&self) -> Ref<'_, T> {
let inner =;
Ref { inner }
/// Checks if the channel has been closed. This happens when all receivers
/// have dropped.
/// # Examples
/// ```
/// let (tx, rx) = tokio::sync::watch::channel(());
/// assert!(!tx.is_closed());
/// drop(rx);
/// assert!(tx.is_closed());
/// ```
pub fn is_closed(&self) -> bool {
self.shared.ref_count_rx.load(Relaxed) == 0
/// Completes when all receivers have dropped.
/// This allows the producer to get notified when interest in the produced
/// values is canceled and immediately stop doing work.
/// # Cancel safety
/// This method is cancel safe. Once the channel is closed, it stays closed
/// forever and all future calls to `closed` will return immediately.
/// # Examples
/// ```
/// use tokio::sync::watch;
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = watch::channel("hello");
/// tokio::spawn(async move {
/// // use `rx`
/// drop(rx);
/// });
/// // Waits for `rx` to drop
/// tx.closed().await;
/// println!("the `rx` handles dropped")
/// }
/// ```
pub async fn closed(&self) {
let notified = self.shared.notify_tx.notified();
if self.shared.ref_count_rx.load(Relaxed) == 0 {
debug_assert_eq!(0, self.shared.ref_count_rx.load(Relaxed));
cfg_signal_internal! {
pub(crate) fn subscribe(&self) -> Receiver<T> {
let shared = self.shared.clone();
let version = shared.version.load(SeqCst);
Receiver::from_shared(version, shared)
/// Returns the number of receivers that currently exist
/// # Examples
/// ```
/// use tokio::sync::watch;
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx1) = watch::channel("hello");
/// assert_eq!(1, tx.receiver_count());
/// let mut _rx2 = rx1.clone();
/// assert_eq!(2, tx.receiver_count());
/// }
/// ```
pub fn receiver_count(&self) -> usize {
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.shared.version.fetch_or(CLOSED, SeqCst);
// ===== impl Ref =====
impl<T> ops::Deref for Ref<'_, T> {
type Target = T;
fn deref(&self) -> &T {
#[cfg(all(test, loom))]
mod tests {
use futures::future::FutureExt;
use loom::thread;
// test for
fn watch_spurious_wakeup() {
loom::model(|| {
let (send, mut recv) = crate::sync::watch::channel(0i32);
let send_thread = thread::spawn(move || {
let send = send_thread.join().unwrap();
let recv_thread = thread::spawn(move || {
let mut recv = recv_thread.join().unwrap();
let send_thread = thread::spawn(move || {
fn watch_borrow() {
loom::model(|| {
let (send, mut recv) = crate::sync::watch::channel(0i32);
let send_thread = thread::spawn(move || {
let send = send_thread.join().unwrap();
let recv_thread = thread::spawn(move || {
let recv = recv_thread.join().unwrap();
thread::spawn(move || {