diff options
author | Joel Galenson <jgalenson@google.com> | 2021-08-17 08:33:38 -0700 |
---|---|---|
committer | Joel Galenson <jgalenson@google.com> | 2021-08-17 08:40:48 -0700 |
commit | 642961436a727d51930e5839e3dbfee04ba4af95 (patch) | |
tree | 9da006d6d1c0e4667e8d848673b13cc7d2bb62ca /src/runtime/task/state.rs | |
parent | 1c33108b3901dd464f81acf08b5268ec294b3876 (diff) | |
download | tokio-642961436a727d51930e5839e3dbfee04ba4af95.tar.gz |
Upgrade rust/crates/tokio to 1.10.0
Test: make
Change-Id: I4ec984178af20297aae0ed51f0b1c6410876a51b
Diffstat (limited to 'src/runtime/task/state.rs')
-rw-r--r-- | src/runtime/task/state.rs | 298 |
1 files changed, 222 insertions, 76 deletions
diff --git a/src/runtime/task/state.rs b/src/runtime/task/state.rs index 1f08d6d..059a7f9 100644 --- a/src/runtime/task/state.rs +++ b/src/runtime/task/state.rs @@ -54,22 +54,52 @@ const REF_ONE: usize = 1 << REF_COUNT_SHIFT; /// State a task is initialized with /// -/// A task is initialized with two references: one for the scheduler and one for -/// the `JoinHandle`. As the task starts with a `JoinHandle`, `JOIN_INTEREST` is -/// set. A new task is immediately pushed into the run queue for execution and -/// starts with the `NOTIFIED` flag set. -const INITIAL_STATE: usize = (REF_ONE * 2) | JOIN_INTEREST | NOTIFIED; +/// A task is initialized with three references: +/// +/// * A reference that will be stored in an OwnedTasks or LocalOwnedTasks. +/// * A reference that will be sent to the scheduler as an ordinary notification. +/// * A reference for the JoinHandle. +/// +/// As the task starts with a `JoinHandle`, `JOIN_INTEREST` is set. +/// As the task starts with a `Notified`, `NOTIFIED` is set. +const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED; + +#[must_use] +pub(super) enum TransitionToRunning { + Success, + Cancelled, + Failed, + Dealloc, +} + +#[must_use] +pub(super) enum TransitionToIdle { + Ok, + OkNotified, + OkDealloc, + Cancelled, +} + +#[must_use] +pub(super) enum TransitionToNotifiedByVal { + DoNothing, + Submit, + Dealloc, +} + +#[must_use] +pub(super) enum TransitionToNotifiedByRef { + DoNothing, + Submit, +} /// All transitions are performed via RMW operations. This establishes an /// unambiguous modification order. impl State { /// Return a task's initial state pub(super) fn new() -> State { - // A task is initialized with three references: one for the scheduler, - // one for the `JoinHandle`, one for the task handle made available in - // release. As the task starts with a `JoinHandle`, `JOIN_INTEREST` is - // set. A new task is immediately pushed into the run queue for - // execution and starts with the `NOTIFIED` flag set. + // The raw task returned by this method has a ref-count of three. See + // the comment on INITIAL_STATE for more. State { val: AtomicUsize::new(INITIAL_STATE), } @@ -80,57 +110,72 @@ impl State { Snapshot(self.val.load(Acquire)) } - /// Attempt to transition the lifecycle to `Running`. - /// - /// If `ref_inc` is set, the reference count is also incremented. - /// - /// The `NOTIFIED` bit is always unset. - pub(super) fn transition_to_running(&self, ref_inc: bool) -> UpdateResult { - self.fetch_update(|curr| { - assert!(curr.is_notified()); - - let mut next = curr; + /// Attempt to transition the lifecycle to `Running`. This sets the + /// notified bit to false so notifications during the poll can be detected. + pub(super) fn transition_to_running(&self) -> TransitionToRunning { + self.fetch_update_action(|mut next| { + let action; + assert!(next.is_notified()); if !next.is_idle() { - return None; - } - - if ref_inc { - next.ref_inc(); + // This happens if the task is either currently running or if it + // has already completed, e.g. if it was cancelled during + // shutdown. Consume the ref-count and return. + next.ref_dec(); + if next.ref_count() == 0 { + action = TransitionToRunning::Dealloc; + } else { + action = TransitionToRunning::Failed; + } + } else { + // We are able to lock the RUNNING bit. + next.set_running(); + next.unset_notified(); + + if next.is_cancelled() { + action = TransitionToRunning::Cancelled; + } else { + action = TransitionToRunning::Success; + } } - - next.set_running(); - next.unset_notified(); - Some(next) + (action, Some(next)) }) } /// Transitions the task from `Running` -> `Idle`. /// - /// Returns `Ok` if the transition to `Idle` is successful, `Err` otherwise. - /// In both cases, a snapshot of the state from **after** the transition is - /// returned. - /// + /// Returns `true` if the transition to `Idle` is successful, `false` otherwise. /// The transition to `Idle` fails if the task has been flagged to be /// cancelled. - pub(super) fn transition_to_idle(&self) -> UpdateResult { - self.fetch_update(|curr| { + pub(super) fn transition_to_idle(&self) -> TransitionToIdle { + self.fetch_update_action(|curr| { assert!(curr.is_running()); if curr.is_cancelled() { - return None; + return (TransitionToIdle::Cancelled, None); } let mut next = curr; + let action; next.unset_running(); - if next.is_notified() { - // The caller needs to schedule the task. To do this, it needs a - // waker. The waker requires a ref count. + if !next.is_notified() { + // Polling the future consumes the ref-count of the Notified. + next.ref_dec(); + if next.ref_count() == 0 { + action = TransitionToIdle::OkDealloc; + } else { + action = TransitionToIdle::Ok; + } + } else { + // The caller will schedule a new notification, so we create a + // new ref-count for the notification. Our own ref-count is kept + // for now, and the caller will drop it shortly. next.ref_inc(); + action = TransitionToIdle::OkNotified; } - Some(next) + (action, Some(next)) }) } @@ -146,38 +191,119 @@ impl State { } /// Transition from `Complete` -> `Terminal`, decrementing the reference - /// count by 1. + /// count the specified number of times. /// - /// When `ref_dec` is set, an additional ref count decrement is performed. - /// This is used to batch atomic ops when possible. - pub(super) fn transition_to_terminal(&self, complete: bool, ref_dec: bool) -> Snapshot { - self.fetch_update(|mut snapshot| { - if complete { - snapshot.set_complete(); - } else { - assert!(snapshot.is_complete()); - } + /// Returns true if the task should be deallocated. + pub(super) fn transition_to_terminal(&self, count: usize) -> bool { + let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel)); + assert!( + prev.ref_count() >= count, + "current: {}, sub: {}", + prev.ref_count(), + count + ); + prev.ref_count() == count + } + + /// Transitions the state to `NOTIFIED`. + /// + /// If no task needs to be submitted, a ref-count is consumed. + /// + /// If a task needs to be submitted, the ref-count is incremented for the + /// new Notified. + pub(super) fn transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal { + self.fetch_update_action(|mut snapshot| { + let action; + + if snapshot.is_running() { + // If the task is running, we mark it as notified, but we should + // not submit anything as the thread currently running the + // future is responsible for that. + snapshot.set_notified(); + snapshot.ref_dec(); - // Decrement the primary handle - snapshot.ref_dec(); + // The thread that set the running bit also holds a ref-count. + assert!(snapshot.ref_count() > 0); - if ref_dec { - // Decrement a second time + action = TransitionToNotifiedByVal::DoNothing; + } else if snapshot.is_complete() || snapshot.is_notified() { + // We do not need to submit any notifications, but we have to + // decrement the ref-count. snapshot.ref_dec(); + + if snapshot.ref_count() == 0 { + action = TransitionToNotifiedByVal::Dealloc; + } else { + action = TransitionToNotifiedByVal::DoNothing; + } + } else { + // We create a new notified that we can submit. The caller + // retains ownership of the ref-count they passed in. + snapshot.set_notified(); + snapshot.ref_inc(); + action = TransitionToNotifiedByVal::Submit; } - Some(snapshot) + (action, Some(snapshot)) }) - .unwrap() } /// Transitions the state to `NOTIFIED`. + pub(super) fn transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef { + self.fetch_update_action(|mut snapshot| { + if snapshot.is_complete() || snapshot.is_notified() { + // There is nothing to do in this case. + (TransitionToNotifiedByRef::DoNothing, None) + } else if snapshot.is_running() { + // If the task is running, we mark it as notified, but we should + // not submit as the thread currently running the future is + // responsible for that. + snapshot.set_notified(); + (TransitionToNotifiedByRef::DoNothing, Some(snapshot)) + } else { + // The task is idle and not notified. We should submit a + // notification. + snapshot.set_notified(); + snapshot.ref_inc(); + (TransitionToNotifiedByRef::Submit, Some(snapshot)) + } + }) + } + + /// Set the cancelled bit and transition the state to `NOTIFIED` if idle. /// /// Returns `true` if the task needs to be submitted to the pool for /// execution - pub(super) fn transition_to_notified(&self) -> bool { - let prev = Snapshot(self.val.fetch_or(NOTIFIED, AcqRel)); - prev.will_need_queueing() + pub(super) fn transition_to_notified_and_cancel(&self) -> bool { + self.fetch_update_action(|mut snapshot| { + if snapshot.is_cancelled() || snapshot.is_complete() { + // Aborts to completed or cancelled tasks are no-ops. + (false, None) + } else if snapshot.is_running() { + // If the task is running, we mark it as cancelled. The thread + // running the task will notice the cancelled bit when it + // stops polling and it will kill the task. + // + // The set_notified() call is not strictly necessary but it will + // in some cases let a wake_by_ref call return without having + // to perform a compare_exchange. + snapshot.set_notified(); + snapshot.set_cancelled(); + (false, Some(snapshot)) + } else { + // The task is idle. We set the cancelled and notified bits and + // submit a notification if the notified bit was not already + // set. + snapshot.set_cancelled(); + if !snapshot.is_notified() { + snapshot.set_notified(); + snapshot.ref_inc(); + (true, Some(snapshot)) + } else { + (false, Some(snapshot)) + } + } + }) } /// Set the `CANCELLED` bit and attempt to transition to `Running`. @@ -191,17 +317,11 @@ impl State { if snapshot.is_idle() { snapshot.set_running(); - - if snapshot.is_notified() { - // If the task is idle and notified, this indicates the task is - // in the run queue and is considered owned by the scheduler. - // The shutdown operation claims ownership of the task, which - // means we need to assign an additional ref-count to the task - // in the queue. - snapshot.ref_inc(); - } } + // If the task was not idle, the thread currently running the task + // will notice the cancelled bit and cancel it once the poll + // completes. snapshot.set_cancelled(); Some(snapshot) }); @@ -317,9 +437,39 @@ impl State { /// Returns `true` if the task should be released. pub(super) fn ref_dec(&self) -> bool { let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel)); + assert!(prev.ref_count() >= 1); prev.ref_count() == 1 } + /// Returns `true` if the task should be released. + pub(super) fn ref_dec_twice(&self) -> bool { + let prev = Snapshot(self.val.fetch_sub(2 * REF_ONE, AcqRel)); + assert!(prev.ref_count() >= 2); + prev.ref_count() == 2 + } + + fn fetch_update_action<F, T>(&self, mut f: F) -> T + where + F: FnMut(Snapshot) -> (T, Option<Snapshot>), + { + let mut curr = self.load(); + + loop { + let (output, next) = f(curr); + let next = match next { + Some(next) => next, + None => return output, + }; + + let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire); + + match res { + Ok(_) => return output, + Err(actual) => curr = Snapshot(actual), + } + } + } + fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot> where F: FnMut(Snapshot) -> Option<Snapshot>, @@ -359,6 +509,10 @@ impl Snapshot { self.0 &= !NOTIFIED } + fn set_notified(&mut self) { + self.0 |= NOTIFIED + } + pub(super) fn is_running(self) -> bool { self.0 & RUNNING == RUNNING } @@ -379,10 +533,6 @@ impl Snapshot { self.0 |= CANCELLED; } - fn set_complete(&mut self) { - self.0 |= COMPLETE; - } - /// Returns `true` if the task's future has completed execution. pub(super) fn is_complete(self) -> bool { self.0 & COMPLETE == COMPLETE @@ -421,10 +571,6 @@ impl Snapshot { assert!(self.ref_count() > 0); self.0 -= REF_ONE } - - fn will_need_queueing(self) -> bool { - !self.is_notified() && self.is_idle() - } } impl fmt::Debug for State { |