use std::future::Future; use std::panic; use std::pin::Pin; use std::task::{Context, Poll}; use crate::runtime::Builder; use crate::sync::oneshot; use crate::task::JoinHandle; use futures::future::FutureExt; // Enums for each option in the combinations being tested #[derive(Copy, Clone, Debug, PartialEq)] enum CombiRuntime { CurrentThread, Multi1, Multi2, } #[derive(Copy, Clone, Debug, PartialEq)] enum CombiLocalSet { Yes, No, } #[derive(Copy, Clone, Debug, PartialEq)] enum CombiTask { PanicOnRun, PanicOnDrop, PanicOnRunAndDrop, NoPanic, } #[derive(Copy, Clone, Debug, PartialEq)] enum CombiOutput { PanicOnDrop, NoPanic, } #[derive(Copy, Clone, Debug, PartialEq)] enum CombiJoinInterest { Polled, NotPolled, } #[allow(clippy::enum_variant_names)] // we aren't using glob imports #[derive(Copy, Clone, Debug, PartialEq)] enum CombiJoinHandle { DropImmediately = 1, DropFirstPoll = 2, DropAfterNoConsume = 3, DropAfterConsume = 4, } #[derive(Copy, Clone, Debug, PartialEq)] enum CombiAbort { NotAborted = 0, AbortedImmediately = 1, AbortedFirstPoll = 2, AbortedAfterFinish = 3, AbortedAfterConsumeOutput = 4, } #[test] fn test_combinations() { let mut rt = &[ CombiRuntime::CurrentThread, CombiRuntime::Multi1, CombiRuntime::Multi2, ][..]; if cfg!(miri) { rt = &[CombiRuntime::CurrentThread]; } let ls = [CombiLocalSet::Yes, CombiLocalSet::No]; let task = [ CombiTask::NoPanic, CombiTask::PanicOnRun, CombiTask::PanicOnDrop, CombiTask::PanicOnRunAndDrop, ]; let output = [CombiOutput::NoPanic, CombiOutput::PanicOnDrop]; let ji = [CombiJoinInterest::Polled, CombiJoinInterest::NotPolled]; let jh = [ CombiJoinHandle::DropImmediately, CombiJoinHandle::DropFirstPoll, CombiJoinHandle::DropAfterNoConsume, CombiJoinHandle::DropAfterConsume, ]; let abort = [ CombiAbort::NotAborted, CombiAbort::AbortedImmediately, CombiAbort::AbortedFirstPoll, CombiAbort::AbortedAfterFinish, CombiAbort::AbortedAfterConsumeOutput, ]; for rt in rt.iter().copied() { for ls in ls.iter().copied() { for task in task.iter().copied() { for output in output.iter().copied() { for ji in ji.iter().copied() { for jh in jh.iter().copied() { for abort in abort.iter().copied() { test_combination(rt, ls, task, output, ji, jh, abort); } } } } } } } } fn test_combination( rt: CombiRuntime, ls: CombiLocalSet, task: CombiTask, output: CombiOutput, ji: CombiJoinInterest, jh: CombiJoinHandle, abort: CombiAbort, ) { if (jh as usize) < (abort as usize) { // drop before abort not possible return; } if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) { // this causes double panic return; } if (task == CombiTask::PanicOnRunAndDrop) && (abort != CombiAbort::AbortedImmediately) { // this causes double panic return; } println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort); // A runtime optionally with a LocalSet struct Rt { rt: crate::runtime::Runtime, ls: Option, } impl Rt { fn new(rt: CombiRuntime, ls: CombiLocalSet) -> Self { let rt = match rt { CombiRuntime::CurrentThread => Builder::new_current_thread().build().unwrap(), CombiRuntime::Multi1 => Builder::new_multi_thread() .worker_threads(1) .build() .unwrap(), CombiRuntime::Multi2 => Builder::new_multi_thread() .worker_threads(2) .build() .unwrap(), }; let ls = match ls { CombiLocalSet::Yes => Some(crate::task::LocalSet::new()), CombiLocalSet::No => None, }; Self { rt, ls } } fn block_on(&self, task: T) -> T::Output where T: Future, { match &self.ls { Some(ls) => ls.block_on(&self.rt, task), None => self.rt.block_on(task), } } fn spawn(&self, task: T) -> JoinHandle where T: Future + Send + 'static, T::Output: Send + 'static, { match &self.ls { Some(ls) => ls.spawn_local(task), None => self.rt.spawn(task), } } } // The type used for the output of the future struct Output { panic_on_drop: bool, on_drop: Option>, } impl Output { fn disarm(&mut self) { self.panic_on_drop = false; } } impl Drop for Output { fn drop(&mut self) { let _ = self.on_drop.take().unwrap().send(()); if self.panic_on_drop { panic!("Panicking in Output"); } } } // A wrapper around the future that is spawned struct FutWrapper { inner: F, on_drop: Option>, panic_on_drop: bool, } impl Future for FutWrapper { type Output = F::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { unsafe { let me = Pin::into_inner_unchecked(self); let inner = Pin::new_unchecked(&mut me.inner); inner.poll(cx) } } } impl Drop for FutWrapper { fn drop(&mut self) { let _: Result<(), ()> = self.on_drop.take().unwrap().send(()); if self.panic_on_drop { panic!("Panicking in FutWrapper"); } } } // The channels passed to the task struct Signals { on_first_poll: Option>, wait_complete: Option>, on_output_drop: Option>, } // The task we will spawn async fn my_task(mut signal: Signals, task: CombiTask, out: CombiOutput) -> Output { // Signal that we have been polled once let _ = signal.on_first_poll.take().unwrap().send(()); // Wait for a signal, then complete the future let _ = signal.wait_complete.take().unwrap().await; // If the task gets past wait_complete without yielding, then aborts // may not be caught without this yield_now. crate::task::yield_now().await; if task == CombiTask::PanicOnRun || task == CombiTask::PanicOnRunAndDrop { panic!("Panicking in my_task on {:?}", std::thread::current().id()); } Output { panic_on_drop: out == CombiOutput::PanicOnDrop, on_drop: signal.on_output_drop.take(), } } let rt = Rt::new(rt, ls); let (on_first_poll, wait_first_poll) = oneshot::channel(); let (on_complete, wait_complete) = oneshot::channel(); let (on_future_drop, wait_future_drop) = oneshot::channel(); let (on_output_drop, wait_output_drop) = oneshot::channel(); let signal = Signals { on_first_poll: Some(on_first_poll), wait_complete: Some(wait_complete), on_output_drop: Some(on_output_drop), }; // === Spawn task === let mut handle = Some(rt.spawn(FutWrapper { inner: my_task(signal, task, output), on_drop: Some(on_future_drop), panic_on_drop: task == CombiTask::PanicOnDrop || task == CombiTask::PanicOnRunAndDrop, })); // Keep track of whether the task has been killed with an abort let mut aborted = false; // If we want to poll the JoinHandle, do it now if ji == CombiJoinInterest::Polled { assert!( handle.as_mut().unwrap().now_or_never().is_none(), "Polling handle succeeded" ); } if abort == CombiAbort::AbortedImmediately { handle.as_mut().unwrap().abort(); aborted = true; } if jh == CombiJoinHandle::DropImmediately { drop(handle.take().unwrap()); } // === Wait for first poll === let got_polled = rt.block_on(wait_first_poll).is_ok(); if !got_polled { // it's possible that we are aborted but still got polled assert!( aborted, "Task completed without ever being polled but was not aborted." ); } if abort == CombiAbort::AbortedFirstPoll { handle.as_mut().unwrap().abort(); aborted = true; } if jh == CombiJoinHandle::DropFirstPoll { drop(handle.take().unwrap()); } // Signal the future that it can return now let _ = on_complete.send(()); // === Wait for future to be dropped === assert!( rt.block_on(wait_future_drop).is_ok(), "The future should always be dropped." ); if abort == CombiAbort::AbortedAfterFinish { // Don't set aborted to true here as the task already finished handle.as_mut().unwrap().abort(); } if jh == CombiJoinHandle::DropAfterNoConsume { // The runtime will usually have dropped every ref-count at this point, // in which case dropping the JoinHandle drops the output. // // (But it might race and still hold a ref-count) let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| { drop(handle.take().unwrap()); })); if panic.is_err() { assert!( (output == CombiOutput::PanicOnDrop) && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted, "Dropping JoinHandle shouldn't panic here" ); } } // Check whether we drop after consuming the output if jh == CombiJoinHandle::DropAfterConsume { // Using as_mut() to not immediately drop the handle let result = rt.block_on(handle.as_mut().unwrap()); match result { Ok(mut output) => { // Don't panic here. output.disarm(); assert!(!aborted, "Task was aborted but returned output"); } Err(err) if err.is_cancelled() => assert!(aborted, "Cancelled output but not aborted"), Err(err) if err.is_panic() => { assert!( (task == CombiTask::PanicOnRun) || (task == CombiTask::PanicOnDrop) || (task == CombiTask::PanicOnRunAndDrop) || (output == CombiOutput::PanicOnDrop), "Panic but nothing should panic" ); } _ => unreachable!(), } let handle = handle.take().unwrap(); if abort == CombiAbort::AbortedAfterConsumeOutput { handle.abort(); } drop(handle); } // The output should have been dropped now. Check whether the output // object was created at all. let output_created = rt.block_on(wait_output_drop).is_ok(); assert_eq!( output_created, (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted, "Creation of output object" ); }