blob: ebcd6520189bc8b13ba26724f10279984ae8e2fe [file] [log] [blame]
Jakub Kotur2b588ff2020-12-21 17:28:14 +01001//! The channel interface.
2
3use std::fmt;
4use std::iter::FusedIterator;
5use std::mem;
6use std::panic::{RefUnwindSafe, UnwindSafe};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use crate::context::Context;
11use crate::counter;
12use crate::err::{
13 RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError,
14};
15use crate::flavors;
16use crate::select::{Operation, SelectHandle, Token};
17
18/// Creates a channel of unbounded capacity.
19///
20/// This channel has a growable buffer that can hold any number of messages at a time.
21///
22/// # Examples
23///
24/// ```
25/// use std::thread;
26/// use crossbeam_channel::unbounded;
27///
28/// let (s, r) = unbounded();
29///
30/// // Computes the n-th Fibonacci number.
31/// fn fib(n: i32) -> i32 {
32/// if n <= 1 {
33/// n
34/// } else {
35/// fib(n - 1) + fib(n - 2)
36/// }
37/// }
38///
39/// // Spawn an asynchronous computation.
40/// thread::spawn(move || s.send(fib(20)).unwrap());
41///
42/// // Print the result of the computation.
43/// println!("{}", r.recv().unwrap());
44/// ```
45pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
46 let (s, r) = counter::new(flavors::list::Channel::new());
47 let s = Sender {
48 flavor: SenderFlavor::List(s),
49 };
50 let r = Receiver {
51 flavor: ReceiverFlavor::List(r),
52 };
53 (s, r)
54}
55
56/// Creates a channel of bounded capacity.
57///
58/// This channel has a buffer that can hold at most `cap` messages at a time.
59///
60/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
61/// receive operations must appear at the same time in order to pair up and pass the message over.
62///
63/// # Examples
64///
65/// A channel of capacity 1:
66///
67/// ```
68/// use std::thread;
69/// use std::time::Duration;
70/// use crossbeam_channel::bounded;
71///
72/// let (s, r) = bounded(1);
73///
74/// // This call returns immediately because there is enough space in the channel.
75/// s.send(1).unwrap();
76///
77/// thread::spawn(move || {
78/// // This call blocks the current thread because the channel is full.
79/// // It will be able to complete only after the first message is received.
80/// s.send(2).unwrap();
81/// });
82///
83/// thread::sleep(Duration::from_secs(1));
84/// assert_eq!(r.recv(), Ok(1));
85/// assert_eq!(r.recv(), Ok(2));
86/// ```
87///
88/// A zero-capacity channel:
89///
90/// ```
91/// use std::thread;
92/// use std::time::Duration;
93/// use crossbeam_channel::bounded;
94///
95/// let (s, r) = bounded(0);
96///
97/// thread::spawn(move || {
98/// // This call blocks the current thread until a receive operation appears
99/// // on the other side of the channel.
100/// s.send(1).unwrap();
101/// });
102///
103/// thread::sleep(Duration::from_secs(1));
104/// assert_eq!(r.recv(), Ok(1));
105/// ```
106pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
107 if cap == 0 {
108 let (s, r) = counter::new(flavors::zero::Channel::new());
109 let s = Sender {
110 flavor: SenderFlavor::Zero(s),
111 };
112 let r = Receiver {
113 flavor: ReceiverFlavor::Zero(r),
114 };
115 (s, r)
116 } else {
117 let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap));
118 let s = Sender {
119 flavor: SenderFlavor::Array(s),
120 };
121 let r = Receiver {
122 flavor: ReceiverFlavor::Array(r),
123 };
124 (s, r)
125 }
126}
127
128/// Creates a receiver that delivers a message after a certain duration of time.
129///
130/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
131/// be sent into the channel after `duration` elapses. The message is the instant at which it is
132/// sent.
133///
134/// # Examples
135///
136/// Using an `after` channel for timeouts:
137///
138/// ```
139/// use std::time::Duration;
140/// use crossbeam_channel::{after, select, unbounded};
141///
142/// let (s, r) = unbounded::<i32>();
143/// let timeout = Duration::from_millis(100);
144///
145/// select! {
146/// recv(r) -> msg => println!("received {:?}", msg),
147/// recv(after(timeout)) -> _ => println!("timed out"),
148/// }
149/// ```
150///
151/// When the message gets sent:
152///
153/// ```
154/// use std::thread;
155/// use std::time::{Duration, Instant};
156/// use crossbeam_channel::after;
157///
158/// // Converts a number of milliseconds into a `Duration`.
159/// let ms = |ms| Duration::from_millis(ms);
160///
161/// // Returns `true` if `a` and `b` are very close `Instant`s.
162/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
163///
164/// let start = Instant::now();
165/// let r = after(ms(100));
166///
167/// thread::sleep(ms(500));
168///
169/// // This message was sent 100 ms from the start and received 500 ms from the start.
170/// assert!(eq(r.recv().unwrap(), start + ms(100)));
171/// assert!(eq(Instant::now(), start + ms(500)));
172/// ```
173pub fn after(duration: Duration) -> Receiver<Instant> {
174 Receiver {
175 flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_timeout(duration))),
176 }
177}
178
179/// Creates a receiver that delivers a message at a certain instant in time.
180///
181/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
182/// be sent into the channel at the moment in time `when`. The message is the instant at which it
183/// is sent, which is the same as `when`. If `when` is in the past, the message will be delivered
184/// instantly to the receiver.
185///
186/// # Examples
187///
188/// Using an `at` channel for timeouts:
189///
190/// ```
191/// use std::time::{Instant, Duration};
192/// use crossbeam_channel::{at, select, unbounded};
193///
194/// let (s, r) = unbounded::<i32>();
195/// let deadline = Instant::now() + Duration::from_millis(500);
196///
197/// select! {
198/// recv(r) -> msg => println!("received {:?}", msg),
199/// recv(at(deadline)) -> _ => println!("timed out"),
200/// }
201/// ```
202///
203/// When the message gets sent:
204///
205/// ```
206/// use std::time::{Duration, Instant};
207/// use crossbeam_channel::at;
208///
209/// // Converts a number of milliseconds into a `Duration`.
210/// let ms = |ms| Duration::from_millis(ms);
211///
212/// let start = Instant::now();
213/// let end = start + ms(100);
214///
215/// let r = at(end);
216///
217/// // This message was sent 100 ms from the start
218/// assert_eq!(r.recv().unwrap(), end);
219/// assert!(Instant::now() > start + ms(100));
220/// ```
221pub fn at(when: Instant) -> Receiver<Instant> {
222 Receiver {
223 flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(when))),
224 }
225}
226
227/// Creates a receiver that never delivers messages.
228///
229/// The channel is bounded with capacity of 0 and never gets disconnected.
230///
231/// # Examples
232///
233/// Using a `never` channel to optionally add a timeout to [`select!`]:
234///
235/// ```
236/// use std::thread;
237/// use std::time::Duration;
238/// use crossbeam_channel::{after, select, never, unbounded};
239///
240/// let (s, r) = unbounded();
241///
242/// thread::spawn(move || {
243/// thread::sleep(Duration::from_secs(1));
244/// s.send(1).unwrap();
245/// });
246///
247/// // Suppose this duration can be a `Some` or a `None`.
248/// let duration = Some(Duration::from_millis(100));
249///
250/// // Create a channel that times out after the specified duration.
251/// let timeout = duration
252/// .map(|d| after(d))
253/// .unwrap_or(never());
254///
255/// select! {
256/// recv(r) -> msg => assert_eq!(msg, Ok(1)),
257/// recv(timeout) -> _ => println!("timed out"),
258/// }
259/// ```
260///
261/// [`select!`]: macro.select.html
262pub fn never<T>() -> Receiver<T> {
263 Receiver {
264 flavor: ReceiverFlavor::Never(flavors::never::Channel::new()),
265 }
266}
267
268/// Creates a receiver that delivers messages periodically.
269///
270/// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be
271/// sent into the channel in intervals of `duration`. Each message is the instant at which it is
272/// sent.
273///
274/// # Examples
275///
276/// Using a `tick` channel to periodically print elapsed time:
277///
278/// ```
279/// use std::time::{Duration, Instant};
280/// use crossbeam_channel::tick;
281///
282/// let start = Instant::now();
283/// let ticker = tick(Duration::from_millis(100));
284///
285/// for _ in 0..5 {
286/// ticker.recv().unwrap();
287/// println!("elapsed: {:?}", start.elapsed());
288/// }
289/// ```
290///
291/// When messages get sent:
292///
293/// ```
294/// use std::thread;
295/// use std::time::{Duration, Instant};
296/// use crossbeam_channel::tick;
297///
298/// // Converts a number of milliseconds into a `Duration`.
299/// let ms = |ms| Duration::from_millis(ms);
300///
301/// // Returns `true` if `a` and `b` are very close `Instant`s.
302/// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
303///
304/// let start = Instant::now();
305/// let r = tick(ms(100));
306///
307/// // This message was sent 100 ms from the start and received 100 ms from the start.
308/// assert!(eq(r.recv().unwrap(), start + ms(100)));
309/// assert!(eq(Instant::now(), start + ms(100)));
310///
311/// thread::sleep(ms(500));
312///
313/// // This message was sent 200 ms from the start and received 600 ms from the start.
314/// assert!(eq(r.recv().unwrap(), start + ms(200)));
315/// assert!(eq(Instant::now(), start + ms(600)));
316///
317/// // This message was sent 700 ms from the start and received 700 ms from the start.
318/// assert!(eq(r.recv().unwrap(), start + ms(700)));
319/// assert!(eq(Instant::now(), start + ms(700)));
320/// ```
321pub fn tick(duration: Duration) -> Receiver<Instant> {
322 Receiver {
323 flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(duration))),
324 }
325}
326
327/// The sending side of a channel.
328///
329/// # Examples
330///
331/// ```
332/// use std::thread;
333/// use crossbeam_channel::unbounded;
334///
335/// let (s1, r) = unbounded();
336/// let s2 = s1.clone();
337///
338/// thread::spawn(move || s1.send(1).unwrap());
339/// thread::spawn(move || s2.send(2).unwrap());
340///
341/// let msg1 = r.recv().unwrap();
342/// let msg2 = r.recv().unwrap();
343///
344/// assert_eq!(msg1 + msg2, 3);
345/// ```
346pub struct Sender<T> {
347 flavor: SenderFlavor<T>,
348}
349
350/// Sender flavors.
351enum SenderFlavor<T> {
352 /// Bounded channel based on a preallocated array.
353 Array(counter::Sender<flavors::array::Channel<T>>),
354
355 /// Unbounded channel implemented as a linked list.
356 List(counter::Sender<flavors::list::Channel<T>>),
357
358 /// Zero-capacity channel.
359 Zero(counter::Sender<flavors::zero::Channel<T>>),
360}
361
362unsafe impl<T: Send> Send for Sender<T> {}
363unsafe impl<T: Send> Sync for Sender<T> {}
364
365impl<T> UnwindSafe for Sender<T> {}
366impl<T> RefUnwindSafe for Sender<T> {}
367
368impl<T> Sender<T> {
369 /// Attempts to send a message into the channel without blocking.
370 ///
371 /// This method will either send a message into the channel immediately or return an error if
372 /// the channel is full or disconnected. The returned error contains the original message.
373 ///
374 /// If called on a zero-capacity channel, this method will send the message only if there
375 /// happens to be a receive operation on the other side of the channel at the same time.
376 ///
377 /// # Examples
378 ///
379 /// ```
380 /// use crossbeam_channel::{bounded, TrySendError};
381 ///
382 /// let (s, r) = bounded(1);
383 ///
384 /// assert_eq!(s.try_send(1), Ok(()));
385 /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
386 ///
387 /// drop(r);
388 /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3)));
389 /// ```
390 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
391 match &self.flavor {
392 SenderFlavor::Array(chan) => chan.try_send(msg),
393 SenderFlavor::List(chan) => chan.try_send(msg),
394 SenderFlavor::Zero(chan) => chan.try_send(msg),
395 }
396 }
397
398 /// Blocks the current thread until a message is sent or the channel is disconnected.
399 ///
400 /// If the channel is full and not disconnected, this call will block until the send operation
401 /// can proceed. If the channel becomes disconnected, this call will wake up and return an
402 /// error. The returned error contains the original message.
403 ///
404 /// If called on a zero-capacity channel, this method will wait for a receive operation to
405 /// appear on the other side of the channel.
406 ///
407 /// # Examples
408 ///
409 /// ```
410 /// use std::thread;
411 /// use std::time::Duration;
412 /// use crossbeam_channel::{bounded, SendError};
413 ///
414 /// let (s, r) = bounded(1);
415 /// assert_eq!(s.send(1), Ok(()));
416 ///
417 /// thread::spawn(move || {
418 /// assert_eq!(r.recv(), Ok(1));
419 /// thread::sleep(Duration::from_secs(1));
420 /// drop(r);
421 /// });
422 ///
423 /// assert_eq!(s.send(2), Ok(()));
424 /// assert_eq!(s.send(3), Err(SendError(3)));
425 /// ```
426 pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
427 match &self.flavor {
428 SenderFlavor::Array(chan) => chan.send(msg, None),
429 SenderFlavor::List(chan) => chan.send(msg, None),
430 SenderFlavor::Zero(chan) => chan.send(msg, None),
431 }
432 .map_err(|err| match err {
433 SendTimeoutError::Disconnected(msg) => SendError(msg),
434 SendTimeoutError::Timeout(_) => unreachable!(),
435 })
436 }
437
438 /// Waits for a message to be sent into the channel, but only for a limited time.
439 ///
440 /// If the channel is full and not disconnected, this call will block until the send operation
441 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
442 /// wake up and return an error. The returned error contains the original message.
443 ///
444 /// If called on a zero-capacity channel, this method will wait for a receive operation to
445 /// appear on the other side of the channel.
446 ///
447 /// # Examples
448 ///
449 /// ```
450 /// use std::thread;
451 /// use std::time::Duration;
452 /// use crossbeam_channel::{bounded, SendTimeoutError};
453 ///
454 /// let (s, r) = bounded(0);
455 ///
456 /// thread::spawn(move || {
457 /// thread::sleep(Duration::from_secs(1));
458 /// assert_eq!(r.recv(), Ok(2));
459 /// drop(r);
460 /// });
461 ///
462 /// assert_eq!(
463 /// s.send_timeout(1, Duration::from_millis(500)),
464 /// Err(SendTimeoutError::Timeout(1)),
465 /// );
466 /// assert_eq!(
467 /// s.send_timeout(2, Duration::from_secs(1)),
468 /// Ok(()),
469 /// );
470 /// assert_eq!(
471 /// s.send_timeout(3, Duration::from_millis(500)),
472 /// Err(SendTimeoutError::Disconnected(3)),
473 /// );
474 /// ```
475 pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
476 self.send_deadline(msg, Instant::now() + timeout)
477 }
478
479 /// Waits for a message to be sent into the channel, but only until a given deadline.
480 ///
481 /// If the channel is full and not disconnected, this call will block until the send operation
482 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
483 /// wake up and return an error. The returned error contains the original message.
484 ///
485 /// If called on a zero-capacity channel, this method will wait for a receive operation to
486 /// appear on the other side of the channel.
487 ///
488 /// # Examples
489 ///
490 /// ```
491 /// use std::thread;
492 /// use std::time::{Duration, Instant};
493 /// use crossbeam_channel::{bounded, SendTimeoutError};
494 ///
495 /// let (s, r) = bounded(0);
496 ///
497 /// thread::spawn(move || {
498 /// thread::sleep(Duration::from_secs(1));
499 /// assert_eq!(r.recv(), Ok(2));
500 /// drop(r);
501 /// });
502 ///
503 /// let now = Instant::now();
504 ///
505 /// assert_eq!(
506 /// s.send_deadline(1, now + Duration::from_millis(500)),
507 /// Err(SendTimeoutError::Timeout(1)),
508 /// );
509 /// assert_eq!(
510 /// s.send_deadline(2, now + Duration::from_millis(1500)),
511 /// Ok(()),
512 /// );
513 /// assert_eq!(
514 /// s.send_deadline(3, now + Duration::from_millis(2000)),
515 /// Err(SendTimeoutError::Disconnected(3)),
516 /// );
517 /// ```
518 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
519 match &self.flavor {
520 SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
521 SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
522 SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
523 }
524 }
525
526 /// Returns `true` if the channel is empty.
527 ///
528 /// Note: Zero-capacity channels are always empty.
529 ///
530 /// # Examples
531 ///
532 /// ```
533 /// use crossbeam_channel::unbounded;
534 ///
535 /// let (s, r) = unbounded();
536 /// assert!(s.is_empty());
537 ///
538 /// s.send(0).unwrap();
539 /// assert!(!s.is_empty());
540 /// ```
541 pub fn is_empty(&self) -> bool {
542 match &self.flavor {
543 SenderFlavor::Array(chan) => chan.is_empty(),
544 SenderFlavor::List(chan) => chan.is_empty(),
545 SenderFlavor::Zero(chan) => chan.is_empty(),
546 }
547 }
548
549 /// Returns `true` if the channel is full.
550 ///
551 /// Note: Zero-capacity channels are always full.
552 ///
553 /// # Examples
554 ///
555 /// ```
556 /// use crossbeam_channel::bounded;
557 ///
558 /// let (s, r) = bounded(1);
559 ///
560 /// assert!(!s.is_full());
561 /// s.send(0).unwrap();
562 /// assert!(s.is_full());
563 /// ```
564 pub fn is_full(&self) -> bool {
565 match &self.flavor {
566 SenderFlavor::Array(chan) => chan.is_full(),
567 SenderFlavor::List(chan) => chan.is_full(),
568 SenderFlavor::Zero(chan) => chan.is_full(),
569 }
570 }
571
572 /// Returns the number of messages in the channel.
573 ///
574 /// # Examples
575 ///
576 /// ```
577 /// use crossbeam_channel::unbounded;
578 ///
579 /// let (s, r) = unbounded();
580 /// assert_eq!(s.len(), 0);
581 ///
582 /// s.send(1).unwrap();
583 /// s.send(2).unwrap();
584 /// assert_eq!(s.len(), 2);
585 /// ```
586 pub fn len(&self) -> usize {
587 match &self.flavor {
588 SenderFlavor::Array(chan) => chan.len(),
589 SenderFlavor::List(chan) => chan.len(),
590 SenderFlavor::Zero(chan) => chan.len(),
591 }
592 }
593
594 /// If the channel is bounded, returns its capacity.
595 ///
596 /// # Examples
597 ///
598 /// ```
599 /// use crossbeam_channel::{bounded, unbounded};
600 ///
601 /// let (s, _) = unbounded::<i32>();
602 /// assert_eq!(s.capacity(), None);
603 ///
604 /// let (s, _) = bounded::<i32>(5);
605 /// assert_eq!(s.capacity(), Some(5));
606 ///
607 /// let (s, _) = bounded::<i32>(0);
608 /// assert_eq!(s.capacity(), Some(0));
609 /// ```
610 pub fn capacity(&self) -> Option<usize> {
611 match &self.flavor {
612 SenderFlavor::Array(chan) => chan.capacity(),
613 SenderFlavor::List(chan) => chan.capacity(),
614 SenderFlavor::Zero(chan) => chan.capacity(),
615 }
616 }
617
618 /// Returns `true` if senders belong to the same channel.
619 ///
620 /// # Examples
621 ///
622 /// ```rust
623 /// use crossbeam_channel::unbounded;
624 ///
625 /// let (s, _) = unbounded::<usize>();
626 ///
627 /// let s2 = s.clone();
628 /// assert!(s.same_channel(&s2));
629 ///
630 /// let (s3, _) = unbounded();
631 /// assert!(!s.same_channel(&s3));
632 /// ```
633 pub fn same_channel(&self, other: &Sender<T>) -> bool {
634 match (&self.flavor, &other.flavor) {
635 (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
636 (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
637 (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
638 _ => false,
639 }
640 }
641}
642
643impl<T> Drop for Sender<T> {
644 fn drop(&mut self) {
645 unsafe {
646 match &self.flavor {
647 SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
648 SenderFlavor::List(chan) => chan.release(|c| c.disconnect()),
649 SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
650 }
651 }
652 }
653}
654
655impl<T> Clone for Sender<T> {
656 fn clone(&self) -> Self {
657 let flavor = match &self.flavor {
658 SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
659 SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
660 SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
661 };
662
663 Sender { flavor }
664 }
665}
666
667impl<T> fmt::Debug for Sender<T> {
668 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
669 f.pad("Sender { .. }")
670 }
671}
672
673/// The receiving side of a channel.
674///
675/// # Examples
676///
677/// ```
678/// use std::thread;
679/// use std::time::Duration;
680/// use crossbeam_channel::unbounded;
681///
682/// let (s, r) = unbounded();
683///
684/// thread::spawn(move || {
685/// let _ = s.send(1);
686/// thread::sleep(Duration::from_secs(1));
687/// let _ = s.send(2);
688/// });
689///
690/// assert_eq!(r.recv(), Ok(1)); // Received immediately.
691/// assert_eq!(r.recv(), Ok(2)); // Received after 1 second.
692/// ```
693pub struct Receiver<T> {
694 flavor: ReceiverFlavor<T>,
695}
696
697/// Receiver flavors.
698enum ReceiverFlavor<T> {
699 /// Bounded channel based on a preallocated array.
700 Array(counter::Receiver<flavors::array::Channel<T>>),
701
702 /// Unbounded channel implemented as a linked list.
703 List(counter::Receiver<flavors::list::Channel<T>>),
704
705 /// Zero-capacity channel.
706 Zero(counter::Receiver<flavors::zero::Channel<T>>),
707
708 /// The after flavor.
709 At(Arc<flavors::at::Channel>),
710
711 /// The tick flavor.
712 Tick(Arc<flavors::tick::Channel>),
713
714 /// The never flavor.
715 Never(flavors::never::Channel<T>),
716}
717
718unsafe impl<T: Send> Send for Receiver<T> {}
719unsafe impl<T: Send> Sync for Receiver<T> {}
720
721impl<T> UnwindSafe for Receiver<T> {}
722impl<T> RefUnwindSafe for Receiver<T> {}
723
724impl<T> Receiver<T> {
725 /// Attempts to receive a message from the channel without blocking.
726 ///
727 /// This method will either receive a message from the channel immediately or return an error
728 /// if the channel is empty.
729 ///
730 /// If called on a zero-capacity channel, this method will receive a message only if there
731 /// happens to be a send operation on the other side of the channel at the same time.
732 ///
733 /// # Examples
734 ///
735 /// ```
736 /// use crossbeam_channel::{unbounded, TryRecvError};
737 ///
738 /// let (s, r) = unbounded();
739 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
740 ///
741 /// s.send(5).unwrap();
742 /// drop(s);
743 ///
744 /// assert_eq!(r.try_recv(), Ok(5));
745 /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
746 /// ```
747 pub fn try_recv(&self) -> Result<T, TryRecvError> {
748 match &self.flavor {
749 ReceiverFlavor::Array(chan) => chan.try_recv(),
750 ReceiverFlavor::List(chan) => chan.try_recv(),
751 ReceiverFlavor::Zero(chan) => chan.try_recv(),
752 ReceiverFlavor::At(chan) => {
753 let msg = chan.try_recv();
754 unsafe {
755 mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
756 &msg,
757 )
758 }
759 }
760 ReceiverFlavor::Tick(chan) => {
761 let msg = chan.try_recv();
762 unsafe {
763 mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
764 &msg,
765 )
766 }
767 }
768 ReceiverFlavor::Never(chan) => chan.try_recv(),
769 }
770 }
771
772 /// Blocks the current thread until a message is received or the channel is empty and
773 /// disconnected.
774 ///
775 /// If the channel is empty and not disconnected, this call will block until the receive
776 /// operation can proceed. If the channel is empty and becomes disconnected, this call will
777 /// wake up and return an error.
778 ///
779 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
780 /// on the other side of the channel.
781 ///
782 /// # Examples
783 ///
784 /// ```
785 /// use std::thread;
786 /// use std::time::Duration;
787 /// use crossbeam_channel::{unbounded, RecvError};
788 ///
789 /// let (s, r) = unbounded();
790 ///
791 /// thread::spawn(move || {
792 /// thread::sleep(Duration::from_secs(1));
793 /// s.send(5).unwrap();
794 /// drop(s);
795 /// });
796 ///
797 /// assert_eq!(r.recv(), Ok(5));
798 /// assert_eq!(r.recv(), Err(RecvError));
799 /// ```
800 pub fn recv(&self) -> Result<T, RecvError> {
801 match &self.flavor {
802 ReceiverFlavor::Array(chan) => chan.recv(None),
803 ReceiverFlavor::List(chan) => chan.recv(None),
804 ReceiverFlavor::Zero(chan) => chan.recv(None),
805 ReceiverFlavor::At(chan) => {
806 let msg = chan.recv(None);
807 unsafe {
808 mem::transmute_copy::<
809 Result<Instant, RecvTimeoutError>,
810 Result<T, RecvTimeoutError>,
811 >(&msg)
812 }
813 }
814 ReceiverFlavor::Tick(chan) => {
815 let msg = chan.recv(None);
816 unsafe {
817 mem::transmute_copy::<
818 Result<Instant, RecvTimeoutError>,
819 Result<T, RecvTimeoutError>,
820 >(&msg)
821 }
822 }
823 ReceiverFlavor::Never(chan) => chan.recv(None),
824 }
825 .map_err(|_| RecvError)
826 }
827
828 /// Waits for a message to be received from the channel, but only for a limited time.
829 ///
830 /// If the channel is empty and not disconnected, this call will block until the receive
831 /// operation can proceed or the operation times out. If the channel is empty and becomes
832 /// disconnected, this call will wake up and return an error.
833 ///
834 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
835 /// on the other side of the channel.
836 ///
837 /// # Examples
838 ///
839 /// ```
840 /// use std::thread;
841 /// use std::time::Duration;
842 /// use crossbeam_channel::{unbounded, RecvTimeoutError};
843 ///
844 /// let (s, r) = unbounded();
845 ///
846 /// thread::spawn(move || {
847 /// thread::sleep(Duration::from_secs(1));
848 /// s.send(5).unwrap();
849 /// drop(s);
850 /// });
851 ///
852 /// assert_eq!(
853 /// r.recv_timeout(Duration::from_millis(500)),
854 /// Err(RecvTimeoutError::Timeout),
855 /// );
856 /// assert_eq!(
857 /// r.recv_timeout(Duration::from_secs(1)),
858 /// Ok(5),
859 /// );
860 /// assert_eq!(
861 /// r.recv_timeout(Duration::from_secs(1)),
862 /// Err(RecvTimeoutError::Disconnected),
863 /// );
864 /// ```
865 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
866 self.recv_deadline(Instant::now() + timeout)
867 }
868
869 /// Waits for a message to be received from the channel, but only before a given deadline.
870 ///
871 /// If the channel is empty and not disconnected, this call will block until the receive
872 /// operation can proceed or the operation times out. If the channel is empty and becomes
873 /// disconnected, this call will wake up and return an error.
874 ///
875 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
876 /// on the other side of the channel.
877 ///
878 /// # Examples
879 ///
880 /// ```
881 /// use std::thread;
882 /// use std::time::{Instant, Duration};
883 /// use crossbeam_channel::{unbounded, RecvTimeoutError};
884 ///
885 /// let (s, r) = unbounded();
886 ///
887 /// thread::spawn(move || {
888 /// thread::sleep(Duration::from_secs(1));
889 /// s.send(5).unwrap();
890 /// drop(s);
891 /// });
892 ///
893 /// let now = Instant::now();
894 ///
895 /// assert_eq!(
896 /// r.recv_deadline(now + Duration::from_millis(500)),
897 /// Err(RecvTimeoutError::Timeout),
898 /// );
899 /// assert_eq!(
900 /// r.recv_deadline(now + Duration::from_millis(1500)),
901 /// Ok(5),
902 /// );
903 /// assert_eq!(
904 /// r.recv_deadline(now + Duration::from_secs(5)),
905 /// Err(RecvTimeoutError::Disconnected),
906 /// );
907 /// ```
908 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
909 match &self.flavor {
910 ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
911 ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
912 ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
913 ReceiverFlavor::At(chan) => {
914 let msg = chan.recv(Some(deadline));
915 unsafe {
916 mem::transmute_copy::<
917 Result<Instant, RecvTimeoutError>,
918 Result<T, RecvTimeoutError>,
919 >(&msg)
920 }
921 }
922 ReceiverFlavor::Tick(chan) => {
923 let msg = chan.recv(Some(deadline));
924 unsafe {
925 mem::transmute_copy::<
926 Result<Instant, RecvTimeoutError>,
927 Result<T, RecvTimeoutError>,
928 >(&msg)
929 }
930 }
931 ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)),
932 }
933 }
934
935 /// Returns `true` if the channel is empty.
936 ///
937 /// Note: Zero-capacity channels are always empty.
938 ///
939 /// # Examples
940 ///
941 /// ```
942 /// use crossbeam_channel::unbounded;
943 ///
944 /// let (s, r) = unbounded();
945 ///
946 /// assert!(r.is_empty());
947 /// s.send(0).unwrap();
948 /// assert!(!r.is_empty());
949 /// ```
950 pub fn is_empty(&self) -> bool {
951 match &self.flavor {
952 ReceiverFlavor::Array(chan) => chan.is_empty(),
953 ReceiverFlavor::List(chan) => chan.is_empty(),
954 ReceiverFlavor::Zero(chan) => chan.is_empty(),
955 ReceiverFlavor::At(chan) => chan.is_empty(),
956 ReceiverFlavor::Tick(chan) => chan.is_empty(),
957 ReceiverFlavor::Never(chan) => chan.is_empty(),
958 }
959 }
960
961 /// Returns `true` if the channel is full.
962 ///
963 /// Note: Zero-capacity channels are always full.
964 ///
965 /// # Examples
966 ///
967 /// ```
968 /// use crossbeam_channel::bounded;
969 ///
970 /// let (s, r) = bounded(1);
971 ///
972 /// assert!(!r.is_full());
973 /// s.send(0).unwrap();
974 /// assert!(r.is_full());
975 /// ```
976 pub fn is_full(&self) -> bool {
977 match &self.flavor {
978 ReceiverFlavor::Array(chan) => chan.is_full(),
979 ReceiverFlavor::List(chan) => chan.is_full(),
980 ReceiverFlavor::Zero(chan) => chan.is_full(),
981 ReceiverFlavor::At(chan) => chan.is_full(),
982 ReceiverFlavor::Tick(chan) => chan.is_full(),
983 ReceiverFlavor::Never(chan) => chan.is_full(),
984 }
985 }
986
987 /// Returns the number of messages in the channel.
988 ///
989 /// # Examples
990 ///
991 /// ```
992 /// use crossbeam_channel::unbounded;
993 ///
994 /// let (s, r) = unbounded();
995 /// assert_eq!(r.len(), 0);
996 ///
997 /// s.send(1).unwrap();
998 /// s.send(2).unwrap();
999 /// assert_eq!(r.len(), 2);
1000 /// ```
1001 pub fn len(&self) -> usize {
1002 match &self.flavor {
1003 ReceiverFlavor::Array(chan) => chan.len(),
1004 ReceiverFlavor::List(chan) => chan.len(),
1005 ReceiverFlavor::Zero(chan) => chan.len(),
1006 ReceiverFlavor::At(chan) => chan.len(),
1007 ReceiverFlavor::Tick(chan) => chan.len(),
1008 ReceiverFlavor::Never(chan) => chan.len(),
1009 }
1010 }
1011
1012 /// If the channel is bounded, returns its capacity.
1013 ///
1014 /// # Examples
1015 ///
1016 /// ```
1017 /// use crossbeam_channel::{bounded, unbounded};
1018 ///
1019 /// let (_, r) = unbounded::<i32>();
1020 /// assert_eq!(r.capacity(), None);
1021 ///
1022 /// let (_, r) = bounded::<i32>(5);
1023 /// assert_eq!(r.capacity(), Some(5));
1024 ///
1025 /// let (_, r) = bounded::<i32>(0);
1026 /// assert_eq!(r.capacity(), Some(0));
1027 /// ```
1028 pub fn capacity(&self) -> Option<usize> {
1029 match &self.flavor {
1030 ReceiverFlavor::Array(chan) => chan.capacity(),
1031 ReceiverFlavor::List(chan) => chan.capacity(),
1032 ReceiverFlavor::Zero(chan) => chan.capacity(),
1033 ReceiverFlavor::At(chan) => chan.capacity(),
1034 ReceiverFlavor::Tick(chan) => chan.capacity(),
1035 ReceiverFlavor::Never(chan) => chan.capacity(),
1036 }
1037 }
1038
1039 /// A blocking iterator over messages in the channel.
1040 ///
1041 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if
1042 /// the channel becomes empty and disconnected, it returns [`None`] without blocking.
1043 ///
1044 /// [`next`]: Iterator::next
1045 ///
1046 /// # Examples
1047 ///
1048 /// ```
1049 /// use std::thread;
1050 /// use crossbeam_channel::unbounded;
1051 ///
1052 /// let (s, r) = unbounded();
1053 ///
1054 /// thread::spawn(move || {
1055 /// s.send(1).unwrap();
1056 /// s.send(2).unwrap();
1057 /// s.send(3).unwrap();
1058 /// drop(s); // Disconnect the channel.
1059 /// });
1060 ///
1061 /// // Collect all messages from the channel.
1062 /// // Note that the call to `collect` blocks until the sender is dropped.
1063 /// let v: Vec<_> = r.iter().collect();
1064 ///
1065 /// assert_eq!(v, [1, 2, 3]);
1066 /// ```
1067 pub fn iter(&self) -> Iter<'_, T> {
1068 Iter { receiver: self }
1069 }
1070
1071 /// A non-blocking iterator over messages in the channel.
1072 ///
1073 /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1074 /// never blocks waiting for the next message.
1075 ///
1076 /// [`next`]: Iterator::next
1077 ///
1078 /// # Examples
1079 ///
1080 /// ```
1081 /// use std::thread;
1082 /// use std::time::Duration;
1083 /// use crossbeam_channel::unbounded;
1084 ///
1085 /// let (s, r) = unbounded::<i32>();
1086 ///
1087 /// thread::spawn(move || {
1088 /// s.send(1).unwrap();
1089 /// thread::sleep(Duration::from_secs(1));
1090 /// s.send(2).unwrap();
1091 /// thread::sleep(Duration::from_secs(2));
1092 /// s.send(3).unwrap();
1093 /// });
1094 ///
1095 /// thread::sleep(Duration::from_secs(2));
1096 ///
1097 /// // Collect all messages from the channel without blocking.
1098 /// // The third message hasn't been sent yet so we'll collect only the first two.
1099 /// let v: Vec<_> = r.try_iter().collect();
1100 ///
1101 /// assert_eq!(v, [1, 2]);
1102 /// ```
1103 pub fn try_iter(&self) -> TryIter<'_, T> {
1104 TryIter { receiver: self }
1105 }
1106
1107 /// Returns `true` if receivers belong to the same channel.
1108 ///
1109 /// # Examples
1110 ///
1111 /// ```rust
1112 /// use crossbeam_channel::unbounded;
1113 ///
1114 /// let (_, r) = unbounded::<usize>();
1115 ///
1116 /// let r2 = r.clone();
1117 /// assert!(r.same_channel(&r2));
1118 ///
1119 /// let (_, r3) = unbounded();
1120 /// assert!(!r.same_channel(&r3));
1121 /// ```
1122 pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1123 match (&self.flavor, &other.flavor) {
1124 (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
1125 (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
1126 (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
1127 (ReceiverFlavor::At(a), ReceiverFlavor::At(b)) => Arc::ptr_eq(a, b),
1128 (ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b),
1129 (ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true,
1130 _ => false,
1131 }
1132 }
1133}
1134
1135impl<T> Drop for Receiver<T> {
1136 fn drop(&mut self) {
1137 unsafe {
1138 match &self.flavor {
1139 ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
1140 ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect()),
1141 ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
1142 ReceiverFlavor::At(_) => {}
1143 ReceiverFlavor::Tick(_) => {}
1144 ReceiverFlavor::Never(_) => {}
1145 }
1146 }
1147 }
1148}
1149
1150impl<T> Clone for Receiver<T> {
1151 fn clone(&self) -> Self {
1152 let flavor = match &self.flavor {
1153 ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
1154 ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
1155 ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
1156 ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()),
1157 ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()),
1158 ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()),
1159 };
1160
1161 Receiver { flavor }
1162 }
1163}
1164
1165impl<T> fmt::Debug for Receiver<T> {
1166 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1167 f.pad("Receiver { .. }")
1168 }
1169}
1170
1171impl<'a, T> IntoIterator for &'a Receiver<T> {
1172 type Item = T;
1173 type IntoIter = Iter<'a, T>;
1174
1175 fn into_iter(self) -> Self::IntoIter {
1176 self.iter()
1177 }
1178}
1179
1180impl<T> IntoIterator for Receiver<T> {
1181 type Item = T;
1182 type IntoIter = IntoIter<T>;
1183
1184 fn into_iter(self) -> Self::IntoIter {
1185 IntoIter { receiver: self }
1186 }
1187}
1188
1189/// A blocking iterator over messages in a channel.
1190///
1191/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1192/// channel becomes empty and disconnected, it returns [`None`] without blocking.
1193///
1194/// [`next`]: Iterator::next
1195///
1196/// # Examples
1197///
1198/// ```
1199/// use std::thread;
1200/// use crossbeam_channel::unbounded;
1201///
1202/// let (s, r) = unbounded();
1203///
1204/// thread::spawn(move || {
1205/// s.send(1).unwrap();
1206/// s.send(2).unwrap();
1207/// s.send(3).unwrap();
1208/// drop(s); // Disconnect the channel.
1209/// });
1210///
1211/// // Collect all messages from the channel.
1212/// // Note that the call to `collect` blocks until the sender is dropped.
1213/// let v: Vec<_> = r.iter().collect();
1214///
1215/// assert_eq!(v, [1, 2, 3]);
1216/// ```
1217pub struct Iter<'a, T> {
1218 receiver: &'a Receiver<T>,
1219}
1220
1221impl<T> FusedIterator for Iter<'_, T> {}
1222
1223impl<T> Iterator for Iter<'_, T> {
1224 type Item = T;
1225
1226 fn next(&mut self) -> Option<Self::Item> {
1227 self.receiver.recv().ok()
1228 }
1229}
1230
1231impl<T> fmt::Debug for Iter<'_, T> {
1232 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1233 f.pad("Iter { .. }")
1234 }
1235}
1236
1237/// A non-blocking iterator over messages in a channel.
1238///
1239/// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1240/// never blocks waiting for the next message.
1241///
1242/// [`next`]: Iterator::next
1243///
1244/// # Examples
1245///
1246/// ```
1247/// use std::thread;
1248/// use std::time::Duration;
1249/// use crossbeam_channel::unbounded;
1250///
1251/// let (s, r) = unbounded::<i32>();
1252///
1253/// thread::spawn(move || {
1254/// s.send(1).unwrap();
1255/// thread::sleep(Duration::from_secs(1));
1256/// s.send(2).unwrap();
1257/// thread::sleep(Duration::from_secs(2));
1258/// s.send(3).unwrap();
1259/// });
1260///
1261/// thread::sleep(Duration::from_secs(2));
1262///
1263/// // Collect all messages from the channel without blocking.
1264/// // The third message hasn't been sent yet so we'll collect only the first two.
1265/// let v: Vec<_> = r.try_iter().collect();
1266///
1267/// assert_eq!(v, [1, 2]);
1268/// ```
1269pub struct TryIter<'a, T> {
1270 receiver: &'a Receiver<T>,
1271}
1272
1273impl<T> Iterator for TryIter<'_, T> {
1274 type Item = T;
1275
1276 fn next(&mut self) -> Option<Self::Item> {
1277 self.receiver.try_recv().ok()
1278 }
1279}
1280
1281impl<T> fmt::Debug for TryIter<'_, T> {
1282 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1283 f.pad("TryIter { .. }")
1284 }
1285}
1286
1287/// A blocking iterator over messages in a channel.
1288///
1289/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1290/// channel becomes empty and disconnected, it returns [`None`] without blocking.
1291///
1292/// [`next`]: Iterator::next
1293///
1294/// # Examples
1295///
1296/// ```
1297/// use std::thread;
1298/// use crossbeam_channel::unbounded;
1299///
1300/// let (s, r) = unbounded();
1301///
1302/// thread::spawn(move || {
1303/// s.send(1).unwrap();
1304/// s.send(2).unwrap();
1305/// s.send(3).unwrap();
1306/// drop(s); // Disconnect the channel.
1307/// });
1308///
1309/// // Collect all messages from the channel.
1310/// // Note that the call to `collect` blocks until the sender is dropped.
1311/// let v: Vec<_> = r.into_iter().collect();
1312///
1313/// assert_eq!(v, [1, 2, 3]);
1314/// ```
1315pub struct IntoIter<T> {
1316 receiver: Receiver<T>,
1317}
1318
1319impl<T> FusedIterator for IntoIter<T> {}
1320
1321impl<T> Iterator for IntoIter<T> {
1322 type Item = T;
1323
1324 fn next(&mut self) -> Option<Self::Item> {
1325 self.receiver.recv().ok()
1326 }
1327}
1328
1329impl<T> fmt::Debug for IntoIter<T> {
1330 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1331 f.pad("IntoIter { .. }")
1332 }
1333}
1334
1335impl<T> SelectHandle for Sender<T> {
1336 fn try_select(&self, token: &mut Token) -> bool {
1337 match &self.flavor {
1338 SenderFlavor::Array(chan) => chan.sender().try_select(token),
1339 SenderFlavor::List(chan) => chan.sender().try_select(token),
1340 SenderFlavor::Zero(chan) => chan.sender().try_select(token),
1341 }
1342 }
1343
1344 fn deadline(&self) -> Option<Instant> {
1345 None
1346 }
1347
1348 fn register(&self, oper: Operation, cx: &Context) -> bool {
1349 match &self.flavor {
1350 SenderFlavor::Array(chan) => chan.sender().register(oper, cx),
1351 SenderFlavor::List(chan) => chan.sender().register(oper, cx),
1352 SenderFlavor::Zero(chan) => chan.sender().register(oper, cx),
1353 }
1354 }
1355
1356 fn unregister(&self, oper: Operation) {
1357 match &self.flavor {
1358 SenderFlavor::Array(chan) => chan.sender().unregister(oper),
1359 SenderFlavor::List(chan) => chan.sender().unregister(oper),
1360 SenderFlavor::Zero(chan) => chan.sender().unregister(oper),
1361 }
1362 }
1363
1364 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1365 match &self.flavor {
1366 SenderFlavor::Array(chan) => chan.sender().accept(token, cx),
1367 SenderFlavor::List(chan) => chan.sender().accept(token, cx),
1368 SenderFlavor::Zero(chan) => chan.sender().accept(token, cx),
1369 }
1370 }
1371
1372 fn is_ready(&self) -> bool {
1373 match &self.flavor {
1374 SenderFlavor::Array(chan) => chan.sender().is_ready(),
1375 SenderFlavor::List(chan) => chan.sender().is_ready(),
1376 SenderFlavor::Zero(chan) => chan.sender().is_ready(),
1377 }
1378 }
1379
1380 fn watch(&self, oper: Operation, cx: &Context) -> bool {
1381 match &self.flavor {
1382 SenderFlavor::Array(chan) => chan.sender().watch(oper, cx),
1383 SenderFlavor::List(chan) => chan.sender().watch(oper, cx),
1384 SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx),
1385 }
1386 }
1387
1388 fn unwatch(&self, oper: Operation) {
1389 match &self.flavor {
1390 SenderFlavor::Array(chan) => chan.sender().unwatch(oper),
1391 SenderFlavor::List(chan) => chan.sender().unwatch(oper),
1392 SenderFlavor::Zero(chan) => chan.sender().unwatch(oper),
1393 }
1394 }
1395}
1396
1397impl<T> SelectHandle for Receiver<T> {
1398 fn try_select(&self, token: &mut Token) -> bool {
1399 match &self.flavor {
1400 ReceiverFlavor::Array(chan) => chan.receiver().try_select(token),
1401 ReceiverFlavor::List(chan) => chan.receiver().try_select(token),
1402 ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token),
1403 ReceiverFlavor::At(chan) => chan.try_select(token),
1404 ReceiverFlavor::Tick(chan) => chan.try_select(token),
1405 ReceiverFlavor::Never(chan) => chan.try_select(token),
1406 }
1407 }
1408
1409 fn deadline(&self) -> Option<Instant> {
1410 match &self.flavor {
1411 ReceiverFlavor::Array(_) => None,
1412 ReceiverFlavor::List(_) => None,
1413 ReceiverFlavor::Zero(_) => None,
1414 ReceiverFlavor::At(chan) => chan.deadline(),
1415 ReceiverFlavor::Tick(chan) => chan.deadline(),
1416 ReceiverFlavor::Never(chan) => chan.deadline(),
1417 }
1418 }
1419
1420 fn register(&self, oper: Operation, cx: &Context) -> bool {
1421 match &self.flavor {
1422 ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx),
1423 ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx),
1424 ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx),
1425 ReceiverFlavor::At(chan) => chan.register(oper, cx),
1426 ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
1427 ReceiverFlavor::Never(chan) => chan.register(oper, cx),
1428 }
1429 }
1430
1431 fn unregister(&self, oper: Operation) {
1432 match &self.flavor {
1433 ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper),
1434 ReceiverFlavor::List(chan) => chan.receiver().unregister(oper),
1435 ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper),
1436 ReceiverFlavor::At(chan) => chan.unregister(oper),
1437 ReceiverFlavor::Tick(chan) => chan.unregister(oper),
1438 ReceiverFlavor::Never(chan) => chan.unregister(oper),
1439 }
1440 }
1441
1442 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1443 match &self.flavor {
1444 ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx),
1445 ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx),
1446 ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx),
1447 ReceiverFlavor::At(chan) => chan.accept(token, cx),
1448 ReceiverFlavor::Tick(chan) => chan.accept(token, cx),
1449 ReceiverFlavor::Never(chan) => chan.accept(token, cx),
1450 }
1451 }
1452
1453 fn is_ready(&self) -> bool {
1454 match &self.flavor {
1455 ReceiverFlavor::Array(chan) => chan.receiver().is_ready(),
1456 ReceiverFlavor::List(chan) => chan.receiver().is_ready(),
1457 ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(),
1458 ReceiverFlavor::At(chan) => chan.is_ready(),
1459 ReceiverFlavor::Tick(chan) => chan.is_ready(),
1460 ReceiverFlavor::Never(chan) => chan.is_ready(),
1461 }
1462 }
1463
1464 fn watch(&self, oper: Operation, cx: &Context) -> bool {
1465 match &self.flavor {
1466 ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx),
1467 ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx),
1468 ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx),
1469 ReceiverFlavor::At(chan) => chan.watch(oper, cx),
1470 ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
1471 ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
1472 }
1473 }
1474
1475 fn unwatch(&self, oper: Operation) {
1476 match &self.flavor {
1477 ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper),
1478 ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper),
1479 ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper),
1480 ReceiverFlavor::At(chan) => chan.unwatch(oper),
1481 ReceiverFlavor::Tick(chan) => chan.unwatch(oper),
1482 ReceiverFlavor::Never(chan) => chan.unwatch(oper),
1483 }
1484 }
1485}
1486
1487/// Writes a message into the channel.
1488pub unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
1489 match &s.flavor {
1490 SenderFlavor::Array(chan) => chan.write(token, msg),
1491 SenderFlavor::List(chan) => chan.write(token, msg),
1492 SenderFlavor::Zero(chan) => chan.write(token, msg),
1493 }
1494}
1495
1496/// Reads a message from the channel.
1497pub unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> {
1498 match &r.flavor {
1499 ReceiverFlavor::Array(chan) => chan.read(token),
1500 ReceiverFlavor::List(chan) => chan.read(token),
1501 ReceiverFlavor::Zero(chan) => chan.read(token),
1502 ReceiverFlavor::At(chan) => {
1503 mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1504 }
1505 ReceiverFlavor::Tick(chan) => {
1506 mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1507 }
1508 ReceiverFlavor::Never(chan) => chan.read(token),
1509 }
1510}