diff options
Diffstat (limited to 'tests/golang.rs')
-rw-r--r-- | tests/golang.rs | 623 |
1 files changed, 579 insertions, 44 deletions
diff --git a/tests/golang.rs b/tests/golang.rs index 05d67f6..6a46c03 100644 --- a/tests/golang.rs +++ b/tests/golang.rs @@ -15,12 +15,12 @@ use std::alloc::{GlobalAlloc, Layout, System}; use std::any::Any; use std::cell::Cell; use std::collections::HashMap; -use std::sync::atomic::{AtomicUsize, Ordering::SeqCst}; +use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering::SeqCst}; use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::Duration; -use crossbeam_channel::{bounded, select, tick, unbounded, Receiver, Select, Sender}; +use crossbeam_channel::{bounded, never, select, tick, unbounded, Receiver, Select, Sender}; fn ms(ms: u64) -> Duration { Duration::from_millis(ms) @@ -32,7 +32,13 @@ struct Chan<T> { struct ChanInner<T> { s: Option<Sender<T>>, - r: Receiver<T>, + r: Option<Receiver<T>>, + // Receiver to use when r is None (Go blocks on receiving from nil) + nil_r: Receiver<T>, + // Sender to use when s is None (Go blocks on sending to nil) + nil_s: Sender<T>, + // Hold this receiver to prevent nil sender channel from disconnection + _nil_sr: Receiver<T>, } impl<T> Clone for Chan<T> { @@ -57,35 +63,53 @@ impl<T> Chan<T> { } fn try_recv(&self) -> Option<T> { - let r = self.inner.lock().unwrap().r.clone(); + let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone(); r.try_recv().ok() } fn recv(&self) -> Option<T> { - let r = self.inner.lock().unwrap().r.clone(); + let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone(); r.recv().ok() } - fn close(&self) { + fn close_s(&self) { self.inner .lock() .unwrap() .s .take() - .expect("channel already closed"); + .expect("channel sender already closed"); + } + + fn close_r(&self) { + self.inner + .lock() + .unwrap() + .r + .take() + .expect("channel receiver already closed"); + } + + fn has_rx(&self) -> bool { + self.inner.lock().unwrap().r.is_some() + } + + fn has_tx(&self) -> bool { + self.inner.lock().unwrap().s.is_some() } fn rx(&self) -> Receiver<T> { - self.inner.lock().unwrap().r.clone() + let inner = self.inner.lock().unwrap(); + match inner.r.as_ref() { + None => inner.nil_r.clone(), + Some(r) => r.clone(), + } } fn tx(&self) -> Sender<T> { - match self.inner.lock().unwrap().s.as_ref() { - None => { - let (s, r) = bounded(0); - std::mem::forget(r); - s - } + let inner = self.inner.lock().unwrap(); + match inner.s.as_ref() { + None => inner.nil_s.clone(), Some(s) => s.clone(), } } @@ -110,17 +134,32 @@ impl<'a, T> IntoIterator for &'a Chan<T> { fn make<T>(cap: usize) -> Chan<T> { let (s, r) = bounded(cap); + let (nil_s, _nil_sr) = bounded(0); Chan { - inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })), + inner: Arc::new(Mutex::new(ChanInner { + s: Some(s), + r: Some(r), + nil_r: never(), + nil_s, + _nil_sr, + })), } } fn make_unbounded<T>() -> Chan<T> { let (s, r) = unbounded(); + let (nil_s, _nil_sr) = bounded(0); Chan { - inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })), + inner: Arc::new(Mutex::new(ChanInner { + s: Some(s), + r: Some(r), + nil_r: never(), + nil_s, + _nil_sr, + })), } } + #[derive(Clone)] struct WaitGroup(Arc<WaitGroupInner>); @@ -199,14 +238,6 @@ macro_rules! defer { } macro_rules! go { - (@parse ref $v:ident, $($tail:tt)*) => {{ - let ref $v = $v; - go!(@parse $($tail)*) - }}; - (@parse move $v:ident, $($tail:tt)*) => {{ - let $v = $v; - go!(@parse $($tail)*) - }}; (@parse $v:ident, $($tail:tt)*) => {{ let $v = $v.clone(); go!(@parse $($tail)*) @@ -240,10 +271,10 @@ mod doubleselect { const ITERATIONS: i32 = 10_000; fn sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>) { - defer! { c1.close() } - defer! { c2.close() } - defer! { c3.close() } - defer! { c4.close() } + defer! { c1.close_s() } + defer! { c2.close_s() } + defer! { c3.close_s() } + defer! { c4.close_s() } for i in 0..n { select! { @@ -292,7 +323,7 @@ mod doubleselect { done.recv(); done.recv(); done.recv(); - cmux.close(); + cmux.close_s(); }); recver(cmux); } @@ -697,7 +728,7 @@ mod select2 { use super::*; #[cfg(miri)] - const N: i32 = 1000; + const N: i32 = 200; #[cfg(not(miri))] const N: i32 = 100000; @@ -892,6 +923,9 @@ mod sieve1 { 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, ]; + #[cfg(miri)] + let a = &a[..10]; + for item in a.iter() { let x = primes.recv().unwrap(); if x != *item { @@ -929,6 +963,11 @@ mod chan_test { #[cfg(not(miri))] const N: i32 = 200; + #[cfg(miri)] + const MESSAGES_COUNT: i32 = 20; + #[cfg(not(miri))] + const MESSAGES_COUNT: i32 = 100; + for cap in 0..N { { // Ensure that receive from empty chan blocks. @@ -999,7 +1038,7 @@ mod chan_test { for i in 0..cap { c.send(i); } - c.close(); + c.close_s(); for i in 0..cap { let v = c.recv(); @@ -1027,7 +1066,7 @@ mod chan_test { }); thread::sleep(ms(1)); - c.close(); + c.close_s(); if !done.recv().unwrap() { panic!(); @@ -1035,15 +1074,15 @@ mod chan_test { } { - // Send 100 integers, + // Send many integers, // ensure that we receive them non-corrupted in FIFO order. let c = make::<i32>(cap as usize); go!(c, { - for i in 0..100 { + for i in 0..MESSAGES_COUNT { c.send(i); } }); - for i in 0..100 { + for i in 0..MESSAGES_COUNT { if c.recv() != Some(i) { panic!(); } @@ -1051,11 +1090,11 @@ mod chan_test { // Same, but using recv2. go!(c, { - for i in 0..100 { + for i in 0..MESSAGES_COUNT { c.send(i); } }); - for i in 0..100 { + for i in 0..MESSAGES_COUNT { if c.recv() != Some(i) { panic!(); } @@ -1082,7 +1121,7 @@ mod chan_test { } }); - c.close(); + c.close_s(); c.recv(); t.join().unwrap(); } @@ -1149,7 +1188,7 @@ mod chan_test { done.send(true); }); - c2.close(); + c2.close_s(); select! { recv(c1.rx()) -> _ => {} default => {} @@ -1378,7 +1417,7 @@ mod chan_test { ); } - done.close(); + done.close_s(); wg.wait(); } @@ -1481,9 +1520,9 @@ mod chan_test { *expect.lock().unwrap() += v; q.send(v); } - q.close(); + q.close_s(); wg.wait(); - r.close(); + r.close_s(); }); let mut n = 0; @@ -1541,8 +1580,504 @@ mod race_chan_test { } // https://github.com/golang/go/blob/master/test/ken/chan.go +#[cfg(not(miri))] // Miri is too slow mod chan { - // TODO + + use super::*; + + const MESSAGES_PER_CHANEL: u32 = 76; + const MESSAGES_RANGE_LEN: u32 = 100; + const END: i32 = 10000; + + struct ChanWithVals { + chan: Chan<i32>, + /// Next value to send + sv: Arc<AtomicI32>, + /// Next value to receive + rv: Arc<AtomicI32>, + } + + struct Totals { + /// Total sent messages + tots: u32, + /// Total received messages + totr: u32, + } + + struct Context { + nproc: Arc<Mutex<i32>>, + cval: Arc<Mutex<i32>>, + tot: Arc<Mutex<Totals>>, + nc: ChanWithVals, + randx: Arc<Mutex<i32>>, + } + + impl ChanWithVals { + fn with_capacity(capacity: usize) -> Self { + ChanWithVals { + chan: make(capacity), + sv: Arc::new(AtomicI32::new(0)), + rv: Arc::new(AtomicI32::new(0)), + } + } + + fn closed() -> Self { + let ch = ChanWithVals::with_capacity(0); + ch.chan.close_r(); + ch.chan.close_s(); + ch + } + + fn rv(&self) -> i32 { + self.rv.load(SeqCst) + } + + fn sv(&self) -> i32 { + self.sv.load(SeqCst) + } + + fn send(&mut self, tot: &Mutex<Totals>) -> bool { + { + let mut tot = tot.lock().unwrap(); + tot.tots += 1 + } + let esv = expect(self.sv(), self.sv()); + self.sv.store(esv, SeqCst); + if self.sv() == END { + self.chan.close_s(); + return true; + } + false + } + + fn recv(&mut self, v: i32, tot: &Mutex<Totals>) -> bool { + { + let mut tot = tot.lock().unwrap(); + tot.totr += 1 + } + let erv = expect(self.rv(), v); + self.rv.store(erv, SeqCst); + if self.rv() == END { + self.chan.close_r(); + return true; + } + false + } + } + + impl Clone for ChanWithVals { + fn clone(&self) -> Self { + ChanWithVals { + chan: self.chan.clone(), + sv: self.sv.clone(), + rv: self.rv.clone(), + } + } + } + + impl Context { + fn nproc(&self) -> &Mutex<i32> { + self.nproc.as_ref() + } + + fn cval(&self) -> &Mutex<i32> { + self.cval.as_ref() + } + + fn tot(&self) -> &Mutex<Totals> { + self.tot.as_ref() + } + + fn randx(&self) -> &Mutex<i32> { + self.randx.as_ref() + } + } + + impl Clone for Context { + fn clone(&self) -> Self { + Context { + nproc: self.nproc.clone(), + cval: self.cval.clone(), + tot: self.tot.clone(), + nc: self.nc.clone(), + randx: self.randx.clone(), + } + } + } + + fn nrand(n: i32, randx: &Mutex<i32>) -> i32 { + let mut randx = randx.lock().unwrap(); + *randx += 10007; + if *randx >= 1000000 { + *randx -= 1000000 + } + *randx % n + } + + fn change_nproc(adjust: i32, nproc: &Mutex<i32>) -> i32 { + let mut nproc = nproc.lock().unwrap(); + *nproc += adjust; + *nproc + } + + fn mkchan(c: usize, n: usize, cval: &Mutex<i32>) -> Vec<ChanWithVals> { + let mut ca = Vec::<ChanWithVals>::with_capacity(n); + let mut cval = cval.lock().unwrap(); + for _ in 0..n { + *cval += MESSAGES_RANGE_LEN as i32; + let chl = ChanWithVals::with_capacity(c); + chl.sv.store(*cval, SeqCst); + chl.rv.store(*cval, SeqCst); + ca.push(chl); + } + ca + } + + fn expect(v: i32, v0: i32) -> i32 { + if v == v0 { + return if v % MESSAGES_RANGE_LEN as i32 == MESSAGES_PER_CHANEL as i32 - 1 { + END + } else { + v + 1 + }; + } + panic!("got {}, expected {}", v, v0 + 1); + } + + fn send(mut c: ChanWithVals, ctx: Context) { + loop { + for _ in 0..=nrand(10, ctx.randx()) { + thread::yield_now(); + } + c.chan.tx().send(c.sv()).unwrap(); + if c.send(ctx.tot()) { + break; + } + } + change_nproc(-1, ctx.nproc()); + } + + fn recv(mut c: ChanWithVals, ctx: Context) { + loop { + for _ in (0..nrand(10, ctx.randx())).rev() { + thread::yield_now(); + } + let v = c.chan.rx().recv().unwrap(); + if c.recv(v, ctx.tot()) { + break; + } + } + change_nproc(-1, ctx.nproc()); + } + + #[allow(clippy::too_many_arguments)] + fn sel( + mut r0: ChanWithVals, + mut r1: ChanWithVals, + mut r2: ChanWithVals, + mut r3: ChanWithVals, + mut s0: ChanWithVals, + mut s1: ChanWithVals, + mut s2: ChanWithVals, + mut s3: ChanWithVals, + ctx: Context, + ) { + let mut a = 0; // local chans running + + if r0.chan.has_rx() { + a += 1; + } + if r1.chan.has_rx() { + a += 1; + } + if r2.chan.has_rx() { + a += 1; + } + if r3.chan.has_rx() { + a += 1; + } + if s0.chan.has_tx() { + a += 1; + } + if s1.chan.has_tx() { + a += 1; + } + if s2.chan.has_tx() { + a += 1; + } + if s3.chan.has_tx() { + a += 1; + } + + loop { + for _ in 0..=nrand(5, ctx.randx()) { + thread::yield_now(); + } + select! { + recv(r0.chan.rx()) -> v => if r0.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + recv(r1.chan.rx()) -> v => if r1.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + recv(r2.chan.rx()) -> v => if r2.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + recv(r3.chan.rx()) -> v => if r3.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + send(s0.chan.tx(), s0.sv()) -> _ => if s0.send(ctx.tot()) { a -= 1 }, + send(s1.chan.tx(), s1.sv()) -> _ => if s1.send(ctx.tot()) { a -= 1 }, + send(s2.chan.tx(), s2.sv()) -> _ => if s2.send(ctx.tot()) { a -= 1 }, + send(s3.chan.tx(), s3.sv()) -> _ => if s3.send(ctx.tot()) { a -= 1 }, + } + if a == 0 { + break; + } + } + change_nproc(-1, ctx.nproc()); + } + + fn get(vec: &[ChanWithVals], idx: usize) -> ChanWithVals { + vec.get(idx).unwrap().clone() + } + + /// Direct send to direct recv + fn test1(c: ChanWithVals, ctx: &mut Context) { + change_nproc(2, ctx.nproc()); + go!(c, ctx, send(c, ctx)); + go!(c, ctx, recv(c, ctx)); + } + + /// Direct send to select recv + fn test2(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, send(get(&ca, 0), ctx)); + go!(ca, ctx, send(get(&ca, 1), ctx)); + go!(ca, ctx, send(get(&ca, 2), ctx)); + go!(ca, ctx, send(get(&ca, 3), ctx)); + + change_nproc(1, ctx.nproc()); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx, + ) + ); + } + + /// Select send to direct recv + fn test3(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, recv(get(&ca, 0), ctx)); + go!(ca, ctx, recv(get(&ca, 1), ctx)); + go!(ca, ctx, recv(get(&ca, 2), ctx)); + go!(ca, ctx, recv(get(&ca, 3), ctx)); + + change_nproc(1, ctx.nproc()); + go!( + ca, + ctx, + sel( + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + } + + /// Select send to select recv, 4 channels + fn test4(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + + change_nproc(2, ctx.nproc()); + go!( + ca, + ctx, + sel( + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx, + ) + ); + } + + /// Select send to select recv, 8 channels + fn test5(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 8, ctx.cval()); + + change_nproc(2, ctx.nproc()); + go!( + ca, + ctx, + sel( + get(&ca, 4), + get(&ca, 5), + get(&ca, 6), + get(&ca, 7), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + get(&ca, 4), + get(&ca, 5), + get(&ca, 6), + get(&ca, 7), + ctx, + ) + ); + } + + // Direct and select send to direct and select recv + fn test6(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 12, ctx.cval()); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, send(get(&ca, 4), ctx)); + go!(ca, ctx, send(get(&ca, 5), ctx)); + go!(ca, ctx, send(get(&ca, 6), ctx)); + go!(ca, ctx, send(get(&ca, 7), ctx)); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, recv(get(&ca, 8), ctx)); + go!(ca, ctx, recv(get(&ca, 9), ctx)); + go!(ca, ctx, recv(get(&ca, 10), ctx)); + go!(ca, ctx, recv(get(&ca, 11), ctx)); + + change_nproc(2, ctx.nproc()); + go!( + ca, + ctx, + sel( + get(&ca, 4), + get(&ca, 5), + get(&ca, 6), + get(&ca, 7), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + get(&ca, 8), + get(&ca, 9), + get(&ca, 10), + get(&ca, 11), + ctx, + ) + ); + } + + fn wait(ctx: &mut Context) { + thread::yield_now(); + while change_nproc(0, ctx.nproc()) != 0 { + thread::yield_now(); + } + } + + fn tests(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + test1(get(&ca, 0), ctx); + test1(get(&ca, 1), ctx); + test1(get(&ca, 2), ctx); + test1(get(&ca, 3), ctx); + wait(ctx); + + test2(c, ctx); + wait(ctx); + + test3(c, ctx); + wait(ctx); + + test4(c, ctx); + wait(ctx); + + test5(c, ctx); + wait(ctx); + + test6(c, ctx); + wait(ctx); + } + + #[test] + fn main() { + let mut ctx = Context { + nproc: Arc::new(Mutex::new(0)), + cval: Arc::new(Mutex::new(0)), + tot: Arc::new(Mutex::new(Totals { tots: 0, totr: 0 })), + nc: ChanWithVals::closed(), + randx: Arc::new(Mutex::new(0)), + }; + + tests(0, &mut ctx); + tests(1, &mut ctx); + tests(10, &mut ctx); + tests(100, &mut ctx); + + #[rustfmt::skip] + let t = 4 * // buffer sizes + (4*4 + // tests 1,2,3,4 channels + 8 + // test 5 channels + 12) * // test 6 channels + MESSAGES_PER_CHANEL; // sends/recvs on a channel + + let tot = ctx.tot.lock().unwrap(); + if tot.tots != t || tot.totr != t { + panic!("tots={} totr={} sb={}", tot.tots, tot.totr, t); + } + } } // https://github.com/golang/go/blob/master/test/ken/chan1.go @@ -1551,7 +2086,7 @@ mod chan1 { // sent messages #[cfg(miri)] - const N: usize = 100; + const N: usize = 20; #[cfg(not(miri))] const N: usize = 1000; // receiving "goroutines" |