| use crate::FromEnvErrorInner; |
| use std::ffi::CString; |
| use std::io; |
| use std::process::Command; |
| use std::ptr; |
| use std::sync::Arc; |
| use std::thread::{Builder, JoinHandle}; |
| |
| #[derive(Debug)] |
| pub struct Client { |
| sem: Handle, |
| name: String, |
| } |
| |
| #[derive(Debug)] |
| pub struct Acquired; |
| |
| #[allow(clippy::upper_case_acronyms)] |
| type BOOL = i32; |
| #[allow(clippy::upper_case_acronyms)] |
| type DWORD = u32; |
| #[allow(clippy::upper_case_acronyms)] |
| type HANDLE = *mut u8; |
| #[allow(clippy::upper_case_acronyms)] |
| type LONG = i32; |
| |
| const ERROR_ALREADY_EXISTS: DWORD = 183; |
| const FALSE: BOOL = 0; |
| const INFINITE: DWORD = 0xffffffff; |
| const SEMAPHORE_MODIFY_STATE: DWORD = 0x2; |
| const SYNCHRONIZE: DWORD = 0x00100000; |
| const TRUE: BOOL = 1; |
| |
| const WAIT_ABANDONED: DWORD = 128u32; |
| const WAIT_FAILED: DWORD = 4294967295u32; |
| const WAIT_OBJECT_0: DWORD = 0u32; |
| const WAIT_TIMEOUT: DWORD = 258u32; |
| |
| extern "system" { |
| fn CloseHandle(handle: HANDLE) -> BOOL; |
| fn SetEvent(hEvent: HANDLE) -> BOOL; |
| fn WaitForMultipleObjects( |
| ncount: DWORD, |
| lpHandles: *const HANDLE, |
| bWaitAll: BOOL, |
| dwMilliseconds: DWORD, |
| ) -> DWORD; |
| fn CreateEventA( |
| lpEventAttributes: *mut u8, |
| bManualReset: BOOL, |
| bInitialState: BOOL, |
| lpName: *const i8, |
| ) -> HANDLE; |
| fn ReleaseSemaphore( |
| hSemaphore: HANDLE, |
| lReleaseCount: LONG, |
| lpPreviousCount: *mut LONG, |
| ) -> BOOL; |
| fn CreateSemaphoreA( |
| lpEventAttributes: *mut u8, |
| lInitialCount: LONG, |
| lMaximumCount: LONG, |
| lpName: *const i8, |
| ) -> HANDLE; |
| fn OpenSemaphoreA(dwDesiredAccess: DWORD, bInheritHandle: BOOL, lpName: *const i8) -> HANDLE; |
| fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD; |
| #[link_name = "SystemFunction036"] |
| fn RtlGenRandom(RandomBuffer: *mut u8, RandomBufferLength: u32) -> u8; |
| } |
| |
| // Note that we ideally would use the `getrandom` crate, but unfortunately |
| // that causes build issues when this crate is used in rust-lang/rust (see |
| // rust-lang/rust#65014 for more information). As a result we just inline |
| // the pretty simple Windows-specific implementation of generating |
| // randomness. |
| fn getrandom(dest: &mut [u8]) -> io::Result<()> { |
| // Prevent overflow of u32 |
| for chunk in dest.chunks_mut(u32::MAX as usize) { |
| let ret = unsafe { RtlGenRandom(chunk.as_mut_ptr(), chunk.len() as u32) }; |
| if ret == 0 { |
| return Err(io::Error::new( |
| io::ErrorKind::Other, |
| "failed to generate random bytes", |
| )); |
| } |
| } |
| Ok(()) |
| } |
| |
| impl Client { |
| pub fn new(limit: usize) -> io::Result<Client> { |
| // Try a bunch of random semaphore names until we get a unique one, |
| // but don't try for too long. |
| // |
| // Note that `limit == 0` is a valid argument above but Windows |
| // won't let us create a semaphore with 0 slots available to it. Get |
| // `limit == 0` working by creating a semaphore instead with one |
| // slot and then immediately acquire it (without ever releaseing it |
| // back). |
| for _ in 0..100 { |
| let mut bytes = [0; 4]; |
| getrandom(&mut bytes)?; |
| let mut name = format!("__rust_jobserver_semaphore_{}\0", u32::from_ne_bytes(bytes)); |
| unsafe { |
| let create_limit = if limit == 0 { 1 } else { limit }; |
| let r = CreateSemaphoreA( |
| ptr::null_mut(), |
| create_limit as LONG, |
| create_limit as LONG, |
| name.as_ptr() as *const _, |
| ); |
| if r.is_null() { |
| return Err(io::Error::last_os_error()); |
| } |
| let handle = Handle(r); |
| |
| let err = io::Error::last_os_error(); |
| if err.raw_os_error() == Some(ERROR_ALREADY_EXISTS as i32) { |
| continue; |
| } |
| name.pop(); // chop off the trailing nul |
| let client = Client { sem: handle, name }; |
| if create_limit != limit { |
| client.acquire()?; |
| } |
| return Ok(client); |
| } |
| } |
| |
| Err(io::Error::new( |
| io::ErrorKind::Other, |
| "failed to find a unique name for a semaphore", |
| )) |
| } |
| |
| pub(crate) unsafe fn open(s: &str, _check_pipe: bool) -> Result<Client, FromEnvErrorInner> { |
| let name = match CString::new(s) { |
| Ok(s) => s, |
| Err(e) => return Err(FromEnvErrorInner::CannotParse(e.to_string())), |
| }; |
| |
| let sem = OpenSemaphoreA(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, FALSE, name.as_ptr()); |
| if sem.is_null() { |
| Err(FromEnvErrorInner::CannotOpenPath( |
| s.to_string(), |
| io::Error::last_os_error(), |
| )) |
| } else { |
| Ok(Client { |
| sem: Handle(sem), |
| name: s.to_string(), |
| }) |
| } |
| } |
| |
| pub fn acquire(&self) -> io::Result<Acquired> { |
| unsafe { |
| let r = WaitForSingleObject(self.sem.0, INFINITE); |
| if r == WAIT_OBJECT_0 { |
| Ok(Acquired) |
| } else { |
| Err(io::Error::last_os_error()) |
| } |
| } |
| } |
| |
| pub fn try_acquire(&self) -> io::Result<Option<Acquired>> { |
| match unsafe { WaitForSingleObject(self.sem.0, 0) } { |
| WAIT_OBJECT_0 => Ok(Some(Acquired)), |
| WAIT_TIMEOUT => Ok(None), |
| WAIT_FAILED => Err(io::Error::last_os_error()), |
| // We believe this should be impossible for a semaphore, but still |
| // check the error code just in case it happens. |
| WAIT_ABANDONED => Err(io::Error::new( |
| io::ErrorKind::Other, |
| "Wait on jobserver semaphore returned WAIT_ABANDONED", |
| )), |
| _ => unreachable!("Unexpected return value from WaitForSingleObject"), |
| } |
| } |
| |
| pub fn release(&self, _data: Option<&Acquired>) -> io::Result<()> { |
| unsafe { |
| let r = ReleaseSemaphore(self.sem.0, 1, ptr::null_mut()); |
| if r != 0 { |
| Ok(()) |
| } else { |
| Err(io::Error::last_os_error()) |
| } |
| } |
| } |
| |
| pub fn string_arg(&self) -> String { |
| self.name.clone() |
| } |
| |
| pub fn available(&self) -> io::Result<usize> { |
| // Can't read value of a semaphore on Windows, so |
| // try to acquire without sleeping, since we can find out the |
| // old value on release. If acquisiton fails, then available is 0. |
| unsafe { |
| let r = WaitForSingleObject(self.sem.0, 0); |
| if r != WAIT_OBJECT_0 { |
| Ok(0) |
| } else { |
| let mut prev: LONG = 0; |
| let r = ReleaseSemaphore(self.sem.0, 1, &mut prev); |
| if r != 0 { |
| Ok(prev as usize + 1) |
| } else { |
| Err(io::Error::last_os_error()) |
| } |
| } |
| } |
| } |
| |
| pub fn configure(&self, _cmd: &mut Command) { |
| // nothing to do here, we gave the name of our semaphore to the |
| // child above |
| } |
| } |
| |
| #[derive(Debug)] |
| struct Handle(HANDLE); |
| // HANDLE is a raw ptr, but we're send/sync |
| unsafe impl Sync for Handle {} |
| unsafe impl Send for Handle {} |
| |
| impl Drop for Handle { |
| fn drop(&mut self) { |
| unsafe { |
| CloseHandle(self.0); |
| } |
| } |
| } |
| |
| #[derive(Debug)] |
| pub struct Helper { |
| event: Arc<Handle>, |
| thread: JoinHandle<()>, |
| } |
| |
| pub(crate) fn spawn_helper( |
| client: crate::Client, |
| state: Arc<super::HelperState>, |
| mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>, |
| ) -> io::Result<Helper> { |
| let event = unsafe { |
| let r = CreateEventA(ptr::null_mut(), TRUE, FALSE, ptr::null()); |
| if r.is_null() { |
| return Err(io::Error::last_os_error()); |
| } else { |
| Handle(r) |
| } |
| }; |
| let event = Arc::new(event); |
| let event2 = event.clone(); |
| let thread = Builder::new().spawn(move || { |
| let objects = [event2.0, client.inner.sem.0]; |
| state.for_each_request(|_| { |
| const WAIT_OBJECT_1: u32 = WAIT_OBJECT_0 + 1; |
| match unsafe { WaitForMultipleObjects(2, objects.as_ptr(), FALSE, INFINITE) } { |
| WAIT_OBJECT_0 => {} |
| WAIT_OBJECT_1 => f(Ok(crate::Acquired { |
| client: client.inner.clone(), |
| data: Acquired, |
| disabled: false, |
| })), |
| _ => f(Err(io::Error::last_os_error())), |
| } |
| }); |
| })?; |
| Ok(Helper { thread, event }) |
| } |
| |
| impl Helper { |
| pub fn join(self) { |
| // Unlike unix this logic is much easier. If our thread was blocked |
| // in waiting for requests it should already be woken up and |
| // exiting. Otherwise it's waiting for a token, so we wake it up |
| // with a different event that it's also waiting on here. After |
| // these two we should be guaranteed the thread is on its way out, |
| // so we can safely `join`. |
| let r = unsafe { SetEvent(self.event.0) }; |
| if r == 0 { |
| panic!("failed to set event: {}", io::Error::last_os_error()); |
| } |
| drop(self.thread.join()); |
| } |
| } |