aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStjepan Glavina <stjepang@gmail.com>2020-04-14 14:33:37 +0200
committerStjepan Glavina <stjepang@gmail.com>2020-04-14 14:33:37 +0200
commitfad623a06dc0fef34149bb473db9dc31fa99b4d3 (patch)
tree2ad26992e5291a0d03e8beaca7d1ff6d3d3c4725
parent4ab094c83cfd2e6cf206d0d3f2929b4b9a809bd5 (diff)
downloadasync-task-fad623a06dc0fef34149bb473db9dc31fa99b4d3.tar.gz
Task only becomes ready when the future is dropped
-rw-r--r--src/header.rs17
-rw-r--r--src/join_handle.rs31
-rw-r--r--src/raw.rs46
-rw-r--r--src/task.rs10
-rw-r--r--tests/basic.rs40
-rw-r--r--tests/waker_panic.rs42
6 files changed, 149 insertions, 37 deletions
diff --git a/src/header.rs b/src/header.rs
index e676bef..5d808c7 100644
--- a/src/header.rs
+++ b/src/header.rs
@@ -32,8 +32,8 @@ pub(crate) struct Header {
impl Header {
/// Cancels the task.
///
- /// This method will mark the task as closed and notify the awaiter, but it won't reschedule
- /// the task if it's not completed.
+ /// This method will mark the task as closed, but it won't reschedule the task or drop its
+ /// future.
pub(crate) fn cancel(&self) {
let mut state = self.state.load(Ordering::Acquire);
@@ -50,14 +50,7 @@ impl Header {
Ordering::AcqRel,
Ordering::Acquire,
) {
- Ok(_) => {
- // Notify the awaiter that the task has been closed.
- if state & AWAITER != 0 {
- self.notify(None);
- }
-
- break;
- }
+ Ok(_) => break,
Err(s) => state = s,
}
}
@@ -107,7 +100,7 @@ impl Header {
// If we're in the notifying state at this moment, just wake and return without
// registering.
if state & NOTIFYING != 0 {
- waker.wake_by_ref();
+ abort_on_panic(|| waker.wake_by_ref());
return;
}
@@ -139,7 +132,7 @@ impl Header {
// If there was a notification, take the waker out of the awaiter field.
if state & NOTIFYING != 0 {
if let Some(w) = unsafe { (*self.awaiter.get()).take() } {
- waker = Some(w);
+ abort_on_panic(|| waker = Some(w));
}
}
diff --git a/src/join_handle.rs b/src/join_handle.rs
index f4710cc..10a189e 100644
--- a/src/join_handle.rs
+++ b/src/join_handle.rs
@@ -8,7 +8,6 @@ use core::task::{Context, Poll, Waker};
use crate::header::Header;
use crate::state::*;
-use crate::utils::abort_on_panic;
/// A handle that awaits the result of a task.
///
@@ -199,6 +198,23 @@ impl<R, T> Future for JoinHandle<R, T> {
loop {
// If the task has been closed, notify the awaiter and return `None`.
if state & CLOSED != 0 {
+ // If the task is scheduled or running, we need to wait until its future is
+ // dropped.
+ if state & (SCHEDULED | RUNNING) != 0 {
+ // Replace the waker with one associated with the current task.
+ (*header).register(cx.waker());
+
+ // Reload the state after registering. It is possible changes occurred just
+ // before registration so we need to check for that.
+ state = (*header).state.load(Ordering::Acquire);
+
+ // If the task is still scheduled or running, we need to wait because its
+ // future is not dropped yet.
+ if state & (SCHEDULED | RUNNING) != 0 {
+ return Poll::Pending;
+ }
+ }
+
// Even though the awaiter is most likely the current task, it could also be
// another task.
(*header).notify(Some(cx.waker()));
@@ -207,21 +223,16 @@ impl<R, T> Future for JoinHandle<R, T> {
// If the task is not completed, register the current task.
if state & COMPLETED == 0 {
- // Replace the waker with one associated with the current task. We need a
- // safeguard against panics because dropping the previous waker can panic.
- abort_on_panic(|| {
- (*header).register(cx.waker());
- });
+ // Replace the waker with one associated with the current task.
+ (*header).register(cx.waker());
// Reload the state after registering. It is possible that the task became
// completed or closed just before registration so we need to check for that.
state = (*header).state.load(Ordering::Acquire);
- // If the task has been closed, return `None`. We do not need to notify the
- // awaiter here, since we have replaced the waker above, and the executor can
- // only set it back to `None`.
+ // If the task has been closed, restart.
if state & CLOSED != 0 {
- return Poll::Ready(None);
+ continue;
}
// If the task is still not completed, we're blocked on it.
diff --git a/src/raw.rs b/src/raw.rs
index ff02373..05891d7 100644
--- a/src/raw.rs
+++ b/src/raw.rs
@@ -464,14 +464,17 @@ where
loop {
// If the task has already been closed, drop the task reference and return.
if state & CLOSED != 0 {
- // Notify the awaiter that the task has been closed.
+ // Drop the future.
+ Self::drop_future(ptr);
+
+ // Mark the task as unscheduled.
+ let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
+
+ // Notify the awaiter that the future has been dropped.
if state & AWAITER != 0 {
(*raw.header).notify(None);
}
- // Drop the future.
- Self::drop_future(ptr);
-
// Drop the task reference.
Self::drop_task(ptr);
return false;
@@ -549,6 +552,8 @@ where
drop(output);
}
Poll::Pending => {
+ let mut future_dropped = false;
+
// The task is still not completed.
loop {
// If the task was closed while running, we'll need to unschedule in case it
@@ -559,6 +564,13 @@ where
state & !RUNNING
};
+ if state & CLOSED != 0 && !future_dropped {
+ // The thread that closed the task didn't drop the future because it was
+ // running so now it's our responsibility to do so.
+ Self::drop_future(ptr);
+ future_dropped = true;
+ }
+
// Mark the task as not running.
match (*raw.header).state.compare_exchange_weak(
state,
@@ -567,14 +579,14 @@ where
Ordering::Acquire,
) {
Ok(state) => {
- // If the task was closed while running, we need to drop its future.
+ // If the task was closed while running, we need to notify the awaiter.
// If the task was woken up while running, we need to schedule it.
// Otherwise, we just drop the task reference.
if state & CLOSED != 0 {
- // The thread that closed the task didn't drop the future because
- // it was running so now it's our responsibility to do so.
- Self::drop_future(ptr);
-
+ // Notify the awaiter that the future has been dropped.
+ if state & AWAITER != 0 {
+ (*raw.header).notify(None);
+ }
// Drop the task reference.
Self::drop_task(ptr);
} else if state & SCHEDULED != 0 {
@@ -618,14 +630,20 @@ where
// If the task was closed while running, then unschedule it, drop its
// future, and drop the task reference.
if state & CLOSED != 0 {
- // We still need to unschedule the task because it is possible it was
- // woken up while running.
- (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
-
// The thread that closed the task didn't drop the future because it
// was running so now it's our responsibility to do so.
RawTask::<F, R, S, T>::drop_future(ptr);
+ // Mark the task as not running and not scheduled.
+ (*raw.header)
+ .state
+ .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel);
+
+ // Notify the awaiter that the future has been dropped.
+ if state & AWAITER != 0 {
+ (*raw.header).notify(None);
+ }
+
// Drop the task reference.
RawTask::<F, R, S, T>::drop_task(ptr);
break;
@@ -642,7 +660,7 @@ where
// Drop the future because the task is now closed.
RawTask::<F, R, S, T>::drop_future(ptr);
- // Notify the awaiter that the task has been closed.
+ // Notify the awaiter that the future has been dropped.
if state & AWAITER != 0 {
(*raw.header).notify(None);
}
diff --git a/src/task.rs b/src/task.rs
index 7a1a5e0..d4da255 100644
--- a/src/task.rs
+++ b/src/task.rs
@@ -4,10 +4,12 @@ use core::marker::PhantomData;
use core::mem::{self, ManuallyDrop};
use core::pin::Pin;
use core::ptr::NonNull;
+use core::sync::atomic::Ordering;
use core::task::{Context, Poll, Waker};
use crate::header::Header;
use crate::raw::RawTask;
+use crate::state::*;
use crate::JoinHandle;
/// Creates a new task.
@@ -315,6 +317,14 @@ impl<T> Drop for Task<T> {
// Drop the future.
((*header).vtable.drop_future)(ptr);
+ // Mark the task as unscheduled.
+ let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
+
+ // Notify the awaiter that the future has been dropped.
+ if state & AWAITER != 0 {
+ (*header).notify(None);
+ }
+
// Drop the task reference.
((*header).vtable.drop_task)(ptr);
}
diff --git a/tests/basic.rs b/tests/basic.rs
index 432e14c..0c0a10f 100644
--- a/tests/basic.rs
+++ b/tests/basic.rs
@@ -6,7 +6,7 @@ use std::task::{Context, Poll};
use async_task::Task;
use crossbeam::atomic::AtomicCell;
use crossbeam::channel;
-use futures::future;
+use futures::future::{self, FutureExt};
use lazy_static::lazy_static;
// Creates a future with event counters.
@@ -238,6 +238,44 @@ fn run_and_cancel() {
}
#[test]
+fn cancel_and_poll() {
+ future!(f, POLL, DROP_F);
+ schedule!(s, SCHEDULE, DROP_S);
+ task!(task, handle, f, s, DROP_T);
+
+ handle.cancel();
+ assert_eq!(POLL.load(), 0);
+ assert_eq!(SCHEDULE.load(), 0);
+ assert_eq!(DROP_F.load(), 0);
+ assert_eq!(DROP_S.load(), 0);
+ assert_eq!(DROP_T.load(), 0);
+
+ let mut handle = handle;
+ assert!((&mut handle).now_or_never().is_none());
+
+ task.run();
+ assert_eq!(POLL.load(), 0);
+ assert_eq!(SCHEDULE.load(), 0);
+ assert_eq!(DROP_F.load(), 1);
+ assert_eq!(DROP_S.load(), 0);
+ assert_eq!(DROP_T.load(), 0);
+
+ assert!((&mut handle).now_or_never().is_some());
+ assert_eq!(POLL.load(), 0);
+ assert_eq!(SCHEDULE.load(), 0);
+ assert_eq!(DROP_F.load(), 1);
+ assert_eq!(DROP_S.load(), 0);
+ assert_eq!(DROP_T.load(), 0);
+
+ drop(handle);
+ assert_eq!(POLL.load(), 0);
+ assert_eq!(SCHEDULE.load(), 0);
+ assert_eq!(DROP_F.load(), 1);
+ assert_eq!(DROP_S.load(), 1);
+ assert_eq!(DROP_T.load(), 1);
+}
+
+#[test]
fn schedule() {
let (s, r) = channel::unbounded();
let schedule = move |t| s.send(t).unwrap();
diff --git a/tests/waker_panic.rs b/tests/waker_panic.rs
index eb77912..60b7fd6 100644
--- a/tests/waker_panic.rs
+++ b/tests/waker_panic.rs
@@ -10,6 +10,7 @@ use std::time::Duration;
use async_task::Task;
use crossbeam::atomic::AtomicCell;
use crossbeam::channel;
+use futures::future::FutureExt;
use lazy_static::lazy_static;
// Creates a future with event counters.
@@ -353,3 +354,44 @@ fn cancel_and_wake_during_run() {
})
.unwrap();
}
+
+#[test]
+fn panic_and_poll() {
+ future!(f, waker, POLL, DROP_F);
+ schedule!(s, chan, SCHEDULE, DROP_S);
+ task!(task, handle, f, s, DROP_T);
+
+ task.run();
+ waker().wake();
+ assert_eq!(POLL.load(), 1);
+ assert_eq!(SCHEDULE.load(), 1);
+ assert_eq!(DROP_F.load(), 0);
+ assert_eq!(DROP_S.load(), 0);
+ assert_eq!(DROP_T.load(), 0);
+
+ let mut handle = handle;
+ assert!((&mut handle).now_or_never().is_none());
+
+ let task = chan.recv().unwrap();
+ assert!(catch_unwind(|| task.run()).is_err());
+ assert_eq!(POLL.load(), 2);
+ assert_eq!(SCHEDULE.load(), 1);
+ assert_eq!(DROP_F.load(), 1);
+ assert_eq!(DROP_S.load(), 0);
+ assert_eq!(DROP_T.load(), 0);
+
+ assert!((&mut handle).now_or_never().is_some());
+ assert_eq!(POLL.load(), 2);
+ assert_eq!(SCHEDULE.load(), 1);
+ assert_eq!(DROP_F.load(), 1);
+ assert_eq!(DROP_S.load(), 0);
+ assert_eq!(DROP_T.load(), 0);
+
+ drop(waker());
+ drop(handle);
+ assert_eq!(POLL.load(), 2);
+ assert_eq!(SCHEDULE.load(), 1);
+ assert_eq!(DROP_F.load(), 1);
+ assert_eq!(DROP_S.load(), 1);
+ assert_eq!(DROP_T.load(), 1);
+}