diff options
author | Stjepan Glavina <stjepang@gmail.com> | 2020-04-14 14:33:37 +0200 |
---|---|---|
committer | Stjepan Glavina <stjepang@gmail.com> | 2020-04-14 14:33:37 +0200 |
commit | fad623a06dc0fef34149bb473db9dc31fa99b4d3 (patch) | |
tree | 2ad26992e5291a0d03e8beaca7d1ff6d3d3c4725 | |
parent | 4ab094c83cfd2e6cf206d0d3f2929b4b9a809bd5 (diff) | |
download | async-task-fad623a06dc0fef34149bb473db9dc31fa99b4d3.tar.gz |
Task only becomes ready when the future is dropped
-rw-r--r-- | src/header.rs | 17 | ||||
-rw-r--r-- | src/join_handle.rs | 31 | ||||
-rw-r--r-- | src/raw.rs | 46 | ||||
-rw-r--r-- | src/task.rs | 10 | ||||
-rw-r--r-- | tests/basic.rs | 40 | ||||
-rw-r--r-- | tests/waker_panic.rs | 42 |
6 files changed, 149 insertions, 37 deletions
diff --git a/src/header.rs b/src/header.rs index e676bef..5d808c7 100644 --- a/src/header.rs +++ b/src/header.rs @@ -32,8 +32,8 @@ pub(crate) struct Header { impl Header { /// Cancels the task. /// - /// This method will mark the task as closed and notify the awaiter, but it won't reschedule - /// the task if it's not completed. + /// This method will mark the task as closed, but it won't reschedule the task or drop its + /// future. pub(crate) fn cancel(&self) { let mut state = self.state.load(Ordering::Acquire); @@ -50,14 +50,7 @@ impl Header { Ordering::AcqRel, Ordering::Acquire, ) { - Ok(_) => { - // Notify the awaiter that the task has been closed. - if state & AWAITER != 0 { - self.notify(None); - } - - break; - } + Ok(_) => break, Err(s) => state = s, } } @@ -107,7 +100,7 @@ impl Header { // If we're in the notifying state at this moment, just wake and return without // registering. if state & NOTIFYING != 0 { - waker.wake_by_ref(); + abort_on_panic(|| waker.wake_by_ref()); return; } @@ -139,7 +132,7 @@ impl Header { // If there was a notification, take the waker out of the awaiter field. if state & NOTIFYING != 0 { if let Some(w) = unsafe { (*self.awaiter.get()).take() } { - waker = Some(w); + abort_on_panic(|| waker = Some(w)); } } diff --git a/src/join_handle.rs b/src/join_handle.rs index f4710cc..10a189e 100644 --- a/src/join_handle.rs +++ b/src/join_handle.rs @@ -8,7 +8,6 @@ use core::task::{Context, Poll, Waker}; use crate::header::Header; use crate::state::*; -use crate::utils::abort_on_panic; /// A handle that awaits the result of a task. /// @@ -199,6 +198,23 @@ impl<R, T> Future for JoinHandle<R, T> { loop { // If the task has been closed, notify the awaiter and return `None`. if state & CLOSED != 0 { + // If the task is scheduled or running, we need to wait until its future is + // dropped. + if state & (SCHEDULED | RUNNING) != 0 { + // Replace the waker with one associated with the current task. + (*header).register(cx.waker()); + + // Reload the state after registering. It is possible changes occurred just + // before registration so we need to check for that. + state = (*header).state.load(Ordering::Acquire); + + // If the task is still scheduled or running, we need to wait because its + // future is not dropped yet. + if state & (SCHEDULED | RUNNING) != 0 { + return Poll::Pending; + } + } + // Even though the awaiter is most likely the current task, it could also be // another task. (*header).notify(Some(cx.waker())); @@ -207,21 +223,16 @@ impl<R, T> Future for JoinHandle<R, T> { // If the task is not completed, register the current task. if state & COMPLETED == 0 { - // Replace the waker with one associated with the current task. We need a - // safeguard against panics because dropping the previous waker can panic. - abort_on_panic(|| { - (*header).register(cx.waker()); - }); + // Replace the waker with one associated with the current task. + (*header).register(cx.waker()); // Reload the state after registering. It is possible that the task became // completed or closed just before registration so we need to check for that. state = (*header).state.load(Ordering::Acquire); - // If the task has been closed, return `None`. We do not need to notify the - // awaiter here, since we have replaced the waker above, and the executor can - // only set it back to `None`. + // If the task has been closed, restart. if state & CLOSED != 0 { - return Poll::Ready(None); + continue; } // If the task is still not completed, we're blocked on it. @@ -464,14 +464,17 @@ where loop { // If the task has already been closed, drop the task reference and return. if state & CLOSED != 0 { - // Notify the awaiter that the task has been closed. + // Drop the future. + Self::drop_future(ptr); + + // Mark the task as unscheduled. + let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); + + // Notify the awaiter that the future has been dropped. if state & AWAITER != 0 { (*raw.header).notify(None); } - // Drop the future. - Self::drop_future(ptr); - // Drop the task reference. Self::drop_task(ptr); return false; @@ -549,6 +552,8 @@ where drop(output); } Poll::Pending => { + let mut future_dropped = false; + // The task is still not completed. loop { // If the task was closed while running, we'll need to unschedule in case it @@ -559,6 +564,13 @@ where state & !RUNNING }; + if state & CLOSED != 0 && !future_dropped { + // The thread that closed the task didn't drop the future because it was + // running so now it's our responsibility to do so. + Self::drop_future(ptr); + future_dropped = true; + } + // Mark the task as not running. match (*raw.header).state.compare_exchange_weak( state, @@ -567,14 +579,14 @@ where Ordering::Acquire, ) { Ok(state) => { - // If the task was closed while running, we need to drop its future. + // If the task was closed while running, we need to notify the awaiter. // If the task was woken up while running, we need to schedule it. // Otherwise, we just drop the task reference. if state & CLOSED != 0 { - // The thread that closed the task didn't drop the future because - // it was running so now it's our responsibility to do so. - Self::drop_future(ptr); - + // Notify the awaiter that the future has been dropped. + if state & AWAITER != 0 { + (*raw.header).notify(None); + } // Drop the task reference. Self::drop_task(ptr); } else if state & SCHEDULED != 0 { @@ -618,14 +630,20 @@ where // If the task was closed while running, then unschedule it, drop its // future, and drop the task reference. if state & CLOSED != 0 { - // We still need to unschedule the task because it is possible it was - // woken up while running. - (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); - // The thread that closed the task didn't drop the future because it // was running so now it's our responsibility to do so. RawTask::<F, R, S, T>::drop_future(ptr); + // Mark the task as not running and not scheduled. + (*raw.header) + .state + .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel); + + // Notify the awaiter that the future has been dropped. + if state & AWAITER != 0 { + (*raw.header).notify(None); + } + // Drop the task reference. RawTask::<F, R, S, T>::drop_task(ptr); break; @@ -642,7 +660,7 @@ where // Drop the future because the task is now closed. RawTask::<F, R, S, T>::drop_future(ptr); - // Notify the awaiter that the task has been closed. + // Notify the awaiter that the future has been dropped. if state & AWAITER != 0 { (*raw.header).notify(None); } diff --git a/src/task.rs b/src/task.rs index 7a1a5e0..d4da255 100644 --- a/src/task.rs +++ b/src/task.rs @@ -4,10 +4,12 @@ use core::marker::PhantomData; use core::mem::{self, ManuallyDrop}; use core::pin::Pin; use core::ptr::NonNull; +use core::sync::atomic::Ordering; use core::task::{Context, Poll, Waker}; use crate::header::Header; use crate::raw::RawTask; +use crate::state::*; use crate::JoinHandle; /// Creates a new task. @@ -315,6 +317,14 @@ impl<T> Drop for Task<T> { // Drop the future. ((*header).vtable.drop_future)(ptr); + // Mark the task as unscheduled. + let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); + + // Notify the awaiter that the future has been dropped. + if state & AWAITER != 0 { + (*header).notify(None); + } + // Drop the task reference. ((*header).vtable.drop_task)(ptr); } diff --git a/tests/basic.rs b/tests/basic.rs index 432e14c..0c0a10f 100644 --- a/tests/basic.rs +++ b/tests/basic.rs @@ -6,7 +6,7 @@ use std::task::{Context, Poll}; use async_task::Task; use crossbeam::atomic::AtomicCell; use crossbeam::channel; -use futures::future; +use futures::future::{self, FutureExt}; use lazy_static::lazy_static; // Creates a future with event counters. @@ -238,6 +238,44 @@ fn run_and_cancel() { } #[test] +fn cancel_and_poll() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_T); + + handle.cancel(); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_T.load(), 0); + + let mut handle = handle; + assert!((&mut handle).now_or_never().is_none()); + + task.run(); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_T.load(), 0); + + assert!((&mut handle).now_or_never().is_some()); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_T.load(), 0); + + drop(handle); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_T.load(), 1); +} + +#[test] fn schedule() { let (s, r) = channel::unbounded(); let schedule = move |t| s.send(t).unwrap(); diff --git a/tests/waker_panic.rs b/tests/waker_panic.rs index eb77912..60b7fd6 100644 --- a/tests/waker_panic.rs +++ b/tests/waker_panic.rs @@ -10,6 +10,7 @@ use std::time::Duration; use async_task::Task; use crossbeam::atomic::AtomicCell; use crossbeam::channel; +use futures::future::FutureExt; use lazy_static::lazy_static; // Creates a future with event counters. @@ -353,3 +354,44 @@ fn cancel_and_wake_during_run() { }) .unwrap(); } + +#[test] +fn panic_and_poll() { + future!(f, waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_T); + + task.run(); + waker().wake(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_T.load(), 0); + + let mut handle = handle; + assert!((&mut handle).now_or_never().is_none()); + + let task = chan.recv().unwrap(); + assert!(catch_unwind(|| task.run()).is_err()); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_T.load(), 0); + + assert!((&mut handle).now_or_never().is_some()); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_T.load(), 0); + + drop(waker()); + drop(handle); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_T.load(), 1); +} |