aboutsummaryrefslogtreecommitdiff
path: root/tests/golang.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/golang.rs')
-rw-r--r--tests/golang.rs623
1 files changed, 579 insertions, 44 deletions
diff --git a/tests/golang.rs b/tests/golang.rs
index 05d67f6..6a46c03 100644
--- a/tests/golang.rs
+++ b/tests/golang.rs
@@ -15,12 +15,12 @@ use std::alloc::{GlobalAlloc, Layout, System};
use std::any::Any;
use std::cell::Cell;
use std::collections::HashMap;
-use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
+use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering::SeqCst};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
-use crossbeam_channel::{bounded, select, tick, unbounded, Receiver, Select, Sender};
+use crossbeam_channel::{bounded, never, select, tick, unbounded, Receiver, Select, Sender};
fn ms(ms: u64) -> Duration {
Duration::from_millis(ms)
@@ -32,7 +32,13 @@ struct Chan<T> {
struct ChanInner<T> {
s: Option<Sender<T>>,
- r: Receiver<T>,
+ r: Option<Receiver<T>>,
+ // Receiver to use when r is None (Go blocks on receiving from nil)
+ nil_r: Receiver<T>,
+ // Sender to use when s is None (Go blocks on sending to nil)
+ nil_s: Sender<T>,
+ // Hold this receiver to prevent nil sender channel from disconnection
+ _nil_sr: Receiver<T>,
}
impl<T> Clone for Chan<T> {
@@ -57,35 +63,53 @@ impl<T> Chan<T> {
}
fn try_recv(&self) -> Option<T> {
- let r = self.inner.lock().unwrap().r.clone();
+ let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone();
r.try_recv().ok()
}
fn recv(&self) -> Option<T> {
- let r = self.inner.lock().unwrap().r.clone();
+ let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone();
r.recv().ok()
}
- fn close(&self) {
+ fn close_s(&self) {
self.inner
.lock()
.unwrap()
.s
.take()
- .expect("channel already closed");
+ .expect("channel sender already closed");
+ }
+
+ fn close_r(&self) {
+ self.inner
+ .lock()
+ .unwrap()
+ .r
+ .take()
+ .expect("channel receiver already closed");
+ }
+
+ fn has_rx(&self) -> bool {
+ self.inner.lock().unwrap().r.is_some()
+ }
+
+ fn has_tx(&self) -> bool {
+ self.inner.lock().unwrap().s.is_some()
}
fn rx(&self) -> Receiver<T> {
- self.inner.lock().unwrap().r.clone()
+ let inner = self.inner.lock().unwrap();
+ match inner.r.as_ref() {
+ None => inner.nil_r.clone(),
+ Some(r) => r.clone(),
+ }
}
fn tx(&self) -> Sender<T> {
- match self.inner.lock().unwrap().s.as_ref() {
- None => {
- let (s, r) = bounded(0);
- std::mem::forget(r);
- s
- }
+ let inner = self.inner.lock().unwrap();
+ match inner.s.as_ref() {
+ None => inner.nil_s.clone(),
Some(s) => s.clone(),
}
}
@@ -110,17 +134,32 @@ impl<'a, T> IntoIterator for &'a Chan<T> {
fn make<T>(cap: usize) -> Chan<T> {
let (s, r) = bounded(cap);
+ let (nil_s, _nil_sr) = bounded(0);
Chan {
- inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })),
+ inner: Arc::new(Mutex::new(ChanInner {
+ s: Some(s),
+ r: Some(r),
+ nil_r: never(),
+ nil_s,
+ _nil_sr,
+ })),
}
}
fn make_unbounded<T>() -> Chan<T> {
let (s, r) = unbounded();
+ let (nil_s, _nil_sr) = bounded(0);
Chan {
- inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })),
+ inner: Arc::new(Mutex::new(ChanInner {
+ s: Some(s),
+ r: Some(r),
+ nil_r: never(),
+ nil_s,
+ _nil_sr,
+ })),
}
}
+
#[derive(Clone)]
struct WaitGroup(Arc<WaitGroupInner>);
@@ -199,14 +238,6 @@ macro_rules! defer {
}
macro_rules! go {
- (@parse ref $v:ident, $($tail:tt)*) => {{
- let ref $v = $v;
- go!(@parse $($tail)*)
- }};
- (@parse move $v:ident, $($tail:tt)*) => {{
- let $v = $v;
- go!(@parse $($tail)*)
- }};
(@parse $v:ident, $($tail:tt)*) => {{
let $v = $v.clone();
go!(@parse $($tail)*)
@@ -240,10 +271,10 @@ mod doubleselect {
const ITERATIONS: i32 = 10_000;
fn sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>) {
- defer! { c1.close() }
- defer! { c2.close() }
- defer! { c3.close() }
- defer! { c4.close() }
+ defer! { c1.close_s() }
+ defer! { c2.close_s() }
+ defer! { c3.close_s() }
+ defer! { c4.close_s() }
for i in 0..n {
select! {
@@ -292,7 +323,7 @@ mod doubleselect {
done.recv();
done.recv();
done.recv();
- cmux.close();
+ cmux.close_s();
});
recver(cmux);
}
@@ -697,7 +728,7 @@ mod select2 {
use super::*;
#[cfg(miri)]
- const N: i32 = 1000;
+ const N: i32 = 200;
#[cfg(not(miri))]
const N: i32 = 100000;
@@ -892,6 +923,9 @@ mod sieve1 {
2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83,
89, 97,
];
+ #[cfg(miri)]
+ let a = &a[..10];
+
for item in a.iter() {
let x = primes.recv().unwrap();
if x != *item {
@@ -929,6 +963,11 @@ mod chan_test {
#[cfg(not(miri))]
const N: i32 = 200;
+ #[cfg(miri)]
+ const MESSAGES_COUNT: i32 = 20;
+ #[cfg(not(miri))]
+ const MESSAGES_COUNT: i32 = 100;
+
for cap in 0..N {
{
// Ensure that receive from empty chan blocks.
@@ -999,7 +1038,7 @@ mod chan_test {
for i in 0..cap {
c.send(i);
}
- c.close();
+ c.close_s();
for i in 0..cap {
let v = c.recv();
@@ -1027,7 +1066,7 @@ mod chan_test {
});
thread::sleep(ms(1));
- c.close();
+ c.close_s();
if !done.recv().unwrap() {
panic!();
@@ -1035,15 +1074,15 @@ mod chan_test {
}
{
- // Send 100 integers,
+ // Send many integers,
// ensure that we receive them non-corrupted in FIFO order.
let c = make::<i32>(cap as usize);
go!(c, {
- for i in 0..100 {
+ for i in 0..MESSAGES_COUNT {
c.send(i);
}
});
- for i in 0..100 {
+ for i in 0..MESSAGES_COUNT {
if c.recv() != Some(i) {
panic!();
}
@@ -1051,11 +1090,11 @@ mod chan_test {
// Same, but using recv2.
go!(c, {
- for i in 0..100 {
+ for i in 0..MESSAGES_COUNT {
c.send(i);
}
});
- for i in 0..100 {
+ for i in 0..MESSAGES_COUNT {
if c.recv() != Some(i) {
panic!();
}
@@ -1082,7 +1121,7 @@ mod chan_test {
}
});
- c.close();
+ c.close_s();
c.recv();
t.join().unwrap();
}
@@ -1149,7 +1188,7 @@ mod chan_test {
done.send(true);
});
- c2.close();
+ c2.close_s();
select! {
recv(c1.rx()) -> _ => {}
default => {}
@@ -1378,7 +1417,7 @@ mod chan_test {
);
}
- done.close();
+ done.close_s();
wg.wait();
}
@@ -1481,9 +1520,9 @@ mod chan_test {
*expect.lock().unwrap() += v;
q.send(v);
}
- q.close();
+ q.close_s();
wg.wait();
- r.close();
+ r.close_s();
});
let mut n = 0;
@@ -1541,8 +1580,504 @@ mod race_chan_test {
}
// https://github.com/golang/go/blob/master/test/ken/chan.go
+#[cfg(not(miri))] // Miri is too slow
mod chan {
- // TODO
+
+ use super::*;
+
+ const MESSAGES_PER_CHANEL: u32 = 76;
+ const MESSAGES_RANGE_LEN: u32 = 100;
+ const END: i32 = 10000;
+
+ struct ChanWithVals {
+ chan: Chan<i32>,
+ /// Next value to send
+ sv: Arc<AtomicI32>,
+ /// Next value to receive
+ rv: Arc<AtomicI32>,
+ }
+
+ struct Totals {
+ /// Total sent messages
+ tots: u32,
+ /// Total received messages
+ totr: u32,
+ }
+
+ struct Context {
+ nproc: Arc<Mutex<i32>>,
+ cval: Arc<Mutex<i32>>,
+ tot: Arc<Mutex<Totals>>,
+ nc: ChanWithVals,
+ randx: Arc<Mutex<i32>>,
+ }
+
+ impl ChanWithVals {
+ fn with_capacity(capacity: usize) -> Self {
+ ChanWithVals {
+ chan: make(capacity),
+ sv: Arc::new(AtomicI32::new(0)),
+ rv: Arc::new(AtomicI32::new(0)),
+ }
+ }
+
+ fn closed() -> Self {
+ let ch = ChanWithVals::with_capacity(0);
+ ch.chan.close_r();
+ ch.chan.close_s();
+ ch
+ }
+
+ fn rv(&self) -> i32 {
+ self.rv.load(SeqCst)
+ }
+
+ fn sv(&self) -> i32 {
+ self.sv.load(SeqCst)
+ }
+
+ fn send(&mut self, tot: &Mutex<Totals>) -> bool {
+ {
+ let mut tot = tot.lock().unwrap();
+ tot.tots += 1
+ }
+ let esv = expect(self.sv(), self.sv());
+ self.sv.store(esv, SeqCst);
+ if self.sv() == END {
+ self.chan.close_s();
+ return true;
+ }
+ false
+ }
+
+ fn recv(&mut self, v: i32, tot: &Mutex<Totals>) -> bool {
+ {
+ let mut tot = tot.lock().unwrap();
+ tot.totr += 1
+ }
+ let erv = expect(self.rv(), v);
+ self.rv.store(erv, SeqCst);
+ if self.rv() == END {
+ self.chan.close_r();
+ return true;
+ }
+ false
+ }
+ }
+
+ impl Clone for ChanWithVals {
+ fn clone(&self) -> Self {
+ ChanWithVals {
+ chan: self.chan.clone(),
+ sv: self.sv.clone(),
+ rv: self.rv.clone(),
+ }
+ }
+ }
+
+ impl Context {
+ fn nproc(&self) -> &Mutex<i32> {
+ self.nproc.as_ref()
+ }
+
+ fn cval(&self) -> &Mutex<i32> {
+ self.cval.as_ref()
+ }
+
+ fn tot(&self) -> &Mutex<Totals> {
+ self.tot.as_ref()
+ }
+
+ fn randx(&self) -> &Mutex<i32> {
+ self.randx.as_ref()
+ }
+ }
+
+ impl Clone for Context {
+ fn clone(&self) -> Self {
+ Context {
+ nproc: self.nproc.clone(),
+ cval: self.cval.clone(),
+ tot: self.tot.clone(),
+ nc: self.nc.clone(),
+ randx: self.randx.clone(),
+ }
+ }
+ }
+
+ fn nrand(n: i32, randx: &Mutex<i32>) -> i32 {
+ let mut randx = randx.lock().unwrap();
+ *randx += 10007;
+ if *randx >= 1000000 {
+ *randx -= 1000000
+ }
+ *randx % n
+ }
+
+ fn change_nproc(adjust: i32, nproc: &Mutex<i32>) -> i32 {
+ let mut nproc = nproc.lock().unwrap();
+ *nproc += adjust;
+ *nproc
+ }
+
+ fn mkchan(c: usize, n: usize, cval: &Mutex<i32>) -> Vec<ChanWithVals> {
+ let mut ca = Vec::<ChanWithVals>::with_capacity(n);
+ let mut cval = cval.lock().unwrap();
+ for _ in 0..n {
+ *cval += MESSAGES_RANGE_LEN as i32;
+ let chl = ChanWithVals::with_capacity(c);
+ chl.sv.store(*cval, SeqCst);
+ chl.rv.store(*cval, SeqCst);
+ ca.push(chl);
+ }
+ ca
+ }
+
+ fn expect(v: i32, v0: i32) -> i32 {
+ if v == v0 {
+ return if v % MESSAGES_RANGE_LEN as i32 == MESSAGES_PER_CHANEL as i32 - 1 {
+ END
+ } else {
+ v + 1
+ };
+ }
+ panic!("got {}, expected {}", v, v0 + 1);
+ }
+
+ fn send(mut c: ChanWithVals, ctx: Context) {
+ loop {
+ for _ in 0..=nrand(10, ctx.randx()) {
+ thread::yield_now();
+ }
+ c.chan.tx().send(c.sv()).unwrap();
+ if c.send(ctx.tot()) {
+ break;
+ }
+ }
+ change_nproc(-1, ctx.nproc());
+ }
+
+ fn recv(mut c: ChanWithVals, ctx: Context) {
+ loop {
+ for _ in (0..nrand(10, ctx.randx())).rev() {
+ thread::yield_now();
+ }
+ let v = c.chan.rx().recv().unwrap();
+ if c.recv(v, ctx.tot()) {
+ break;
+ }
+ }
+ change_nproc(-1, ctx.nproc());
+ }
+
+ #[allow(clippy::too_many_arguments)]
+ fn sel(
+ mut r0: ChanWithVals,
+ mut r1: ChanWithVals,
+ mut r2: ChanWithVals,
+ mut r3: ChanWithVals,
+ mut s0: ChanWithVals,
+ mut s1: ChanWithVals,
+ mut s2: ChanWithVals,
+ mut s3: ChanWithVals,
+ ctx: Context,
+ ) {
+ let mut a = 0; // local chans running
+
+ if r0.chan.has_rx() {
+ a += 1;
+ }
+ if r1.chan.has_rx() {
+ a += 1;
+ }
+ if r2.chan.has_rx() {
+ a += 1;
+ }
+ if r3.chan.has_rx() {
+ a += 1;
+ }
+ if s0.chan.has_tx() {
+ a += 1;
+ }
+ if s1.chan.has_tx() {
+ a += 1;
+ }
+ if s2.chan.has_tx() {
+ a += 1;
+ }
+ if s3.chan.has_tx() {
+ a += 1;
+ }
+
+ loop {
+ for _ in 0..=nrand(5, ctx.randx()) {
+ thread::yield_now();
+ }
+ select! {
+ recv(r0.chan.rx()) -> v => if r0.recv(v.unwrap(), ctx.tot()) { a -= 1 },
+ recv(r1.chan.rx()) -> v => if r1.recv(v.unwrap(), ctx.tot()) { a -= 1 },
+ recv(r2.chan.rx()) -> v => if r2.recv(v.unwrap(), ctx.tot()) { a -= 1 },
+ recv(r3.chan.rx()) -> v => if r3.recv(v.unwrap(), ctx.tot()) { a -= 1 },
+ send(s0.chan.tx(), s0.sv()) -> _ => if s0.send(ctx.tot()) { a -= 1 },
+ send(s1.chan.tx(), s1.sv()) -> _ => if s1.send(ctx.tot()) { a -= 1 },
+ send(s2.chan.tx(), s2.sv()) -> _ => if s2.send(ctx.tot()) { a -= 1 },
+ send(s3.chan.tx(), s3.sv()) -> _ => if s3.send(ctx.tot()) { a -= 1 },
+ }
+ if a == 0 {
+ break;
+ }
+ }
+ change_nproc(-1, ctx.nproc());
+ }
+
+ fn get(vec: &[ChanWithVals], idx: usize) -> ChanWithVals {
+ vec.get(idx).unwrap().clone()
+ }
+
+ /// Direct send to direct recv
+ fn test1(c: ChanWithVals, ctx: &mut Context) {
+ change_nproc(2, ctx.nproc());
+ go!(c, ctx, send(c, ctx));
+ go!(c, ctx, recv(c, ctx));
+ }
+
+ /// Direct send to select recv
+ fn test2(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 4, ctx.cval());
+
+ change_nproc(4, ctx.nproc());
+ go!(ca, ctx, send(get(&ca, 0), ctx));
+ go!(ca, ctx, send(get(&ca, 1), ctx));
+ go!(ca, ctx, send(get(&ca, 2), ctx));
+ go!(ca, ctx, send(get(&ca, 3), ctx));
+
+ change_nproc(1, ctx.nproc());
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx,
+ )
+ );
+ }
+
+ /// Select send to direct recv
+ fn test3(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 4, ctx.cval());
+
+ change_nproc(4, ctx.nproc());
+ go!(ca, ctx, recv(get(&ca, 0), ctx));
+ go!(ca, ctx, recv(get(&ca, 1), ctx));
+ go!(ca, ctx, recv(get(&ca, 2), ctx));
+ go!(ca, ctx, recv(get(&ca, 3), ctx));
+
+ change_nproc(1, ctx.nproc());
+ go!(
+ ca,
+ ctx,
+ sel(
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx,
+ )
+ );
+ }
+
+ /// Select send to select recv, 4 channels
+ fn test4(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 4, ctx.cval());
+
+ change_nproc(2, ctx.nproc());
+ go!(
+ ca,
+ ctx,
+ sel(
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx,
+ )
+ );
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx.nc.clone(),
+ ctx,
+ )
+ );
+ }
+
+ /// Select send to select recv, 8 channels
+ fn test5(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 8, ctx.cval());
+
+ change_nproc(2, ctx.nproc());
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 4),
+ get(&ca, 5),
+ get(&ca, 6),
+ get(&ca, 7),
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx,
+ )
+ );
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ get(&ca, 4),
+ get(&ca, 5),
+ get(&ca, 6),
+ get(&ca, 7),
+ ctx,
+ )
+ );
+ }
+
+ // Direct and select send to direct and select recv
+ fn test6(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 12, ctx.cval());
+
+ change_nproc(4, ctx.nproc());
+ go!(ca, ctx, send(get(&ca, 4), ctx));
+ go!(ca, ctx, send(get(&ca, 5), ctx));
+ go!(ca, ctx, send(get(&ca, 6), ctx));
+ go!(ca, ctx, send(get(&ca, 7), ctx));
+
+ change_nproc(4, ctx.nproc());
+ go!(ca, ctx, recv(get(&ca, 8), ctx));
+ go!(ca, ctx, recv(get(&ca, 9), ctx));
+ go!(ca, ctx, recv(get(&ca, 10), ctx));
+ go!(ca, ctx, recv(get(&ca, 11), ctx));
+
+ change_nproc(2, ctx.nproc());
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 4),
+ get(&ca, 5),
+ get(&ca, 6),
+ get(&ca, 7),
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ ctx,
+ )
+ );
+ go!(
+ ca,
+ ctx,
+ sel(
+ get(&ca, 0),
+ get(&ca, 1),
+ get(&ca, 2),
+ get(&ca, 3),
+ get(&ca, 8),
+ get(&ca, 9),
+ get(&ca, 10),
+ get(&ca, 11),
+ ctx,
+ )
+ );
+ }
+
+ fn wait(ctx: &mut Context) {
+ thread::yield_now();
+ while change_nproc(0, ctx.nproc()) != 0 {
+ thread::yield_now();
+ }
+ }
+
+ fn tests(c: usize, ctx: &mut Context) {
+ let ca = mkchan(c, 4, ctx.cval());
+ test1(get(&ca, 0), ctx);
+ test1(get(&ca, 1), ctx);
+ test1(get(&ca, 2), ctx);
+ test1(get(&ca, 3), ctx);
+ wait(ctx);
+
+ test2(c, ctx);
+ wait(ctx);
+
+ test3(c, ctx);
+ wait(ctx);
+
+ test4(c, ctx);
+ wait(ctx);
+
+ test5(c, ctx);
+ wait(ctx);
+
+ test6(c, ctx);
+ wait(ctx);
+ }
+
+ #[test]
+ fn main() {
+ let mut ctx = Context {
+ nproc: Arc::new(Mutex::new(0)),
+ cval: Arc::new(Mutex::new(0)),
+ tot: Arc::new(Mutex::new(Totals { tots: 0, totr: 0 })),
+ nc: ChanWithVals::closed(),
+ randx: Arc::new(Mutex::new(0)),
+ };
+
+ tests(0, &mut ctx);
+ tests(1, &mut ctx);
+ tests(10, &mut ctx);
+ tests(100, &mut ctx);
+
+ #[rustfmt::skip]
+ let t = 4 * // buffer sizes
+ (4*4 + // tests 1,2,3,4 channels
+ 8 + // test 5 channels
+ 12) * // test 6 channels
+ MESSAGES_PER_CHANEL; // sends/recvs on a channel
+
+ let tot = ctx.tot.lock().unwrap();
+ if tot.tots != t || tot.totr != t {
+ panic!("tots={} totr={} sb={}", tot.tots, tot.totr, t);
+ }
+ }
}
// https://github.com/golang/go/blob/master/test/ken/chan1.go
@@ -1551,7 +2086,7 @@ mod chan1 {
// sent messages
#[cfg(miri)]
- const N: usize = 100;
+ const N: usize = 20;
#[cfg(not(miri))]
const N: usize = 1000;
// receiving "goroutines"