diff options
Diffstat (limited to 'src/runtime/tests')
-rw-r--r-- | src/runtime/tests/loom_blocking.rs | 50 | ||||
-rw-r--r-- | src/runtime/tests/loom_current_thread_scheduler.rs (renamed from src/runtime/tests/loom_basic_scheduler.rs) | 20 | ||||
-rw-r--r-- | src/runtime/tests/loom_join_set.rs | 82 | ||||
-rw-r--r-- | src/runtime/tests/loom_queue.rs | 41 | ||||
-rw-r--r-- | src/runtime/tests/loom_yield.rs | 37 | ||||
-rw-r--r-- | src/runtime/tests/mod.rs | 16 | ||||
-rw-r--r-- | src/runtime/tests/queue.rs | 84 | ||||
-rw-r--r-- | src/runtime/tests/task.rs | 50 | ||||
-rw-r--r-- | src/runtime/tests/task_combinations.rs | 153 |
9 files changed, 455 insertions, 78 deletions
diff --git a/src/runtime/tests/loom_blocking.rs b/src/runtime/tests/loom_blocking.rs index 8fb54c5..89de85e 100644 --- a/src/runtime/tests/loom_blocking.rs +++ b/src/runtime/tests/loom_blocking.rs @@ -23,6 +23,56 @@ fn blocking_shutdown() { }); } +#[test] +fn spawn_mandatory_blocking_should_always_run() { + use crate::runtime::tests::loom_oneshot; + loom::model(|| { + let rt = runtime::Builder::new_current_thread().build().unwrap(); + + let (tx, rx) = loom_oneshot::channel(); + let _enter = rt.enter(); + runtime::spawn_blocking(|| {}); + runtime::spawn_mandatory_blocking(move || { + let _ = tx.send(()); + }) + .unwrap(); + + drop(rt); + + // This call will deadlock if `spawn_mandatory_blocking` doesn't run. + let () = rx.recv(); + }); +} + +#[test] +fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread() { + use crate::runtime::tests::loom_oneshot; + loom::model(|| { + let rt = runtime::Builder::new_current_thread().build().unwrap(); + let handle = rt.handle().clone(); + + // Drop the runtime in a different thread + { + loom::thread::spawn(move || { + drop(rt); + }); + } + + let _enter = handle.enter(); + let (tx, rx) = loom_oneshot::channel(); + let handle = runtime::spawn_mandatory_blocking(move || { + let _ = tx.send(()); + }); + + // handle.is_some() means that `spawn_mandatory_blocking` + // promised us to run the blocking task + if handle.is_some() { + // This call will deadlock if `spawn_mandatory_blocking` doesn't run. + let () = rx.recv(); + } + }); +} + fn mk_runtime(num_threads: usize) -> Runtime { runtime::Builder::new_multi_thread() .worker_threads(num_threads) diff --git a/src/runtime/tests/loom_basic_scheduler.rs b/src/runtime/tests/loom_current_thread_scheduler.rs index d2894b9..a772603 100644 --- a/src/runtime/tests/loom_basic_scheduler.rs +++ b/src/runtime/tests/loom_current_thread_scheduler.rs @@ -34,20 +34,22 @@ fn assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize) { #[test] fn block_on_num_polls() { loom::model(|| { - // we expect at most 3 number of polls because there are - // three points at which we poll the future. At any of these - // points it can be ready: + // we expect at most 4 number of polls because there are three points at + // which we poll the future and an opportunity for a false-positive.. At + // any of these points it can be ready: // - // - when we fail to steal the parker and we block on a - // notification that it is available. + // - when we fail to steal the parker and we block on a notification + // that it is available. // // - when we steal the parker and we schedule the future // - // - when the future is woken up and we have ran the max - // number of tasks for the current tick or there are no - // more tasks to run. + // - when the future is woken up and we have ran the max number of tasks + // for the current tick or there are no more tasks to run. // - let at_most = 3; + // - a thread is notified that the parker is available but a third + // thread acquires it before the notified thread can. + // + let at_most = 4; let rt1 = Arc::new(Builder::new_current_thread().build().unwrap()); let rt2 = rt1.clone(); diff --git a/src/runtime/tests/loom_join_set.rs b/src/runtime/tests/loom_join_set.rs new file mode 100644 index 0000000..bd34387 --- /dev/null +++ b/src/runtime/tests/loom_join_set.rs @@ -0,0 +1,82 @@ +use crate::runtime::Builder; +use crate::task::JoinSet; + +#[test] +fn test_join_set() { + loom::model(|| { + let rt = Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(); + let mut set = JoinSet::new(); + + rt.block_on(async { + assert_eq!(set.len(), 0); + set.spawn(async { () }); + assert_eq!(set.len(), 1); + set.spawn(async { () }); + assert_eq!(set.len(), 2); + let () = set.join_next().await.unwrap().unwrap(); + assert_eq!(set.len(), 1); + set.spawn(async { () }); + assert_eq!(set.len(), 2); + let () = set.join_next().await.unwrap().unwrap(); + assert_eq!(set.len(), 1); + let () = set.join_next().await.unwrap().unwrap(); + assert_eq!(set.len(), 0); + set.spawn(async { () }); + assert_eq!(set.len(), 1); + }); + + drop(set); + drop(rt); + }); +} + +#[test] +fn abort_all_during_completion() { + use std::sync::{ + atomic::{AtomicBool, Ordering::SeqCst}, + Arc, + }; + + // These booleans assert that at least one execution had the task complete first, and that at + // least one execution had the task be cancelled before it completed. + let complete_happened = Arc::new(AtomicBool::new(false)); + let cancel_happened = Arc::new(AtomicBool::new(false)); + + { + let complete_happened = complete_happened.clone(); + let cancel_happened = cancel_happened.clone(); + loom::model(move || { + let rt = Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(); + + let mut set = JoinSet::new(); + + rt.block_on(async { + set.spawn(async { () }); + set.abort_all(); + + match set.join_next().await { + Some(Ok(())) => complete_happened.store(true, SeqCst), + Some(Err(err)) if err.is_cancelled() => cancel_happened.store(true, SeqCst), + Some(Err(err)) => panic!("fail: {}", err), + None => { + unreachable!("Aborting the task does not remove it from the JoinSet.") + } + } + + assert!(matches!(set.join_next().await, None)); + }); + + drop(set); + drop(rt); + }); + } + + assert!(complete_happened.load(SeqCst)); + assert!(cancel_happened.load(SeqCst)); +} diff --git a/src/runtime/tests/loom_queue.rs b/src/runtime/tests/loom_queue.rs index 2cbb0a1..8d4e1d3 100644 --- a/src/runtime/tests/loom_queue.rs +++ b/src/runtime/tests/loom_queue.rs @@ -1,7 +1,7 @@ use crate::runtime::blocking::NoopSchedule; -use crate::runtime::queue; -use crate::runtime::stats::WorkerStatsBatcher; +use crate::runtime::scheduler::multi_thread::queue; use crate::runtime::task::Inject; +use crate::runtime::MetricsBatch; use loom::thread; @@ -10,14 +10,15 @@ fn basic() { loom::model(|| { let (steal, mut local) = queue::local(); let inject = Inject::new(); + let mut metrics = MetricsBatch::new(); let th = thread::spawn(move || { - let mut stats = WorkerStatsBatcher::new(0); + let mut metrics = MetricsBatch::new(); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..3 { - if steal.steal_into(&mut local, &mut stats).is_some() { + if steal.steal_into(&mut local, &mut metrics).is_some() { n += 1; } @@ -34,7 +35,7 @@ fn basic() { for _ in 0..2 { for _ in 0..2 { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); } if local.pop().is_some() { @@ -43,7 +44,7 @@ fn basic() { // Push another task let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); while local.pop().is_some() { n += 1; @@ -65,13 +66,14 @@ fn steal_overflow() { loom::model(|| { let (steal, mut local) = queue::local(); let inject = Inject::new(); + let mut metrics = MetricsBatch::new(); let th = thread::spawn(move || { - let mut stats = WorkerStatsBatcher::new(0); + let mut metrics = MetricsBatch::new(); let (_, mut local) = queue::local(); let mut n = 0; - if steal.steal_into(&mut local, &mut stats).is_some() { + if steal.steal_into(&mut local, &mut metrics).is_some() { n += 1; } @@ -86,7 +88,7 @@ fn steal_overflow() { // push a task, pop a task let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); if local.pop().is_some() { n += 1; @@ -94,7 +96,7 @@ fn steal_overflow() { for _ in 0..6 { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); } n += th.join().unwrap(); @@ -116,10 +118,10 @@ fn multi_stealer() { const NUM_TASKS: usize = 5; fn steal_tasks(steal: queue::Steal<NoopSchedule>) -> usize { - let mut stats = WorkerStatsBatcher::new(0); + let mut metrics = MetricsBatch::new(); let (_, mut local) = queue::local(); - if steal.steal_into(&mut local, &mut stats).is_none() { + if steal.steal_into(&mut local, &mut metrics).is_none() { return 0; } @@ -135,11 +137,12 @@ fn multi_stealer() { loom::model(|| { let (steal, mut local) = queue::local(); let inject = Inject::new(); + let mut metrics = MetricsBatch::new(); // Push work for _ in 0..NUM_TASKS { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); } let th1 = { @@ -169,7 +172,7 @@ fn multi_stealer() { #[test] fn chained_steal() { loom::model(|| { - let mut stats = WorkerStatsBatcher::new(0); + let mut metrics = MetricsBatch::new(); let (s1, mut l1) = queue::local(); let (s2, mut l2) = queue::local(); let inject = Inject::new(); @@ -177,17 +180,17 @@ fn chained_steal() { // Load up some tasks for _ in 0..4 { let (task, _) = super::unowned(async {}); - l1.push_back(task, &inject); + l1.push_back(task, &inject, &mut metrics); let (task, _) = super::unowned(async {}); - l2.push_back(task, &inject); + l2.push_back(task, &inject, &mut metrics); } // Spawn a task to steal from **our** queue let th = thread::spawn(move || { - let mut stats = WorkerStatsBatcher::new(0); + let mut metrics = MetricsBatch::new(); let (_, mut local) = queue::local(); - s1.steal_into(&mut local, &mut stats); + s1.steal_into(&mut local, &mut metrics); while local.pop().is_some() {} }); @@ -195,7 +198,7 @@ fn chained_steal() { // Drain our tasks, then attempt to steal while l1.pop().is_some() {} - s2.steal_into(&mut l1, &mut stats); + s2.steal_into(&mut l1, &mut metrics); th.join().unwrap(); diff --git a/src/runtime/tests/loom_yield.rs b/src/runtime/tests/loom_yield.rs new file mode 100644 index 0000000..ba506e5 --- /dev/null +++ b/src/runtime/tests/loom_yield.rs @@ -0,0 +1,37 @@ +use crate::runtime::park; +use crate::runtime::tests::loom_oneshot as oneshot; +use crate::runtime::{self, Runtime}; + +#[test] +fn yield_calls_park_before_scheduling_again() { + // Don't need to check all permutations + let mut loom = loom::model::Builder::default(); + loom.max_permutations = Some(1); + loom.check(|| { + let rt = mk_runtime(2); + let (tx, rx) = oneshot::channel::<()>(); + + rt.spawn(async { + let tid = loom::thread::current().id(); + let park_count = park::current_thread_park_count(); + + crate::task::yield_now().await; + + if tid == loom::thread::current().id() { + let new_park_count = park::current_thread_park_count(); + assert_eq!(park_count + 1, new_park_count); + } + + tx.send(()); + }); + + rx.recv(); + }); +} + +fn mk_runtime(num_threads: usize) -> Runtime { + runtime::Builder::new_multi_thread() + .worker_threads(num_threads) + .build() + .unwrap() +} diff --git a/src/runtime/tests/mod.rs b/src/runtime/tests/mod.rs index be36d6f..1c67dfe 100644 --- a/src/runtime/tests/mod.rs +++ b/src/runtime/tests/mod.rs @@ -1,8 +1,12 @@ +// Enable dead_code / unreachable_pub here. It has been disabled in lib.rs for +// other code when running loom tests. +#![cfg_attr(loom, warn(dead_code, unreachable_pub))] + use self::unowned_wrapper::unowned; mod unowned_wrapper { use crate::runtime::blocking::NoopSchedule; - use crate::runtime::task::{JoinHandle, Notified}; + use crate::runtime::task::{Id, JoinHandle, Notified}; #[cfg(all(tokio_unstable, feature = "tracing"))] pub(crate) fn unowned<T>(task: T) -> (Notified<NoopSchedule>, JoinHandle<T::Output>) @@ -13,7 +17,7 @@ mod unowned_wrapper { use tracing::Instrument; let span = tracing::trace_span!("test_span"); let task = task.instrument(span); - let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule); + let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule, Id::next()); (task.into_notified(), handle) } @@ -23,19 +27,21 @@ mod unowned_wrapper { T: std::future::Future + Send + 'static, T::Output: Send + 'static, { - let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule); + let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule, Id::next()); (task.into_notified(), handle) } } cfg_loom! { - mod loom_basic_scheduler; - mod loom_local; mod loom_blocking; + mod loom_current_thread_scheduler; + mod loom_local; mod loom_oneshot; mod loom_pool; mod loom_queue; mod loom_shutdown_join; + mod loom_join_set; + mod loom_yield; } cfg_not_loom! { diff --git a/src/runtime/tests/queue.rs b/src/runtime/tests/queue.rs index 47f1b01..68d2e89 100644 --- a/src/runtime/tests/queue.rs +++ b/src/runtime/tests/queue.rs @@ -1,18 +1,39 @@ -use crate::runtime::queue; -use crate::runtime::stats::WorkerStatsBatcher; +use crate::runtime::scheduler::multi_thread::queue; use crate::runtime::task::{self, Inject, Schedule, Task}; +use crate::runtime::MetricsBatch; use std::thread; use std::time::Duration; +#[allow(unused)] +macro_rules! assert_metrics { + ($metrics:ident, $field:ident == $v:expr) => {{ + use crate::runtime::WorkerMetrics; + use std::sync::atomic::Ordering::Relaxed; + + let worker = WorkerMetrics::new(); + $metrics.submit(&worker); + + let expect = $v; + let actual = worker.$field.load(Relaxed); + + assert!(actual == expect, "expect = {}; actual = {}", expect, actual) + }}; +} + #[test] fn fits_256() { let (_, mut local) = queue::local(); let inject = Inject::new(); + let mut metrics = MetricsBatch::new(); for _ in 0..256 { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); + } + + cfg_metrics! { + assert_metrics!(metrics, overflow_count == 0); } assert!(inject.pop().is_none()); @@ -24,10 +45,15 @@ fn fits_256() { fn overflow() { let (_, mut local) = queue::local(); let inject = Inject::new(); + let mut metrics = MetricsBatch::new(); for _ in 0..257 { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); + } + + cfg_metrics! { + assert_metrics!(metrics, overflow_count == 1); } let mut n = 0; @@ -45,7 +71,7 @@ fn overflow() { #[test] fn steal_batch() { - let mut stats = WorkerStatsBatcher::new(0); + let mut metrics = MetricsBatch::new(); let (steal1, mut local1) = queue::local(); let (_, mut local2) = queue::local(); @@ -53,10 +79,14 @@ fn steal_batch() { for _ in 0..4 { let (task, _) = super::unowned(async {}); - local1.push_back(task, &inject); + local1.push_back(task, &inject, &mut metrics); } - assert!(steal1.steal_into(&mut local2, &mut stats).is_some()); + assert!(steal1.steal_into(&mut local2, &mut metrics).is_some()); + + cfg_metrics! { + assert_metrics!(metrics, steal_count == 2); + } for _ in 0..1 { assert!(local2.pop().is_some()); @@ -71,25 +101,35 @@ fn steal_batch() { assert!(local1.pop().is_none()); } +const fn normal_or_miri(normal: usize, miri: usize) -> usize { + if cfg!(miri) { + miri + } else { + normal + } +} + #[test] fn stress1() { - const NUM_ITER: usize = 1; - const NUM_STEAL: usize = 1_000; - const NUM_LOCAL: usize = 1_000; - const NUM_PUSH: usize = 500; - const NUM_POP: usize = 250; + const NUM_ITER: usize = 5; + const NUM_STEAL: usize = normal_or_miri(1_000, 10); + const NUM_LOCAL: usize = normal_or_miri(1_000, 10); + const NUM_PUSH: usize = normal_or_miri(500, 10); + const NUM_POP: usize = normal_or_miri(250, 10); + + let mut metrics = MetricsBatch::new(); for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); let inject = Inject::new(); let th = thread::spawn(move || { - let mut stats = WorkerStatsBatcher::new(0); + let mut metrics = MetricsBatch::new(); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..NUM_STEAL { - if steal.steal_into(&mut local, &mut stats).is_some() { + if steal.steal_into(&mut local, &mut metrics).is_some() { n += 1; } @@ -100,6 +140,10 @@ fn stress1() { thread::yield_now(); } + cfg_metrics! { + assert_metrics!(metrics, steal_count == n as _); + } + n }); @@ -108,7 +152,7 @@ fn stress1() { for _ in 0..NUM_LOCAL { for _ in 0..NUM_PUSH { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); } for _ in 0..NUM_POP { @@ -133,15 +177,17 @@ fn stress1() { #[test] fn stress2() { const NUM_ITER: usize = 1; - const NUM_TASKS: usize = 1_000_000; - const NUM_STEAL: usize = 1_000; + const NUM_TASKS: usize = normal_or_miri(1_000_000, 50); + const NUM_STEAL: usize = normal_or_miri(1_000, 10); + + let mut metrics = MetricsBatch::new(); for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); let inject = Inject::new(); let th = thread::spawn(move || { - let mut stats = WorkerStatsBatcher::new(0); + let mut stats = MetricsBatch::new(); let (_, mut local) = queue::local(); let mut n = 0; @@ -164,7 +210,7 @@ fn stress2() { for i in 0..NUM_TASKS { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); if i % 128 == 0 && local.pop().is_some() { num_pop += 1; diff --git a/src/runtime/tests/task.rs b/src/runtime/tests/task.rs index 04e1b56..173e5b0 100644 --- a/src/runtime/tests/task.rs +++ b/src/runtime/tests/task.rs @@ -1,5 +1,5 @@ use crate::runtime::blocking::NoopSchedule; -use crate::runtime::task::{self, unowned, JoinHandle, OwnedTasks, Schedule, Task}; +use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task}; use crate::util::TryLock; use std::collections::VecDeque; @@ -55,6 +55,7 @@ fn create_drop1() { unreachable!() }, NoopSchedule, + Id::next(), ); drop(notified); handle.assert_not_dropped(); @@ -71,6 +72,7 @@ fn create_drop2() { unreachable!() }, NoopSchedule, + Id::next(), ); drop(join); handle.assert_not_dropped(); @@ -78,6 +80,46 @@ fn create_drop2() { handle.assert_dropped(); } +#[test] +fn drop_abort_handle1() { + let (ad, handle) = AssertDrop::new(); + let (notified, join) = unowned( + async { + drop(ad); + unreachable!() + }, + NoopSchedule, + Id::next(), + ); + let abort = join.abort_handle(); + drop(join); + handle.assert_not_dropped(); + drop(notified); + handle.assert_not_dropped(); + drop(abort); + handle.assert_dropped(); +} + +#[test] +fn drop_abort_handle2() { + let (ad, handle) = AssertDrop::new(); + let (notified, join) = unowned( + async { + drop(ad); + unreachable!() + }, + NoopSchedule, + Id::next(), + ); + let abort = join.abort_handle(); + drop(notified); + handle.assert_not_dropped(); + drop(abort); + handle.assert_not_dropped(); + drop(join); + handle.assert_dropped(); +} + // Shutting down through Notified works #[test] fn create_shutdown1() { @@ -88,6 +130,7 @@ fn create_shutdown1() { unreachable!() }, NoopSchedule, + Id::next(), ); drop(join); handle.assert_not_dropped(); @@ -104,6 +147,7 @@ fn create_shutdown2() { unreachable!() }, NoopSchedule, + Id::next(), ); handle.assert_not_dropped(); notified.shutdown(); @@ -113,7 +157,7 @@ fn create_shutdown2() { #[test] fn unowned_poll() { - let (task, _) = unowned(async {}, NoopSchedule); + let (task, _) = unowned(async {}, NoopSchedule, Id::next()); task.run(); } @@ -228,7 +272,7 @@ impl Runtime { T: 'static + Send + Future, T::Output: 'static + Send, { - let (handle, notified) = self.0.owned.bind(future, self.clone()); + let (handle, notified) = self.0.owned.bind(future, self.clone(), Id::next()); if let Some(notified) = notified { self.schedule(notified); diff --git a/src/runtime/tests/task_combinations.rs b/src/runtime/tests/task_combinations.rs index 76ce233..73a20d9 100644 --- a/src/runtime/tests/task_combinations.rs +++ b/src/runtime/tests/task_combinations.rs @@ -1,8 +1,10 @@ +use std::fmt; use std::future::Future; use std::panic; use std::pin::Pin; use std::task::{Context, Poll}; +use crate::runtime::task::AbortHandle; use crate::runtime::Builder; use crate::sync::oneshot; use crate::task::JoinHandle; @@ -56,6 +58,12 @@ enum CombiAbort { AbortedAfterConsumeOutput = 4, } +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiAbortSource { + JoinHandle, + AbortHandle, +} + #[test] fn test_combinations() { let mut rt = &[ @@ -90,6 +98,13 @@ fn test_combinations() { CombiAbort::AbortedAfterFinish, CombiAbort::AbortedAfterConsumeOutput, ]; + let ah = [ + None, + Some(CombiJoinHandle::DropImmediately), + Some(CombiJoinHandle::DropFirstPoll), + Some(CombiJoinHandle::DropAfterNoConsume), + Some(CombiJoinHandle::DropAfterConsume), + ]; for rt in rt.iter().copied() { for ls in ls.iter().copied() { @@ -98,7 +113,34 @@ fn test_combinations() { for ji in ji.iter().copied() { for jh in jh.iter().copied() { for abort in abort.iter().copied() { - test_combination(rt, ls, task, output, ji, jh, abort); + // abort via join handle --- abort handles + // may be dropped at any point + for ah in ah.iter().copied() { + test_combination( + rt, + ls, + task, + output, + ji, + jh, + ah, + abort, + CombiAbortSource::JoinHandle, + ); + } + // if aborting via AbortHandle, it will + // never be dropped. + test_combination( + rt, + ls, + task, + output, + ji, + jh, + None, + abort, + CombiAbortSource::AbortHandle, + ); } } } @@ -108,6 +150,9 @@ fn test_combinations() { } } +fn is_debug<T: fmt::Debug>(_: &T) {} + +#[allow(clippy::too_many_arguments)] fn test_combination( rt: CombiRuntime, ls: CombiLocalSet, @@ -115,12 +160,24 @@ fn test_combination( output: CombiOutput, ji: CombiJoinInterest, jh: CombiJoinHandle, + ah: Option<CombiJoinHandle>, abort: CombiAbort, + abort_src: CombiAbortSource, ) { - if (jh as usize) < (abort as usize) { - // drop before abort not possible - return; + match (abort_src, ah) { + (CombiAbortSource::JoinHandle, _) if (jh as usize) < (abort as usize) => { + // join handle dropped prior to abort + return; + } + (CombiAbortSource::AbortHandle, Some(_)) => { + // abort handle dropped, we can't abort through the + // abort handle + return; + } + + _ => {} } + if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) { // this causes double panic return; @@ -130,7 +187,15 @@ fn test_combination( return; } - println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort); + is_debug(&rt); + is_debug(&ls); + is_debug(&task); + is_debug(&output); + is_debug(&ji); + is_debug(&jh); + is_debug(&ah); + is_debug(&abort); + is_debug(&abort_src); // A runtime optionally with a LocalSet struct Rt { @@ -282,8 +347,24 @@ fn test_combination( ); } + // If we are either aborting the task via an abort handle, or dropping via + // an abort handle, do that now. + let mut abort_handle = if ah.is_some() || abort_src == CombiAbortSource::AbortHandle { + handle.as_ref().map(JoinHandle::abort_handle) + } else { + None + }; + + let do_abort = |abort_handle: &mut Option<AbortHandle>, + join_handle: Option<&mut JoinHandle<_>>| { + match abort_src { + CombiAbortSource::AbortHandle => abort_handle.take().unwrap().abort(), + CombiAbortSource::JoinHandle => join_handle.unwrap().abort(), + } + }; + if abort == CombiAbort::AbortedImmediately { - handle.as_mut().unwrap().abort(); + do_abort(&mut abort_handle, handle.as_mut()); aborted = true; } if jh == CombiJoinHandle::DropImmediately { @@ -301,12 +382,15 @@ fn test_combination( } if abort == CombiAbort::AbortedFirstPoll { - handle.as_mut().unwrap().abort(); + do_abort(&mut abort_handle, handle.as_mut()); aborted = true; } if jh == CombiJoinHandle::DropFirstPoll { drop(handle.take().unwrap()); } + if ah == Some(CombiJoinHandle::DropFirstPoll) { + drop(abort_handle.take().unwrap()); + } // Signal the future that it can return now let _ = on_complete.send(()); @@ -318,23 +402,42 @@ fn test_combination( if abort == CombiAbort::AbortedAfterFinish { // Don't set aborted to true here as the task already finished - handle.as_mut().unwrap().abort(); + do_abort(&mut abort_handle, handle.as_mut()); } if jh == CombiJoinHandle::DropAfterNoConsume { - // The runtime will usually have dropped every ref-count at this point, - // in which case dropping the JoinHandle drops the output. - // - // (But it might race and still hold a ref-count) - let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| { + if ah == Some(CombiJoinHandle::DropAfterNoConsume) { drop(handle.take().unwrap()); - })); - if panic.is_err() { - assert!( - (output == CombiOutput::PanicOnDrop) - && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) - && !aborted, - "Dropping JoinHandle shouldn't panic here" - ); + // The runtime will usually have dropped every ref-count at this point, + // in which case dropping the AbortHandle drops the output. + // + // (But it might race and still hold a ref-count) + let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| { + drop(abort_handle.take().unwrap()); + })); + if panic.is_err() { + assert!( + (output == CombiOutput::PanicOnDrop) + && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) + && !aborted, + "Dropping AbortHandle shouldn't panic here" + ); + } + } else { + // The runtime will usually have dropped every ref-count at this point, + // in which case dropping the JoinHandle drops the output. + // + // (But it might race and still hold a ref-count) + let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| { + drop(handle.take().unwrap()); + })); + if panic.is_err() { + assert!( + (output == CombiOutput::PanicOnDrop) + && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) + && !aborted, + "Dropping JoinHandle shouldn't panic here" + ); + } } } @@ -362,11 +465,15 @@ fn test_combination( _ => unreachable!(), } - let handle = handle.take().unwrap(); + let mut handle = handle.take().unwrap(); if abort == CombiAbort::AbortedAfterConsumeOutput { - handle.abort(); + do_abort(&mut abort_handle, Some(&mut handle)); } drop(handle); + + if ah == Some(CombiJoinHandle::DropAfterConsume) { + drop(abort_handle.take()); + } } // The output should have been dropped now. Check whether the output |