| //! The channel interface. |
| |
| use std::fmt; |
| use std::iter::FusedIterator; |
| use std::mem; |
| use std::panic::{RefUnwindSafe, UnwindSafe}; |
| use std::sync::Arc; |
| use std::time::{Duration, Instant}; |
| |
| use crate::context::Context; |
| use crate::counter; |
| use crate::err::{ |
| RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError, |
| }; |
| use crate::flavors; |
| use crate::select::{Operation, SelectHandle, Token}; |
| |
| /// Creates a channel of unbounded capacity. |
| /// |
| /// This channel has a growable buffer that can hold any number of messages at a time. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use crossbeam_channel::unbounded; |
| /// |
| /// let (s, r) = unbounded(); |
| /// |
| /// // Computes the n-th Fibonacci number. |
| /// fn fib(n: i32) -> i32 { |
| /// if n <= 1 { |
| /// n |
| /// } else { |
| /// fib(n - 1) + fib(n - 2) |
| /// } |
| /// } |
| /// |
| /// // Spawn an asynchronous computation. |
| /// thread::spawn(move || s.send(fib(20)).unwrap()); |
| /// |
| /// // Print the result of the computation. |
| /// println!("{}", r.recv().unwrap()); |
| /// ``` |
| pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) { |
| let (s, r) = counter::new(flavors::list::Channel::new()); |
| let s = Sender { |
| flavor: SenderFlavor::List(s), |
| }; |
| let r = Receiver { |
| flavor: ReceiverFlavor::List(r), |
| }; |
| (s, r) |
| } |
| |
| /// Creates a channel of bounded capacity. |
| /// |
| /// This channel has a buffer that can hold at most `cap` messages at a time. |
| /// |
| /// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and |
| /// receive operations must appear at the same time in order to pair up and pass the message over. |
| /// |
| /// # Examples |
| /// |
| /// A channel of capacity 1: |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use std::time::Duration; |
| /// use crossbeam_channel::bounded; |
| /// |
| /// let (s, r) = bounded(1); |
| /// |
| /// // This call returns immediately because there is enough space in the channel. |
| /// s.send(1).unwrap(); |
| /// |
| /// thread::spawn(move || { |
| /// // This call blocks the current thread because the channel is full. |
| /// // It will be able to complete only after the first message is received. |
| /// s.send(2).unwrap(); |
| /// }); |
| /// |
| /// thread::sleep(Duration::from_secs(1)); |
| /// assert_eq!(r.recv(), Ok(1)); |
| /// assert_eq!(r.recv(), Ok(2)); |
| /// ``` |
| /// |
| /// A zero-capacity channel: |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use std::time::Duration; |
| /// use crossbeam_channel::bounded; |
| /// |
| /// let (s, r) = bounded(0); |
| /// |
| /// thread::spawn(move || { |
| /// // This call blocks the current thread until a receive operation appears |
| /// // on the other side of the channel. |
| /// s.send(1).unwrap(); |
| /// }); |
| /// |
| /// thread::sleep(Duration::from_secs(1)); |
| /// assert_eq!(r.recv(), Ok(1)); |
| /// ``` |
| pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) { |
| if cap == 0 { |
| let (s, r) = counter::new(flavors::zero::Channel::new()); |
| let s = Sender { |
| flavor: SenderFlavor::Zero(s), |
| }; |
| let r = Receiver { |
| flavor: ReceiverFlavor::Zero(r), |
| }; |
| (s, r) |
| } else { |
| let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap)); |
| let s = Sender { |
| flavor: SenderFlavor::Array(s), |
| }; |
| let r = Receiver { |
| flavor: ReceiverFlavor::Array(r), |
| }; |
| (s, r) |
| } |
| } |
| |
| /// Creates a receiver that delivers a message after a certain duration of time. |
| /// |
| /// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will |
| /// be sent into the channel after `duration` elapses. The message is the instant at which it is |
| /// sent. |
| /// |
| /// # Examples |
| /// |
| /// Using an `after` channel for timeouts: |
| /// |
| /// ``` |
| /// use std::time::Duration; |
| /// use crossbeam_channel::{after, select, unbounded}; |
| /// |
| /// let (s, r) = unbounded::<i32>(); |
| /// let timeout = Duration::from_millis(100); |
| /// |
| /// select! { |
| /// recv(r) -> msg => println!("received {:?}", msg), |
| /// recv(after(timeout)) -> _ => println!("timed out"), |
| /// } |
| /// ``` |
| /// |
| /// When the message gets sent: |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use std::time::{Duration, Instant}; |
| /// use crossbeam_channel::after; |
| /// |
| /// // Converts a number of milliseconds into a `Duration`. |
| /// let ms = |ms| Duration::from_millis(ms); |
| /// |
| /// // Returns `true` if `a` and `b` are very close `Instant`s. |
| /// let eq = |a, b| a + ms(50) > b && b + ms(50) > a; |
| /// |
| /// let start = Instant::now(); |
| /// let r = after(ms(100)); |
| /// |
| /// thread::sleep(ms(500)); |
| /// |
| /// // This message was sent 100 ms from the start and received 500 ms from the start. |
| /// assert!(eq(r.recv().unwrap(), start + ms(100))); |
| /// assert!(eq(Instant::now(), start + ms(500))); |
| /// ``` |
| pub fn after(duration: Duration) -> Receiver<Instant> { |
| Receiver { |
| flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_timeout(duration))), |
| } |
| } |
| |
| /// Creates a receiver that delivers a message at a certain instant in time. |
| /// |
| /// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will |
| /// be sent into the channel at the moment in time `when`. The message is the instant at which it |
| /// is sent, which is the same as `when`. If `when` is in the past, the message will be delivered |
| /// instantly to the receiver. |
| /// |
| /// # Examples |
| /// |
| /// Using an `at` channel for timeouts: |
| /// |
| /// ``` |
| /// use std::time::{Instant, Duration}; |
| /// use crossbeam_channel::{at, select, unbounded}; |
| /// |
| /// let (s, r) = unbounded::<i32>(); |
| /// let deadline = Instant::now() + Duration::from_millis(500); |
| /// |
| /// select! { |
| /// recv(r) -> msg => println!("received {:?}", msg), |
| /// recv(at(deadline)) -> _ => println!("timed out"), |
| /// } |
| /// ``` |
| /// |
| /// When the message gets sent: |
| /// |
| /// ``` |
| /// use std::time::{Duration, Instant}; |
| /// use crossbeam_channel::at; |
| /// |
| /// // Converts a number of milliseconds into a `Duration`. |
| /// let ms = |ms| Duration::from_millis(ms); |
| /// |
| /// let start = Instant::now(); |
| /// let end = start + ms(100); |
| /// |
| /// let r = at(end); |
| /// |
| /// // This message was sent 100 ms from the start |
| /// assert_eq!(r.recv().unwrap(), end); |
| /// assert!(Instant::now() > start + ms(100)); |
| /// ``` |
| pub fn at(when: Instant) -> Receiver<Instant> { |
| Receiver { |
| flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(when))), |
| } |
| } |
| |
| /// Creates a receiver that never delivers messages. |
| /// |
| /// The channel is bounded with capacity of 0 and never gets disconnected. |
| /// |
| /// # Examples |
| /// |
| /// Using a `never` channel to optionally add a timeout to [`select!`]: |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use std::time::Duration; |
| /// use crossbeam_channel::{after, select, never, unbounded}; |
| /// |
| /// let (s, r) = unbounded(); |
| /// |
| /// thread::spawn(move || { |
| /// thread::sleep(Duration::from_secs(1)); |
| /// s.send(1).unwrap(); |
| /// }); |
| /// |
| /// // Suppose this duration can be a `Some` or a `None`. |
| /// let duration = Some(Duration::from_millis(100)); |
| /// |
| /// // Create a channel that times out after the specified duration. |
| /// let timeout = duration |
| /// .map(|d| after(d)) |
| /// .unwrap_or(never()); |
| /// |
| /// select! { |
| /// recv(r) -> msg => assert_eq!(msg, Ok(1)), |
| /// recv(timeout) -> _ => println!("timed out"), |
| /// } |
| /// ``` |
| /// |
| /// [`select!`]: macro.select.html |
| pub fn never<T>() -> Receiver<T> { |
| Receiver { |
| flavor: ReceiverFlavor::Never(flavors::never::Channel::new()), |
| } |
| } |
| |
| /// Creates a receiver that delivers messages periodically. |
| /// |
| /// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be |
| /// sent into the channel in intervals of `duration`. Each message is the instant at which it is |
| /// sent. |
| /// |
| /// # Examples |
| /// |
| /// Using a `tick` channel to periodically print elapsed time: |
| /// |
| /// ``` |
| /// use std::time::{Duration, Instant}; |
| /// use crossbeam_channel::tick; |
| /// |
| /// let start = Instant::now(); |
| /// let ticker = tick(Duration::from_millis(100)); |
| /// |
| /// for _ in 0..5 { |
| /// ticker.recv().unwrap(); |
| /// println!("elapsed: {:?}", start.elapsed()); |
| /// } |
| /// ``` |
| /// |
| /// When messages get sent: |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use std::time::{Duration, Instant}; |
| /// use crossbeam_channel::tick; |
| /// |
| /// // Converts a number of milliseconds into a `Duration`. |
| /// let ms = |ms| Duration::from_millis(ms); |
| /// |
| /// // Returns `true` if `a` and `b` are very close `Instant`s. |
| /// let eq = |a, b| a + ms(50) > b && b + ms(50) > a; |
| /// |
| /// let start = Instant::now(); |
| /// let r = tick(ms(100)); |
| /// |
| /// // This message was sent 100 ms from the start and received 100 ms from the start. |
| /// assert!(eq(r.recv().unwrap(), start + ms(100))); |
| /// assert!(eq(Instant::now(), start + ms(100))); |
| /// |
| /// thread::sleep(ms(500)); |
| /// |
| /// // This message was sent 200 ms from the start and received 600 ms from the start. |
| /// assert!(eq(r.recv().unwrap(), start + ms(200))); |
| /// assert!(eq(Instant::now(), start + ms(600))); |
| /// |
| /// // This message was sent 700 ms from the start and received 700 ms from the start. |
| /// assert!(eq(r.recv().unwrap(), start + ms(700))); |
| /// assert!(eq(Instant::now(), start + ms(700))); |
| /// ``` |
| pub fn tick(duration: Duration) -> Receiver<Instant> { |
| Receiver { |
| flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(duration))), |
| } |
| } |
| |
| /// The sending side of a channel. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use crossbeam_channel::unbounded; |
| /// |
| /// let (s1, r) = unbounded(); |
| /// let s2 = s1.clone(); |
| /// |
| /// thread::spawn(move || s1.send(1).unwrap()); |
| /// thread::spawn(move || s2.send(2).unwrap()); |
| /// |
| /// let msg1 = r.recv().unwrap(); |
| /// let msg2 = r.recv().unwrap(); |
| /// |
| /// assert_eq!(msg1 + msg2, 3); |
| /// ``` |
| pub struct Sender<T> { |
| flavor: SenderFlavor<T>, |
| } |
| |
| /// Sender flavors. |
| enum SenderFlavor<T> { |
| /// Bounded channel based on a preallocated array. |
| Array(counter::Sender<flavors::array::Channel<T>>), |
| |
| /// Unbounded channel implemented as a linked list. |
| List(counter::Sender<flavors::list::Channel<T>>), |
| |
| /// Zero-capacity channel. |
| Zero(counter::Sender<flavors::zero::Channel<T>>), |
| } |
| |
| unsafe impl<T: Send> Send for Sender<T> {} |
| unsafe impl<T: Send> Sync for Sender<T> {} |
| |
| impl<T> UnwindSafe for Sender<T> {} |
| impl<T> RefUnwindSafe for Sender<T> {} |
| |
| impl<T> Sender<T> { |
| /// Attempts to send a message into the channel without blocking. |
| /// |
| /// This method will either send a message into the channel immediately or return an error if |
| /// the channel is full or disconnected. The returned error contains the original message. |
| /// |
| /// If called on a zero-capacity channel, this method will send the message only if there |
| /// happens to be a receive operation on the other side of the channel at the same time. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use crossbeam_channel::{bounded, TrySendError}; |
| /// |
| /// let (s, r) = bounded(1); |
| /// |
| /// assert_eq!(s.try_send(1), Ok(())); |
| /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2))); |
| /// |
| /// drop(r); |
| /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3))); |
| /// ``` |
| pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { |
| match &self.flavor { |
| SenderFlavor::Array(chan) => chan.try_send(msg), |
| SenderFlavor::List(chan) => chan.try_send(msg), |
| SenderFlavor::Zero(chan) => chan.try_send(msg), |
| } |
| } |
| |
| /// Blocks the current thread until a message is sent or the channel is disconnected. |
| /// |
| /// If the channel is full and not disconnected, this call will block until the send operation |
| /// can proceed. If the channel becomes disconnected, this call will wake up and return an |
| /// error. The returned error contains the original message. |
| /// |
| /// If called on a zero-capacity channel, this method will wait for a receive operation to |
| /// appear on the other side of the channel. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use std::time::Duration; |
| /// use crossbeam_channel::{bounded, SendError}; |
| /// |
| /// let (s, r) = bounded(1); |
| /// assert_eq!(s.send(1), Ok(())); |
| /// |
| /// thread::spawn(move || { |
| /// assert_eq!(r.recv(), Ok(1)); |
| /// thread::sleep(Duration::from_secs(1)); |
| /// drop(r); |
| /// }); |
| /// |
| /// assert_eq!(s.send(2), Ok(())); |
| /// assert_eq!(s.send(3), Err(SendError(3))); |
| /// ``` |
| pub fn send(&self, msg: T) -> Result<(), SendError<T>> { |
| match &self.flavor { |
| SenderFlavor::Array(chan) => chan.send(msg, None), |
| SenderFlavor::List(chan) => chan.send(msg, None), |
| SenderFlavor::Zero(chan) => chan.send(msg, None), |
| } |
| .map_err(|err| match err { |
| SendTimeoutError::Disconnected(msg) => SendError(msg), |
| SendTimeoutError::Timeout(_) => unreachable!(), |
| }) |
| } |
| |
| /// Waits for a message to be sent into the channel, but only for a limited time. |
| /// |
| /// If the channel is full and not disconnected, this call will block until the send operation |
| /// can proceed or the operation times out. If the channel becomes disconnected, this call will |
| /// wake up and return an error. The returned error contains the original message. |
| /// |
| /// If called on a zero-capacity channel, this method will wait for a receive operation to |
| /// appear on the other side of the channel. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use std::time::Duration; |
| /// use crossbeam_channel::{bounded, SendTimeoutError}; |
| /// |
| /// let (s, r) = bounded(0); |
| /// |
| /// thread::spawn(move || { |
| /// thread::sleep(Duration::from_secs(1)); |
| /// assert_eq!(r.recv(), Ok(2)); |
| /// drop(r); |
| /// }); |
| /// |
| /// assert_eq!( |
| /// s.send_timeout(1, Duration::from_millis(500)), |
| /// Err(SendTimeoutError::Timeout(1)), |
| /// ); |
| /// assert_eq!( |
| /// s.send_timeout(2, Duration::from_secs(1)), |
| /// Ok(()), |
| /// ); |
| /// assert_eq!( |
| /// s.send_timeout(3, Duration::from_millis(500)), |
| /// Err(SendTimeoutError::Disconnected(3)), |
| /// ); |
| /// ``` |
| pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> { |
| self.send_deadline(msg, Instant::now() + timeout) |
| } |
| |
| /// Waits for a message to be sent into the channel, but only until a given deadline. |
| /// |
| /// If the channel is full and not disconnected, this call will block until the send operation |
| /// can proceed or the operation times out. If the channel becomes disconnected, this call will |
| /// wake up and return an error. The returned error contains the original message. |
| /// |
| /// If called on a zero-capacity channel, this method will wait for a receive operation to |
| /// appear on the other side of the channel. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use std::time::{Duration, Instant}; |
| /// use crossbeam_channel::{bounded, SendTimeoutError}; |
| /// |
| /// let (s, r) = bounded(0); |
| /// |
| /// thread::spawn(move || { |
| /// thread::sleep(Duration::from_secs(1)); |
| /// assert_eq!(r.recv(), Ok(2)); |
| /// drop(r); |
| /// }); |
| /// |
| /// let now = Instant::now(); |
| /// |
| /// assert_eq!( |
| /// s.send_deadline(1, now + Duration::from_millis(500)), |
| /// Err(SendTimeoutError::Timeout(1)), |
| /// ); |
| /// assert_eq!( |
| /// s.send_deadline(2, now + Duration::from_millis(1500)), |
| /// Ok(()), |
| /// ); |
| /// assert_eq!( |
| /// s.send_deadline(3, now + Duration::from_millis(2000)), |
| /// Err(SendTimeoutError::Disconnected(3)), |
| /// ); |
| /// ``` |
| pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> { |
| match &self.flavor { |
| SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)), |
| SenderFlavor::List(chan) => chan.send(msg, Some(deadline)), |
| SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)), |
| } |
| } |
| |
| /// Returns `true` if the channel is empty. |
| /// |
| /// Note: Zero-capacity channels are always empty. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use crossbeam_channel::unbounded; |
| /// |
| /// let (s, r) = unbounded(); |
| /// assert!(s.is_empty()); |
| /// |
| /// s.send(0).unwrap(); |
| /// assert!(!s.is_empty()); |
| /// ``` |
| pub fn is_empty(&self) -> bool { |
| match &self.flavor { |
| SenderFlavor::Array(chan) => chan.is_empty(), |
| SenderFlavor::List(chan) => chan.is_empty(), |
| SenderFlavor::Zero(chan) => chan.is_empty(), |
| } |
| } |
| |
| /// Returns `true` if the channel is full. |
| /// |
| /// Note: Zero-capacity channels are always full. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use crossbeam_channel::bounded; |
| /// |
| /// let (s, r) = bounded(1); |
| /// |
| /// assert!(!s.is_full()); |
| /// s.send(0).unwrap(); |
| /// assert!(s.is_full()); |
| /// ``` |
| pub fn is_full(&self) -> bool { |
| match &self.flavor { |
| SenderFlavor::Array(chan) => chan.is_full(), |
| SenderFlavor::List(chan) => chan.is_full(), |
| SenderFlavor::Zero(chan) => chan.is_full(), |
| } |
| } |
| |
| /// Returns the number of messages in the channel. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use crossbeam_channel::unbounded; |
| /// |
| /// let (s, r) = unbounded(); |
| /// assert_eq!(s.len(), 0); |
| /// |
| /// s.send(1).unwrap(); |
| /// s.send(2).unwrap(); |
| /// assert_eq!(s.len(), 2); |
| /// ``` |
| pub fn len(&self) -> usize { |
| match &self.flavor { |
| SenderFlavor::Array(chan) => chan.len(), |
| SenderFlavor::List(chan) => chan.len(), |
| SenderFlavor::Zero(chan) => chan.len(), |
| } |
| } |
| |
| /// If the channel is bounded, returns its capacity. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use crossbeam_channel::{bounded, unbounded}; |
| /// |
| /// let (s, _) = unbounded::<i32>(); |
| /// assert_eq!(s.capacity(), None); |
| /// |
| /// let (s, _) = bounded::<i32>(5); |
| /// assert_eq!(s.capacity(), Some(5)); |
| /// |
| /// let (s, _) = bounded::<i32>(0); |
| /// assert_eq!(s.capacity(), Some(0)); |
| /// ``` |
| pub fn capacity(&self) -> Option<usize> { |
| match &self.flavor { |
| SenderFlavor::Array(chan) => chan.capacity(), |
| SenderFlavor::List(chan) => chan.capacity(), |
| SenderFlavor::Zero(chan) => chan.capacity(), |
| } |
| } |
| |
| /// Returns `true` if senders belong to the same channel. |
| /// |
| /// # Examples |
| /// |
| /// ```rust |
| /// use crossbeam_channel::unbounded; |
| /// |
| /// let (s, _) = unbounded::<usize>(); |
| /// |
| /// let s2 = s.clone(); |
| /// assert!(s.same_channel(&s2)); |
| /// |
| /// let (s3, _) = unbounded(); |
| /// assert!(!s.same_channel(&s3)); |
| /// ``` |
| pub fn same_channel(&self, other: &Sender<T>) -> bool { |
| match (&self.flavor, &other.flavor) { |
| (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b, |
| (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b, |
| (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b, |
| _ => false, |
| } |
| } |
| } |
| |
| impl<T> Drop for Sender<T> { |
| fn drop(&mut self) { |
| unsafe { |
| match &self.flavor { |
| SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()), |
| SenderFlavor::List(chan) => chan.release(|c| c.disconnect()), |
| SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()), |
| } |
| } |
| } |
| } |
| |
| impl<T> Clone for Sender<T> { |
| fn clone(&self) -> Self { |
| let flavor = match &self.flavor { |
| SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()), |
| SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()), |
| SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()), |
| }; |
| |
| Sender { flavor } |
| } |
| } |
| |
| impl<T> fmt::Debug for Sender<T> { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.pad("Sender { .. }") |
| } |
| } |
| |
| /// The receiving side of a channel. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use std::time::Duration; |
| /// use crossbeam_channel::unbounded; |
| /// |
| /// let (s, r) = unbounded(); |
| /// |
| /// thread::spawn(move || { |
| /// let _ = s.send(1); |
| /// thread::sleep(Duration::from_secs(1)); |
| /// let _ = s.send(2); |
| /// }); |
| /// |
| /// assert_eq!(r.recv(), Ok(1)); // Received immediately. |
| /// assert_eq!(r.recv(), Ok(2)); // Received after 1 second. |
| /// ``` |
| pub struct Receiver<T> { |
| flavor: ReceiverFlavor<T>, |
| } |
| |
| /// Receiver flavors. |
| enum ReceiverFlavor<T> { |
| /// Bounded channel based on a preallocated array. |
| Array(counter::Receiver<flavors::array::Channel<T>>), |
| |
| /// Unbounded channel implemented as a linked list. |
| List(counter::Receiver<flavors::list::Channel<T>>), |
| |
| /// Zero-capacity channel. |
| Zero(counter::Receiver<flavors::zero::Channel<T>>), |
| |
| /// The after flavor. |
| At(Arc<flavors::at::Channel>), |
| |
| /// The tick flavor. |
| Tick(Arc<flavors::tick::Channel>), |
| |
| /// The never flavor. |
| Never(flavors::never::Channel<T>), |
| } |
| |
| unsafe impl<T: Send> Send for Receiver<T> {} |
| unsafe impl<T: Send> Sync for Receiver<T> {} |
| |
| impl<T> UnwindSafe for Receiver<T> {} |
| impl<T> RefUnwindSafe for Receiver<T> {} |
| |
| impl<T> Receiver<T> { |
| /// Attempts to receive a message from the channel without blocking. |
| /// |
| /// This method will either receive a message from the channel immediately or return an error |
| /// if the channel is empty. |
| /// |
| /// If called on a zero-capacity channel, this method will receive a message only if there |
| /// happens to be a send operation on the other side of the channel at the same time. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use crossbeam_channel::{unbounded, TryRecvError}; |
| /// |
| /// let (s, r) = unbounded(); |
| /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); |
| /// |
| /// s.send(5).unwrap(); |
| /// drop(s); |
| /// |
| /// assert_eq!(r.try_recv(), Ok(5)); |
| /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); |
| /// ``` |
| pub fn try_recv(&self) -> Result<T, TryRecvError> { |
| match &self.flavor { |
| ReceiverFlavor::Array(chan) => chan.try_recv(), |
| ReceiverFlavor::List(chan) => chan.try_recv(), |
| ReceiverFlavor::Zero(chan) => chan.try_recv(), |
| ReceiverFlavor::At(chan) => { |
| let msg = chan.try_recv(); |
| unsafe { |
| mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>( |
| &msg, |
| ) |
| } |
| } |
| ReceiverFlavor::Tick(chan) => { |
| let msg = chan.try_recv(); |
| unsafe { |
| mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>( |
| &msg, |
| ) |
| } |
| } |
| ReceiverFlavor::Never(chan) => chan.try_recv(), |
| } |
| } |
| |
| /// Blocks the current thread until a message is received or the channel is empty and |
| /// disconnected. |
| /// |
| /// If the channel is empty and not disconnected, this call will block until the receive |
| /// operation can proceed. If the channel is empty and becomes disconnected, this call will |
| /// wake up and return an error. |
| /// |
| /// If called on a zero-capacity channel, this method will wait for a send operation to appear |
| /// on the other side of the channel. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use std::time::Duration; |
| /// use crossbeam_channel::{unbounded, RecvError}; |
| /// |
| /// let (s, r) = unbounded(); |
| /// |
| /// thread::spawn(move || { |
| /// thread::sleep(Duration::from_secs(1)); |
| /// s.send(5).unwrap(); |
| /// drop(s); |
| /// }); |
| /// |
| /// assert_eq!(r.recv(), Ok(5)); |
| /// assert_eq!(r.recv(), Err(RecvError)); |
| /// ``` |
| pub fn recv(&self) -> Result<T, RecvError> { |
| match &self.flavor { |
| ReceiverFlavor::Array(chan) => chan.recv(None), |
| ReceiverFlavor::List(chan) => chan.recv(None), |
| ReceiverFlavor::Zero(chan) => chan.recv(None), |
| ReceiverFlavor::At(chan) => { |
| let msg = chan.recv(None); |
| unsafe { |
| mem::transmute_copy::< |
| Result<Instant, RecvTimeoutError>, |
| Result<T, RecvTimeoutError>, |
| >(&msg) |
| } |
| } |
| ReceiverFlavor::Tick(chan) => { |
| let msg = chan.recv(None); |
| unsafe { |
| mem::transmute_copy::< |
| Result<Instant, RecvTimeoutError>, |
| Result<T, RecvTimeoutError>, |
| >(&msg) |
| } |
| } |
| ReceiverFlavor::Never(chan) => chan.recv(None), |
| } |
| .map_err(|_| RecvError) |
| } |
| |
| /// Waits for a message to be received from the channel, but only for a limited time. |
| /// |
| /// If the channel is empty and not disconnected, this call will block until the receive |
| /// operation can proceed or the operation times out. If the channel is empty and becomes |
| /// disconnected, this call will wake up and return an error. |
| /// |
| /// If called on a zero-capacity channel, this method will wait for a send operation to appear |
| /// on the other side of the channel. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use std::time::Duration; |
| /// use crossbeam_channel::{unbounded, RecvTimeoutError}; |
| /// |
| /// let (s, r) = unbounded(); |
| /// |
| /// thread::spawn(move || { |
| /// thread::sleep(Duration::from_secs(1)); |
| /// s.send(5).unwrap(); |
| /// drop(s); |
| /// }); |
| /// |
| /// assert_eq!( |
| /// r.recv_timeout(Duration::from_millis(500)), |
| /// Err(RecvTimeoutError::Timeout), |
| /// ); |
| /// assert_eq!( |
| /// r.recv_timeout(Duration::from_secs(1)), |
| /// Ok(5), |
| /// ); |
| /// assert_eq!( |
| /// r.recv_timeout(Duration::from_secs(1)), |
| /// Err(RecvTimeoutError::Disconnected), |
| /// ); |
| /// ``` |
| pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { |
| self.recv_deadline(Instant::now() + timeout) |
| } |
| |
| /// Waits for a message to be received from the channel, but only before a given deadline. |
| /// |
| /// If the channel is empty and not disconnected, this call will block until the receive |
| /// operation can proceed or the operation times out. If the channel is empty and becomes |
| /// disconnected, this call will wake up and return an error. |
| /// |
| /// If called on a zero-capacity channel, this method will wait for a send operation to appear |
| /// on the other side of the channel. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use std::time::{Instant, Duration}; |
| /// use crossbeam_channel::{unbounded, RecvTimeoutError}; |
| /// |
| /// let (s, r) = unbounded(); |
| /// |
| /// thread::spawn(move || { |
| /// thread::sleep(Duration::from_secs(1)); |
| /// s.send(5).unwrap(); |
| /// drop(s); |
| /// }); |
| /// |
| /// let now = Instant::now(); |
| /// |
| /// assert_eq!( |
| /// r.recv_deadline(now + Duration::from_millis(500)), |
| /// Err(RecvTimeoutError::Timeout), |
| /// ); |
| /// assert_eq!( |
| /// r.recv_deadline(now + Duration::from_millis(1500)), |
| /// Ok(5), |
| /// ); |
| /// assert_eq!( |
| /// r.recv_deadline(now + Duration::from_secs(5)), |
| /// Err(RecvTimeoutError::Disconnected), |
| /// ); |
| /// ``` |
| pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> { |
| match &self.flavor { |
| ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)), |
| ReceiverFlavor::List(chan) => chan.recv(Some(deadline)), |
| ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)), |
| ReceiverFlavor::At(chan) => { |
| let msg = chan.recv(Some(deadline)); |
| unsafe { |
| mem::transmute_copy::< |
| Result<Instant, RecvTimeoutError>, |
| Result<T, RecvTimeoutError>, |
| >(&msg) |
| } |
| } |
| ReceiverFlavor::Tick(chan) => { |
| let msg = chan.recv(Some(deadline)); |
| unsafe { |
| mem::transmute_copy::< |
| Result<Instant, RecvTimeoutError>, |
| Result<T, RecvTimeoutError>, |
| >(&msg) |
| } |
| } |
| ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)), |
| } |
| } |
| |
| /// Returns `true` if the channel is empty. |
| /// |
| /// Note: Zero-capacity channels are always empty. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use crossbeam_channel::unbounded; |
| /// |
| /// let (s, r) = unbounded(); |
| /// |
| /// assert!(r.is_empty()); |
| /// s.send(0).unwrap(); |
| /// assert!(!r.is_empty()); |
| /// ``` |
| pub fn is_empty(&self) -> bool { |
| match &self.flavor { |
| ReceiverFlavor::Array(chan) => chan.is_empty(), |
| ReceiverFlavor::List(chan) => chan.is_empty(), |
| ReceiverFlavor::Zero(chan) => chan.is_empty(), |
| ReceiverFlavor::At(chan) => chan.is_empty(), |
| ReceiverFlavor::Tick(chan) => chan.is_empty(), |
| ReceiverFlavor::Never(chan) => chan.is_empty(), |
| } |
| } |
| |
| /// Returns `true` if the channel is full. |
| /// |
| /// Note: Zero-capacity channels are always full. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use crossbeam_channel::bounded; |
| /// |
| /// let (s, r) = bounded(1); |
| /// |
| /// assert!(!r.is_full()); |
| /// s.send(0).unwrap(); |
| /// assert!(r.is_full()); |
| /// ``` |
| pub fn is_full(&self) -> bool { |
| match &self.flavor { |
| ReceiverFlavor::Array(chan) => chan.is_full(), |
| ReceiverFlavor::List(chan) => chan.is_full(), |
| ReceiverFlavor::Zero(chan) => chan.is_full(), |
| ReceiverFlavor::At(chan) => chan.is_full(), |
| ReceiverFlavor::Tick(chan) => chan.is_full(), |
| ReceiverFlavor::Never(chan) => chan.is_full(), |
| } |
| } |
| |
| /// Returns the number of messages in the channel. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use crossbeam_channel::unbounded; |
| /// |
| /// let (s, r) = unbounded(); |
| /// assert_eq!(r.len(), 0); |
| /// |
| /// s.send(1).unwrap(); |
| /// s.send(2).unwrap(); |
| /// assert_eq!(r.len(), 2); |
| /// ``` |
| pub fn len(&self) -> usize { |
| match &self.flavor { |
| ReceiverFlavor::Array(chan) => chan.len(), |
| ReceiverFlavor::List(chan) => chan.len(), |
| ReceiverFlavor::Zero(chan) => chan.len(), |
| ReceiverFlavor::At(chan) => chan.len(), |
| ReceiverFlavor::Tick(chan) => chan.len(), |
| ReceiverFlavor::Never(chan) => chan.len(), |
| } |
| } |
| |
| /// If the channel is bounded, returns its capacity. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use crossbeam_channel::{bounded, unbounded}; |
| /// |
| /// let (_, r) = unbounded::<i32>(); |
| /// assert_eq!(r.capacity(), None); |
| /// |
| /// let (_, r) = bounded::<i32>(5); |
| /// assert_eq!(r.capacity(), Some(5)); |
| /// |
| /// let (_, r) = bounded::<i32>(0); |
| /// assert_eq!(r.capacity(), Some(0)); |
| /// ``` |
| pub fn capacity(&self) -> Option<usize> { |
| match &self.flavor { |
| ReceiverFlavor::Array(chan) => chan.capacity(), |
| ReceiverFlavor::List(chan) => chan.capacity(), |
| ReceiverFlavor::Zero(chan) => chan.capacity(), |
| ReceiverFlavor::At(chan) => chan.capacity(), |
| ReceiverFlavor::Tick(chan) => chan.capacity(), |
| ReceiverFlavor::Never(chan) => chan.capacity(), |
| } |
| } |
| |
| /// A blocking iterator over messages in the channel. |
| /// |
| /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if |
| /// the channel becomes empty and disconnected, it returns [`None`] without blocking. |
| /// |
| /// [`next`]: Iterator::next |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use crossbeam_channel::unbounded; |
| /// |
| /// let (s, r) = unbounded(); |
| /// |
| /// thread::spawn(move || { |
| /// s.send(1).unwrap(); |
| /// s.send(2).unwrap(); |
| /// s.send(3).unwrap(); |
| /// drop(s); // Disconnect the channel. |
| /// }); |
| /// |
| /// // Collect all messages from the channel. |
| /// // Note that the call to `collect` blocks until the sender is dropped. |
| /// let v: Vec<_> = r.iter().collect(); |
| /// |
| /// assert_eq!(v, [1, 2, 3]); |
| /// ``` |
| pub fn iter(&self) -> Iter<'_, T> { |
| Iter { receiver: self } |
| } |
| |
| /// A non-blocking iterator over messages in the channel. |
| /// |
| /// Each call to [`next`] returns a message if there is one ready to be received. The iterator |
| /// never blocks waiting for the next message. |
| /// |
| /// [`next`]: Iterator::next |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use std::time::Duration; |
| /// use crossbeam_channel::unbounded; |
| /// |
| /// let (s, r) = unbounded::<i32>(); |
| /// |
| /// thread::spawn(move || { |
| /// s.send(1).unwrap(); |
| /// thread::sleep(Duration::from_secs(1)); |
| /// s.send(2).unwrap(); |
| /// thread::sleep(Duration::from_secs(2)); |
| /// s.send(3).unwrap(); |
| /// }); |
| /// |
| /// thread::sleep(Duration::from_secs(2)); |
| /// |
| /// // Collect all messages from the channel without blocking. |
| /// // The third message hasn't been sent yet so we'll collect only the first two. |
| /// let v: Vec<_> = r.try_iter().collect(); |
| /// |
| /// assert_eq!(v, [1, 2]); |
| /// ``` |
| pub fn try_iter(&self) -> TryIter<'_, T> { |
| TryIter { receiver: self } |
| } |
| |
| /// Returns `true` if receivers belong to the same channel. |
| /// |
| /// # Examples |
| /// |
| /// ```rust |
| /// use crossbeam_channel::unbounded; |
| /// |
| /// let (_, r) = unbounded::<usize>(); |
| /// |
| /// let r2 = r.clone(); |
| /// assert!(r.same_channel(&r2)); |
| /// |
| /// let (_, r3) = unbounded(); |
| /// assert!(!r.same_channel(&r3)); |
| /// ``` |
| pub fn same_channel(&self, other: &Receiver<T>) -> bool { |
| match (&self.flavor, &other.flavor) { |
| (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b, |
| (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b, |
| (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b, |
| (ReceiverFlavor::At(a), ReceiverFlavor::At(b)) => Arc::ptr_eq(a, b), |
| (ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b), |
| (ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true, |
| _ => false, |
| } |
| } |
| } |
| |
| impl<T> Drop for Receiver<T> { |
| fn drop(&mut self) { |
| unsafe { |
| match &self.flavor { |
| ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()), |
| ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect()), |
| ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()), |
| ReceiverFlavor::At(_) => {} |
| ReceiverFlavor::Tick(_) => {} |
| ReceiverFlavor::Never(_) => {} |
| } |
| } |
| } |
| } |
| |
| impl<T> Clone for Receiver<T> { |
| fn clone(&self) -> Self { |
| let flavor = match &self.flavor { |
| ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()), |
| ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()), |
| ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()), |
| ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()), |
| ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()), |
| ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()), |
| }; |
| |
| Receiver { flavor } |
| } |
| } |
| |
| impl<T> fmt::Debug for Receiver<T> { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.pad("Receiver { .. }") |
| } |
| } |
| |
| impl<'a, T> IntoIterator for &'a Receiver<T> { |
| type Item = T; |
| type IntoIter = Iter<'a, T>; |
| |
| fn into_iter(self) -> Self::IntoIter { |
| self.iter() |
| } |
| } |
| |
| impl<T> IntoIterator for Receiver<T> { |
| type Item = T; |
| type IntoIter = IntoIter<T>; |
| |
| fn into_iter(self) -> Self::IntoIter { |
| IntoIter { receiver: self } |
| } |
| } |
| |
| /// A blocking iterator over messages in a channel. |
| /// |
| /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the |
| /// channel becomes empty and disconnected, it returns [`None`] without blocking. |
| /// |
| /// [`next`]: Iterator::next |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use crossbeam_channel::unbounded; |
| /// |
| /// let (s, r) = unbounded(); |
| /// |
| /// thread::spawn(move || { |
| /// s.send(1).unwrap(); |
| /// s.send(2).unwrap(); |
| /// s.send(3).unwrap(); |
| /// drop(s); // Disconnect the channel. |
| /// }); |
| /// |
| /// // Collect all messages from the channel. |
| /// // Note that the call to `collect` blocks until the sender is dropped. |
| /// let v: Vec<_> = r.iter().collect(); |
| /// |
| /// assert_eq!(v, [1, 2, 3]); |
| /// ``` |
| pub struct Iter<'a, T> { |
| receiver: &'a Receiver<T>, |
| } |
| |
| impl<T> FusedIterator for Iter<'_, T> {} |
| |
| impl<T> Iterator for Iter<'_, T> { |
| type Item = T; |
| |
| fn next(&mut self) -> Option<Self::Item> { |
| self.receiver.recv().ok() |
| } |
| } |
| |
| impl<T> fmt::Debug for Iter<'_, T> { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.pad("Iter { .. }") |
| } |
| } |
| |
| /// A non-blocking iterator over messages in a channel. |
| /// |
| /// Each call to [`next`] returns a message if there is one ready to be received. The iterator |
| /// never blocks waiting for the next message. |
| /// |
| /// [`next`]: Iterator::next |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use std::time::Duration; |
| /// use crossbeam_channel::unbounded; |
| /// |
| /// let (s, r) = unbounded::<i32>(); |
| /// |
| /// thread::spawn(move || { |
| /// s.send(1).unwrap(); |
| /// thread::sleep(Duration::from_secs(1)); |
| /// s.send(2).unwrap(); |
| /// thread::sleep(Duration::from_secs(2)); |
| /// s.send(3).unwrap(); |
| /// }); |
| /// |
| /// thread::sleep(Duration::from_secs(2)); |
| /// |
| /// // Collect all messages from the channel without blocking. |
| /// // The third message hasn't been sent yet so we'll collect only the first two. |
| /// let v: Vec<_> = r.try_iter().collect(); |
| /// |
| /// assert_eq!(v, [1, 2]); |
| /// ``` |
| pub struct TryIter<'a, T> { |
| receiver: &'a Receiver<T>, |
| } |
| |
| impl<T> Iterator for TryIter<'_, T> { |
| type Item = T; |
| |
| fn next(&mut self) -> Option<Self::Item> { |
| self.receiver.try_recv().ok() |
| } |
| } |
| |
| impl<T> fmt::Debug for TryIter<'_, T> { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.pad("TryIter { .. }") |
| } |
| } |
| |
| /// A blocking iterator over messages in a channel. |
| /// |
| /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the |
| /// channel becomes empty and disconnected, it returns [`None`] without blocking. |
| /// |
| /// [`next`]: Iterator::next |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use std::thread; |
| /// use crossbeam_channel::unbounded; |
| /// |
| /// let (s, r) = unbounded(); |
| /// |
| /// thread::spawn(move || { |
| /// s.send(1).unwrap(); |
| /// s.send(2).unwrap(); |
| /// s.send(3).unwrap(); |
| /// drop(s); // Disconnect the channel. |
| /// }); |
| /// |
| /// // Collect all messages from the channel. |
| /// // Note that the call to `collect` blocks until the sender is dropped. |
| /// let v: Vec<_> = r.into_iter().collect(); |
| /// |
| /// assert_eq!(v, [1, 2, 3]); |
| /// ``` |
| pub struct IntoIter<T> { |
| receiver: Receiver<T>, |
| } |
| |
| impl<T> FusedIterator for IntoIter<T> {} |
| |
| impl<T> Iterator for IntoIter<T> { |
| type Item = T; |
| |
| fn next(&mut self) -> Option<Self::Item> { |
| self.receiver.recv().ok() |
| } |
| } |
| |
| impl<T> fmt::Debug for IntoIter<T> { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.pad("IntoIter { .. }") |
| } |
| } |
| |
| impl<T> SelectHandle for Sender<T> { |
| fn try_select(&self, token: &mut Token) -> bool { |
| match &self.flavor { |
| SenderFlavor::Array(chan) => chan.sender().try_select(token), |
| SenderFlavor::List(chan) => chan.sender().try_select(token), |
| SenderFlavor::Zero(chan) => chan.sender().try_select(token), |
| } |
| } |
| |
| fn deadline(&self) -> Option<Instant> { |
| None |
| } |
| |
| fn register(&self, oper: Operation, cx: &Context) -> bool { |
| match &self.flavor { |
| SenderFlavor::Array(chan) => chan.sender().register(oper, cx), |
| SenderFlavor::List(chan) => chan.sender().register(oper, cx), |
| SenderFlavor::Zero(chan) => chan.sender().register(oper, cx), |
| } |
| } |
| |
| fn unregister(&self, oper: Operation) { |
| match &self.flavor { |
| SenderFlavor::Array(chan) => chan.sender().unregister(oper), |
| SenderFlavor::List(chan) => chan.sender().unregister(oper), |
| SenderFlavor::Zero(chan) => chan.sender().unregister(oper), |
| } |
| } |
| |
| fn accept(&self, token: &mut Token, cx: &Context) -> bool { |
| match &self.flavor { |
| SenderFlavor::Array(chan) => chan.sender().accept(token, cx), |
| SenderFlavor::List(chan) => chan.sender().accept(token, cx), |
| SenderFlavor::Zero(chan) => chan.sender().accept(token, cx), |
| } |
| } |
| |
| fn is_ready(&self) -> bool { |
| match &self.flavor { |
| SenderFlavor::Array(chan) => chan.sender().is_ready(), |
| SenderFlavor::List(chan) => chan.sender().is_ready(), |
| SenderFlavor::Zero(chan) => chan.sender().is_ready(), |
| } |
| } |
| |
| fn watch(&self, oper: Operation, cx: &Context) -> bool { |
| match &self.flavor { |
| SenderFlavor::Array(chan) => chan.sender().watch(oper, cx), |
| SenderFlavor::List(chan) => chan.sender().watch(oper, cx), |
| SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx), |
| } |
| } |
| |
| fn unwatch(&self, oper: Operation) { |
| match &self.flavor { |
| SenderFlavor::Array(chan) => chan.sender().unwatch(oper), |
| SenderFlavor::List(chan) => chan.sender().unwatch(oper), |
| SenderFlavor::Zero(chan) => chan.sender().unwatch(oper), |
| } |
| } |
| } |
| |
| impl<T> SelectHandle for Receiver<T> { |
| fn try_select(&self, token: &mut Token) -> bool { |
| match &self.flavor { |
| ReceiverFlavor::Array(chan) => chan.receiver().try_select(token), |
| ReceiverFlavor::List(chan) => chan.receiver().try_select(token), |
| ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token), |
| ReceiverFlavor::At(chan) => chan.try_select(token), |
| ReceiverFlavor::Tick(chan) => chan.try_select(token), |
| ReceiverFlavor::Never(chan) => chan.try_select(token), |
| } |
| } |
| |
| fn deadline(&self) -> Option<Instant> { |
| match &self.flavor { |
| ReceiverFlavor::Array(_) => None, |
| ReceiverFlavor::List(_) => None, |
| ReceiverFlavor::Zero(_) => None, |
| ReceiverFlavor::At(chan) => chan.deadline(), |
| ReceiverFlavor::Tick(chan) => chan.deadline(), |
| ReceiverFlavor::Never(chan) => chan.deadline(), |
| } |
| } |
| |
| fn register(&self, oper: Operation, cx: &Context) -> bool { |
| match &self.flavor { |
| ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx), |
| ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx), |
| ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx), |
| ReceiverFlavor::At(chan) => chan.register(oper, cx), |
| ReceiverFlavor::Tick(chan) => chan.register(oper, cx), |
| ReceiverFlavor::Never(chan) => chan.register(oper, cx), |
| } |
| } |
| |
| fn unregister(&self, oper: Operation) { |
| match &self.flavor { |
| ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper), |
| ReceiverFlavor::List(chan) => chan.receiver().unregister(oper), |
| ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper), |
| ReceiverFlavor::At(chan) => chan.unregister(oper), |
| ReceiverFlavor::Tick(chan) => chan.unregister(oper), |
| ReceiverFlavor::Never(chan) => chan.unregister(oper), |
| } |
| } |
| |
| fn accept(&self, token: &mut Token, cx: &Context) -> bool { |
| match &self.flavor { |
| ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx), |
| ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx), |
| ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx), |
| ReceiverFlavor::At(chan) => chan.accept(token, cx), |
| ReceiverFlavor::Tick(chan) => chan.accept(token, cx), |
| ReceiverFlavor::Never(chan) => chan.accept(token, cx), |
| } |
| } |
| |
| fn is_ready(&self) -> bool { |
| match &self.flavor { |
| ReceiverFlavor::Array(chan) => chan.receiver().is_ready(), |
| ReceiverFlavor::List(chan) => chan.receiver().is_ready(), |
| ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(), |
| ReceiverFlavor::At(chan) => chan.is_ready(), |
| ReceiverFlavor::Tick(chan) => chan.is_ready(), |
| ReceiverFlavor::Never(chan) => chan.is_ready(), |
| } |
| } |
| |
| fn watch(&self, oper: Operation, cx: &Context) -> bool { |
| match &self.flavor { |
| ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx), |
| ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx), |
| ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx), |
| ReceiverFlavor::At(chan) => chan.watch(oper, cx), |
| ReceiverFlavor::Tick(chan) => chan.watch(oper, cx), |
| ReceiverFlavor::Never(chan) => chan.watch(oper, cx), |
| } |
| } |
| |
| fn unwatch(&self, oper: Operation) { |
| match &self.flavor { |
| ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper), |
| ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper), |
| ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper), |
| ReceiverFlavor::At(chan) => chan.unwatch(oper), |
| ReceiverFlavor::Tick(chan) => chan.unwatch(oper), |
| ReceiverFlavor::Never(chan) => chan.unwatch(oper), |
| } |
| } |
| } |
| |
| /// Writes a message into the channel. |
| pub unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> { |
| match &s.flavor { |
| SenderFlavor::Array(chan) => chan.write(token, msg), |
| SenderFlavor::List(chan) => chan.write(token, msg), |
| SenderFlavor::Zero(chan) => chan.write(token, msg), |
| } |
| } |
| |
| /// Reads a message from the channel. |
| pub unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> { |
| match &r.flavor { |
| ReceiverFlavor::Array(chan) => chan.read(token), |
| ReceiverFlavor::List(chan) => chan.read(token), |
| ReceiverFlavor::Zero(chan) => chan.read(token), |
| ReceiverFlavor::At(chan) => { |
| mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token)) |
| } |
| ReceiverFlavor::Tick(chan) => { |
| mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token)) |
| } |
| ReceiverFlavor::Never(chan) => chan.read(token), |
| } |
| } |