| //! Tests copied from Go and manually rewritten in Rust. |
| //! |
| //! Source: |
| //! - https://github.com/golang/go |
| //! |
| //! Copyright & License: |
| //! - Copyright (c) 2009 The Go Authors |
| //! - https://golang.org/AUTHORS |
| //! - https://golang.org/LICENSE |
| //! - https://golang.org/PATENTS |
| |
| #![allow(clippy::redundant_clone)] |
| |
| use std::alloc::{GlobalAlloc, Layout, System}; |
| use std::any::Any; |
| use std::cell::Cell; |
| use std::collections::HashMap; |
| use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering::SeqCst}; |
| use std::sync::{Arc, Condvar, Mutex}; |
| use std::thread; |
| use std::time::Duration; |
| |
| use crossbeam_channel::{bounded, never, select, tick, unbounded, Receiver, Select, Sender}; |
| |
| fn ms(ms: u64) -> Duration { |
| Duration::from_millis(ms) |
| } |
| |
| struct Chan<T> { |
| inner: Arc<Mutex<ChanInner<T>>>, |
| } |
| |
| struct ChanInner<T> { |
| s: Option<Sender<T>>, |
| r: Option<Receiver<T>>, |
| // Receiver to use when r is None (Go blocks on receiving from nil) |
| nil_r: Receiver<T>, |
| // Sender to use when s is None (Go blocks on sending to nil) |
| nil_s: Sender<T>, |
| // Hold this receiver to prevent nil sender channel from disconnection |
| _nil_sr: Receiver<T>, |
| } |
| |
| impl<T> Clone for Chan<T> { |
| fn clone(&self) -> Chan<T> { |
| Chan { |
| inner: self.inner.clone(), |
| } |
| } |
| } |
| |
| impl<T> Chan<T> { |
| fn send(&self, msg: T) { |
| let s = self |
| .inner |
| .lock() |
| .unwrap() |
| .s |
| .as_ref() |
| .expect("sending into closed channel") |
| .clone(); |
| let _ = s.send(msg); |
| } |
| |
| fn try_recv(&self) -> Option<T> { |
| let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone(); |
| r.try_recv().ok() |
| } |
| |
| fn recv(&self) -> Option<T> { |
| let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone(); |
| r.recv().ok() |
| } |
| |
| fn close_s(&self) { |
| self.inner |
| .lock() |
| .unwrap() |
| .s |
| .take() |
| .expect("channel sender already closed"); |
| } |
| |
| fn close_r(&self) { |
| self.inner |
| .lock() |
| .unwrap() |
| .r |
| .take() |
| .expect("channel receiver already closed"); |
| } |
| |
| fn has_rx(&self) -> bool { |
| self.inner.lock().unwrap().r.is_some() |
| } |
| |
| fn has_tx(&self) -> bool { |
| self.inner.lock().unwrap().s.is_some() |
| } |
| |
| fn rx(&self) -> Receiver<T> { |
| let inner = self.inner.lock().unwrap(); |
| match inner.r.as_ref() { |
| None => inner.nil_r.clone(), |
| Some(r) => r.clone(), |
| } |
| } |
| |
| fn tx(&self) -> Sender<T> { |
| let inner = self.inner.lock().unwrap(); |
| match inner.s.as_ref() { |
| None => inner.nil_s.clone(), |
| Some(s) => s.clone(), |
| } |
| } |
| } |
| |
| impl<T> Iterator for Chan<T> { |
| type Item = T; |
| |
| fn next(&mut self) -> Option<Self::Item> { |
| self.recv() |
| } |
| } |
| |
| impl<'a, T> IntoIterator for &'a Chan<T> { |
| type Item = T; |
| type IntoIter = Chan<T>; |
| |
| fn into_iter(self) -> Self::IntoIter { |
| self.clone() |
| } |
| } |
| |
| fn make<T>(cap: usize) -> Chan<T> { |
| let (s, r) = bounded(cap); |
| let (nil_s, _nil_sr) = bounded(0); |
| Chan { |
| inner: Arc::new(Mutex::new(ChanInner { |
| s: Some(s), |
| r: Some(r), |
| nil_r: never(), |
| nil_s, |
| _nil_sr, |
| })), |
| } |
| } |
| |
| fn make_unbounded<T>() -> Chan<T> { |
| let (s, r) = unbounded(); |
| let (nil_s, _nil_sr) = bounded(0); |
| Chan { |
| inner: Arc::new(Mutex::new(ChanInner { |
| s: Some(s), |
| r: Some(r), |
| nil_r: never(), |
| nil_s, |
| _nil_sr, |
| })), |
| } |
| } |
| |
| #[derive(Clone)] |
| struct WaitGroup(Arc<WaitGroupInner>); |
| |
| struct WaitGroupInner { |
| cond: Condvar, |
| count: Mutex<i32>, |
| } |
| |
| impl WaitGroup { |
| fn new() -> WaitGroup { |
| WaitGroup(Arc::new(WaitGroupInner { |
| cond: Condvar::new(), |
| count: Mutex::new(0), |
| })) |
| } |
| |
| fn add(&self, delta: i32) { |
| let mut count = self.0.count.lock().unwrap(); |
| *count += delta; |
| assert!(*count >= 0); |
| self.0.cond.notify_all(); |
| } |
| |
| fn done(&self) { |
| self.add(-1); |
| } |
| |
| fn wait(&self) { |
| let mut count = self.0.count.lock().unwrap(); |
| while *count > 0 { |
| count = self.0.cond.wait(count).unwrap(); |
| } |
| } |
| } |
| |
| struct Defer<F: FnOnce()> { |
| f: Option<Box<F>>, |
| } |
| |
| impl<F: FnOnce()> Drop for Defer<F> { |
| fn drop(&mut self) { |
| let f = self.f.take().unwrap(); |
| let mut f = Some(f); |
| let mut f = move || f.take().unwrap()(); |
| f(); |
| } |
| } |
| |
| struct Counter; |
| |
| static ALLOCATED: AtomicUsize = AtomicUsize::new(0); |
| unsafe impl GlobalAlloc for Counter { |
| unsafe fn alloc(&self, layout: Layout) -> *mut u8 { |
| let ret = System.alloc(layout); |
| if !ret.is_null() { |
| ALLOCATED.fetch_add(layout.size(), SeqCst); |
| } |
| ret |
| } |
| |
| unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { |
| System.dealloc(ptr, layout); |
| ALLOCATED.fetch_sub(layout.size(), SeqCst); |
| } |
| } |
| |
| #[global_allocator] |
| static A: Counter = Counter; |
| |
| macro_rules! defer { |
| ($body:expr) => { |
| let _defer = Defer { |
| f: Some(Box::new(|| $body)), |
| }; |
| }; |
| } |
| |
| macro_rules! go { |
| (@parse $v:ident, $($tail:tt)*) => {{ |
| let $v = $v.clone(); |
| go!(@parse $($tail)*) |
| }}; |
| (@parse $body:expr) => { |
| ::std::thread::spawn(move || { |
| let res = ::std::panic::catch_unwind(::std::panic::AssertUnwindSafe(|| { |
| $body |
| })); |
| if res.is_err() { |
| eprintln!("goroutine panicked: {:?}", res); |
| ::std::process::abort(); |
| } |
| }) |
| }; |
| (@parse $($tail:tt)*) => { |
| compile_error!("invalid `go!` syntax") |
| }; |
| ($($tail:tt)*) => {{ |
| go!(@parse $($tail)*) |
| }}; |
| } |
| |
| // https://github.com/golang/go/blob/master/test/chan/doubleselect.go |
| mod doubleselect { |
| use super::*; |
| |
| #[cfg(miri)] |
| const ITERATIONS: i32 = 100; |
| #[cfg(not(miri))] |
| const ITERATIONS: i32 = 10_000; |
| |
| fn sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>) { |
| defer! { c1.close_s() } |
| defer! { c2.close_s() } |
| defer! { c3.close_s() } |
| defer! { c4.close_s() } |
| |
| for i in 0..n { |
| select! { |
| send(c1.tx(), i) -> _ => {} |
| send(c2.tx(), i) -> _ => {} |
| send(c3.tx(), i) -> _ => {} |
| send(c4.tx(), i) -> _ => {} |
| } |
| } |
| } |
| |
| fn mux(out: Chan<i32>, inp: Chan<i32>, done: Chan<bool>) { |
| for v in inp { |
| out.send(v); |
| } |
| done.send(true); |
| } |
| |
| fn recver(inp: Chan<i32>) { |
| let mut seen = HashMap::new(); |
| |
| for v in &inp { |
| if seen.contains_key(&v) { |
| panic!("got duplicate value for {}", v); |
| } |
| seen.insert(v, true); |
| } |
| } |
| |
| #[test] |
| fn main() { |
| let c1 = make::<i32>(0); |
| let c2 = make::<i32>(0); |
| let c3 = make::<i32>(0); |
| let c4 = make::<i32>(0); |
| let done = make::<bool>(0); |
| let cmux = make::<i32>(0); |
| |
| go!(c1, c2, c3, c4, sender(ITERATIONS, c1, c2, c3, c4)); |
| go!(cmux, c1, done, mux(cmux, c1, done)); |
| go!(cmux, c2, done, mux(cmux, c2, done)); |
| go!(cmux, c3, done, mux(cmux, c3, done)); |
| go!(cmux, c4, done, mux(cmux, c4, done)); |
| go!(done, cmux, { |
| done.recv(); |
| done.recv(); |
| done.recv(); |
| done.recv(); |
| cmux.close_s(); |
| }); |
| recver(cmux); |
| } |
| } |
| |
| // https://github.com/golang/go/blob/master/test/chan/fifo.go |
| mod fifo { |
| use super::*; |
| |
| const N: i32 = 10; |
| |
| #[test] |
| fn asynch_fifo() { |
| let ch = make::<i32>(N as usize); |
| for i in 0..N { |
| ch.send(i); |
| } |
| for i in 0..N { |
| if ch.recv() != Some(i) { |
| panic!("bad receive"); |
| } |
| } |
| } |
| |
| fn chain(ch: Chan<i32>, val: i32, inp: Chan<i32>, out: Chan<i32>) { |
| inp.recv(); |
| if ch.recv() != Some(val) { |
| panic!("{}", val); |
| } |
| out.send(1); |
| } |
| |
| #[test] |
| fn synch_fifo() { |
| let ch = make::<i32>(0); |
| let mut inp = make::<i32>(0); |
| let start = inp.clone(); |
| |
| for i in 0..N { |
| let out = make::<i32>(0); |
| go!(ch, i, inp, out, chain(ch, i, inp, out)); |
| inp = out; |
| } |
| |
| start.send(0); |
| for i in 0..N { |
| ch.send(i); |
| } |
| inp.recv(); |
| } |
| } |
| |
| // https://github.com/golang/go/blob/master/test/chan/goroutines.go |
| mod goroutines { |
| use super::*; |
| |
| fn f(left: Chan<i32>, right: Chan<i32>) { |
| left.send(right.recv().unwrap()); |
| } |
| |
| #[test] |
| fn main() { |
| let n = 100i32; |
| |
| let leftmost = make::<i32>(0); |
| let mut right = leftmost.clone(); |
| let mut left = leftmost.clone(); |
| |
| for _ in 0..n { |
| right = make::<i32>(0); |
| go!(left, right, f(left, right)); |
| left = right.clone(); |
| } |
| |
| go!(right, right.send(1)); |
| leftmost.recv().unwrap(); |
| } |
| } |
| |
| // https://github.com/golang/go/blob/master/test/chan/nonblock.go |
| mod nonblock { |
| use super::*; |
| |
| fn i32receiver(c: Chan<i32>, strobe: Chan<bool>) { |
| if c.recv().unwrap() != 123 { |
| panic!("i32 value"); |
| } |
| strobe.send(true); |
| } |
| |
| fn i32sender(c: Chan<i32>, strobe: Chan<bool>) { |
| c.send(234); |
| strobe.send(true); |
| } |
| |
| fn i64receiver(c: Chan<i64>, strobe: Chan<bool>) { |
| if c.recv().unwrap() != 123456 { |
| panic!("i64 value"); |
| } |
| strobe.send(true); |
| } |
| |
| fn i64sender(c: Chan<i64>, strobe: Chan<bool>) { |
| c.send(234567); |
| strobe.send(true); |
| } |
| |
| fn breceiver(c: Chan<bool>, strobe: Chan<bool>) { |
| if !c.recv().unwrap() { |
| panic!("b value"); |
| } |
| strobe.send(true); |
| } |
| |
| fn bsender(c: Chan<bool>, strobe: Chan<bool>) { |
| c.send(true); |
| strobe.send(true); |
| } |
| |
| fn sreceiver(c: Chan<String>, strobe: Chan<bool>) { |
| if c.recv().unwrap() != "hello" { |
| panic!("x value"); |
| } |
| strobe.send(true); |
| } |
| |
| fn ssender(c: Chan<String>, strobe: Chan<bool>) { |
| c.send("hello again".to_string()); |
| strobe.send(true); |
| } |
| |
| const MAX_TRIES: usize = 10000; // Up to 100ms per test. |
| |
| #[test] |
| fn main() { |
| let ticker = tick(Duration::new(0, 10_000)); // 10 us |
| let sleep = || { |
| ticker.recv().unwrap(); |
| ticker.recv().unwrap(); |
| thread::yield_now(); |
| thread::yield_now(); |
| thread::yield_now(); |
| }; |
| |
| let sync = make::<bool>(0); |
| |
| for buffer in 0..2 { |
| let c32 = make::<i32>(buffer); |
| let c64 = make::<i64>(buffer); |
| let cb = make::<bool>(buffer); |
| let cs = make::<String>(buffer); |
| |
| select! { |
| recv(c32.rx()) -> _ => panic!("blocked i32sender"), |
| default => {} |
| } |
| |
| select! { |
| recv(c64.rx()) -> _ => panic!("blocked i64sender"), |
| default => {} |
| } |
| |
| select! { |
| recv(cb.rx()) -> _ => panic!("blocked bsender"), |
| default => {} |
| } |
| |
| select! { |
| recv(cs.rx()) -> _ => panic!("blocked ssender"), |
| default => {} |
| } |
| |
| go!(c32, sync, i32receiver(c32, sync)); |
| let mut r#try = 0; |
| loop { |
| select! { |
| send(c32.tx(), 123) -> _ => break, |
| default => { |
| r#try += 1; |
| if r#try > MAX_TRIES { |
| println!("i32receiver buffer={}", buffer); |
| panic!("fail") |
| } |
| sleep(); |
| } |
| } |
| } |
| sync.recv(); |
| go!(c32, sync, i32sender(c32, sync)); |
| if buffer > 0 { |
| sync.recv(); |
| } |
| let mut r#try = 0; |
| loop { |
| select! { |
| recv(c32.rx()) -> v => { |
| if v != Ok(234) { |
| panic!("i32sender value"); |
| } |
| break; |
| } |
| default => { |
| r#try += 1; |
| if r#try > MAX_TRIES { |
| println!("i32sender buffer={}", buffer); |
| panic!("fail"); |
| } |
| sleep(); |
| } |
| } |
| } |
| if buffer == 0 { |
| sync.recv(); |
| } |
| |
| go!(c64, sync, i64receiver(c64, sync)); |
| let mut r#try = 0; |
| loop { |
| select! { |
| send(c64.tx(), 123456) -> _ => break, |
| default => { |
| r#try += 1; |
| if r#try > MAX_TRIES { |
| println!("i64receiver buffer={}", buffer); |
| panic!("fail") |
| } |
| sleep(); |
| } |
| } |
| } |
| sync.recv(); |
| go!(c64, sync, i64sender(c64, sync)); |
| if buffer > 0 { |
| sync.recv(); |
| } |
| let mut r#try = 0; |
| loop { |
| select! { |
| recv(c64.rx()) -> v => { |
| if v != Ok(234567) { |
| panic!("i64sender value"); |
| } |
| break; |
| } |
| default => { |
| r#try += 1; |
| if r#try > MAX_TRIES { |
| println!("i64sender buffer={}", buffer); |
| panic!("fail"); |
| } |
| sleep(); |
| } |
| } |
| } |
| if buffer == 0 { |
| sync.recv(); |
| } |
| |
| go!(cb, sync, breceiver(cb, sync)); |
| let mut r#try = 0; |
| loop { |
| select! { |
| send(cb.tx(), true) -> _ => break, |
| default => { |
| r#try += 1; |
| if r#try > MAX_TRIES { |
| println!("breceiver buffer={}", buffer); |
| panic!("fail") |
| } |
| sleep(); |
| } |
| } |
| } |
| sync.recv(); |
| go!(cb, sync, bsender(cb, sync)); |
| if buffer > 0 { |
| sync.recv(); |
| } |
| let mut r#try = 0; |
| loop { |
| select! { |
| recv(cb.rx()) -> v => { |
| if v != Ok(true) { |
| panic!("bsender value"); |
| } |
| break; |
| } |
| default => { |
| r#try += 1; |
| if r#try > MAX_TRIES { |
| println!("bsender buffer={}", buffer); |
| panic!("fail"); |
| } |
| sleep(); |
| } |
| } |
| } |
| if buffer == 0 { |
| sync.recv(); |
| } |
| |
| go!(cs, sync, sreceiver(cs, sync)); |
| let mut r#try = 0; |
| loop { |
| select! { |
| send(cs.tx(), "hello".to_string()) -> _ => break, |
| default => { |
| r#try += 1; |
| if r#try > MAX_TRIES { |
| println!("sreceiver buffer={}", buffer); |
| panic!("fail") |
| } |
| sleep(); |
| } |
| } |
| } |
| sync.recv(); |
| go!(cs, sync, ssender(cs, sync)); |
| if buffer > 0 { |
| sync.recv(); |
| } |
| let mut r#try = 0; |
| loop { |
| select! { |
| recv(cs.rx()) -> v => { |
| if v != Ok("hello again".to_string()) { |
| panic!("ssender value"); |
| } |
| break; |
| } |
| default => { |
| r#try += 1; |
| if r#try > MAX_TRIES { |
| println!("ssender buffer={}", buffer); |
| panic!("fail"); |
| } |
| sleep(); |
| } |
| } |
| } |
| if buffer == 0 { |
| sync.recv(); |
| } |
| } |
| } |
| } |
| |
| // https://github.com/golang/go/blob/master/test/chan/select.go |
| mod select { |
| use super::*; |
| |
| #[test] |
| fn main() { |
| let shift = Cell::new(0); |
| let counter = Cell::new(0); |
| |
| let get_value = || { |
| counter.set(counter.get() + 1); |
| 1 << shift.get() |
| }; |
| |
| let send = |mut a: Option<&Chan<u32>>, mut b: Option<&Chan<u32>>| { |
| let mut i = 0; |
| let never = make::<u32>(0); |
| loop { |
| let nil1 = never.tx(); |
| let nil2 = never.tx(); |
| let v1 = get_value(); |
| let v2 = get_value(); |
| select! { |
| send(a.map(|c| c.tx()).unwrap_or(nil1), v1) -> _ => { |
| i += 1; |
| a = None; |
| } |
| send(b.map(|c| c.tx()).unwrap_or(nil2), v2) -> _ => { |
| i += 1; |
| b = None; |
| } |
| default => break, |
| } |
| shift.set(shift.get() + 1); |
| } |
| i |
| }; |
| |
| let a = make::<u32>(1); |
| let b = make::<u32>(1); |
| |
| assert_eq!(send(Some(&a), Some(&b)), 2); |
| |
| let av = a.recv().unwrap(); |
| let bv = b.recv().unwrap(); |
| assert_eq!(av | bv, 3); |
| |
| assert_eq!(send(Some(&a), None), 1); |
| assert_eq!(counter.get(), 10); |
| } |
| } |
| |
| // https://github.com/golang/go/blob/master/test/chan/select2.go |
| mod select2 { |
| use super::*; |
| |
| #[cfg(miri)] |
| const N: i32 = 200; |
| #[cfg(not(miri))] |
| const N: i32 = 100000; |
| |
| #[test] |
| fn main() { |
| fn sender(c: &Chan<i32>, n: i32) { |
| for _ in 0..n { |
| c.send(1); |
| } |
| } |
| |
| fn receiver(c: &Chan<i32>, dummy: &Chan<i32>, n: i32) { |
| for _ in 0..n { |
| select! { |
| recv(c.rx()) -> _ => {} |
| recv(dummy.rx()) -> _ => { |
| panic!("dummy"); |
| } |
| } |
| } |
| } |
| |
| let c = make_unbounded::<i32>(); |
| let dummy = make_unbounded::<i32>(); |
| |
| ALLOCATED.store(0, SeqCst); |
| |
| go!(c, sender(&c, N)); |
| receiver(&c, &dummy, N); |
| |
| let alloc = ALLOCATED.load(SeqCst); |
| |
| go!(c, sender(&c, N)); |
| receiver(&c, &dummy, N); |
| |
| assert!( |
| !(ALLOCATED.load(SeqCst) > alloc |
| && (ALLOCATED.load(SeqCst) - alloc) > (N as usize + 10000)) |
| ) |
| } |
| } |
| |
| // https://github.com/golang/go/blob/master/test/chan/select3.go |
| mod select3 { |
| // TODO |
| } |
| |
| // https://github.com/golang/go/blob/master/test/chan/select4.go |
| mod select4 { |
| use super::*; |
| |
| #[test] |
| fn main() { |
| let c = make::<i32>(1); |
| let c1 = make::<i32>(0); |
| c.send(42); |
| select! { |
| recv(c1.rx()) -> _ => panic!("BUG"), |
| recv(c.rx()) -> v => assert_eq!(v, Ok(42)), |
| } |
| } |
| } |
| |
| // https://github.com/golang/go/blob/master/test/chan/select6.go |
| mod select6 { |
| use super::*; |
| |
| #[test] |
| fn main() { |
| let c1 = make::<bool>(0); |
| let c2 = make::<bool>(0); |
| let c3 = make::<bool>(0); |
| |
| go!(c1, c1.recv()); |
| go!(c1, c2, c3, { |
| select! { |
| recv(c1.rx()) -> _ => panic!("dummy"), |
| recv(c2.rx()) -> _ => c3.send(true), |
| } |
| c1.recv(); |
| }); |
| go!(c2, c2.send(true)); |
| |
| c3.recv(); |
| c1.send(true); |
| c1.send(true); |
| } |
| } |
| |
| // https://github.com/golang/go/blob/master/test/chan/select7.go |
| mod select7 { |
| use super::*; |
| |
| fn recv1(c: Chan<i32>) { |
| c.recv().unwrap(); |
| } |
| |
| fn recv2(c: Chan<i32>) { |
| select! { |
| recv(c.rx()) -> _ => () |
| } |
| } |
| |
| fn recv3(c: Chan<i32>) { |
| let c2 = make::<i32>(1); |
| select! { |
| recv(c.rx()) -> _ => (), |
| recv(c2.rx()) -> _ => () |
| } |
| } |
| |
| fn send1(recv: fn(Chan<i32>)) { |
| let c = make::<i32>(1); |
| go!(c, recv(c)); |
| thread::yield_now(); |
| c.send(1); |
| } |
| |
| fn send2(recv: fn(Chan<i32>)) { |
| let c = make::<i32>(1); |
| go!(c, recv(c)); |
| thread::yield_now(); |
| select! { |
| send(c.tx(), 1) -> _ => () |
| } |
| } |
| |
| fn send3(recv: fn(Chan<i32>)) { |
| let c = make::<i32>(1); |
| go!(c, recv(c)); |
| thread::yield_now(); |
| let c2 = make::<i32>(1); |
| select! { |
| send(c.tx(), 1) -> _ => (), |
| send(c2.tx(), 1) -> _ => () |
| } |
| } |
| |
| #[test] |
| fn main() { |
| send1(recv1); |
| send2(recv1); |
| send3(recv1); |
| send1(recv2); |
| send2(recv2); |
| send3(recv2); |
| send1(recv3); |
| send2(recv3); |
| send3(recv3); |
| } |
| } |
| |
| // https://github.com/golang/go/blob/master/test/chan/sieve1.go |
| mod sieve1 { |
| use super::*; |
| |
| fn generate(ch: Chan<i32>) { |
| let mut i = 2; |
| loop { |
| ch.send(i); |
| i += 1; |
| } |
| } |
| |
| fn filter(in_ch: Chan<i32>, out_ch: Chan<i32>, prime: i32) { |
| for i in in_ch { |
| if i % prime != 0 { |
| out_ch.send(i); |
| } |
| } |
| } |
| |
| fn sieve(primes: Chan<i32>) { |
| let mut ch = make::<i32>(1); |
| go!(ch, generate(ch)); |
| loop { |
| let prime = ch.recv().unwrap(); |
| primes.send(prime); |
| |
| let ch1 = make::<i32>(1); |
| go!(ch, ch1, prime, filter(ch, ch1, prime)); |
| ch = ch1; |
| } |
| } |
| |
| #[test] |
| fn main() { |
| let primes = make::<i32>(1); |
| go!(primes, sieve(primes)); |
| |
| let a = [ |
| 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, |
| 89, 97, |
| ]; |
| #[cfg(miri)] |
| let a = &a[..10]; |
| |
| for item in a.iter() { |
| let x = primes.recv().unwrap(); |
| if x != *item { |
| println!("{} != {}", x, item); |
| panic!("fail"); |
| } |
| } |
| } |
| } |
| |
| // https://github.com/golang/go/blob/master/test/chan/zerosize.go |
| mod zerosize { |
| use super::*; |
| |
| #[test] |
| fn zero_size_struct() { |
| struct ZeroSize; |
| let _ = make::<ZeroSize>(0); |
| } |
| |
| #[test] |
| fn zero_size_array() { |
| let _ = make::<[u8; 0]>(0); |
| } |
| } |
| |
| // https://github.com/golang/go/blob/master/src/runtime/chan_test.go |
| mod chan_test { |
| use super::*; |
| |
| #[test] |
| fn test_chan() { |
| #[cfg(miri)] |
| const N: i32 = 12; |
| #[cfg(not(miri))] |
| const N: i32 = 200; |
| |
| #[cfg(miri)] |
| const MESSAGES_COUNT: i32 = 20; |
| #[cfg(not(miri))] |
| const MESSAGES_COUNT: i32 = 100; |
| |
| for cap in 0..N { |
| { |
| // Ensure that receive from empty chan blocks. |
| let c = make::<i32>(cap as usize); |
| |
| let recv1 = Arc::new(Mutex::new(false)); |
| go!(c, recv1, { |
| c.recv(); |
| *recv1.lock().unwrap() = true; |
| }); |
| |
| let recv2 = Arc::new(Mutex::new(false)); |
| go!(c, recv2, { |
| c.recv(); |
| *recv2.lock().unwrap() = true; |
| }); |
| |
| thread::sleep(ms(1)); |
| |
| if *recv1.lock().unwrap() || *recv2.lock().unwrap() { |
| panic!(); |
| } |
| |
| // Ensure that non-blocking receive does not block. |
| select! { |
| recv(c.rx()) -> _ => panic!(), |
| default => {} |
| } |
| select! { |
| recv(c.rx()) -> _ => panic!(), |
| default => {} |
| } |
| |
| c.send(0); |
| c.send(0); |
| } |
| |
| { |
| // Ensure that send to full chan blocks. |
| let c = make::<i32>(cap as usize); |
| for i in 0..cap { |
| c.send(i); |
| } |
| |
| let sent = Arc::new(Mutex::new(0)); |
| go!(sent, c, { |
| c.send(0); |
| *sent.lock().unwrap() = 1; |
| }); |
| |
| thread::sleep(ms(1)); |
| |
| if *sent.lock().unwrap() != 0 { |
| panic!(); |
| } |
| |
| // Ensure that non-blocking send does not block. |
| select! { |
| send(c.tx(), 0) -> _ => panic!(), |
| default => {} |
| } |
| c.recv(); |
| } |
| |
| { |
| // Ensure that we receive 0 from closed chan. |
| let c = make::<i32>(cap as usize); |
| for i in 0..cap { |
| c.send(i); |
| } |
| c.close_s(); |
| |
| for i in 0..cap { |
| let v = c.recv(); |
| if v != Some(i) { |
| panic!(); |
| } |
| } |
| |
| if c.recv() != None { |
| panic!(); |
| } |
| if c.try_recv() != None { |
| panic!(); |
| } |
| } |
| |
| { |
| // Ensure that close unblocks receive. |
| let c = make::<i32>(cap as usize); |
| let done = make::<bool>(0); |
| |
| go!(c, done, { |
| let v = c.try_recv(); |
| done.send(v.is_none()); |
| }); |
| |
| thread::sleep(ms(1)); |
| c.close_s(); |
| |
| if !done.recv().unwrap() { |
| panic!(); |
| } |
| } |
| |
| { |
| // Send many integers, |
| // ensure that we receive them non-corrupted in FIFO order. |
| let c = make::<i32>(cap as usize); |
| go!(c, { |
| for i in 0..MESSAGES_COUNT { |
| c.send(i); |
| } |
| }); |
| for i in 0..MESSAGES_COUNT { |
| if c.recv() != Some(i) { |
| panic!(); |
| } |
| } |
| |
| // Same, but using recv2. |
| go!(c, { |
| for i in 0..MESSAGES_COUNT { |
| c.send(i); |
| } |
| }); |
| for i in 0..MESSAGES_COUNT { |
| if c.recv() != Some(i) { |
| panic!(); |
| } |
| } |
| } |
| } |
| } |
| |
| #[test] |
| fn test_nonblock_recv_race() { |
| #[cfg(miri)] |
| const N: usize = 100; |
| #[cfg(not(miri))] |
| const N: usize = 1000; |
| |
| for _ in 0..N { |
| let c = make::<i32>(1); |
| c.send(1); |
| |
| let t = go!(c, { |
| select! { |
| recv(c.rx()) -> _ => {} |
| default => panic!("chan is not ready"), |
| } |
| }); |
| |
| c.close_s(); |
| c.recv(); |
| t.join().unwrap(); |
| } |
| } |
| |
| #[test] |
| fn test_nonblock_select_race() { |
| #[cfg(miri)] |
| const N: usize = 100; |
| #[cfg(not(miri))] |
| const N: usize = 1000; |
| |
| let done = make::<bool>(1); |
| for _ in 0..N { |
| let c1 = make::<i32>(1); |
| let c2 = make::<i32>(1); |
| c1.send(1); |
| |
| go!(c1, c2, done, { |
| select! { |
| recv(c1.rx()) -> _ => {} |
| recv(c2.rx()) -> _ => {} |
| default => { |
| done.send(false); |
| return; |
| } |
| } |
| done.send(true); |
| }); |
| |
| c2.send(1); |
| select! { |
| recv(c1.rx()) -> _ => {} |
| default => {} |
| } |
| if !done.recv().unwrap() { |
| panic!("no chan is ready"); |
| } |
| } |
| } |
| |
| #[test] |
| fn test_nonblock_select_race2() { |
| #[cfg(miri)] |
| const N: usize = 100; |
| #[cfg(not(miri))] |
| const N: usize = 1000; |
| |
| let done = make::<bool>(1); |
| for _ in 0..N { |
| let c1 = make::<i32>(1); |
| let c2 = make::<i32>(0); |
| c1.send(1); |
| |
| go!(c1, c2, done, { |
| select! { |
| recv(c1.rx()) -> _ => {} |
| recv(c2.rx()) -> _ => {} |
| default => { |
| done.send(false); |
| return; |
| } |
| } |
| done.send(true); |
| }); |
| |
| c2.close_s(); |
| select! { |
| recv(c1.rx()) -> _ => {} |
| default => {} |
| } |
| if !done.recv().unwrap() { |
| panic!("no chan is ready"); |
| } |
| } |
| } |
| |
| #[test] |
| fn test_self_select() { |
| // Ensure that send/recv on the same chan in select |
| // does not crash nor deadlock. |
| |
| #[cfg(miri)] |
| const N: usize = 100; |
| #[cfg(not(miri))] |
| const N: usize = 1000; |
| |
| for &cap in &[0, 10] { |
| let wg = WaitGroup::new(); |
| wg.add(2); |
| let c = make::<i32>(cap); |
| |
| for p in 0..2 { |
| let p = p; |
| go!(wg, p, c, { |
| defer! { wg.done() } |
| for i in 0..N { |
| if p == 0 || i % 2 == 0 { |
| select! { |
| send(c.tx(), p) -> _ => {} |
| recv(c.rx()) -> v => { |
| if cap == 0 && v.ok() == Some(p) { |
| panic!("self receive"); |
| } |
| } |
| } |
| } else { |
| select! { |
| recv(c.rx()) -> v => { |
| if cap == 0 && v.ok() == Some(p) { |
| panic!("self receive"); |
| } |
| } |
| send(c.tx(), p) -> _ => {} |
| } |
| } |
| } |
| }); |
| } |
| wg.wait(); |
| } |
| } |
| |
| #[test] |
| fn test_select_stress() { |
| #[cfg(miri)] |
| const N: usize = 100; |
| #[cfg(not(miri))] |
| const N: usize = 10000; |
| |
| let c = vec![ |
| make::<i32>(0), |
| make::<i32>(0), |
| make::<i32>(2), |
| make::<i32>(3), |
| ]; |
| |
| // There are 4 goroutines that send N values on each of the chans, |
| // + 4 goroutines that receive N values on each of the chans, |
| // + 1 goroutine that sends N values on each of the chans in a single select, |
| // + 1 goroutine that receives N values on each of the chans in a single select. |
| // All these sends, receives and selects interact chaotically at runtime, |
| // but we are careful that this whole construct does not deadlock. |
| let wg = WaitGroup::new(); |
| wg.add(10); |
| |
| for k in 0..4 { |
| go!(k, c, wg, { |
| for _ in 0..N { |
| c[k].send(0); |
| } |
| wg.done(); |
| }); |
| go!(k, c, wg, { |
| for _ in 0..N { |
| c[k].recv(); |
| } |
| wg.done(); |
| }); |
| } |
| |
| go!(c, wg, { |
| let mut n = [0; 4]; |
| let mut c1 = c.iter().map(|c| Some(c.rx().clone())).collect::<Vec<_>>(); |
| |
| for _ in 0..4 * N { |
| let index = { |
| let mut sel = Select::new(); |
| let mut opers = [!0; 4]; |
| for &i in &[3, 2, 0, 1] { |
| if let Some(c) = &c1[i] { |
| opers[i] = sel.recv(c); |
| } |
| } |
| |
| let oper = sel.select(); |
| let mut index = !0; |
| for i in 0..4 { |
| if opers[i] == oper.index() { |
| index = i; |
| let _ = oper.recv(c1[i].as_ref().unwrap()); |
| break; |
| } |
| } |
| index |
| }; |
| |
| n[index] += 1; |
| if n[index] == N { |
| c1[index] = None; |
| } |
| } |
| wg.done(); |
| }); |
| |
| go!(c, wg, { |
| let mut n = [0; 4]; |
| let mut c1 = c.iter().map(|c| Some(c.tx().clone())).collect::<Vec<_>>(); |
| |
| for _ in 0..4 * N { |
| let index = { |
| let mut sel = Select::new(); |
| let mut opers = [!0; 4]; |
| for &i in &[0, 1, 2, 3] { |
| if let Some(c) = &c1[i] { |
| opers[i] = sel.send(c); |
| } |
| } |
| |
| let oper = sel.select(); |
| let mut index = !0; |
| for i in 0..4 { |
| if opers[i] == oper.index() { |
| index = i; |
| let _ = oper.send(c1[i].as_ref().unwrap(), 0); |
| break; |
| } |
| } |
| index |
| }; |
| |
| n[index] += 1; |
| if n[index] == N { |
| c1[index] = None; |
| } |
| } |
| wg.done(); |
| }); |
| |
| wg.wait(); |
| } |
| |
| #[test] |
| fn test_select_fairness() { |
| #[cfg(miri)] |
| const TRIALS: usize = 100; |
| #[cfg(not(miri))] |
| const TRIALS: usize = 10000; |
| |
| let c1 = make::<u8>(TRIALS + 1); |
| let c2 = make::<u8>(TRIALS + 1); |
| |
| for _ in 0..TRIALS + 1 { |
| c1.send(1); |
| c2.send(2); |
| } |
| |
| let c3 = make::<u8>(0); |
| let c4 = make::<u8>(0); |
| let out = make::<u8>(0); |
| let done = make::<u8>(0); |
| let wg = WaitGroup::new(); |
| |
| wg.add(1); |
| go!(wg, c1, c2, c3, c4, out, done, { |
| defer! { wg.done() }; |
| loop { |
| let b; |
| select! { |
| recv(c3.rx()) -> m => b = m.unwrap(), |
| recv(c4.rx()) -> m => b = m.unwrap(), |
| recv(c1.rx()) -> m => b = m.unwrap(), |
| recv(c2.rx()) -> m => b = m.unwrap(), |
| } |
| select! { |
| send(out.tx(), b) -> _ => {} |
| recv(done.rx()) -> _ => return, |
| } |
| } |
| }); |
| |
| let (mut cnt1, mut cnt2) = (0, 0); |
| for _ in 0..TRIALS { |
| match out.recv() { |
| Some(1) => cnt1 += 1, |
| Some(2) => cnt2 += 1, |
| b => panic!("unexpected value {:?} on channel", b), |
| } |
| } |
| |
| // If the select in the goroutine is fair, |
| // cnt1 and cnt2 should be about the same value. |
| // With 10,000 trials, the expected margin of error at |
| // a confidence level of five nines is 4.4172 / (2 * Sqrt(10000)). |
| |
| let r = cnt1 as f64 / TRIALS as f64; |
| let e = (r - 0.5).abs(); |
| |
| if e > 4.4172 / (2.0 * (TRIALS as f64).sqrt()) { |
| panic!( |
| "unfair select: in {} trials, results were {}, {}", |
| TRIALS, cnt1, cnt2, |
| ); |
| } |
| |
| done.close_s(); |
| wg.wait(); |
| } |
| |
| #[test] |
| fn test_chan_send_interface() { |
| struct Mt; |
| |
| let c = make::<Box<dyn Any>>(1); |
| c.send(Box::new(Mt)); |
| |
| select! { |
| send(c.tx(), Box::new(Mt)) -> _ => {} |
| default => {} |
| } |
| |
| select! { |
| send(c.tx(), Box::new(Mt)) -> _ => {} |
| send(c.tx(), Box::new(Mt)) -> _ => {} |
| default => {} |
| } |
| } |
| |
| #[test] |
| fn test_pseudo_random_send() { |
| #[cfg(miri)] |
| const N: usize = 20; |
| #[cfg(not(miri))] |
| const N: usize = 100; |
| |
| for cap in 0..N { |
| let c = make::<i32>(cap); |
| let l = Arc::new(Mutex::new(vec![0i32; N])); |
| let done = make::<bool>(0); |
| |
| go!(c, done, l, { |
| let mut l = l.lock().unwrap(); |
| for i in 0..N { |
| thread::yield_now(); |
| l[i] = c.recv().unwrap(); |
| } |
| done.send(true); |
| }); |
| |
| for _ in 0..N { |
| select! { |
| send(c.tx(), 1) -> _ => {} |
| send(c.tx(), 0) -> _ => {} |
| } |
| } |
| done.recv(); |
| |
| let mut n0 = 0; |
| let mut n1 = 0; |
| for &i in l.lock().unwrap().iter() { |
| n0 += (i + 1) % 2; |
| n1 += i; |
| } |
| |
| if n0 <= N as i32 / 10 || n1 <= N as i32 / 10 { |
| panic!( |
| "Want pseudorandom, got {} zeros and {} ones (chan cap {})", |
| n0, n1, cap, |
| ); |
| } |
| } |
| } |
| |
| #[test] |
| fn test_multi_consumer() { |
| const NWORK: usize = 23; |
| #[cfg(miri)] |
| const NITER: usize = 50; |
| #[cfg(not(miri))] |
| const NITER: usize = 271828; |
| |
| let pn = [2, 3, 7, 11, 13, 17, 19, 23, 27, 31]; |
| |
| let q = make::<i32>(NWORK * 3); |
| let r = make::<i32>(NWORK * 3); |
| |
| let wg = WaitGroup::new(); |
| for i in 0..NWORK { |
| wg.add(1); |
| let w = i; |
| go!(q, r, wg, pn, { |
| for v in &q { |
| if pn[w % pn.len()] == v { |
| thread::yield_now(); |
| } |
| r.send(v); |
| } |
| wg.done(); |
| }); |
| } |
| |
| let expect = Arc::new(Mutex::new(0)); |
| go!(q, r, expect, wg, pn, { |
| for i in 0..NITER { |
| let v = pn[i % pn.len()]; |
| *expect.lock().unwrap() += v; |
| q.send(v); |
| } |
| q.close_s(); |
| wg.wait(); |
| r.close_s(); |
| }); |
| |
| let mut n = 0; |
| let mut s = 0; |
| for v in &r { |
| n += 1; |
| s += v; |
| } |
| |
| if n != NITER || s != *expect.lock().unwrap() { |
| panic!(); |
| } |
| } |
| |
| #[test] |
| fn test_select_duplicate_channel() { |
| // This test makes sure we can queue a G on |
| // the same channel multiple times. |
| let c = make::<i32>(0); |
| let d = make::<i32>(0); |
| let e = make::<i32>(0); |
| |
| go!(c, d, e, { |
| select! { |
| recv(c.rx()) -> _ => {} |
| recv(d.rx()) -> _ => {} |
| recv(e.rx()) -> _ => {} |
| } |
| e.send(9); |
| }); |
| thread::sleep(ms(1)); |
| |
| go!(c, c.recv()); |
| thread::sleep(ms(1)); |
| |
| d.send(7); |
| e.recv(); |
| c.send(8); |
| } |
| } |
| |
| // https://github.com/golang/go/blob/master/test/closedchan.go |
| mod closedchan { |
| // TODO |
| } |
| |
| // https://github.com/golang/go/blob/master/src/runtime/chanbarrier_test.go |
| mod chanbarrier_test { |
| // TODO |
| } |
| |
| // https://github.com/golang/go/blob/master/src/runtime/race/testdata/chan_test.go |
| mod race_chan_test { |
| // TODO |
| } |
| |
| // https://github.com/golang/go/blob/master/test/ken/chan.go |
| mod chan { |
| use super::*; |
| |
| const MESSAGES_PER_CHANEL: u32 = 76; |
| const MESSAGES_RANGE_LEN: u32 = 100; |
| const END: i32 = 10000; |
| |
| struct ChanWithVals { |
| chan: Chan<i32>, |
| /// Next value to send |
| sv: Arc<AtomicI32>, |
| /// Next value to receive |
| rv: Arc<AtomicI32>, |
| } |
| |
| struct Totals { |
| /// Total sent messages |
| tots: u32, |
| /// Total received messages |
| totr: u32, |
| } |
| |
| struct Context { |
| nproc: Arc<Mutex<i32>>, |
| cval: Arc<Mutex<i32>>, |
| tot: Arc<Mutex<Totals>>, |
| nc: ChanWithVals, |
| randx: Arc<Mutex<i32>>, |
| } |
| |
| impl ChanWithVals { |
| fn with_capacity(capacity: usize) -> Self { |
| ChanWithVals { |
| chan: make(capacity), |
| sv: Arc::new(AtomicI32::new(0)), |
| rv: Arc::new(AtomicI32::new(0)), |
| } |
| } |
| |
| fn closed() -> Self { |
| let ch = ChanWithVals::with_capacity(0); |
| ch.chan.close_r(); |
| ch.chan.close_s(); |
| ch |
| } |
| |
| fn rv(&self) -> i32 { |
| self.rv.load(SeqCst) |
| } |
| |
| fn sv(&self) -> i32 { |
| self.sv.load(SeqCst) |
| } |
| |
| fn send(&mut self, tot: &Mutex<Totals>) -> bool { |
| { |
| let mut tot = tot.lock().unwrap(); |
| tot.tots += 1 |
| } |
| let esv = expect(self.sv(), self.sv()); |
| self.sv.store(esv, SeqCst); |
| if self.sv() == END { |
| self.chan.close_s(); |
| return true; |
| } |
| false |
| } |
| |
| fn recv(&mut self, v: i32, tot: &Mutex<Totals>) -> bool { |
| { |
| let mut tot = tot.lock().unwrap(); |
| tot.totr += 1 |
| } |
| let erv = expect(self.rv(), v); |
| self.rv.store(erv, SeqCst); |
| if self.rv() == END { |
| self.chan.close_r(); |
| return true; |
| } |
| false |
| } |
| } |
| |
| impl Clone for ChanWithVals { |
| fn clone(&self) -> Self { |
| ChanWithVals { |
| chan: self.chan.clone(), |
| sv: self.sv.clone(), |
| rv: self.rv.clone(), |
| } |
| } |
| } |
| |
| impl Context { |
| fn nproc(&self) -> &Mutex<i32> { |
| self.nproc.as_ref() |
| } |
| |
| fn cval(&self) -> &Mutex<i32> { |
| self.cval.as_ref() |
| } |
| |
| fn tot(&self) -> &Mutex<Totals> { |
| self.tot.as_ref() |
| } |
| |
| fn randx(&self) -> &Mutex<i32> { |
| self.randx.as_ref() |
| } |
| } |
| |
| impl Clone for Context { |
| fn clone(&self) -> Self { |
| Context { |
| nproc: self.nproc.clone(), |
| cval: self.cval.clone(), |
| tot: self.tot.clone(), |
| nc: self.nc.clone(), |
| randx: self.randx.clone(), |
| } |
| } |
| } |
| |
| fn nrand(n: i32, randx: &Mutex<i32>) -> i32 { |
| let mut randx = randx.lock().unwrap(); |
| *randx += 10007; |
| if *randx >= 1000000 { |
| *randx -= 1000000 |
| } |
| *randx % n |
| } |
| |
| fn change_nproc(adjust: i32, nproc: &Mutex<i32>) -> i32 { |
| let mut nproc = nproc.lock().unwrap(); |
| *nproc += adjust; |
| *nproc |
| } |
| |
| fn mkchan(c: usize, n: usize, cval: &Mutex<i32>) -> Vec<ChanWithVals> { |
| let mut ca = Vec::<ChanWithVals>::with_capacity(n); |
| let mut cval = cval.lock().unwrap(); |
| for _ in 0..n { |
| *cval += MESSAGES_RANGE_LEN as i32; |
| let chl = ChanWithVals::with_capacity(c); |
| chl.sv.store(*cval, SeqCst); |
| chl.rv.store(*cval, SeqCst); |
| ca.push(chl); |
| } |
| ca |
| } |
| |
| fn expect(v: i32, v0: i32) -> i32 { |
| if v == v0 { |
| return if v % MESSAGES_RANGE_LEN as i32 == MESSAGES_PER_CHANEL as i32 - 1 { |
| END |
| } else { |
| v + 1 |
| }; |
| } |
| panic!("got {}, expected {}", v, v0 + 1); |
| } |
| |
| fn send(mut c: ChanWithVals, ctx: Context) { |
| loop { |
| for _ in 0..=nrand(10, ctx.randx()) { |
| thread::yield_now(); |
| } |
| c.chan.tx().send(c.sv()).unwrap(); |
| if c.send(ctx.tot()) { |
| break; |
| } |
| } |
| change_nproc(-1, ctx.nproc()); |
| } |
| |
| fn recv(mut c: ChanWithVals, ctx: Context) { |
| loop { |
| for _ in (0..nrand(10, ctx.randx())).rev() { |
| thread::yield_now(); |
| } |
| let v = c.chan.rx().recv().unwrap(); |
| if c.recv(v, ctx.tot()) { |
| break; |
| } |
| } |
| change_nproc(-1, ctx.nproc()); |
| } |
| |
| #[allow(clippy::too_many_arguments)] |
| fn sel( |
| mut r0: ChanWithVals, |
| mut r1: ChanWithVals, |
| mut r2: ChanWithVals, |
| mut r3: ChanWithVals, |
| mut s0: ChanWithVals, |
| mut s1: ChanWithVals, |
| mut s2: ChanWithVals, |
| mut s3: ChanWithVals, |
| ctx: Context, |
| ) { |
| let mut a = 0; // local chans running |
| |
| if r0.chan.has_rx() { |
| a += 1; |
| } |
| if r1.chan.has_rx() { |
| a += 1; |
| } |
| if r2.chan.has_rx() { |
| a += 1; |
| } |
| if r3.chan.has_rx() { |
| a += 1; |
| } |
| if s0.chan.has_tx() { |
| a += 1; |
| } |
| if s1.chan.has_tx() { |
| a += 1; |
| } |
| if s2.chan.has_tx() { |
| a += 1; |
| } |
| if s3.chan.has_tx() { |
| a += 1; |
| } |
| |
| loop { |
| for _ in 0..=nrand(5, ctx.randx()) { |
| thread::yield_now(); |
| } |
| select! { |
| recv(r0.chan.rx()) -> v => if r0.recv(v.unwrap(), ctx.tot()) { a -= 1 }, |
| recv(r1.chan.rx()) -> v => if r1.recv(v.unwrap(), ctx.tot()) { a -= 1 }, |
| recv(r2.chan.rx()) -> v => if r2.recv(v.unwrap(), ctx.tot()) { a -= 1 }, |
| recv(r3.chan.rx()) -> v => if r3.recv(v.unwrap(), ctx.tot()) { a -= 1 }, |
| send(s0.chan.tx(), s0.sv()) -> _ => if s0.send(ctx.tot()) { a -= 1 }, |
| send(s1.chan.tx(), s1.sv()) -> _ => if s1.send(ctx.tot()) { a -= 1 }, |
| send(s2.chan.tx(), s2.sv()) -> _ => if s2.send(ctx.tot()) { a -= 1 }, |
| send(s3.chan.tx(), s3.sv()) -> _ => if s3.send(ctx.tot()) { a -= 1 }, |
| } |
| if a == 0 { |
| break; |
| } |
| } |
| change_nproc(-1, ctx.nproc()); |
| } |
| |
| fn get(vec: &[ChanWithVals], idx: usize) -> ChanWithVals { |
| vec.get(idx).unwrap().clone() |
| } |
| |
| /// Direct send to direct recv |
| fn test1(c: ChanWithVals, ctx: &mut Context) { |
| change_nproc(2, ctx.nproc()); |
| go!(c, ctx, send(c, ctx)); |
| go!(c, ctx, recv(c, ctx)); |
| } |
| |
| /// Direct send to select recv |
| fn test2(c: usize, ctx: &mut Context) { |
| let ca = mkchan(c, 4, ctx.cval()); |
| |
| change_nproc(4, ctx.nproc()); |
| go!(ca, ctx, send(get(&ca, 0), ctx)); |
| go!(ca, ctx, send(get(&ca, 1), ctx)); |
| go!(ca, ctx, send(get(&ca, 2), ctx)); |
| go!(ca, ctx, send(get(&ca, 3), ctx)); |
| |
| change_nproc(1, ctx.nproc()); |
| go!( |
| ca, |
| ctx, |
| sel( |
| get(&ca, 0), |
| get(&ca, 1), |
| get(&ca, 2), |
| get(&ca, 3), |
| ctx.nc.clone(), |
| ctx.nc.clone(), |
| ctx.nc.clone(), |
| ctx.nc.clone(), |
| ctx, |
| ) |
| ); |
| } |
| |
| /// Select send to direct recv |
| fn test3(c: usize, ctx: &mut Context) { |
| let ca = mkchan(c, 4, ctx.cval()); |
| |
| change_nproc(4, ctx.nproc()); |
| go!(ca, ctx, recv(get(&ca, 0), ctx)); |
| go!(ca, ctx, recv(get(&ca, 1), ctx)); |
| go!(ca, ctx, recv(get(&ca, 2), ctx)); |
| go!(ca, ctx, recv(get(&ca, 3), ctx)); |
| |
| change_nproc(1, ctx.nproc()); |
| go!( |
| ca, |
| ctx, |
| sel( |
| ctx.nc.clone(), |
| ctx.nc.clone(), |
| ctx.nc.clone(), |
| ctx.nc.clone(), |
| get(&ca, 0), |
| get(&ca, 1), |
| get(&ca, 2), |
| get(&ca, 3), |
| ctx, |
| ) |
| ); |
| } |
| |
| /// Select send to select recv, 4 channels |
| fn test4(c: usize, ctx: &mut Context) { |
| let ca = mkchan(c, 4, ctx.cval()); |
| |
| change_nproc(2, ctx.nproc()); |
| go!( |
| ca, |
| ctx, |
| sel( |
| ctx.nc.clone(), |
| ctx.nc.clone(), |
| ctx.nc.clone(), |
| ctx.nc.clone(), |
| get(&ca, 0), |
| get(&ca, 1), |
| get(&ca, 2), |
| get(&ca, 3), |
| ctx, |
| ) |
| ); |
| go!( |
| ca, |
| ctx, |
| sel( |
| get(&ca, 0), |
| get(&ca, 1), |
| get(&ca, 2), |
| get(&ca, 3), |
| ctx.nc.clone(), |
| ctx.nc.clone(), |
| ctx.nc.clone(), |
| ctx.nc.clone(), |
| ctx, |
| ) |
| ); |
| } |
| |
| /// Select send to select recv, 8 channels |
| fn test5(c: usize, ctx: &mut Context) { |
| let ca = mkchan(c, 8, ctx.cval()); |
| |
| change_nproc(2, ctx.nproc()); |
| go!( |
| ca, |
| ctx, |
| sel( |
| get(&ca, 4), |
| get(&ca, 5), |
| get(&ca, 6), |
| get(&ca, 7), |
| get(&ca, 0), |
| get(&ca, 1), |
| get(&ca, 2), |
| get(&ca, 3), |
| ctx, |
| ) |
| ); |
| go!( |
| ca, |
| ctx, |
| sel( |
| get(&ca, 0), |
| get(&ca, 1), |
| get(&ca, 2), |
| get(&ca, 3), |
| get(&ca, 4), |
| get(&ca, 5), |
| get(&ca, 6), |
| get(&ca, 7), |
| ctx, |
| ) |
| ); |
| } |
| |
| // Direct and select send to direct and select recv |
| fn test6(c: usize, ctx: &mut Context) { |
| let ca = mkchan(c, 12, ctx.cval()); |
| |
| change_nproc(4, ctx.nproc()); |
| go!(ca, ctx, send(get(&ca, 4), ctx)); |
| go!(ca, ctx, send(get(&ca, 5), ctx)); |
| go!(ca, ctx, send(get(&ca, 6), ctx)); |
| go!(ca, ctx, send(get(&ca, 7), ctx)); |
| |
| change_nproc(4, ctx.nproc()); |
| go!(ca, ctx, recv(get(&ca, 8), ctx)); |
| go!(ca, ctx, recv(get(&ca, 9), ctx)); |
| go!(ca, ctx, recv(get(&ca, 10), ctx)); |
| go!(ca, ctx, recv(get(&ca, 11), ctx)); |
| |
| change_nproc(2, ctx.nproc()); |
| go!( |
| ca, |
| ctx, |
| sel( |
| get(&ca, 4), |
| get(&ca, 5), |
| get(&ca, 6), |
| get(&ca, 7), |
| get(&ca, 0), |
| get(&ca, 1), |
| get(&ca, 2), |
| get(&ca, 3), |
| ctx, |
| ) |
| ); |
| go!( |
| ca, |
| ctx, |
| sel( |
| get(&ca, 0), |
| get(&ca, 1), |
| get(&ca, 2), |
| get(&ca, 3), |
| get(&ca, 8), |
| get(&ca, 9), |
| get(&ca, 10), |
| get(&ca, 11), |
| ctx, |
| ) |
| ); |
| } |
| |
| fn wait(ctx: &mut Context) { |
| thread::yield_now(); |
| while change_nproc(0, ctx.nproc()) != 0 { |
| thread::yield_now(); |
| } |
| } |
| |
| fn tests(c: usize, ctx: &mut Context) { |
| let ca = mkchan(c, 4, ctx.cval()); |
| test1(get(&ca, 0), ctx); |
| test1(get(&ca, 1), ctx); |
| test1(get(&ca, 2), ctx); |
| test1(get(&ca, 3), ctx); |
| wait(ctx); |
| |
| test2(c, ctx); |
| wait(ctx); |
| |
| test3(c, ctx); |
| wait(ctx); |
| |
| test4(c, ctx); |
| wait(ctx); |
| |
| test5(c, ctx); |
| wait(ctx); |
| |
| test6(c, ctx); |
| wait(ctx); |
| } |
| |
| #[test] |
| #[cfg_attr(miri, ignore)] // Miri is too slow |
| fn main() { |
| let mut ctx = Context { |
| nproc: Arc::new(Mutex::new(0)), |
| cval: Arc::new(Mutex::new(0)), |
| tot: Arc::new(Mutex::new(Totals { tots: 0, totr: 0 })), |
| nc: ChanWithVals::closed(), |
| randx: Arc::new(Mutex::new(0)), |
| }; |
| |
| tests(0, &mut ctx); |
| tests(1, &mut ctx); |
| tests(10, &mut ctx); |
| tests(100, &mut ctx); |
| |
| #[rustfmt::skip] |
| let t = 4 * // buffer sizes |
| (4*4 + // tests 1,2,3,4 channels |
| 8 + // test 5 channels |
| 12) * // test 6 channels |
| MESSAGES_PER_CHANEL; // sends/recvs on a channel |
| |
| let tot = ctx.tot.lock().unwrap(); |
| if tot.tots != t || tot.totr != t { |
| panic!("tots={} totr={} sb={}", tot.tots, tot.totr, t); |
| } |
| } |
| } |
| |
| // https://github.com/golang/go/blob/master/test/ken/chan1.go |
| mod chan1 { |
| use super::*; |
| |
| // sent messages |
| #[cfg(miri)] |
| const N: usize = 20; |
| #[cfg(not(miri))] |
| const N: usize = 1000; |
| // receiving "goroutines" |
| const M: usize = 10; |
| // channel buffering |
| const W: usize = 2; |
| |
| fn r(c: Chan<usize>, m: usize, h: Arc<Mutex<[usize; N]>>) { |
| loop { |
| select! { |
| recv(c.rx()) -> rr => { |
| let r = rr.unwrap(); |
| let mut data = h.lock().unwrap(); |
| if data[r] != 1 { |
| println!("r\nm={}\nr={}\nh={}\n", m, r, data[r]); |
| panic!("fail") |
| } |
| data[r] = 2; |
| } |
| } |
| } |
| } |
| |
| fn s(c: Chan<usize>, h: Arc<Mutex<[usize; N]>>) { |
| for n in 0..N { |
| let r = n; |
| let mut data = h.lock().unwrap(); |
| if data[r] != 0 { |
| println!("s"); |
| panic!("fail"); |
| } |
| data[r] = 1; |
| // https://github.com/crossbeam-rs/crossbeam/pull/615#discussion_r550281094 |
| drop(data); |
| c.send(r); |
| } |
| } |
| |
| #[test] |
| fn main() { |
| let h = Arc::new(Mutex::new([0usize; N])); |
| let c = make::<usize>(W); |
| for m in 0..M { |
| go!(c, h, { |
| r(c, m, h); |
| }); |
| thread::yield_now(); |
| } |
| thread::yield_now(); |
| thread::yield_now(); |
| s(c, h); |
| } |
| } |