blob: 908d1842a14c64854fda781c6fe0dfb890e42949 [file] [log] [blame] [edit]
use std::ops::Deref;
use std::ptr;
use std::sync::mpsc::{SendError, RecvError, TryRecvError};
use std::sync::atomic::Ordering;
use loom::sync::{Arc, Mutex, CausalCell, Condvar};
use loom::sync::atomic::{AtomicPtr, AtomicBool, AtomicUsize};
use loom::thread;
/// Create a new SPMC channel.
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
let a = Arc::new(Inner::new());
(Sender::new(a.clone()), Receiver::new(a))
}
/// The Sending side of a SPMC channel.
pub struct Sender<T: Send> {
inner: Arc<Inner<T>>,
}
unsafe impl<T: Send> Send for Sender<T> {}
impl<T: Send> Sender<T> {
fn new(inner: Arc<Inner<T>>) -> Sender<T> {
Sender { inner: inner }
}
/// Send a message to the receivers.
///
/// Returns a SendError if there are no more receivers listening.
pub fn send(&mut self, t: T) -> Result<(), SendError<T>> {
if self.inner.is_disconnected.load(Ordering::SeqCst) {
Err(SendError(t))
} else {
unsafe {
// Only safe from a single thread...
//
// But we have `&mut self`, so we're good!
self.inner.queue.push(t);
}
if self.inner.num_sleeping.load(Ordering::SeqCst) > 0 {
*self.inner.sleeping_guard.lock().unwrap() = true;
self.inner.sleeping_condvar.notify_one();
}
Ok(())
}
}
}
impl<T: Send> Drop for Sender<T> {
fn drop(&mut self) {
self.inner.is_disconnected.store(true, Ordering::SeqCst);
if self.inner.num_sleeping.load(Ordering::SeqCst) > 0 {
*self.inner.sleeping_guard.lock().unwrap() = true;
self.inner.sleeping_condvar.notify_all();
}
}
}
/// The receiving side of a SPMC channel.
///
/// There may be many of these, and the Receiver itself is Sync, so it can be
/// placed in an Arc, or cloned itself.
pub struct Receiver<T: Send> {
inner: Arc<RecvInner<T>>,
}
unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Sync for Receiver<T> {}
impl<T: Send> Clone for Receiver<T> {
fn clone(&self) -> Receiver<T> {
Receiver { inner: self.inner.clone() }
}
}
impl<T: Send> Receiver<T> {
fn new(inner: Arc<Inner<T>>) -> Receiver<T> {
Receiver { inner: Arc::new(RecvInner { inner: inner }) }
}
/// Try to receive a message, without blocking.
pub fn try_recv(&self) -> Result<T, TryRecvError> {
match self.inner.queue.pop() {
Some(t) => Ok(t),
None => {
if self.inner.is_disconnected.load(Ordering::SeqCst) {
// Check that it didn't fill in a message inbetween
// trying to pop and us checking is_disconnected
match self.inner.queue.pop() {
Some(t) => Ok(t),
None => Err(TryRecvError::Disconnected),
}
} else {
Err(TryRecvError::Empty)
}
}
}
}
/// Receive a message from the channel.
///
/// If no message is available, this will block the current thread until a
/// message is sent.
pub fn recv(&self) -> Result<T, RecvError> {
match self.try_recv() {
Ok(t) => return Ok(t),
Err(TryRecvError::Disconnected) => return Err(RecvError),
Err(TryRecvError::Empty) => {},
}
let ret;
let mut guard = self.inner.sleeping_guard.lock().unwrap();
self.inner.num_sleeping.fetch_add(1, Ordering::SeqCst);
loop {
match self.try_recv() {
Ok(t) => {
ret = Ok(t);
break;
},
Err(TryRecvError::Disconnected) => {
ret = Err(RecvError);
break;
},
Err(TryRecvError::Empty) => {}
}
guard = self.inner.sleeping_condvar.wait(guard).unwrap();
}
self.inner.num_sleeping.fetch_sub(1, Ordering::SeqCst);
ret
}
}
struct Inner<T: Send> {
queue: Queue<T>,
is_disconnected: AtomicBool,
// ohai there. this is all just to allow the blocking functionality
// of recv(). The existance of this mutex is only because the condvar
// needs one. A lock is not used elsewhere, its still a lock-free queue.
sleeping_guard: Mutex<bool>,
sleeping_condvar: Condvar,
num_sleeping: AtomicUsize,
}
impl<T: Send> Inner<T> {
fn new() -> Inner<T> {
Inner {
queue: Queue::new(),
is_disconnected: AtomicBool::new(false),
sleeping_guard: Mutex::new(false),
sleeping_condvar: Condvar::new(),
num_sleeping: AtomicUsize::new(0),
}
}
}
struct RecvInner<T: Send> {
inner: Arc<Inner<T>>,
}
impl<T: Send> Deref for RecvInner<T> {
type Target = Arc<Inner<T>>;
fn deref(&self) -> &Arc<Inner<T>> {
&self.inner
}
}
impl<T: Send> Drop for RecvInner<T> {
fn drop(&mut self) {
self.inner.is_disconnected.store(true, Ordering::SeqCst);
}
}
pub(super) struct Queue<T: Send> {
head: CausalCell<*mut Node<T>>,
tail: AtomicPtr<Node<T>>,
}
impl<T: Send> Queue<T> {
pub(super) fn new() -> Queue<T> {
let stub = Node::new(None);
Queue {
head: CausalCell::new(stub),
tail: AtomicPtr::new(stub),
}
}
// Not safe to call from multiple threads.
pub(super) unsafe fn push(&self, t: T) {
let end = Node::new(None);
let node = self.head.with_mut(|p| {
::std::mem::replace(&mut *p, end)
});
(*node).value = Some(t);
(*node).next.store(end, Ordering::SeqCst);
}
pub(super) fn pop(&self) -> Option<T> {
unsafe {
let mut tail = ptr::null_mut();
loop {
tail = self.tail.swap(tail, Ordering::SeqCst);
if tail.is_null() {
thread::yield_now();
continue;
} else {
break;
}
}
let mut node = Box::from_raw(tail);
let next = node.next.load(Ordering::SeqCst);
if !next.is_null() {
self.tail.store(next, Ordering::SeqCst);
return node.value.take();
} else {
self.tail.store(Box::into_raw(node), Ordering::SeqCst);
return None;
}
}
}
}
impl<T: Send> Drop for Queue<T> {
fn drop(&mut self) {
unsafe {
let head = self.tail.swap(ptr::null_mut(), Ordering::SeqCst);
if head != ptr::null_mut() {
let mut node = Box::from_raw(head);
loop {
let next = node.next.load(Ordering::SeqCst);
if !next.is_null() {
node = Box::from_raw(next);
} else {
break;
}
}
}
}
}
}
struct Node<T> {
value: Option<T>,
next: AtomicPtr<Node<T>>,
}
impl<T> Node<T> {
fn new(v: Option<T>) -> *mut Node<T> {
let b = Box::new(Node {
value: v,
next: AtomicPtr::new(ptr::null_mut()),
});
Box::into_raw(b)
}
}