aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/tests
diff options
context:
space:
mode:
Diffstat (limited to 'src/runtime/tests')
-rw-r--r--src/runtime/tests/loom_blocking.rs50
-rw-r--r--src/runtime/tests/loom_current_thread_scheduler.rs (renamed from src/runtime/tests/loom_basic_scheduler.rs)20
-rw-r--r--src/runtime/tests/loom_join_set.rs82
-rw-r--r--src/runtime/tests/loom_queue.rs41
-rw-r--r--src/runtime/tests/loom_yield.rs37
-rw-r--r--src/runtime/tests/mod.rs16
-rw-r--r--src/runtime/tests/queue.rs84
-rw-r--r--src/runtime/tests/task.rs50
-rw-r--r--src/runtime/tests/task_combinations.rs153
9 files changed, 455 insertions, 78 deletions
diff --git a/src/runtime/tests/loom_blocking.rs b/src/runtime/tests/loom_blocking.rs
index 8fb54c5..89de85e 100644
--- a/src/runtime/tests/loom_blocking.rs
+++ b/src/runtime/tests/loom_blocking.rs
@@ -23,6 +23,56 @@ fn blocking_shutdown() {
});
}
+#[test]
+fn spawn_mandatory_blocking_should_always_run() {
+ use crate::runtime::tests::loom_oneshot;
+ loom::model(|| {
+ let rt = runtime::Builder::new_current_thread().build().unwrap();
+
+ let (tx, rx) = loom_oneshot::channel();
+ let _enter = rt.enter();
+ runtime::spawn_blocking(|| {});
+ runtime::spawn_mandatory_blocking(move || {
+ let _ = tx.send(());
+ })
+ .unwrap();
+
+ drop(rt);
+
+ // This call will deadlock if `spawn_mandatory_blocking` doesn't run.
+ let () = rx.recv();
+ });
+}
+
+#[test]
+fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread() {
+ use crate::runtime::tests::loom_oneshot;
+ loom::model(|| {
+ let rt = runtime::Builder::new_current_thread().build().unwrap();
+ let handle = rt.handle().clone();
+
+ // Drop the runtime in a different thread
+ {
+ loom::thread::spawn(move || {
+ drop(rt);
+ });
+ }
+
+ let _enter = handle.enter();
+ let (tx, rx) = loom_oneshot::channel();
+ let handle = runtime::spawn_mandatory_blocking(move || {
+ let _ = tx.send(());
+ });
+
+ // handle.is_some() means that `spawn_mandatory_blocking`
+ // promised us to run the blocking task
+ if handle.is_some() {
+ // This call will deadlock if `spawn_mandatory_blocking` doesn't run.
+ let () = rx.recv();
+ }
+ });
+}
+
fn mk_runtime(num_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread()
.worker_threads(num_threads)
diff --git a/src/runtime/tests/loom_basic_scheduler.rs b/src/runtime/tests/loom_current_thread_scheduler.rs
index d2894b9..a772603 100644
--- a/src/runtime/tests/loom_basic_scheduler.rs
+++ b/src/runtime/tests/loom_current_thread_scheduler.rs
@@ -34,20 +34,22 @@ fn assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize) {
#[test]
fn block_on_num_polls() {
loom::model(|| {
- // we expect at most 3 number of polls because there are
- // three points at which we poll the future. At any of these
- // points it can be ready:
+ // we expect at most 4 number of polls because there are three points at
+ // which we poll the future and an opportunity for a false-positive.. At
+ // any of these points it can be ready:
//
- // - when we fail to steal the parker and we block on a
- // notification that it is available.
+ // - when we fail to steal the parker and we block on a notification
+ // that it is available.
//
// - when we steal the parker and we schedule the future
//
- // - when the future is woken up and we have ran the max
- // number of tasks for the current tick or there are no
- // more tasks to run.
+ // - when the future is woken up and we have ran the max number of tasks
+ // for the current tick or there are no more tasks to run.
//
- let at_most = 3;
+ // - a thread is notified that the parker is available but a third
+ // thread acquires it before the notified thread can.
+ //
+ let at_most = 4;
let rt1 = Arc::new(Builder::new_current_thread().build().unwrap());
let rt2 = rt1.clone();
diff --git a/src/runtime/tests/loom_join_set.rs b/src/runtime/tests/loom_join_set.rs
new file mode 100644
index 0000000..bd34387
--- /dev/null
+++ b/src/runtime/tests/loom_join_set.rs
@@ -0,0 +1,82 @@
+use crate::runtime::Builder;
+use crate::task::JoinSet;
+
+#[test]
+fn test_join_set() {
+ loom::model(|| {
+ let rt = Builder::new_multi_thread()
+ .worker_threads(1)
+ .build()
+ .unwrap();
+ let mut set = JoinSet::new();
+
+ rt.block_on(async {
+ assert_eq!(set.len(), 0);
+ set.spawn(async { () });
+ assert_eq!(set.len(), 1);
+ set.spawn(async { () });
+ assert_eq!(set.len(), 2);
+ let () = set.join_next().await.unwrap().unwrap();
+ assert_eq!(set.len(), 1);
+ set.spawn(async { () });
+ assert_eq!(set.len(), 2);
+ let () = set.join_next().await.unwrap().unwrap();
+ assert_eq!(set.len(), 1);
+ let () = set.join_next().await.unwrap().unwrap();
+ assert_eq!(set.len(), 0);
+ set.spawn(async { () });
+ assert_eq!(set.len(), 1);
+ });
+
+ drop(set);
+ drop(rt);
+ });
+}
+
+#[test]
+fn abort_all_during_completion() {
+ use std::sync::{
+ atomic::{AtomicBool, Ordering::SeqCst},
+ Arc,
+ };
+
+ // These booleans assert that at least one execution had the task complete first, and that at
+ // least one execution had the task be cancelled before it completed.
+ let complete_happened = Arc::new(AtomicBool::new(false));
+ let cancel_happened = Arc::new(AtomicBool::new(false));
+
+ {
+ let complete_happened = complete_happened.clone();
+ let cancel_happened = cancel_happened.clone();
+ loom::model(move || {
+ let rt = Builder::new_multi_thread()
+ .worker_threads(1)
+ .build()
+ .unwrap();
+
+ let mut set = JoinSet::new();
+
+ rt.block_on(async {
+ set.spawn(async { () });
+ set.abort_all();
+
+ match set.join_next().await {
+ Some(Ok(())) => complete_happened.store(true, SeqCst),
+ Some(Err(err)) if err.is_cancelled() => cancel_happened.store(true, SeqCst),
+ Some(Err(err)) => panic!("fail: {}", err),
+ None => {
+ unreachable!("Aborting the task does not remove it from the JoinSet.")
+ }
+ }
+
+ assert!(matches!(set.join_next().await, None));
+ });
+
+ drop(set);
+ drop(rt);
+ });
+ }
+
+ assert!(complete_happened.load(SeqCst));
+ assert!(cancel_happened.load(SeqCst));
+}
diff --git a/src/runtime/tests/loom_queue.rs b/src/runtime/tests/loom_queue.rs
index 2cbb0a1..8d4e1d3 100644
--- a/src/runtime/tests/loom_queue.rs
+++ b/src/runtime/tests/loom_queue.rs
@@ -1,7 +1,7 @@
use crate::runtime::blocking::NoopSchedule;
-use crate::runtime::queue;
-use crate::runtime::stats::WorkerStatsBatcher;
+use crate::runtime::scheduler::multi_thread::queue;
use crate::runtime::task::Inject;
+use crate::runtime::MetricsBatch;
use loom::thread;
@@ -10,14 +10,15 @@ fn basic() {
loom::model(|| {
let (steal, mut local) = queue::local();
let inject = Inject::new();
+ let mut metrics = MetricsBatch::new();
let th = thread::spawn(move || {
- let mut stats = WorkerStatsBatcher::new(0);
+ let mut metrics = MetricsBatch::new();
let (_, mut local) = queue::local();
let mut n = 0;
for _ in 0..3 {
- if steal.steal_into(&mut local, &mut stats).is_some() {
+ if steal.steal_into(&mut local, &mut metrics).is_some() {
n += 1;
}
@@ -34,7 +35,7 @@ fn basic() {
for _ in 0..2 {
for _ in 0..2 {
let (task, _) = super::unowned(async {});
- local.push_back(task, &inject);
+ local.push_back(task, &inject, &mut metrics);
}
if local.pop().is_some() {
@@ -43,7 +44,7 @@ fn basic() {
// Push another task
let (task, _) = super::unowned(async {});
- local.push_back(task, &inject);
+ local.push_back(task, &inject, &mut metrics);
while local.pop().is_some() {
n += 1;
@@ -65,13 +66,14 @@ fn steal_overflow() {
loom::model(|| {
let (steal, mut local) = queue::local();
let inject = Inject::new();
+ let mut metrics = MetricsBatch::new();
let th = thread::spawn(move || {
- let mut stats = WorkerStatsBatcher::new(0);
+ let mut metrics = MetricsBatch::new();
let (_, mut local) = queue::local();
let mut n = 0;
- if steal.steal_into(&mut local, &mut stats).is_some() {
+ if steal.steal_into(&mut local, &mut metrics).is_some() {
n += 1;
}
@@ -86,7 +88,7 @@ fn steal_overflow() {
// push a task, pop a task
let (task, _) = super::unowned(async {});
- local.push_back(task, &inject);
+ local.push_back(task, &inject, &mut metrics);
if local.pop().is_some() {
n += 1;
@@ -94,7 +96,7 @@ fn steal_overflow() {
for _ in 0..6 {
let (task, _) = super::unowned(async {});
- local.push_back(task, &inject);
+ local.push_back(task, &inject, &mut metrics);
}
n += th.join().unwrap();
@@ -116,10 +118,10 @@ fn multi_stealer() {
const NUM_TASKS: usize = 5;
fn steal_tasks(steal: queue::Steal<NoopSchedule>) -> usize {
- let mut stats = WorkerStatsBatcher::new(0);
+ let mut metrics = MetricsBatch::new();
let (_, mut local) = queue::local();
- if steal.steal_into(&mut local, &mut stats).is_none() {
+ if steal.steal_into(&mut local, &mut metrics).is_none() {
return 0;
}
@@ -135,11 +137,12 @@ fn multi_stealer() {
loom::model(|| {
let (steal, mut local) = queue::local();
let inject = Inject::new();
+ let mut metrics = MetricsBatch::new();
// Push work
for _ in 0..NUM_TASKS {
let (task, _) = super::unowned(async {});
- local.push_back(task, &inject);
+ local.push_back(task, &inject, &mut metrics);
}
let th1 = {
@@ -169,7 +172,7 @@ fn multi_stealer() {
#[test]
fn chained_steal() {
loom::model(|| {
- let mut stats = WorkerStatsBatcher::new(0);
+ let mut metrics = MetricsBatch::new();
let (s1, mut l1) = queue::local();
let (s2, mut l2) = queue::local();
let inject = Inject::new();
@@ -177,17 +180,17 @@ fn chained_steal() {
// Load up some tasks
for _ in 0..4 {
let (task, _) = super::unowned(async {});
- l1.push_back(task, &inject);
+ l1.push_back(task, &inject, &mut metrics);
let (task, _) = super::unowned(async {});
- l2.push_back(task, &inject);
+ l2.push_back(task, &inject, &mut metrics);
}
// Spawn a task to steal from **our** queue
let th = thread::spawn(move || {
- let mut stats = WorkerStatsBatcher::new(0);
+ let mut metrics = MetricsBatch::new();
let (_, mut local) = queue::local();
- s1.steal_into(&mut local, &mut stats);
+ s1.steal_into(&mut local, &mut metrics);
while local.pop().is_some() {}
});
@@ -195,7 +198,7 @@ fn chained_steal() {
// Drain our tasks, then attempt to steal
while l1.pop().is_some() {}
- s2.steal_into(&mut l1, &mut stats);
+ s2.steal_into(&mut l1, &mut metrics);
th.join().unwrap();
diff --git a/src/runtime/tests/loom_yield.rs b/src/runtime/tests/loom_yield.rs
new file mode 100644
index 0000000..ba506e5
--- /dev/null
+++ b/src/runtime/tests/loom_yield.rs
@@ -0,0 +1,37 @@
+use crate::runtime::park;
+use crate::runtime::tests::loom_oneshot as oneshot;
+use crate::runtime::{self, Runtime};
+
+#[test]
+fn yield_calls_park_before_scheduling_again() {
+ // Don't need to check all permutations
+ let mut loom = loom::model::Builder::default();
+ loom.max_permutations = Some(1);
+ loom.check(|| {
+ let rt = mk_runtime(2);
+ let (tx, rx) = oneshot::channel::<()>();
+
+ rt.spawn(async {
+ let tid = loom::thread::current().id();
+ let park_count = park::current_thread_park_count();
+
+ crate::task::yield_now().await;
+
+ if tid == loom::thread::current().id() {
+ let new_park_count = park::current_thread_park_count();
+ assert_eq!(park_count + 1, new_park_count);
+ }
+
+ tx.send(());
+ });
+
+ rx.recv();
+ });
+}
+
+fn mk_runtime(num_threads: usize) -> Runtime {
+ runtime::Builder::new_multi_thread()
+ .worker_threads(num_threads)
+ .build()
+ .unwrap()
+}
diff --git a/src/runtime/tests/mod.rs b/src/runtime/tests/mod.rs
index be36d6f..1c67dfe 100644
--- a/src/runtime/tests/mod.rs
+++ b/src/runtime/tests/mod.rs
@@ -1,8 +1,12 @@
+// Enable dead_code / unreachable_pub here. It has been disabled in lib.rs for
+// other code when running loom tests.
+#![cfg_attr(loom, warn(dead_code, unreachable_pub))]
+
use self::unowned_wrapper::unowned;
mod unowned_wrapper {
use crate::runtime::blocking::NoopSchedule;
- use crate::runtime::task::{JoinHandle, Notified};
+ use crate::runtime::task::{Id, JoinHandle, Notified};
#[cfg(all(tokio_unstable, feature = "tracing"))]
pub(crate) fn unowned<T>(task: T) -> (Notified<NoopSchedule>, JoinHandle<T::Output>)
@@ -13,7 +17,7 @@ mod unowned_wrapper {
use tracing::Instrument;
let span = tracing::trace_span!("test_span");
let task = task.instrument(span);
- let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule);
+ let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule, Id::next());
(task.into_notified(), handle)
}
@@ -23,19 +27,21 @@ mod unowned_wrapper {
T: std::future::Future + Send + 'static,
T::Output: Send + 'static,
{
- let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule);
+ let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule, Id::next());
(task.into_notified(), handle)
}
}
cfg_loom! {
- mod loom_basic_scheduler;
- mod loom_local;
mod loom_blocking;
+ mod loom_current_thread_scheduler;
+ mod loom_local;
mod loom_oneshot;
mod loom_pool;
mod loom_queue;
mod loom_shutdown_join;
+ mod loom_join_set;
+ mod loom_yield;
}
cfg_not_loom! {
diff --git a/src/runtime/tests/queue.rs b/src/runtime/tests/queue.rs
index 47f1b01..68d2e89 100644
--- a/src/runtime/tests/queue.rs
+++ b/src/runtime/tests/queue.rs
@@ -1,18 +1,39 @@
-use crate::runtime::queue;
-use crate::runtime::stats::WorkerStatsBatcher;
+use crate::runtime::scheduler::multi_thread::queue;
use crate::runtime::task::{self, Inject, Schedule, Task};
+use crate::runtime::MetricsBatch;
use std::thread;
use std::time::Duration;
+#[allow(unused)]
+macro_rules! assert_metrics {
+ ($metrics:ident, $field:ident == $v:expr) => {{
+ use crate::runtime::WorkerMetrics;
+ use std::sync::atomic::Ordering::Relaxed;
+
+ let worker = WorkerMetrics::new();
+ $metrics.submit(&worker);
+
+ let expect = $v;
+ let actual = worker.$field.load(Relaxed);
+
+ assert!(actual == expect, "expect = {}; actual = {}", expect, actual)
+ }};
+}
+
#[test]
fn fits_256() {
let (_, mut local) = queue::local();
let inject = Inject::new();
+ let mut metrics = MetricsBatch::new();
for _ in 0..256 {
let (task, _) = super::unowned(async {});
- local.push_back(task, &inject);
+ local.push_back(task, &inject, &mut metrics);
+ }
+
+ cfg_metrics! {
+ assert_metrics!(metrics, overflow_count == 0);
}
assert!(inject.pop().is_none());
@@ -24,10 +45,15 @@ fn fits_256() {
fn overflow() {
let (_, mut local) = queue::local();
let inject = Inject::new();
+ let mut metrics = MetricsBatch::new();
for _ in 0..257 {
let (task, _) = super::unowned(async {});
- local.push_back(task, &inject);
+ local.push_back(task, &inject, &mut metrics);
+ }
+
+ cfg_metrics! {
+ assert_metrics!(metrics, overflow_count == 1);
}
let mut n = 0;
@@ -45,7 +71,7 @@ fn overflow() {
#[test]
fn steal_batch() {
- let mut stats = WorkerStatsBatcher::new(0);
+ let mut metrics = MetricsBatch::new();
let (steal1, mut local1) = queue::local();
let (_, mut local2) = queue::local();
@@ -53,10 +79,14 @@ fn steal_batch() {
for _ in 0..4 {
let (task, _) = super::unowned(async {});
- local1.push_back(task, &inject);
+ local1.push_back(task, &inject, &mut metrics);
}
- assert!(steal1.steal_into(&mut local2, &mut stats).is_some());
+ assert!(steal1.steal_into(&mut local2, &mut metrics).is_some());
+
+ cfg_metrics! {
+ assert_metrics!(metrics, steal_count == 2);
+ }
for _ in 0..1 {
assert!(local2.pop().is_some());
@@ -71,25 +101,35 @@ fn steal_batch() {
assert!(local1.pop().is_none());
}
+const fn normal_or_miri(normal: usize, miri: usize) -> usize {
+ if cfg!(miri) {
+ miri
+ } else {
+ normal
+ }
+}
+
#[test]
fn stress1() {
- const NUM_ITER: usize = 1;
- const NUM_STEAL: usize = 1_000;
- const NUM_LOCAL: usize = 1_000;
- const NUM_PUSH: usize = 500;
- const NUM_POP: usize = 250;
+ const NUM_ITER: usize = 5;
+ const NUM_STEAL: usize = normal_or_miri(1_000, 10);
+ const NUM_LOCAL: usize = normal_or_miri(1_000, 10);
+ const NUM_PUSH: usize = normal_or_miri(500, 10);
+ const NUM_POP: usize = normal_or_miri(250, 10);
+
+ let mut metrics = MetricsBatch::new();
for _ in 0..NUM_ITER {
let (steal, mut local) = queue::local();
let inject = Inject::new();
let th = thread::spawn(move || {
- let mut stats = WorkerStatsBatcher::new(0);
+ let mut metrics = MetricsBatch::new();
let (_, mut local) = queue::local();
let mut n = 0;
for _ in 0..NUM_STEAL {
- if steal.steal_into(&mut local, &mut stats).is_some() {
+ if steal.steal_into(&mut local, &mut metrics).is_some() {
n += 1;
}
@@ -100,6 +140,10 @@ fn stress1() {
thread::yield_now();
}
+ cfg_metrics! {
+ assert_metrics!(metrics, steal_count == n as _);
+ }
+
n
});
@@ -108,7 +152,7 @@ fn stress1() {
for _ in 0..NUM_LOCAL {
for _ in 0..NUM_PUSH {
let (task, _) = super::unowned(async {});
- local.push_back(task, &inject);
+ local.push_back(task, &inject, &mut metrics);
}
for _ in 0..NUM_POP {
@@ -133,15 +177,17 @@ fn stress1() {
#[test]
fn stress2() {
const NUM_ITER: usize = 1;
- const NUM_TASKS: usize = 1_000_000;
- const NUM_STEAL: usize = 1_000;
+ const NUM_TASKS: usize = normal_or_miri(1_000_000, 50);
+ const NUM_STEAL: usize = normal_or_miri(1_000, 10);
+
+ let mut metrics = MetricsBatch::new();
for _ in 0..NUM_ITER {
let (steal, mut local) = queue::local();
let inject = Inject::new();
let th = thread::spawn(move || {
- let mut stats = WorkerStatsBatcher::new(0);
+ let mut stats = MetricsBatch::new();
let (_, mut local) = queue::local();
let mut n = 0;
@@ -164,7 +210,7 @@ fn stress2() {
for i in 0..NUM_TASKS {
let (task, _) = super::unowned(async {});
- local.push_back(task, &inject);
+ local.push_back(task, &inject, &mut metrics);
if i % 128 == 0 && local.pop().is_some() {
num_pop += 1;
diff --git a/src/runtime/tests/task.rs b/src/runtime/tests/task.rs
index 04e1b56..173e5b0 100644
--- a/src/runtime/tests/task.rs
+++ b/src/runtime/tests/task.rs
@@ -1,5 +1,5 @@
use crate::runtime::blocking::NoopSchedule;
-use crate::runtime::task::{self, unowned, JoinHandle, OwnedTasks, Schedule, Task};
+use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task};
use crate::util::TryLock;
use std::collections::VecDeque;
@@ -55,6 +55,7 @@ fn create_drop1() {
unreachable!()
},
NoopSchedule,
+ Id::next(),
);
drop(notified);
handle.assert_not_dropped();
@@ -71,6 +72,7 @@ fn create_drop2() {
unreachable!()
},
NoopSchedule,
+ Id::next(),
);
drop(join);
handle.assert_not_dropped();
@@ -78,6 +80,46 @@ fn create_drop2() {
handle.assert_dropped();
}
+#[test]
+fn drop_abort_handle1() {
+ let (ad, handle) = AssertDrop::new();
+ let (notified, join) = unowned(
+ async {
+ drop(ad);
+ unreachable!()
+ },
+ NoopSchedule,
+ Id::next(),
+ );
+ let abort = join.abort_handle();
+ drop(join);
+ handle.assert_not_dropped();
+ drop(notified);
+ handle.assert_not_dropped();
+ drop(abort);
+ handle.assert_dropped();
+}
+
+#[test]
+fn drop_abort_handle2() {
+ let (ad, handle) = AssertDrop::new();
+ let (notified, join) = unowned(
+ async {
+ drop(ad);
+ unreachable!()
+ },
+ NoopSchedule,
+ Id::next(),
+ );
+ let abort = join.abort_handle();
+ drop(notified);
+ handle.assert_not_dropped();
+ drop(abort);
+ handle.assert_not_dropped();
+ drop(join);
+ handle.assert_dropped();
+}
+
// Shutting down through Notified works
#[test]
fn create_shutdown1() {
@@ -88,6 +130,7 @@ fn create_shutdown1() {
unreachable!()
},
NoopSchedule,
+ Id::next(),
);
drop(join);
handle.assert_not_dropped();
@@ -104,6 +147,7 @@ fn create_shutdown2() {
unreachable!()
},
NoopSchedule,
+ Id::next(),
);
handle.assert_not_dropped();
notified.shutdown();
@@ -113,7 +157,7 @@ fn create_shutdown2() {
#[test]
fn unowned_poll() {
- let (task, _) = unowned(async {}, NoopSchedule);
+ let (task, _) = unowned(async {}, NoopSchedule, Id::next());
task.run();
}
@@ -228,7 +272,7 @@ impl Runtime {
T: 'static + Send + Future,
T::Output: 'static + Send,
{
- let (handle, notified) = self.0.owned.bind(future, self.clone());
+ let (handle, notified) = self.0.owned.bind(future, self.clone(), Id::next());
if let Some(notified) = notified {
self.schedule(notified);
diff --git a/src/runtime/tests/task_combinations.rs b/src/runtime/tests/task_combinations.rs
index 76ce233..73a20d9 100644
--- a/src/runtime/tests/task_combinations.rs
+++ b/src/runtime/tests/task_combinations.rs
@@ -1,8 +1,10 @@
+use std::fmt;
use std::future::Future;
use std::panic;
use std::pin::Pin;
use std::task::{Context, Poll};
+use crate::runtime::task::AbortHandle;
use crate::runtime::Builder;
use crate::sync::oneshot;
use crate::task::JoinHandle;
@@ -56,6 +58,12 @@ enum CombiAbort {
AbortedAfterConsumeOutput = 4,
}
+#[derive(Copy, Clone, Debug, PartialEq)]
+enum CombiAbortSource {
+ JoinHandle,
+ AbortHandle,
+}
+
#[test]
fn test_combinations() {
let mut rt = &[
@@ -90,6 +98,13 @@ fn test_combinations() {
CombiAbort::AbortedAfterFinish,
CombiAbort::AbortedAfterConsumeOutput,
];
+ let ah = [
+ None,
+ Some(CombiJoinHandle::DropImmediately),
+ Some(CombiJoinHandle::DropFirstPoll),
+ Some(CombiJoinHandle::DropAfterNoConsume),
+ Some(CombiJoinHandle::DropAfterConsume),
+ ];
for rt in rt.iter().copied() {
for ls in ls.iter().copied() {
@@ -98,7 +113,34 @@ fn test_combinations() {
for ji in ji.iter().copied() {
for jh in jh.iter().copied() {
for abort in abort.iter().copied() {
- test_combination(rt, ls, task, output, ji, jh, abort);
+ // abort via join handle --- abort handles
+ // may be dropped at any point
+ for ah in ah.iter().copied() {
+ test_combination(
+ rt,
+ ls,
+ task,
+ output,
+ ji,
+ jh,
+ ah,
+ abort,
+ CombiAbortSource::JoinHandle,
+ );
+ }
+ // if aborting via AbortHandle, it will
+ // never be dropped.
+ test_combination(
+ rt,
+ ls,
+ task,
+ output,
+ ji,
+ jh,
+ None,
+ abort,
+ CombiAbortSource::AbortHandle,
+ );
}
}
}
@@ -108,6 +150,9 @@ fn test_combinations() {
}
}
+fn is_debug<T: fmt::Debug>(_: &T) {}
+
+#[allow(clippy::too_many_arguments)]
fn test_combination(
rt: CombiRuntime,
ls: CombiLocalSet,
@@ -115,12 +160,24 @@ fn test_combination(
output: CombiOutput,
ji: CombiJoinInterest,
jh: CombiJoinHandle,
+ ah: Option<CombiJoinHandle>,
abort: CombiAbort,
+ abort_src: CombiAbortSource,
) {
- if (jh as usize) < (abort as usize) {
- // drop before abort not possible
- return;
+ match (abort_src, ah) {
+ (CombiAbortSource::JoinHandle, _) if (jh as usize) < (abort as usize) => {
+ // join handle dropped prior to abort
+ return;
+ }
+ (CombiAbortSource::AbortHandle, Some(_)) => {
+ // abort handle dropped, we can't abort through the
+ // abort handle
+ return;
+ }
+
+ _ => {}
}
+
if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) {
// this causes double panic
return;
@@ -130,7 +187,15 @@ fn test_combination(
return;
}
- println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort);
+ is_debug(&rt);
+ is_debug(&ls);
+ is_debug(&task);
+ is_debug(&output);
+ is_debug(&ji);
+ is_debug(&jh);
+ is_debug(&ah);
+ is_debug(&abort);
+ is_debug(&abort_src);
// A runtime optionally with a LocalSet
struct Rt {
@@ -282,8 +347,24 @@ fn test_combination(
);
}
+ // If we are either aborting the task via an abort handle, or dropping via
+ // an abort handle, do that now.
+ let mut abort_handle = if ah.is_some() || abort_src == CombiAbortSource::AbortHandle {
+ handle.as_ref().map(JoinHandle::abort_handle)
+ } else {
+ None
+ };
+
+ let do_abort = |abort_handle: &mut Option<AbortHandle>,
+ join_handle: Option<&mut JoinHandle<_>>| {
+ match abort_src {
+ CombiAbortSource::AbortHandle => abort_handle.take().unwrap().abort(),
+ CombiAbortSource::JoinHandle => join_handle.unwrap().abort(),
+ }
+ };
+
if abort == CombiAbort::AbortedImmediately {
- handle.as_mut().unwrap().abort();
+ do_abort(&mut abort_handle, handle.as_mut());
aborted = true;
}
if jh == CombiJoinHandle::DropImmediately {
@@ -301,12 +382,15 @@ fn test_combination(
}
if abort == CombiAbort::AbortedFirstPoll {
- handle.as_mut().unwrap().abort();
+ do_abort(&mut abort_handle, handle.as_mut());
aborted = true;
}
if jh == CombiJoinHandle::DropFirstPoll {
drop(handle.take().unwrap());
}
+ if ah == Some(CombiJoinHandle::DropFirstPoll) {
+ drop(abort_handle.take().unwrap());
+ }
// Signal the future that it can return now
let _ = on_complete.send(());
@@ -318,23 +402,42 @@ fn test_combination(
if abort == CombiAbort::AbortedAfterFinish {
// Don't set aborted to true here as the task already finished
- handle.as_mut().unwrap().abort();
+ do_abort(&mut abort_handle, handle.as_mut());
}
if jh == CombiJoinHandle::DropAfterNoConsume {
- // The runtime will usually have dropped every ref-count at this point,
- // in which case dropping the JoinHandle drops the output.
- //
- // (But it might race and still hold a ref-count)
- let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ if ah == Some(CombiJoinHandle::DropAfterNoConsume) {
drop(handle.take().unwrap());
- }));
- if panic.is_err() {
- assert!(
- (output == CombiOutput::PanicOnDrop)
- && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
- && !aborted,
- "Dropping JoinHandle shouldn't panic here"
- );
+ // The runtime will usually have dropped every ref-count at this point,
+ // in which case dropping the AbortHandle drops the output.
+ //
+ // (But it might race and still hold a ref-count)
+ let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ drop(abort_handle.take().unwrap());
+ }));
+ if panic.is_err() {
+ assert!(
+ (output == CombiOutput::PanicOnDrop)
+ && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
+ && !aborted,
+ "Dropping AbortHandle shouldn't panic here"
+ );
+ }
+ } else {
+ // The runtime will usually have dropped every ref-count at this point,
+ // in which case dropping the JoinHandle drops the output.
+ //
+ // (But it might race and still hold a ref-count)
+ let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ drop(handle.take().unwrap());
+ }));
+ if panic.is_err() {
+ assert!(
+ (output == CombiOutput::PanicOnDrop)
+ && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
+ && !aborted,
+ "Dropping JoinHandle shouldn't panic here"
+ );
+ }
}
}
@@ -362,11 +465,15 @@ fn test_combination(
_ => unreachable!(),
}
- let handle = handle.take().unwrap();
+ let mut handle = handle.take().unwrap();
if abort == CombiAbort::AbortedAfterConsumeOutput {
- handle.abort();
+ do_abort(&mut abort_handle, Some(&mut handle));
}
drop(handle);
+
+ if ah == Some(CombiJoinHandle::DropAfterConsume) {
+ drop(abort_handle.take());
+ }
}
// The output should have been dropped now. Check whether the output