| //! Thread-local context used in select. |
| |
| use std::cell::Cell; |
| use std::ptr; |
| use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; |
| use std::sync::Arc; |
| use std::thread::{self, Thread, ThreadId}; |
| use std::time::Instant; |
| |
| use crossbeam_utils::Backoff; |
| |
| use crate::select::Selected; |
| |
| /// Thread-local context used in select. |
| // This is a private API that is used by the select macro. |
| #[derive(Debug, Clone)] |
| pub struct Context { |
| inner: Arc<Inner>, |
| } |
| |
| /// Inner representation of `Context`. |
| #[derive(Debug)] |
| struct Inner { |
| /// Selected operation. |
| select: AtomicUsize, |
| |
| /// A slot into which another thread may store a pointer to its `Packet`. |
| packet: AtomicPtr<()>, |
| |
| /// Thread handle. |
| thread: Thread, |
| |
| /// Thread id. |
| thread_id: ThreadId, |
| } |
| |
| impl Context { |
| /// Creates a new context for the duration of the closure. |
| #[inline] |
| pub fn with<F, R>(f: F) -> R |
| where |
| F: FnOnce(&Context) -> R, |
| { |
| std::thread_local! { |
| /// Cached thread-local context. |
| static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new())); |
| } |
| |
| let mut f = Some(f); |
| let mut f = |cx: &Context| -> R { |
| let f = f.take().unwrap(); |
| f(cx) |
| }; |
| |
| CONTEXT |
| .try_with(|cell| match cell.take() { |
| None => f(&Context::new()), |
| Some(cx) => { |
| cx.reset(); |
| let res = f(&cx); |
| cell.set(Some(cx)); |
| res |
| } |
| }) |
| .unwrap_or_else(|_| f(&Context::new())) |
| } |
| |
| /// Creates a new `Context`. |
| #[cold] |
| fn new() -> Context { |
| Context { |
| inner: Arc::new(Inner { |
| select: AtomicUsize::new(Selected::Waiting.into()), |
| packet: AtomicPtr::new(ptr::null_mut()), |
| thread: thread::current(), |
| thread_id: thread::current().id(), |
| }), |
| } |
| } |
| |
| /// Resets `select` and `packet`. |
| #[inline] |
| fn reset(&self) { |
| self.inner |
| .select |
| .store(Selected::Waiting.into(), Ordering::Release); |
| self.inner.packet.store(ptr::null_mut(), Ordering::Release); |
| } |
| |
| /// Attempts to select an operation. |
| /// |
| /// On failure, the previously selected operation is returned. |
| #[inline] |
| pub fn try_select(&self, select: Selected) -> Result<(), Selected> { |
| self.inner |
| .select |
| .compare_exchange( |
| Selected::Waiting.into(), |
| select.into(), |
| Ordering::AcqRel, |
| Ordering::Acquire, |
| ) |
| .map(|_| ()) |
| .map_err(|e| e.into()) |
| } |
| |
| /// Returns the selected operation. |
| #[inline] |
| pub fn selected(&self) -> Selected { |
| Selected::from(self.inner.select.load(Ordering::Acquire)) |
| } |
| |
| /// Stores a packet. |
| /// |
| /// This method must be called after `try_select` succeeds and there is a packet to provide. |
| #[inline] |
| pub fn store_packet(&self, packet: *mut ()) { |
| if !packet.is_null() { |
| self.inner.packet.store(packet, Ordering::Release); |
| } |
| } |
| |
| /// Waits until a packet is provided and returns it. |
| #[inline] |
| pub fn wait_packet(&self) -> *mut () { |
| let backoff = Backoff::new(); |
| loop { |
| let packet = self.inner.packet.load(Ordering::Acquire); |
| if !packet.is_null() { |
| return packet; |
| } |
| backoff.snooze(); |
| } |
| } |
| |
| /// Waits until an operation is selected and returns it. |
| /// |
| /// If the deadline is reached, `Selected::Aborted` will be selected. |
| #[inline] |
| pub fn wait_until(&self, deadline: Option<Instant>) -> Selected { |
| // Spin for a short time, waiting until an operation is selected. |
| let backoff = Backoff::new(); |
| loop { |
| let sel = Selected::from(self.inner.select.load(Ordering::Acquire)); |
| if sel != Selected::Waiting { |
| return sel; |
| } |
| |
| if backoff.is_completed() { |
| break; |
| } else { |
| backoff.snooze(); |
| } |
| } |
| |
| loop { |
| // Check whether an operation has been selected. |
| let sel = Selected::from(self.inner.select.load(Ordering::Acquire)); |
| if sel != Selected::Waiting { |
| return sel; |
| } |
| |
| // If there's a deadline, park the current thread until the deadline is reached. |
| if let Some(end) = deadline { |
| let now = Instant::now(); |
| |
| if now < end { |
| thread::park_timeout(end - now); |
| } else { |
| // The deadline has been reached. Try aborting select. |
| return match self.try_select(Selected::Aborted) { |
| Ok(()) => Selected::Aborted, |
| Err(s) => s, |
| }; |
| } |
| } else { |
| thread::park(); |
| } |
| } |
| } |
| |
| /// Unparks the thread this context belongs to. |
| #[inline] |
| pub fn unpark(&self) { |
| self.inner.thread.unpark(); |
| } |
| |
| /// Returns the id of the thread this context belongs to. |
| #[inline] |
| pub fn thread_id(&self) -> ThreadId { |
| self.inner.thread_id |
| } |
| } |