diff options
author | Stjepan Glavina <stjepang@gmail.com> | 2019-08-12 20:18:51 +0200 |
---|---|---|
committer | Stjepan Glavina <stjepang@gmail.com> | 2019-08-12 20:18:51 +0200 |
commit | 1479e86ca668fa9205d6ef50867d9688ffb4d804 (patch) | |
tree | 6d22c859485aacbf81ffee52150a4675febda75f | |
download | async-task-1479e86ca668fa9205d6ef50867d9688ffb4d804.tar.gz |
Initial commit
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | Cargo.toml | 20 | ||||
-rw-r--r-- | LICENSE-APACHE | 201 | ||||
-rw-r--r-- | LICENSE-MIT | 23 | ||||
-rw-r--r-- | README.md | 21 | ||||
-rw-r--r-- | benches/bench.rs | 43 | ||||
-rw-r--r-- | examples/panic-propagation.rs | 75 | ||||
-rw-r--r-- | examples/panic-result.rs | 74 | ||||
-rw-r--r-- | examples/spawn-on-thread.rs | 55 | ||||
-rw-r--r-- | examples/spawn.rs | 52 | ||||
-rw-r--r-- | examples/task-id.rs | 88 | ||||
-rw-r--r-- | src/header.rs | 158 | ||||
-rw-r--r-- | src/join_handle.rs | 333 | ||||
-rw-r--r-- | src/lib.rs | 149 | ||||
-rw-r--r-- | src/raw.rs | 629 | ||||
-rw-r--r-- | src/state.rs | 65 | ||||
-rw-r--r-- | src/task.rs | 390 | ||||
-rw-r--r-- | src/utils.rs | 48 | ||||
-rw-r--r-- | tests/basic.rs | 314 | ||||
-rw-r--r-- | tests/join.rs | 454 | ||||
-rw-r--r-- | tests/panic.rs | 288 | ||||
-rw-r--r-- | tests/ready.rs | 265 | ||||
-rw-r--r-- | tests/waker_panic.rs | 357 | ||||
-rw-r--r-- | tests/waker_pending.rs | 348 | ||||
-rw-r--r-- | tests/waker_ready.rs | 328 |
25 files changed, 4781 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6936990 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/target +**/*.rs.bk +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..2286f1e --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "async-task" +version = "0.1.0" +authors = ["Stjepan Glavina <stjepang@gmail.com>"] +edition = "2018" +license = "Apache-2.0/MIT" +repository = "https://github.com/stjepang/async-task" +homepage = "https://github.com/stjepang/async-task" +documentation = "https://docs.rs/async-task" +description = "Task abstraction for building executors" +keywords = ["future", "task", "executor", "spawn"] +categories = ["asynchronous", "concurrency"] + +[dependencies] +crossbeam-utils = "0.6.5" + +[dev-dependencies] +crossbeam = "0.7.1" +futures-preview = "0.3.0-alpha.17" +lazy_static = "1.3.0" diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..16fe87b --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..31aa793 --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,23 @@ +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..22fe9f2 --- /dev/null +++ b/README.md @@ -0,0 +1,21 @@ +# async-task + +A task abstraction for building executors. + +This crate makes it possible to build an efficient and extendable executor in few lines of +code. + +## License + +Licensed under either of + + * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) + +at your option. + +#### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall be +dual licensed as above, without any additional terms or conditions. diff --git a/benches/bench.rs b/benches/bench.rs new file mode 100644 index 0000000..6fd7935 --- /dev/null +++ b/benches/bench.rs @@ -0,0 +1,43 @@ +#![feature(async_await, test)] + +extern crate test; + +use futures::channel::oneshot; +use futures::executor; +use futures::future::TryFutureExt; +use test::Bencher; + +#[bench] +fn task_create(b: &mut Bencher) { + b.iter(|| { + async_task::spawn(async {}, drop, ()); + }); +} + +#[bench] +fn task_run(b: &mut Bencher) { + b.iter(|| { + let (task, handle) = async_task::spawn(async {}, drop, ()); + task.run(); + executor::block_on(handle).unwrap(); + }); +} + +#[bench] +fn oneshot_create(b: &mut Bencher) { + b.iter(|| { + let (tx, _rx) = oneshot::channel::<()>(); + let _task = Box::new(async move { tx.send(()).map_err(|_| ()) }); + }); +} + +#[bench] +fn oneshot_run(b: &mut Bencher) { + b.iter(|| { + let (tx, rx) = oneshot::channel::<()>(); + let task = Box::new(async move { tx.send(()).map_err(|_| ()) }); + + let future = task.and_then(|_| rx.map_err(|_| ())); + executor::block_on(future).unwrap(); + }); +} diff --git a/examples/panic-propagation.rs b/examples/panic-propagation.rs new file mode 100644 index 0000000..9c4f081 --- /dev/null +++ b/examples/panic-propagation.rs @@ -0,0 +1,75 @@ +//! A single-threaded executor where join handles propagate panics from tasks. + +#![feature(async_await)] + +use std::future::Future; +use std::panic::{resume_unwind, AssertUnwindSafe}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::thread; + +use crossbeam::channel::{unbounded, Sender}; +use futures::executor; +use futures::future::FutureExt; +use lazy_static::lazy_static; + +/// Spawns a future on the executor. +fn spawn<F, R>(future: F) -> JoinHandle<R> +where + F: Future<Output = R> + Send + 'static, + R: Send + 'static, +{ + lazy_static! { + // A channel that holds scheduled tasks. + static ref QUEUE: Sender<async_task::Task<()>> = { + let (sender, receiver) = unbounded::<async_task::Task<()>>(); + + // Start the executor thread. + thread::spawn(|| { + for task in receiver { + // No need for `catch_unwind()` here because panics are already caught. + task.run(); + } + }); + + sender + }; + } + + // Create a future that catches panics within itself. + let future = AssertUnwindSafe(future).catch_unwind(); + + // Create a task that is scheduled by sending itself into the channel. + let schedule = |t| QUEUE.send(t).unwrap(); + let (task, handle) = async_task::spawn(future, schedule, ()); + + // Schedule the task by sending it into the channel. + task.schedule(); + + // Wrap the handle into one that propagates panics. + JoinHandle(handle) +} + +/// A join handle that propagates panics inside the task. +struct JoinHandle<R>(async_task::JoinHandle<thread::Result<R>, ()>); + +impl<R> Future for JoinHandle<R> { + type Output = Option<R>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match Pin::new(&mut self.0).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(Ok(val))) => Poll::Ready(Some(val)), + Poll::Ready(Some(Err(err))) => resume_unwind(err), + } + } +} + +fn main() { + // Spawn a future that panics and block on it. + let handle = spawn(async { + panic!("Ooops!"); + }); + executor::block_on(handle); +} diff --git a/examples/panic-result.rs b/examples/panic-result.rs new file mode 100644 index 0000000..b1200a3 --- /dev/null +++ b/examples/panic-result.rs @@ -0,0 +1,74 @@ +//! A single-threaded executor where join handles catch panics inside tasks. + +#![feature(async_await)] + +use std::future::Future; +use std::panic::AssertUnwindSafe; +use std::thread; + +use crossbeam::channel::{unbounded, Sender}; +use futures::executor; +use futures::future::FutureExt; +use lazy_static::lazy_static; + +/// Spawns a future on the executor. +fn spawn<F, R>(future: F) -> async_task::JoinHandle<thread::Result<R>, ()> +where + F: Future<Output = R> + Send + 'static, + R: Send + 'static, +{ + lazy_static! { + // A channel that holds scheduled tasks. + static ref QUEUE: Sender<async_task::Task<()>> = { + let (sender, receiver) = unbounded::<async_task::Task<()>>(); + + // Start the executor thread. + thread::spawn(|| { + for task in receiver { + // No need for `catch_unwind()` here because panics are already caught. + task.run(); + } + }); + + sender + }; + } + + // Create a future that catches panics within itself. + let future = AssertUnwindSafe(future).catch_unwind(); + + // Create a task that is scheduled by sending itself into the channel. + let schedule = |t| QUEUE.send(t).unwrap(); + let (task, handle) = async_task::spawn(future, schedule, ()); + + // Schedule the task by sending it into the channel. + task.schedule(); + + handle +} + +fn main() { + // Spawn a future that completes succesfully. + let handle = spawn(async { + println!("Hello, world!"); + }); + + // Block on the future and report its result. + match executor::block_on(handle) { + None => println!("The task was cancelled."), + Some(Ok(val)) => println!("The task completed with {:?}", val), + Some(Err(_)) => println!("The task has panicked"), + } + + // Spawn a future that panics. + let handle = spawn(async { + panic!("Ooops!"); + }); + + // Block on the future and report its result. + match executor::block_on(handle) { + None => println!("The task was cancelled."), + Some(Ok(val)) => println!("The task completed with {:?}", val), + Some(Err(_)) => println!("The task has panicked"), + } +} diff --git a/examples/spawn-on-thread.rs b/examples/spawn-on-thread.rs new file mode 100644 index 0000000..6d5b9a2 --- /dev/null +++ b/examples/spawn-on-thread.rs @@ -0,0 +1,55 @@ +//! A function that runs a future to completion on a dedicated thread. + +#![feature(async_await)] + +use std::future::Future; +use std::sync::Arc; +use std::thread; + +use crossbeam::channel; +use futures::executor; + +/// Spawns a future on a new dedicated thread. +/// +/// The returned handle can be used to await the output of the future. +fn spawn_on_thread<F, R>(future: F) -> async_task::JoinHandle<R, ()> +where + F: Future<Output = R> + Send + 'static, + R: Send + 'static, +{ + // Create a channel that holds the task when it is scheduled for running. + let (sender, receiver) = channel::unbounded(); + let sender = Arc::new(sender); + let s = Arc::downgrade(&sender); + + // Wrap the future into one that disconnects the channel on completion. + let future = async move { + // When the inner future completes, the sender gets dropped and disconnects the channel. + let _sender = sender; + future.await + }; + + // Create a task that is scheduled by sending itself into the channel. + let schedule = move |t| s.upgrade().unwrap().send(t).unwrap(); + let (task, handle) = async_task::spawn(future, schedule, ()); + + // Schedule the task by sending it into the channel. + task.schedule(); + + // Spawn a thread running the task to completion. + thread::spawn(move || { + // Keep taking the task from the channel and running it until completion. + for task in receiver { + task.run(); + } + }); + + handle +} + +fn main() { + // Spawn a future on a dedicated thread. + executor::block_on(spawn_on_thread(async { + println!("Hello, world!"); + })); +} diff --git a/examples/spawn.rs b/examples/spawn.rs new file mode 100644 index 0000000..6e798c0 --- /dev/null +++ b/examples/spawn.rs @@ -0,0 +1,52 @@ +//! A simple single-threaded executor. + +#![feature(async_await)] + +use std::future::Future; +use std::panic::catch_unwind; +use std::thread; + +use crossbeam::channel::{unbounded, Sender}; +use futures::executor; +use lazy_static::lazy_static; + +/// Spawns a future on the executor. +fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, ()> +where + F: Future<Output = R> + Send + 'static, + R: Send + 'static, +{ + lazy_static! { + // A channel that holds scheduled tasks. + static ref QUEUE: Sender<async_task::Task<()>> = { + let (sender, receiver) = unbounded::<async_task::Task<()>>(); + + // Start the executor thread. + thread::spawn(|| { + for task in receiver { + // Ignore panics for simplicity. + let _ignore_panic = catch_unwind(|| task.run()); + } + }); + + sender + }; + } + + // Create a task that is scheduled by sending itself into the channel. + let schedule = |t| QUEUE.send(t).unwrap(); + let (task, handle) = async_task::spawn(future, schedule, ()); + + // Schedule the task by sending it into the channel. + task.schedule(); + + handle +} + +fn main() { + // Spawn a future and await its result. + let handle = spawn(async { + println!("Hello, world!"); + }); + executor::block_on(handle); +} diff --git a/examples/task-id.rs b/examples/task-id.rs new file mode 100644 index 0000000..b3832d0 --- /dev/null +++ b/examples/task-id.rs @@ -0,0 +1,88 @@ +//! An executor that assigns an ID to every spawned task. + +#![feature(async_await)] + +use std::cell::Cell; +use std::future::Future; +use std::panic::catch_unwind; +use std::thread; + +use crossbeam::atomic::AtomicCell; +use crossbeam::channel::{unbounded, Sender}; +use futures::executor; +use lazy_static::lazy_static; + +#[derive(Clone, Copy, Debug)] +struct TaskId(usize); + +thread_local! { + /// The ID of the current task. + static TASK_ID: Cell<Option<TaskId>> = Cell::new(None); +} + +/// Returns the ID of the currently executing task. +/// +/// Returns `None` if called outside the runtime. +fn task_id() -> Option<TaskId> { + TASK_ID.with(|id| id.get()) +} + +/// Spawns a future on the executor. +fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, TaskId> +where + F: Future<Output = R> + Send + 'static, + R: Send + 'static, +{ + lazy_static! { + // A channel that holds scheduled tasks. + static ref QUEUE: Sender<async_task::Task<TaskId>> = { + let (sender, receiver) = unbounded::<async_task::Task<TaskId>>(); + + // Start the executor thread. + thread::spawn(|| { + TASK_ID.with(|id| { + for task in receiver { + // Store the task ID into the thread-local before running. + id.set(Some(*task.tag())); + + // Ignore panics for simplicity. + let _ignore_panic = catch_unwind(|| task.run()); + } + }) + }); + + sender + }; + + // A counter that assigns IDs to spawned tasks. + static ref COUNTER: AtomicCell<usize> = AtomicCell::new(0); + } + + // Reserve an ID for the new task. + let id = TaskId(COUNTER.fetch_add(1)); + + // Create a task that is scheduled by sending itself into the channel. + let schedule = |task| QUEUE.send(task).unwrap(); + let (task, handle) = async_task::spawn(future, schedule, id); + + // Schedule the task by sending it into the channel. + task.schedule(); + + handle +} + +fn main() { + let mut handles = vec![]; + + // Spawn a bunch of tasks. + for _ in 0..10 { + handles.push(spawn(async move { + println!("Hello from task with {:?}", task_id()); + })); + } + + // Wait for the tasks to finish. + for handle in handles { + executor::block_on(handle); + } +} diff --git a/src/header.rs b/src/header.rs new file mode 100644 index 0000000..0ce5164 --- /dev/null +++ b/src/header.rs @@ -0,0 +1,158 @@ +use std::alloc::Layout; +use std::cell::Cell; +use std::fmt; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::task::Waker; + +use crossbeam_utils::Backoff; + +use crate::raw::TaskVTable; +use crate::state::*; +use crate::utils::{abort_on_panic, extend}; + +/// The header of a task. +/// +/// This header is stored right at the beginning of every heap-allocated task. +pub(crate) struct Header { + /// Current state of the task. + /// + /// Contains flags representing the current state and the reference count. + pub(crate) state: AtomicUsize, + + /// The task that is blocked on the `JoinHandle`. + /// + /// This waker needs to be woken once the task completes or is closed. + pub(crate) awaiter: Cell<Option<Waker>>, + + /// The virtual table. + /// + /// In addition to the actual waker virtual table, it also contains pointers to several other + /// methods necessary for bookkeeping the heap-allocated task. + pub(crate) vtable: &'static TaskVTable, +} + +impl Header { + /// Cancels the task. + /// + /// This method will only mark the task as closed and will notify the awaiter, but it won't + /// reschedule the task if it's not completed. + pub(crate) fn cancel(&self) { + let mut state = self.state.load(Ordering::Acquire); + + loop { + // If the task has been completed or closed, it can't be cancelled. + if state & (COMPLETED | CLOSED) != 0 { + break; + } + + // Mark the task as closed. + match self.state.compare_exchange_weak( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Notify the awaiter that the task has been closed. + if state & AWAITER != 0 { + self.notify(); + } + + break; + } + Err(s) => state = s, + } + } + } + + /// Notifies the task blocked on the task. + /// + /// If there is a registered waker, it will be removed from the header and woken. + #[inline] + pub(crate) fn notify(&self) { + if let Some(waker) = self.swap_awaiter(None) { + // We need a safeguard against panics because waking can panic. + abort_on_panic(|| { + waker.wake(); + }); + } + } + + /// Notifies the task blocked on the task unless its waker matches `current`. + /// + /// If there is a registered waker, it will be removed from the header. + #[inline] + pub(crate) fn notify_unless(&self, current: &Waker) { + if let Some(waker) = self.swap_awaiter(None) { + if !waker.will_wake(current) { + // We need a safeguard against panics because waking can panic. + abort_on_panic(|| { + waker.wake(); + }); + } + } + } + + /// Swaps the awaiter and returns the previous value. + #[inline] + pub(crate) fn swap_awaiter(&self, new: Option<Waker>) -> Option<Waker> { + let new_is_none = new.is_none(); + + // We're about to try acquiring the lock in a loop. If it's already being held by another + // thread, we'll have to spin for a while so it's best to employ a backoff strategy. + let backoff = Backoff::new(); + loop { + // Acquire the lock. If we're storing an awaiter, then also set the awaiter flag. + let state = if new_is_none { + self.state.fetch_or(LOCKED, Ordering::Acquire) + } else { + self.state.fetch_or(LOCKED | AWAITER, Ordering::Acquire) + }; + + // If the lock was acquired, break from the loop. + if state & LOCKED == 0 { + break; + } + + // Snooze for a little while because the lock is held by another thread. + backoff.snooze(); + } + + // Replace the awaiter. + let old = self.awaiter.replace(new); + + // Release the lock. If we've cleared the awaiter, then also unset the awaiter flag. + if new_is_none { + self.state.fetch_and(!LOCKED & !AWAITER, Ordering::Release); + } else { + self.state.fetch_and(!LOCKED, Ordering::Release); + } + + old + } + + /// Returns the offset at which the tag of type `T` is stored. + #[inline] + pub(crate) fn offset_tag<T>() -> usize { + let layout_header = Layout::new::<Header>(); + let layout_t = Layout::new::<T>(); + let (_, offset_t) = extend(layout_header, layout_t); + offset_t + } +} + +impl fmt::Debug for Header { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let state = self.state.load(Ordering::SeqCst); + + f.debug_struct("Header") + .field("scheduled", &(state & SCHEDULED != 0)) + .field("running", &(state & RUNNING != 0)) + .field("completed", &(state & COMPLETED != 0)) + .field("closed", &(state & CLOSED != 0)) + .field("awaiter", &(state & AWAITER != 0)) + .field("handle", &(state & HANDLE != 0)) + .field("ref_count", &(state / REFERENCE)) + .finish() + } +} diff --git a/src/join_handle.rs b/src/join_handle.rs new file mode 100644 index 0000000..fb5c275 --- /dev/null +++ b/src/join_handle.rs @@ -0,0 +1,333 @@ +use std::fmt; +use std::future::Future; +use std::marker::{PhantomData, Unpin}; +use std::pin::Pin; +use std::ptr::NonNull; +use std::sync::atomic::Ordering; +use std::task::{Context, Poll}; + +use crate::header::Header; +use crate::state::*; +use crate::utils::abort_on_panic; + +/// A handle that awaits the result of a task. +/// +/// If the task has completed with `value`, the handle returns it as `Some(value)`. If the task was +/// cancelled or has panicked, the handle returns `None`. Otherwise, the handle has to wait until +/// the task completes, panics, or gets cancelled. +/// +/// # Examples +/// +/// ``` +/// #![feature(async_await)] +/// +/// use crossbeam::channel; +/// use futures::executor; +/// +/// // The future inside the task. +/// let future = async { 1 + 2 }; +/// +/// // If the task gets woken, it will be sent into this channel. +/// let (s, r) = channel::unbounded(); +/// let schedule = move |task| s.send(task).unwrap(); +/// +/// // Create a task with the future and the schedule function. +/// let (task, handle) = async_task::spawn(future, schedule, ()); +/// +/// // Run the task. In this example, it will complete after a single run. +/// task.run(); +/// assert!(r.is_empty()); +/// +/// // Await the result of the task. +/// let result = executor::block_on(handle); +/// assert_eq!(result, Some(3)); +/// ``` +pub struct JoinHandle<R, T> { + /// A raw task pointer. + pub(crate) raw_task: NonNull<()>, + + /// A marker capturing the generic type `R`. + pub(crate) _marker: PhantomData<(R, T)>, +} + +unsafe impl<R, T> Send for JoinHandle<R, T> {} +unsafe impl<R, T> Sync for JoinHandle<R, T> {} + +impl<R, T> Unpin for JoinHandle<R, T> {} + +impl<R, T> JoinHandle<R, T> { + /// Cancels the task. + /// + /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt + /// to run it won't do anything. And if it's completed, awaiting its result evaluates to + /// `None`. + /// + /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html + /// + /// # Examples + /// + /// ``` + /// # #![feature(async_await)] + /// use crossbeam::channel; + /// use futures::executor; + /// + /// // The future inside the task. + /// let future = async { 1 + 2 }; + /// + /// // If the task gets woken, it will be sent into this channel. + /// let (s, r) = channel::unbounded(); + /// let schedule = move |task| s.send(task).unwrap(); + /// + /// // Create a task with the future and the schedule function. + /// let (task, handle) = async_task::spawn(future, schedule, ()); + /// + /// // Cancel the task. + /// handle.cancel(); + /// + /// // Running a cancelled task does nothing. + /// task.run(); + /// + /// // Await the result of the task. + /// let result = executor::block_on(handle); + /// assert_eq!(result, None); + /// ``` + pub fn cancel(&self) { + let ptr = self.raw_task.as_ptr(); + let header = ptr as *const Header; + + unsafe { + let mut state = (*header).state.load(Ordering::Acquire); + + loop { + // If the task has been completed or closed, it can't be cancelled. + if state & (COMPLETED | CLOSED) != 0 { + break; + } + + // If the task is not scheduled nor running, we'll need to schedule it. + let new = if state & (SCHEDULED | RUNNING) == 0 { + (state | SCHEDULED | CLOSED) + REFERENCE + } else { + state | CLOSED + }; + + // Mark the task as closed. + match (*header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the task is not scheduled nor running, schedule it so that its future + // gets dropped by the executor. + if state & (SCHEDULED | RUNNING) == 0 { + ((*header).vtable.schedule)(ptr); + } + + // Notify the awaiter that the task has been closed. + if state & AWAITER != 0 { + (*header).notify(); + } + + break; + } + Err(s) => state = s, + } + } + } + } + + /// Returns a reference to the tag stored inside the task. + /// + /// # Examples + /// + /// ``` + /// # #![feature(async_await)] + /// use crossbeam::channel; + /// + /// // The future inside the task. + /// let future = async { 1 + 2 }; + /// + /// // If the task gets woken, it will be sent into this channel. + /// let (s, r) = channel::unbounded(); + /// let schedule = move |task| s.send(task).unwrap(); + /// + /// // Create a task with the future and the schedule function. + /// let (task, handle) = async_task::spawn(future, schedule, "a simple task"); + /// + /// // Access the tag. + /// assert_eq!(*handle.tag(), "a simple task"); + /// ``` + pub fn tag(&self) -> &T { + let offset = Header::offset_tag::<T>(); + let ptr = self.raw_task.as_ptr(); + + unsafe { + let raw = (ptr as *mut u8).add(offset) as *const T; + &*raw + } + } +} + +impl<R, T> Drop for JoinHandle<R, T> { + fn drop(&mut self) { + let ptr = self.raw_task.as_ptr(); + let header = ptr as *const Header; + + // A place where the output will be stored in case it needs to be dropped. + let mut output = None; + + unsafe { + // Optimistically assume the `JoinHandle` is being dropped just after creating the + // task. This is a common case so if the handle is not used, the overhead of it is only + // one compare-exchange operation. + if let Err(mut state) = (*header).state.compare_exchange_weak( + SCHEDULED | HANDLE | REFERENCE, + SCHEDULED | REFERENCE, + Ordering::AcqRel, + Ordering::Acquire, + ) { + loop { + // If the task has been completed but not yet closed, that means its output + // must be dropped. + if state & COMPLETED != 0 && state & CLOSED == 0 { + // Mark the task as closed in order to grab its output. + match (*header).state.compare_exchange_weak( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Read the output. + output = + Some((((*header).vtable.get_output)(ptr) as *mut R).read()); + + // Update the state variable because we're continuing the loop. + state |= CLOSED; + } + Err(s) => state = s, + } + } else { + // If this is the last reference to task and it's not closed, then close + // it and schedule one more time so that its future gets dropped by the + // executor. + let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { + SCHEDULED | CLOSED | REFERENCE + } else { + state & !HANDLE + }; + + // Unset the handle flag. + match (*header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If this is the last reference to the task, we need to either + // schedule dropping its future or destroy it. + if state & !(REFERENCE - 1) == 0 { + if state & CLOSED == 0 { + ((*header).vtable.schedule)(ptr); + } else { + ((*header).vtable.destroy)(ptr); + } + } + + break; + } + Err(s) => state = s, + } + } + } + } + } + + // Drop the output if it was taken out of the task. + drop(output); + } +} + +impl<R, T> Future for JoinHandle<R, T> { + type Output = Option<R>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let ptr = self.raw_task.as_ptr(); + let header = ptr as *const Header; + + unsafe { + let mut state = (*header).state.load(Ordering::Acquire); + + loop { + // If the task has been closed, notify the awaiter and return `None`. + if state & CLOSED != 0 { + // Even though the awaiter is most likely the current task, it could also be + // another task. + (*header).notify_unless(cx.waker()); + return Poll::Ready(None); + } + + // If the task is not completed, register the current task. + if state & COMPLETED == 0 { + // Replace the waker with one associated with the current task. We need a + // safeguard against panics because dropping the previous waker can panic. + abort_on_panic(|| { + (*header).swap_awaiter(Some(cx.waker().clone())); + }); + + // Reload the state after registering. It is possible that the task became + // completed or closed just before registration so we need to check for that. + state = (*header).state.load(Ordering::Acquire); + + // If the task has been closed, notify the awaiter and return `None`. + if state & CLOSED != 0 { + // Even though the awaiter is most likely the current task, it could also + // be another task. + (*header).notify_unless(cx.waker()); + return Poll::Ready(None); + } + + // If the task is still not completed, we're blocked on it. + if state & COMPLETED == 0 { + return Poll::Pending; + } + } + + // Since the task is now completed, mark it as closed in order to grab its output. + match (*header).state.compare_exchange( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Notify the awaiter. Even though the awaiter is most likely the current + // task, it could also be another task. + if state & AWAITER != 0 { + (*header).notify_unless(cx.waker()); + } + + // Take the output from the task. + let output = ((*header).vtable.get_output)(ptr) as *mut R; + return Poll::Ready(Some(output.read())); + } + Err(s) => state = s, + } + } + } + } +} + +impl<R, T> fmt::Debug for JoinHandle<R, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let ptr = self.raw_task.as_ptr(); + let header = ptr as *const Header; + + f.debug_struct("JoinHandle") + .field("header", unsafe { &(*header) }) + .finish() + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..5518515 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,149 @@ +//! Task abstraction for building executors. +//! +//! # What is an executor? +//! +//! An async block creates a future and an async function returns one. But futures don't do +//! anything unless they are awaited inside other async blocks or async functions. So the question +//! arises: who or what awaits the main future that awaits others? +//! +//! One solution is to call [`block_on()`] on the main future, which will block +//! the current thread and keep polling the future until it completes. But sometimes we don't want +//! to block the current thread and would prefer to *spawn* the future to let a background thread +//! block on it instead. +//! +//! This is where executors step in - they create a number of threads (typically equal to the +//! number of CPU cores on the system) that are dedicated to polling spawned futures. Each executor +//! thread keeps polling spawned futures in a loop and only blocks when all spawned futures are +//! either sleeping or running. +//! +//! # What is a task? +//! +//! In order to spawn a future on an executor, one needs to allocate the future on the heap and +//! keep some state alongside it, like whether the future is ready for polling, waiting to be woken +//! up, or completed. This allocation is usually called a *task*. +//! +//! The executor then runs the spawned task by polling its future. If the future is pending on a +//! resource, a [`Waker`] associated with the task will be registered somewhere so that the task +//! can be woken up and run again at a later time. +//! +//! For example, if the future wants to read something from a TCP socket that is not ready yet, the +//! networking system will clone the task's waker and wake it up once the socket becomes ready. +//! +//! # Task construction +//! +//! A task is constructed with [`Task::create()`]: +//! +//! ``` +//! # #![feature(async_await)] +//! let future = async { 1 + 2 }; +//! let schedule = |task| unimplemented!(); +//! +//! let (task, handle) = async_task::spawn(future, schedule, ()); +//! ``` +//! +//! The first argument to the constructor, `()` in this example, is an arbitrary piece of data +//! called a *tag*. This can be a task identifier, a task name, task-local storage, or something +//! of similar nature. +//! +//! The second argument is the future that gets polled when the task is run. +//! +//! The third argument is the schedule function, which is called every time when the task gets +//! woken up. This function should push the received task into some kind of queue of runnable +//! tasks. +//! +//! The constructor returns a runnable [`Task`] and a [`JoinHandle`] that can await the result of +//! the future. +//! +//! # Task scheduling +//! +//! TODO +//! +//! # Join handles +//! +//! TODO +//! +//! # Cancellation +//! +//! TODO +//! +//! # Performance +//! +//! TODO: explain single allocation, etc. +//! +//! Task [construction] incurs a single allocation only. The [`Task`] can then be run and its +//! result awaited through the [`JoinHandle`]. When woken, the task gets automatically rescheduled. +//! It's also possible to cancel the task so that it stops running and can't be awaited anymore. +//! +//! [construction]: struct.Task.html#method.create +//! [`JoinHandle`]: struct.JoinHandle.html +//! [`Task`]: struct.Task.html +//! [`Future`]: https://doc.rust-lang.org/nightly/std/future/trait.Future.html +//! [`Waker`]: https://doc.rust-lang.org/nightly/std/task/struct.Waker.html +//! [`block_on()`]: https://docs.rs/futures-preview/*/futures/executor/fn.block_on.html +//! +//! # Examples +//! +//! A simple single-threaded executor: +//! +//! ``` +//! # #![feature(async_await)] +//! use std::future::Future; +//! use std::panic::catch_unwind; +//! use std::thread; +//! +//! use async_task::{JoinHandle, Task}; +//! use crossbeam::channel::{unbounded, Sender}; +//! use futures::executor; +//! use lazy_static::lazy_static; +//! +//! /// Spawns a future on the executor. +//! fn spawn<F, R>(future: F) -> JoinHandle<R, ()> +//! where +//! F: Future<Output = R> + Send + 'static, +//! R: Send + 'static, +//! { +//! lazy_static! { +//! // A channel that holds scheduled tasks. +//! static ref QUEUE: Sender<Task<()>> = { +//! let (sender, receiver) = unbounded::<Task<()>>(); +//! +//! // Start the executor thread. +//! thread::spawn(|| { +//! for task in receiver { +//! // Ignore panics for simplicity. +//! let _ignore_panic = catch_unwind(|| task.run()); +//! } +//! }); +//! +//! sender +//! }; +//! } +//! +//! // Create a task that is scheduled by sending itself into the channel. +//! let schedule = |t| QUEUE.send(t).unwrap(); +//! let (task, handle) = async_task::spawn(future, schedule, ()); +//! +//! // Schedule the task by sending it into the channel. +//! task.schedule(); +//! +//! handle +//! } +//! +//! // Spawn a future and await its result. +//! let handle = spawn(async { +//! println!("Hello, world!"); +//! }); +//! executor::block_on(handle); +//! ``` + +#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] + +mod header; +mod join_handle; +mod raw; +mod state; +mod task; +mod utils; + +pub use crate::join_handle::JoinHandle; +pub use crate::task::{spawn, Task}; diff --git a/src/raw.rs b/src/raw.rs new file mode 100644 index 0000000..6928427 --- /dev/null +++ b/src/raw.rs @@ -0,0 +1,629 @@ +use std::alloc::{self, Layout}; +use std::cell::Cell; +use std::future::Future; +use std::marker::PhantomData; +use std::mem::{self, ManuallyDrop}; +use std::pin::Pin; +use std::ptr::NonNull; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + +use crate::header::Header; +use crate::state::*; +use crate::utils::{abort_on_panic, extend}; +use crate::Task; + +/// The vtable for a task. +pub(crate) struct TaskVTable { + /// The raw waker vtable. + pub(crate) raw_waker: RawWakerVTable, + + /// Schedules the task. + pub(crate) schedule: unsafe fn(*const ()), + + /// Drops the future inside the task. + pub(crate) drop_future: unsafe fn(*const ()), + + /// Returns a pointer to the output stored after completion. + pub(crate) get_output: unsafe fn(*const ()) -> *const (), + + /// Drops a waker or a task. + pub(crate) decrement: unsafe fn(ptr: *const ()), + + /// Destroys the task. + pub(crate) destroy: unsafe fn(*const ()), + + /// Runs the task. + pub(crate) run: unsafe fn(*const ()), +} + +/// Memory layout of a task. +/// +/// This struct contains the information on: +/// +/// 1. How to allocate and deallocate the task. +/// 2. How to access the fields inside the task. +#[derive(Clone, Copy)] +pub(crate) struct TaskLayout { + /// Memory layout of the whole task. + pub(crate) layout: Layout, + + /// Offset into the task at which the tag is stored. + pub(crate) offset_t: usize, + + /// Offset into the task at which the schedule function is stored. + pub(crate) offset_s: usize, + + /// Offset into the task at which the future is stored. + pub(crate) offset_f: usize, + + /// Offset into the task at which the output is stored. + pub(crate) offset_r: usize, +} + +/// Raw pointers to the fields of a task. +pub(crate) struct RawTask<F, R, S, T> { + /// The task header. + pub(crate) header: *const Header, + + /// The schedule function. + pub(crate) schedule: *const S, + + /// The tag inside the task. + pub(crate) tag: *mut T, + + /// The future. + pub(crate) future: *mut F, + + /// The output of the future. + pub(crate) output: *mut R, +} + +impl<F, R, S, T> Copy for RawTask<F, R, S, T> {} + +impl<F, R, S, T> Clone for RawTask<F, R, S, T> { + fn clone(&self) -> Self { + Self { + header: self.header, + schedule: self.schedule, + tag: self.tag, + future: self.future, + output: self.output, + } + } +} + +impl<F, R, S, T> RawTask<F, R, S, T> +where + F: Future<Output = R> + Send + 'static, + R: Send + 'static, + S: Fn(Task<T>) + Send + Sync + 'static, + T: Send + 'static, +{ + /// Allocates a task with the given `future` and `schedule` function. + /// + /// It is assumed there are initially only the `Task` reference and the `JoinHandle`. + pub(crate) fn allocate(tag: T, future: F, schedule: S) -> NonNull<()> { + // Compute the layout of the task for allocation. Abort if the computation fails. + let task_layout = abort_on_panic(|| Self::task_layout()); + + unsafe { + // Allocate enough space for the entire task. + let raw_task = match NonNull::new(alloc::alloc(task_layout.layout) as *mut ()) { + None => std::process::abort(), + Some(p) => p, + }; + + let raw = Self::from_ptr(raw_task.as_ptr()); + + // Write the header as the first field of the task. + (raw.header as *mut Header).write(Header { + state: AtomicUsize::new(SCHEDULED | HANDLE | REFERENCE), + awaiter: Cell::new(None), + vtable: &TaskVTable { + raw_waker: RawWakerVTable::new( + Self::clone_waker, + Self::wake, + Self::wake_by_ref, + Self::decrement, + ), + schedule: Self::schedule, + drop_future: Self::drop_future, + get_output: Self::get_output, + decrement: Self::decrement, + destroy: Self::destroy, + run: Self::run, + }, + }); + + // Write the tag as the second field of the task. + (raw.tag as *mut T).write(tag); + + // Write the schedule function as the third field of the task. + (raw.schedule as *mut S).write(schedule); + + // Write the future as the fourth field of the task. + raw.future.write(future); + + raw_task + } + } + + /// Creates a `RawTask` from a raw task pointer. + #[inline] + pub(crate) fn from_ptr(ptr: *const ()) -> Self { + let task_layout = Self::task_layout(); + let p = ptr as *const u8; + + unsafe { + Self { + header: p as *const Header, + tag: p.add(task_layout.offset_t) as *mut T, + schedule: p.add(task_layout.offset_s) as *const S, + future: p.add(task_layout.offset_f) as *mut F, + output: p.add(task_layout.offset_r) as *mut R, + } + } + } + + /// Returns the memory layout for a task. + #[inline] + fn task_layout() -> TaskLayout { + // Compute the layouts for `Header`, `T`, `S`, `F`, and `R`. + let layout_header = Layout::new::<Header>(); + let layout_t = Layout::new::<T>(); + let layout_s = Layout::new::<S>(); + let layout_f = Layout::new::<F>(); + let layout_r = Layout::new::<R>(); + + // Compute the layout for `union { F, R }`. + let size_union = layout_f.size().max(layout_r.size()); + let align_union = layout_f.align().max(layout_r.align()); + let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) }; + + // Compute the layout for `Header` followed by `T`, then `S`, then `union { F, R }`. + let layout = layout_header; + let (layout, offset_t) = extend(layout, layout_t); + let (layout, offset_s) = extend(layout, layout_s); + let (layout, offset_union) = extend(layout, layout_union); + let offset_f = offset_union; + let offset_r = offset_union; + + TaskLayout { + layout, + offset_t, + offset_s, + offset_f, + offset_r, + } + } + + /// Wakes a waker. + unsafe fn wake(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + let mut state = (*raw.header).state.load(Ordering::Acquire); + + loop { + // If the task is completed or closed, it can't be woken. + if state & (COMPLETED | CLOSED) != 0 { + // Drop the waker. + Self::decrement(ptr); + break; + } + + // If the task is already scheduled, we just need to synchronize with the thread that + // will run the task by "publishing" our current view of the memory. + if state & SCHEDULED != 0 { + // Update the state without actually modifying it. + match (*raw.header).state.compare_exchange_weak( + state, + state, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Drop the waker. + Self::decrement(ptr); + break; + } + Err(s) => state = s, + } + } else { + // Mark the task as scheduled. + match (*raw.header).state.compare_exchange_weak( + state, + state | SCHEDULED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the task is not yet scheduled and isn't currently running, now is the + // time to schedule it. + if state & (SCHEDULED | RUNNING) == 0 { + // Schedule the task. + let task = Task { + raw_task: NonNull::new_unchecked(ptr as *mut ()), + _marker: PhantomData, + }; + (*raw.schedule)(task); + } else { + // Drop the waker. + Self::decrement(ptr); + } + + break; + } + Err(s) => state = s, + } + } + } + } + + /// Wakes a waker by reference. + unsafe fn wake_by_ref(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + let mut state = (*raw.header).state.load(Ordering::Acquire); + + loop { + // If the task is completed or closed, it can't be woken. + if state & (COMPLETED | CLOSED) != 0 { + break; + } + + // If the task is already scheduled, we just need to synchronize with the thread that + // will run the task by "publishing" our current view of the memory. + if state & SCHEDULED != 0 { + // Update the state without actually modifying it. + match (*raw.header).state.compare_exchange_weak( + state, + state, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(s) => state = s, + } + } else { + // If the task is not scheduled nor running, we'll need to schedule after waking. + let new = if state & (SCHEDULED | RUNNING) == 0 { + (state | SCHEDULED) + REFERENCE + } else { + state | SCHEDULED + }; + + // Mark the task as scheduled. + match (*raw.header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the task is not scheduled nor running, now is the time to schedule. + if state & (SCHEDULED | RUNNING) == 0 { + // If the reference count overflowed, abort. + if state > isize::max_value() as usize { + std::process::abort(); + } + + // Schedule the task. + let task = Task { + raw_task: NonNull::new_unchecked(ptr as *mut ()), + _marker: PhantomData, + }; + (*raw.schedule)(task); + } + + break; + } + Err(s) => state = s, + } + } + } + } + + /// Clones a waker. + unsafe fn clone_waker(ptr: *const ()) -> RawWaker { + let raw = Self::from_ptr(ptr); + let raw_waker = &(*raw.header).vtable.raw_waker; + + // Increment the reference count. With any kind of reference-counted data structure, + // relaxed ordering is fine when the reference is being cloned. + let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed); + + // If the reference count overflowed, abort. + if state > isize::max_value() as usize { + std::process::abort(); + } + + RawWaker::new(ptr, raw_waker) + } + + /// Drops a waker or a task. + /// + /// This function will decrement the reference count. If it drops down to zero and the + /// associated join handle has been dropped too, then the task gets destroyed. + #[inline] + unsafe fn decrement(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + // Decrement the reference count. + let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; + + // If this was the last reference to the task and the `JoinHandle` has been dropped as + // well, then destroy task. + if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 { + Self::destroy(ptr); + } + } + + /// Schedules a task for running. + /// + /// This function doesn't modify the state of the task. It only passes the task reference to + /// its schedule function. + unsafe fn schedule(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + (*raw.schedule)(Task { + raw_task: NonNull::new_unchecked(ptr as *mut ()), + _marker: PhantomData, + }); + } + + /// Drops the future inside a task. + #[inline] + unsafe fn drop_future(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + // We need a safeguard against panics because the destructor can panic. + abort_on_panic(|| { + raw.future.drop_in_place(); + }) + } + + /// Returns a pointer to the output inside a task. + unsafe fn get_output(ptr: *const ()) -> *const () { + let raw = Self::from_ptr(ptr); + raw.output as *const () + } + + /// Cleans up task's resources and deallocates it. + /// + /// If the task has not been closed, then its future or the output will be dropped. The + /// schedule function and the tag get dropped too. + #[inline] + unsafe fn destroy(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + let task_layout = Self::task_layout(); + + // We need a safeguard against panics because destructors can panic. + abort_on_panic(|| { + // Drop the schedule function. + (raw.schedule as *mut S).drop_in_place(); + + // Drop the tag. + (raw.tag as *mut T).drop_in_place(); + }); + + // Finally, deallocate the memory reserved by the task. + alloc::dealloc(ptr as *mut u8, task_layout.layout); + } + + /// Runs a task. + /// + /// If polling its future panics, the task will be closed and the panic propagated into the + /// caller. + unsafe fn run(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + // Create a context from the raw task pointer and the vtable inside the its header. + let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new( + ptr, + &(*raw.header).vtable.raw_waker, + ))); + let cx = &mut Context::from_waker(&waker); + + let mut state = (*raw.header).state.load(Ordering::Acquire); + + // Update the task's state before polling its future. + loop { + // If the task has been closed, drop the task reference and return. + if state & CLOSED != 0 { + // Notify the awaiter that the task has been closed. + if state & AWAITER != 0 { + (*raw.header).notify(); + } + + // Drop the future. + Self::drop_future(ptr); + + // Drop the task reference. + Self::decrement(ptr); + return; + } + + // Mark the task as unscheduled and running. + match (*raw.header).state.compare_exchange_weak( + state, + (state & !SCHEDULED) | RUNNING, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Update the state because we're continuing with polling the future. + state = (state & !SCHEDULED) | RUNNING; + break; + } + Err(s) => state = s, + } + } + + // Poll the inner future, but surround it with a guard that closes the task in case polling + // panics. + let guard = Guard(raw); + let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx); + mem::forget(guard); + + match poll { + Poll::Ready(out) => { + // Replace the future with its output. + Self::drop_future(ptr); + raw.output.write(out); + + // A place where the output will be stored in case it needs to be dropped. + let mut output = None; + + // The task is now completed. + loop { + // If the handle is dropped, we'll need to close it and drop the output. + let new = if state & HANDLE == 0 { + (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED + } else { + (state & !RUNNING & !SCHEDULED) | COMPLETED + }; + + // Mark the task as not running and completed. + match (*raw.header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the handle is dropped or if the task was closed while running, + // now it's time to drop the output. + if state & HANDLE == 0 || state & CLOSED != 0 { + // Read the output. + output = Some(raw.output.read()); + } + + // Notify the awaiter that the task has been completed. + if state & AWAITER != 0 { + (*raw.header).notify(); + } + + // Drop the task reference. + Self::decrement(ptr); + break; + } + Err(s) => state = s, + } + } + + // Drop the output if it was taken out of the task. + drop(output); + } + Poll::Pending => { + // The task is still not completed. + loop { + // If the task was closed while running, we'll need to unschedule in case it + // was woken and then clean up its resources. + let new = if state & CLOSED != 0 { + state & !RUNNING & !SCHEDULED + } else { + state & !RUNNING + }; + + // Mark the task as not running. + match (*raw.header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(state) => { + // If the task was closed while running, we need to drop its future. + // If the task was woken while running, we need to schedule it. + // Otherwise, we just drop the task reference. + if state & CLOSED != 0 { + // The thread that closed the task didn't drop the future because + // it was running so now it's our responsibility to do so. + Self::drop_future(ptr); + + // Drop the task reference. + Self::decrement(ptr); + } else if state & SCHEDULED != 0 { + // The thread that has woken the task didn't reschedule it because + // it was running so now it's our responsibility to do so. + Self::schedule(ptr); + } else { + // Drop the task reference. + Self::decrement(ptr); + } + break; + } + Err(s) => state = s, + } + } + } + } + + /// A guard that closes the task if polling its future panics. + struct Guard<F, R, S, T>(RawTask<F, R, S, T>) + where + F: Future<Output = R> + Send + 'static, + R: Send + 'static, + S: Fn(Task<T>) + Send + Sync + 'static, + T: Send + 'static; + + impl<F, R, S, T> Drop for Guard<F, R, S, T> + where + F: Future<Output = R> + Send + 'static, + R: Send + 'static, + S: Fn(Task<T>) + Send + Sync + 'static, + T: Send + 'static, + { + fn drop(&mut self) { + let raw = self.0; + let ptr = raw.header as *const (); + + unsafe { + let mut state = (*raw.header).state.load(Ordering::Acquire); + + loop { + // If the task was closed while running, then unschedule it, drop its + // future, and drop the task reference. + if state & CLOSED != 0 { + // We still need to unschedule the task because it is possible it was + // woken while running. + (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); + + // The thread that closed the task didn't drop the future because it + // was running so now it's our responsibility to do so. + RawTask::<F, R, S, T>::drop_future(ptr); + + // Drop the task reference. + RawTask::<F, R, S, T>::decrement(ptr); + break; + } + + // Mark the task as not running, not scheduled, and closed. + match (*raw.header).state.compare_exchange_weak( + state, + (state & !RUNNING & !SCHEDULED) | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(state) => { + // Drop the future because the task is now closed. + RawTask::<F, R, S, T>::drop_future(ptr); + + // Notify the awaiter that the task has been closed. + if state & AWAITER != 0 { + (*raw.header).notify(); + } + + // Drop the task reference. + RawTask::<F, R, S, T>::decrement(ptr); + break; + } + Err(s) => state = s, + } + } + } + } + } + } +} diff --git a/src/state.rs b/src/state.rs new file mode 100644 index 0000000..d6ce34f --- /dev/null +++ b/src/state.rs @@ -0,0 +1,65 @@ +/// Set if the task is scheduled for running. +/// +/// A task is considered to be scheduled whenever its `Task` reference exists. It is in scheduled +/// state at the moment of creation and when it gets unapused either by its `JoinHandle` or woken +/// by a `Waker`. +/// +/// This flag can't be set when the task is completed. However, it can be set while the task is +/// running, in which case it will be rescheduled as soon as polling finishes. +pub(crate) const SCHEDULED: usize = 1 << 0; + +/// Set if the task is running. +/// +/// A task is running state while its future is being polled. +/// +/// This flag can't be set when the task is completed. However, it can be in scheduled state while +/// it is running, in which case it will be rescheduled when it stops being polled. +pub(crate) const RUNNING: usize = 1 << 1; + +/// Set if the task has been completed. +/// +/// This flag is set when polling returns `Poll::Ready`. The output of the future is then stored +/// inside the task until it becomes stopped. In fact, `JoinHandle` picks the output up by marking +/// the task as stopped. +/// +/// This flag can't be set when the task is scheduled or completed. +pub(crate) const COMPLETED: usize = 1 << 2; + +/// Set if the task is closed. +/// +/// If a task is closed, that means its either cancelled or its output has been consumed by the +/// `JoinHandle`. A task becomes closed when: +/// +/// 1. It gets cancelled by `Task::cancel()` or `JoinHandle::cancel()`. +/// 2. Its output is awaited by the `JoinHandle`. +/// 3. It panics while polling the future. +/// 4. It is completed and the `JoinHandle` is dropped. +pub(crate) const CLOSED: usize = 1 << 3; + +/// Set if the `JoinHandle` still exists. +/// +/// The `JoinHandle` is a special case in that it is only tracked by this flag, while all other +/// task references (`Task` and `Waker`s) are tracked by the reference count. +pub(crate) const HANDLE: usize = 1 << 4; + +/// Set if the `JoinHandle` is awaiting the output. +/// +/// This flag is set while there is a registered awaiter of type `Waker` inside the task. When the +/// task gets closed or completed, we need to wake the awaiter. This flag can be used as a fast +/// check that tells us if we need to wake anyone without acquiring the lock inside the task. +pub(crate) const AWAITER: usize = 1 << 5; + +/// Set if the awaiter is locked. +/// +/// This lock is acquired before a new awaiter is registered or the existing one is woken. +pub(crate) const LOCKED: usize = 1 << 6; + +/// A single reference. +/// +/// The lower bits in the state contain various flags representing the task state, while the upper +/// bits contain the reference count. The value of `REFERENCE` represents a single reference in the +/// total reference count. +/// +/// Note that the reference counter only tracks the `Task` and `Waker`s. The `JoinHandle` is +/// tracked separately by the `HANDLE` flag. +pub(crate) const REFERENCE: usize = 1 << 7; diff --git a/src/task.rs b/src/task.rs new file mode 100644 index 0000000..8bfc164 --- /dev/null +++ b/src/task.rs @@ -0,0 +1,390 @@ +use std::fmt; +use std::future::Future; +use std::marker::PhantomData; +use std::mem; +use std::ptr::NonNull; + +use crate::header::Header; +use crate::raw::RawTask; +use crate::JoinHandle; + +/// Creates a new task. +/// +/// This constructor returns a `Task` reference that runs the future and a [`JoinHandle`] that +/// awaits its result. +/// +/// The `tag` is stored inside the allocated task. +/// +/// When run, the task polls `future`. When woken, it gets scheduled for running by the +/// `schedule` function. +/// +/// # Examples +/// +/// ``` +/// # #![feature(async_await)] +/// use crossbeam::channel; +/// +/// // The future inside the task. +/// let future = async { +/// println!("Hello, world!"); +/// }; +/// +/// // If the task gets woken, it will be sent into this channel. +/// let (s, r) = channel::unbounded(); +/// let schedule = move |task| s.send(task).unwrap(); +/// +/// // Create a task with the future and the schedule function. +/// let (task, handle) = async_task::spawn(future, schedule, ()); +/// ``` +/// +/// [`JoinHandle`]: struct.JoinHandle.html +pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>) +where + F: Future<Output = R> + Send + 'static, + R: Send + 'static, + S: Fn(Task<T>) + Send + Sync + 'static, + T: Send + Sync + 'static, +{ + let raw_task = RawTask::<F, R, S, T>::allocate(tag, future, schedule); + let task = Task { + raw_task, + _marker: PhantomData, + }; + let handle = JoinHandle { + raw_task, + _marker: PhantomData, + }; + (task, handle) +} + +/// A task that runs a future. +/// +/// # Construction +/// +/// A task is a heap-allocated structure containing: +/// +/// * A reference counter. +/// * The state of the task. +/// * Arbitrary piece of data called a *tag*. +/// * A function that schedules the task when woken. +/// * A future or its result if polling has completed. +/// +/// Constructor [`Task::create()`] returns a [`Task`] and a [`JoinHandle`]. Those two references +/// are like two sides of the task: one runs the future and the other awaits its result. +/// +/// # Behavior +/// +/// The [`Task`] reference "owns" the task itself and is used to [run] it. Running consumes the +/// [`Task`] reference and polls its internal future. If the future is still pending after being +/// polled, the [`Task`] reference will be recreated when woken by a [`Waker`]. If the future +/// completes, its result becomes available to the [`JoinHandle`]. +/// +/// The [`JoinHandle`] is a [`Future`] that awaits the result of the task. +/// +/// When the task is woken, its [`Task`] reference is recreated and passed to the schedule function +/// provided during construction. In most executors, scheduling simply pushes the [`Task`] into a +/// queue of runnable tasks. +/// +/// If the [`Task`] reference is dropped without being run, the task is cancelled. +/// +/// Both [`Task`] and [`JoinHandle`] have methods that cancel the task. When cancelled, the task +/// won't be scheduled again even if a [`Waker`] wakes it or the [`JoinHandle`] is polled. An +/// attempt to run a cancelled task won't do anything. And if the cancelled task has already +/// completed, awaiting its result through [`JoinHandle`] will return `None`. +/// +/// If polling the task's future panics, it gets cancelled automatically. +/// +/// # Task states +/// +/// A task can be in the following states: +/// +/// * Sleeping: The [`Task`] reference doesn't exist and is waiting to be scheduled by a [`Waker`]. +/// * Scheduled: The [`Task`] reference exists and is waiting to be [run]. +/// * Completed: The [`Task`] reference doesn't exist anymore and can't be rescheduled, but its +/// result is available to the [`JoinHandle`]. +/// * Cancelled: The [`Task`] reference may or may not exist, but running it does nothing and +/// awaiting the [`JoinHandle`] returns `None`. +/// +/// When constructed, the task is initially in the scheduled state. +/// +/// # Destruction +/// +/// The future inside the task gets dropped in the following cases: +/// +/// * When [`Task`] is dropped. +/// * When [`Task`] is run to completion. +/// +/// If the future hasn't been dropped and the last [`Waker`] or [`JoinHandle`] is dropped, or if +/// a [`JoinHandle`] cancels the task, then the task will be scheduled one last time so that its +/// future gets dropped by the executor. In other words, the task's future can be dropped only by +/// [`Task`]. +/// +/// When the task completes, the result of its future is stored inside the allocation. This result +/// is taken out when the [`JoinHandle`] awaits it. When the task is cancelled or the +/// [`JoinHandle`] is dropped without being awaited, the result gets dropped too. +/// +/// The task gets deallocated when all references to it are dropped, which includes the [`Task`], +/// the [`JoinHandle`], and any associated [`Waker`]s. +/// +/// The tag inside the task and the schedule function get dropped at the time of deallocation. +/// +/// # Panics +/// +/// If polling the inner future inside [`run()`] panics, the panic will be propagated into +/// the caller. Likewise, a panic inside the task result's destructor will be propagated. All other +/// panics result in the process being aborted. +/// +/// More precisely, the process is aborted if a panic occurs: +/// +/// * Inside the schedule function. +/// * While dropping the tag. +/// * While dropping the future. +/// * While dropping the schedule function. +/// * While waking the task awaiting the [`JoinHandle`]. +/// +/// [`run()`]: struct.Task.html#method.run +/// [run]: struct.Task.html#method.run +/// [`JoinHandle`]: struct.JoinHandle.html +/// [`Task`]: struct.Task.html +/// [`Task::create()`]: struct.Task.html#method.create +/// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html +/// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html +/// +/// # Examples +/// +/// ``` +/// # #![feature(async_await)] +/// use async_task::Task; +/// use crossbeam::channel; +/// use futures::executor; +/// +/// // The future inside the task. +/// let future = async { +/// println!("Hello, world!"); +/// }; +/// +/// // If the task gets woken, it will be sent into this channel. +/// let (s, r) = channel::unbounded(); +/// let schedule = move |task| s.send(task).unwrap(); +/// +/// // Create a task with the future and the schedule function. +/// let (task, handle) = async_task::spawn(future, schedule, ()); +/// +/// // Run the task. In this example, it will complete after a single run. +/// task.run(); +/// assert!(r.is_empty()); +/// +/// // Await its result. +/// executor::block_on(handle); +/// ``` +pub struct Task<T> { + /// A pointer to the heap-allocated task. + pub(crate) raw_task: NonNull<()>, + + /// A marker capturing the generic type `T`. + pub(crate) _marker: PhantomData<T>, +} + +unsafe impl<T> Send for Task<T> {} +unsafe impl<T> Sync for Task<T> {} + +impl<T> Task<T> { + /// Schedules the task. + /// + /// This is a convenience method that simply reschedules the task by passing it to its schedule + /// function. + /// + /// If the task is cancelled, this method won't do anything. + /// + /// # Examples + /// + /// ``` + /// # #![feature(async_await)] + /// use crossbeam::channel; + /// + /// // The future inside the task. + /// let future = async { + /// println!("Hello, world!"); + /// }; + /// + /// // If the task gets woken, it will be sent into this channel. + /// let (s, r) = channel::unbounded(); + /// let schedule = move |task| s.send(task).unwrap(); + /// + /// // Create a task with the future and the schedule function. + /// let (task, handle) = async_task::spawn(future, schedule, ()); + /// + /// // Send the task into the channel. + /// task.schedule(); + /// + /// // Retrieve the task back from the channel. + /// let task = r.recv().unwrap(); + /// ``` + pub fn schedule(self) { + let ptr = self.raw_task.as_ptr(); + let header = ptr as *const Header; + mem::forget(self); + + unsafe { + ((*header).vtable.schedule)(ptr); + } + } + + /// Runs the task. + /// + /// This method polls the task's future. If the future completes, its result will become + /// available to the [`JoinHandle`]. And if the future is still pending, the task will have to + /// be woken in order to be rescheduled and then run again. + /// + /// If the task is cancelled, running it won't do anything. + /// + /// # Panics + /// + /// It is possible that polling the future panics, in which case the panic will be propagated + /// into the caller. It is advised that invocations of this method are wrapped inside + /// [`catch_unwind`]. + /// + /// If a panic occurs, the task is automatically cancelled. + /// + /// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html + /// + /// # Examples + /// + /// ``` + /// # #![feature(async_await)] + /// use crossbeam::channel; + /// use futures::executor; + /// + /// // The future inside the task. + /// let future = async { 1 + 2 }; + /// + /// // If the task gets woken, it will be sent into this channel. + /// let (s, r) = channel::unbounded(); + /// let schedule = move |task| s.send(task).unwrap(); + /// + /// // Create a task with the future and the schedule function. + /// let (task, handle) = async_task::spawn(future, schedule, ()); + /// + /// // Run the task. In this example, it will complete after a single run. + /// task.run(); + /// assert!(r.is_empty()); + /// + /// // Await the result of the task. + /// let result = executor::block_on(handle); + /// assert_eq!(result, Some(3)); + /// ``` + pub fn run(self) { + let ptr = self.raw_task.as_ptr(); + let header = ptr as *const Header; + mem::forget(self); + + unsafe { + ((*header).vtable.run)(ptr); + } + } + + /// Cancels the task. + /// + /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt + /// to run it won't do anything. And if it's completed, awaiting its result evaluates to + /// `None`. + /// + /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html + /// + /// # Examples + /// + /// ``` + /// # #![feature(async_await)] + /// use crossbeam::channel; + /// use futures::executor; + /// + /// // The future inside the task. + /// let future = async { 1 + 2 }; + /// + /// // If the task gets woken, it will be sent into this channel. + /// let (s, r) = channel::unbounded(); + /// let schedule = move |task| s.send(task).unwrap(); + /// + /// // Create a task with the future and the schedule function. + /// let (task, handle) = async_task::spawn(future, schedule, ()); + /// + /// // Cancel the task. + /// task.cancel(); + /// + /// // Running a cancelled task does nothing. + /// task.run(); + /// + /// // Await the result of the task. + /// let result = executor::block_on(handle); + /// assert_eq!(result, None); + /// ``` + pub fn cancel(&self) { + let ptr = self.raw_task.as_ptr(); + let header = ptr as *const Header; + + unsafe { + (*header).cancel(); + } + } + + /// Returns a reference to the tag stored inside the task. + /// + /// # Examples + /// + /// ``` + /// # #![feature(async_await)] + /// use crossbeam::channel; + /// + /// // The future inside the task. + /// let future = async { 1 + 2 }; + /// + /// // If the task gets woken, it will be sent into this channel. + /// let (s, r) = channel::unbounded(); + /// let schedule = move |task| s.send(task).unwrap(); + /// + /// // Create a task with the future and the schedule function. + /// let (task, handle) = async_task::spawn(future, schedule, "a simple task"); + /// + /// // Access the tag. + /// assert_eq!(*task.tag(), "a simple task"); + /// ``` + pub fn tag(&self) -> &T { + let offset = Header::offset_tag::<T>(); + let ptr = self.raw_task.as_ptr(); + + unsafe { + let raw = (ptr as *mut u8).add(offset) as *const T; + &*raw + } + } +} + +impl<T> Drop for Task<T> { + fn drop(&mut self) { + let ptr = self.raw_task.as_ptr(); + let header = ptr as *const Header; + + unsafe { + // Cancel the task. + (*header).cancel(); + + // Drop the future. + ((*header).vtable.drop_future)(ptr); + + // Drop the task reference. + ((*header).vtable.decrement)(ptr); + } + } +} + +impl<T: fmt::Debug> fmt::Debug for Task<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let ptr = self.raw_task.as_ptr(); + let header = ptr as *const Header; + + f.debug_struct("Task") + .field("header", unsafe { &(*header) }) + .field("tag", self.tag()) + .finish() + } +} diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..441ead1 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,48 @@ +use std::alloc::Layout; +use std::mem; + +/// Calls a function and aborts if it panics. +/// +/// This is useful in unsafe code where we can't recover from panics. +#[inline] +pub(crate) fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T { + struct Bomb; + + impl Drop for Bomb { + fn drop(&mut self) { + std::process::abort(); + } + } + + let bomb = Bomb; + let t = f(); + mem::forget(bomb); + t +} + +/// Returns the layout for `a` followed by `b` and the offset of `b`. +/// +/// This function was adapted from the currently unstable `Layout::extend()`: +/// https://doc.rust-lang.org/nightly/std/alloc/struct.Layout.html#method.extend +#[inline] +pub(crate) fn extend(a: Layout, b: Layout) -> (Layout, usize) { + let new_align = a.align().max(b.align()); + let pad = padding_needed_for(a, b.align()); + + let offset = a.size().checked_add(pad).unwrap(); + let new_size = offset.checked_add(b.size()).unwrap(); + + let layout = Layout::from_size_align(new_size, new_align).unwrap(); + (layout, offset) +} + +/// Returns the padding after `layout` that aligns the following address to `align`. +/// +/// This function was adapted from the currently unstable `Layout::padding_needed_for()`: +/// https://doc.rust-lang.org/nightly/std/alloc/struct.Layout.html#method.padding_needed_for +#[inline] +pub(crate) fn padding_needed_for(layout: Layout, align: usize) -> usize { + let len = layout.size(); + let len_rounded_up = len.wrapping_add(align).wrapping_sub(1) & !align.wrapping_sub(1); + len_rounded_up.wrapping_sub(len) +} diff --git a/tests/basic.rs b/tests/basic.rs new file mode 100644 index 0000000..b9e181b --- /dev/null +++ b/tests/basic.rs @@ -0,0 +1,314 @@ +#![feature(async_await)] + +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::task::{Context, Poll}; + +use async_task::Task; +use crossbeam::atomic::AtomicCell; +use crossbeam::channel; +use futures::future; +use lazy_static::lazy_static; + +// Creates a future with event counters. +// +// Usage: `future!(f, POLL, DROP)` +// +// The future `f` always returns `Poll::Ready`. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! future { + ($name:pat, $poll:ident, $drop:ident) => { + lazy_static! { + static ref $poll: AtomicCell<usize> = AtomicCell::new(0); + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + } + + let $name = { + struct Fut(Box<i32>); + + impl Future for Fut { + type Output = Box<i32>; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + $poll.fetch_add(1); + Poll::Ready(Box::new(0)) + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + Fut(Box::new(0)) + }; + }; +} + +// Creates a schedule function with event counters. +// +// Usage: `schedule!(s, SCHED, DROP)` +// +// The schedule function `s` does nothing. +// When it gets invoked, `SCHED` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! schedule { + ($name:pat, $sched:ident, $drop:ident) => { + lazy_static! { + static ref $sched: AtomicCell<usize> = AtomicCell::new(0); + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + } + + let $name = { + struct Guard(Box<i32>); + + impl Drop for Guard { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + let guard = Guard(Box::new(0)); + move |_task| { + &guard; + $sched.fetch_add(1); + } + }; + }; +} + +// Creates a task with event counters. +// +// Usage: `task!(task, handle f, s, DROP)` +// +// A task with future `f` and schedule function `s` is created. +// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively. +// When the tag inside the task gets dropped, `DROP` is incremented. +macro_rules! task { + ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => { + lazy_static! { + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + } + + let ($task, $handle) = { + struct Tag(Box<i32>); + + impl Drop for Tag { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + async_task::spawn($future, $schedule, Tag(Box::new(0))) + }; + }; +} + +#[test] +fn cancel_and_drop_handle() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + + task.cancel(); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + + drop(handle); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + + drop(task); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); +} + +#[test] +fn run_and_drop_handle() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + drop(handle); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + + task.run(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); +} + +#[test] +fn drop_handle_and_run() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + drop(handle); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + + task.run(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); +} + +#[test] +fn cancel_and_run() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + handle.cancel(); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + + drop(handle); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + + task.run(); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); +} + +#[test] +fn run_and_cancel() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + task.run(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + + handle.cancel(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + + drop(handle); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); +} + +#[test] +fn schedule() { + let (s, r) = channel::unbounded(); + let schedule = move |t| s.send(t).unwrap(); + let (task, _handle) = async_task::spawn( + future::poll_fn(|_| Poll::<()>::Pending), + schedule, + Box::new(0), + ); + + assert!(r.is_empty()); + task.schedule(); + + let task = r.recv().unwrap(); + assert!(r.is_empty()); + task.schedule(); + + let task = r.recv().unwrap(); + assert!(r.is_empty()); + task.schedule(); + + r.recv().unwrap(); +} + +#[test] +fn tag() { + let (s, r) = channel::unbounded(); + let schedule = move |t| s.send(t).unwrap(); + let (task, handle) = async_task::spawn( + future::poll_fn(|_| Poll::<()>::Pending), + schedule, + AtomicUsize::new(7), + ); + + assert!(r.is_empty()); + task.schedule(); + + let task = r.recv().unwrap(); + assert!(r.is_empty()); + handle.tag().fetch_add(1, Ordering::SeqCst); + task.schedule(); + + let task = r.recv().unwrap(); + assert_eq!(task.tag().load(Ordering::SeqCst), 8); + assert!(r.is_empty()); + task.schedule(); + + r.recv().unwrap(); +} + +#[test] +fn schedule_counter() { + let (s, r) = channel::unbounded(); + let schedule = move |t: Task<AtomicUsize>| { + t.tag().fetch_add(1, Ordering::SeqCst); + s.send(t).unwrap(); + }; + let (task, handle) = async_task::spawn( + future::poll_fn(|_| Poll::<()>::Pending), + schedule, + AtomicUsize::new(0), + ); + task.schedule(); + + assert_eq!(handle.tag().load(Ordering::SeqCst), 1); + r.recv().unwrap().schedule(); + + assert_eq!(handle.tag().load(Ordering::SeqCst), 2); + r.recv().unwrap().schedule(); + + assert_eq!(handle.tag().load(Ordering::SeqCst), 3); + r.recv().unwrap(); +} diff --git a/tests/join.rs b/tests/join.rs new file mode 100644 index 0000000..e082939 --- /dev/null +++ b/tests/join.rs @@ -0,0 +1,454 @@ +#![feature(async_await)] + +use std::cell::Cell; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; + +use async_task::Task; +use crossbeam::atomic::AtomicCell; +use futures::executor::block_on; +use futures::future; +use lazy_static::lazy_static; + +// Creates a future with event counters. +// +// Usage: `future!(f, POLL, DROP_F, DROP_O)` +// +// The future `f` outputs `Poll::Ready`. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP_F` is incremented. +// When the output gets dropped, `DROP_O` is incremented. +macro_rules! future { + ($name:pat, $poll:ident, $drop_f:ident, $drop_o:ident) => { + lazy_static! { + static ref $poll: AtomicCell<usize> = AtomicCell::new(0); + static ref $drop_f: AtomicCell<usize> = AtomicCell::new(0); + static ref $drop_o: AtomicCell<usize> = AtomicCell::new(0); + } + + let $name = { + struct Fut(Box<i32>); + + impl Future for Fut { + type Output = Out; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + $poll.fetch_add(1); + Poll::Ready(Out(Box::new(0))) + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop_f.fetch_add(1); + } + } + + struct Out(Box<i32>); + + impl Drop for Out { + fn drop(&mut self) { + $drop_o.fetch_add(1); + } + } + + Fut(Box::new(0)) + }; + }; +} + +// Creates a schedule function with event counters. +// +// Usage: `schedule!(s, SCHED, DROP)` +// +// The schedule function `s` does nothing. +// When it gets invoked, `SCHED` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! schedule { + ($name:pat, $sched:ident, $drop:ident) => { + lazy_static! { + static ref $sched: AtomicCell<usize> = AtomicCell::new(0); + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + } + + let $name = { + struct Guard(Box<i32>); + + impl Drop for Guard { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + let guard = Guard(Box::new(0)); + move |task: Task<_>| { + &guard; + task.schedule(); + $sched.fetch_add(1); + } + }; + }; +} + +// Creates a task with event counters. +// +// Usage: `task!(task, handle f, s, DROP)` +// +// A task with future `f` and schedule function `s` is created. +// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively. +// When the tag inside the task gets dropped, `DROP` is incremented. +macro_rules! task { + ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => { + lazy_static! { + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + } + + let ($task, $handle) = { + struct Tag(Box<i32>); + + impl Drop for Tag { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + async_task::spawn($future, $schedule, Tag(Box::new(0))) + }; + }; +} + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn cancel_and_join() { + future!(f, POLL, DROP_F, DROP_O); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + assert_eq!(DROP_O.load(), 0); + + task.cancel(); + drop(task); + assert_eq!(DROP_O.load(), 0); + + assert!(block_on(handle).is_none()); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(DROP_O.load(), 0); +} + +#[test] +fn run_and_join() { + future!(f, POLL, DROP_F, DROP_O); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + assert_eq!(DROP_O.load(), 0); + + task.run(); + assert_eq!(DROP_O.load(), 0); + + assert!(block_on(handle).is_some()); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(DROP_O.load(), 1); +} + +#[test] +fn drop_handle_and_run() { + future!(f, POLL, DROP_F, DROP_O); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + assert_eq!(DROP_O.load(), 0); + + drop(handle); + assert_eq!(DROP_O.load(), 0); + + task.run(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(DROP_O.load(), 1); +} + +#[test] +fn join_twice() { + future!(f, POLL, DROP_F, DROP_O); + schedule!(s, SCHEDULE, DROP_S); + task!(task, mut handle, f, s, DROP_D); + + assert_eq!(DROP_O.load(), 0); + + task.run(); + assert_eq!(DROP_O.load(), 0); + + assert!(block_on(&mut handle).is_some()); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(DROP_O.load(), 1); + + assert!(block_on(&mut handle).is_none()); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(DROP_O.load(), 1); + + drop(handle); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); +} + +#[test] +fn join_and_cancel() { + future!(f, POLL, DROP_F, DROP_O); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(100)); + + task.cancel(); + drop(task); + + thread::sleep(ms(200)); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_O.load(), 0); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + }); + + assert!(block_on(handle).is_none()); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + + thread::sleep(ms(100)); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_O.load(), 0); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + }) + .unwrap(); +} + +#[test] +fn join_and_run() { + future!(f, POLL, DROP_F, DROP_O); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(200)); + + task.run(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + + thread::sleep(ms(100)); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + }); + + assert!(block_on(handle).is_some()); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_O.load(), 1); + + thread::sleep(ms(100)); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + }) + .unwrap(); +} + +#[test] +fn try_join_and_run_and_join() { + future!(f, POLL, DROP_F, DROP_O); + schedule!(s, SCHEDULE, DROP_S); + task!(task, mut handle, f, s, DROP_D); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(200)); + + task.run(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + + thread::sleep(ms(100)); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + }); + + block_on(future::select(&mut handle, future::ready(()))); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(DROP_O.load(), 0); + + assert!(block_on(handle).is_some()); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_O.load(), 1); + + thread::sleep(ms(100)); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + }) + .unwrap(); +} + +#[test] +fn try_join_and_cancel_and_run() { + future!(f, POLL, DROP_F, DROP_O); + schedule!(s, SCHEDULE, DROP_S); + task!(task, mut handle, f, s, DROP_D); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(200)); + + task.run(); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + }); + + block_on(future::select(&mut handle, future::ready(()))); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(DROP_O.load(), 0); + + handle.cancel(); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(DROP_O.load(), 0); + + drop(handle); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(DROP_O.load(), 0); + }) + .unwrap(); +} + +#[test] +fn try_join_and_run_and_cancel() { + future!(f, POLL, DROP_F, DROP_O); + schedule!(s, SCHEDULE, DROP_S); + task!(task, mut handle, f, s, DROP_D); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(200)); + + task.run(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + }); + + block_on(future::select(&mut handle, future::ready(()))); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(DROP_O.load(), 0); + + thread::sleep(ms(400)); + + handle.cancel(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(DROP_O.load(), 0); + + drop(handle); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(DROP_O.load(), 1); + }) + .unwrap(); +} + +#[test] +fn await_output() { + struct Fut<T>(Cell<Option<T>>); + + impl<T> Fut<T> { + fn new(t: T) -> Fut<T> { + Fut(Cell::new(Some(t))) + } + } + + impl<T> Future for Fut<T> { + type Output = T; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + Poll::Ready(self.0.take().unwrap()) + } + } + + for i in 0..10 { + let (task, handle) = async_task::spawn(Fut::new(i), drop, Box::new(0)); + task.run(); + assert_eq!(block_on(handle), Some(i)); + } + + for i in 0..10 { + let (task, handle) = async_task::spawn(Fut::new(vec![7; i]), drop, Box::new(0)); + task.run(); + assert_eq!(block_on(handle), Some(vec![7; i])); + } + + let (task, handle) = async_task::spawn(Fut::new("foo".to_string()), drop, Box::new(0)); + task.run(); + assert_eq!(block_on(handle), Some("foo".to_string())); +} diff --git a/tests/panic.rs b/tests/panic.rs new file mode 100644 index 0000000..68058a2 --- /dev/null +++ b/tests/panic.rs @@ -0,0 +1,288 @@ +#![feature(async_await)] + +use std::future::Future; +use std::panic::catch_unwind; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; + +use async_task::Task; +use crossbeam::atomic::AtomicCell; +use futures::executor::block_on; +use futures::future; +use lazy_static::lazy_static; + +// Creates a future with event counters. +// +// Usage: `future!(f, POLL, DROP)` +// +// The future `f` sleeps for 200 ms and then panics. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! future { + ($name:pat, $poll:ident, $drop:ident) => { + lazy_static! { + static ref $poll: AtomicCell<usize> = AtomicCell::new(0); + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + } + + let $name = { + struct Fut(Box<i32>); + + impl Future for Fut { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + $poll.fetch_add(1); + thread::sleep(ms(200)); + panic!() + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + Fut(Box::new(0)) + }; + }; +} + +// Creates a schedule function with event counters. +// +// Usage: `schedule!(s, SCHED, DROP)` +// +// The schedule function `s` does nothing. +// When it gets invoked, `SCHED` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! schedule { + ($name:pat, $sched:ident, $drop:ident) => { + lazy_static! { + static ref $sched: AtomicCell<usize> = AtomicCell::new(0); + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + } + + let $name = { + struct Guard(Box<i32>); + + impl Drop for Guard { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + let guard = Guard(Box::new(0)); + move |_task: Task<_>| { + &guard; + $sched.fetch_add(1); + } + }; + }; +} + +// Creates a task with event counters. +// +// Usage: `task!(task, handle f, s, DROP)` +// +// A task with future `f` and schedule function `s` is created. +// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively. +// When the tag inside the task gets dropped, `DROP` is incremented. +macro_rules! task { + ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => { + lazy_static! { + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + } + + let ($task, $handle) = { + struct Tag(Box<i32>); + + impl Drop for Tag { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + async_task::spawn($future, $schedule, Tag(Box::new(0))) + }; + }; +} + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn cancel_during_run() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + assert!(catch_unwind(|| task.run()).is_err()); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + }); + + thread::sleep(ms(100)); + + handle.cancel(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + + drop(handle); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + }) + .unwrap(); +} + +#[test] +fn run_and_join() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + assert!(catch_unwind(|| task.run()).is_err()); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + + assert!(block_on(handle).is_none()); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); +} + +#[test] +fn try_join_and_run_and_join() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + task!(task, mut handle, f, s, DROP_D); + + block_on(future::select(&mut handle, future::ready(()))); + assert_eq!(POLL.load(), 0); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + + assert!(catch_unwind(|| task.run()).is_err()); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + + assert!(block_on(handle).is_none()); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); +} + +#[test] +fn join_during_run() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + assert!(catch_unwind(|| task.run()).is_err()); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + + thread::sleep(ms(100)); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + }); + + thread::sleep(ms(100)); + + assert!(block_on(handle).is_none()); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + + thread::sleep(ms(100)); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + }) + .unwrap(); +} + +#[test] +fn try_join_during_run() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + task!(task, mut handle, f, s, DROP_D); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + assert!(catch_unwind(|| task.run()).is_err()); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + }); + + thread::sleep(ms(100)); + + block_on(future::select(&mut handle, future::ready(()))); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + drop(handle); + }) + .unwrap(); +} + +#[test] +fn drop_handle_during_run() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + assert!(catch_unwind(|| task.run()).is_err()); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + }); + + thread::sleep(ms(100)); + + drop(handle); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + }) + .unwrap(); +} diff --git a/tests/ready.rs b/tests/ready.rs new file mode 100644 index 0000000..ecca328 --- /dev/null +++ b/tests/ready.rs @@ -0,0 +1,265 @@ +#![feature(async_await)] + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; + +use async_task::Task; +use crossbeam::atomic::AtomicCell; +use futures::executor::block_on; +use futures::future; +use lazy_static::lazy_static; + +// Creates a future with event counters. +// +// Usage: `future!(f, POLL, DROP_F, DROP_O)` +// +// The future `f` sleeps for 200 ms and outputs `Poll::Ready`. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP_F` is incremented. +// When the output gets dropped, `DROP_O` is incremented. +macro_rules! future { + ($name:pat, $poll:ident, $drop_f:ident, $drop_o:ident) => { + lazy_static! { + static ref $poll: AtomicCell<usize> = AtomicCell::new(0); + static ref $drop_f: AtomicCell<usize> = AtomicCell::new(0); + static ref $drop_o: AtomicCell<usize> = AtomicCell::new(0); + } + + let $name = { + struct Fut(Box<i32>); + + impl Future for Fut { + type Output = Out; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + $poll.fetch_add(1); + thread::sleep(ms(200)); + Poll::Ready(Out(Box::new(0))) + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop_f.fetch_add(1); + } + } + + struct Out(Box<i32>); + + impl Drop for Out { + fn drop(&mut self) { + $drop_o.fetch_add(1); + } + } + + Fut(Box::new(0)) + }; + }; +} + +// Creates a schedule function with event counters. +// +// Usage: `schedule!(s, SCHED, DROP)` +// +// The schedule function `s` does nothing. +// When it gets invoked, `SCHED` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! schedule { + ($name:pat, $sched:ident, $drop:ident) => { + lazy_static! { + static ref $sched: AtomicCell<usize> = AtomicCell::new(0); + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + } + + let $name = { + struct Guard(Box<i32>); + + impl Drop for Guard { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + let guard = Guard(Box::new(0)); + move |_task: Task<_>| { + &guard; + $sched.fetch_add(1); + } + }; + }; +} + +// Creates a task with event counters. +// +// Usage: `task!(task, handle f, s, DROP)` +// +// A task with future `f` and schedule function `s` is created. +// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively. +// When the tag inside the task gets dropped, `DROP` is incremented. +macro_rules! task { + ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => { + lazy_static! { + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + } + + let ($task, $handle) = { + struct Tag(Box<i32>); + + impl Drop for Tag { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + async_task::spawn($future, $schedule, Tag(Box::new(0))) + }; + }; +} + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn cancel_during_run() { + future!(f, POLL, DROP_F, DROP_O); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + task.run(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(DROP_O.load(), 1); + }); + + thread::sleep(ms(100)); + + handle.cancel(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(DROP_O.load(), 0); + + thread::sleep(ms(200)); + + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(DROP_O.load(), 1); + + drop(handle); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(DROP_O.load(), 1); + }) + .unwrap(); +} + +#[test] +fn join_during_run() { + future!(f, POLL, DROP_F, DROP_O); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + task.run(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + + thread::sleep(ms(100)); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + }); + + thread::sleep(ms(100)); + + assert!(block_on(handle).is_some()); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_O.load(), 1); + + thread::sleep(ms(100)); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + }) + .unwrap(); +} + +#[test] +fn try_join_during_run() { + future!(f, POLL, DROP_F, DROP_O); + schedule!(s, SCHEDULE, DROP_S); + task!(task, mut handle, f, s, DROP_D); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + task.run(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(DROP_O.load(), 1); + }); + + thread::sleep(ms(100)); + + block_on(future::select(&mut handle, future::ready(()))); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(DROP_O.load(), 0); + drop(handle); + }) + .unwrap(); +} + +#[test] +fn drop_handle_during_run() { + future!(f, POLL, DROP_F, DROP_O); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + task.run(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(DROP_O.load(), 1); + }); + + thread::sleep(ms(100)); + + drop(handle); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(DROP_O.load(), 0); + }) + .unwrap(); +} diff --git a/tests/waker_panic.rs b/tests/waker_panic.rs new file mode 100644 index 0000000..a683f26 --- /dev/null +++ b/tests/waker_panic.rs @@ -0,0 +1,357 @@ +#![feature(async_await)] + +use std::cell::Cell; +use std::future::Future; +use std::panic::catch_unwind; +use std::pin::Pin; +use std::task::Waker; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; + +use async_task::Task; +use crossbeam::atomic::AtomicCell; +use crossbeam::channel; +use lazy_static::lazy_static; + +// Creates a future with event counters. +// +// Usage: `future!(f, waker, POLL, DROP)` +// +// The future `f` always sleeps for 200 ms, and panics the second time it is polled. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP` is incremented. +// +// Every time the future is run, it stores the waker into a global variable. +// This waker can be extracted using the `waker` function. +macro_rules! future { + ($name:pat, $waker:pat, $poll:ident, $drop:ident) => { + lazy_static! { + static ref $poll: AtomicCell<usize> = AtomicCell::new(0); + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + static ref WAKER: AtomicCell<Option<Waker>> = AtomicCell::new(None); + } + + let ($name, $waker) = { + struct Fut(Cell<bool>, Box<i32>); + + impl Future for Fut { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + WAKER.store(Some(cx.waker().clone())); + $poll.fetch_add(1); + thread::sleep(ms(200)); + + if self.0.get() { + panic!() + } else { + self.0.set(true); + Poll::Pending + } + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + (Fut(Cell::new(false), Box::new(0)), || { + WAKER.swap(None).unwrap() + }) + }; + }; +} + +// Creates a schedule function with event counters. +// +// Usage: `schedule!(s, chan, SCHED, DROP)` +// +// The schedule function `s` pushes the task into `chan`. +// When it gets invoked, `SCHED` is incremented. +// When it gets dropped, `DROP` is incremented. +// +// Receiver `chan` extracts the task when it is scheduled. +macro_rules! schedule { + ($name:pat, $chan:pat, $sched:ident, $drop:ident) => { + lazy_static! { + static ref $sched: AtomicCell<usize> = AtomicCell::new(0); + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + } + + let ($name, $chan) = { + let (s, r) = channel::unbounded(); + + struct Guard(Box<i32>); + + impl Drop for Guard { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + let guard = Guard(Box::new(0)); + let sched = move |task: Task<_>| { + &guard; + $sched.fetch_add(1); + s.send(task).unwrap(); + }; + + (sched, r) + }; + }; +} + +// Creates a task with event counters. +// +// Usage: `task!(task, handle f, s, DROP)` +// +// A task with future `f` and schedule function `s` is created. +// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively. +// When the tag inside the task gets dropped, `DROP` is incremented. +macro_rules! task { + ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => { + lazy_static! { + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + } + + let ($task, $handle) = { + struct Tag(Box<i32>); + + impl Drop for Tag { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + async_task::spawn($future, $schedule, Tag(Box::new(0))) + }; + }; +} + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn wake_during_run() { + future!(f, waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + task.run(); + let w = waker(); + w.wake_by_ref(); + let task = chan.recv().unwrap(); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + assert!(catch_unwind(|| task.run()).is_err()); + drop(waker()); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); + }); + + thread::sleep(ms(100)); + + w.wake(); + drop(handle); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + thread::sleep(ms(200)); + + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); + }) + .unwrap(); +} + +#[test] +fn cancel_during_run() { + future!(f, waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + task.run(); + let w = waker(); + w.wake(); + let task = chan.recv().unwrap(); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + assert!(catch_unwind(|| task.run()).is_err()); + drop(waker()); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); + }); + + thread::sleep(ms(100)); + + handle.cancel(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + drop(handle); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + thread::sleep(ms(200)); + + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); + }) + .unwrap(); +} + +#[test] +fn wake_and_cancel_during_run() { + future!(f, waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + task.run(); + let w = waker(); + w.wake_by_ref(); + let task = chan.recv().unwrap(); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + assert!(catch_unwind(|| task.run()).is_err()); + drop(waker()); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); + }); + + thread::sleep(ms(100)); + + w.wake(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + handle.cancel(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + drop(handle); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + thread::sleep(ms(200)); + + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); + }) + .unwrap(); +} + +#[test] +fn cancel_and_wake_during_run() { + future!(f, waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + task.run(); + let w = waker(); + w.wake_by_ref(); + let task = chan.recv().unwrap(); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + assert!(catch_unwind(|| task.run()).is_err()); + drop(waker()); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); + }); + + thread::sleep(ms(100)); + + handle.cancel(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + drop(handle); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + w.wake(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + thread::sleep(ms(200)); + + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); + }) + .unwrap(); +} diff --git a/tests/waker_pending.rs b/tests/waker_pending.rs new file mode 100644 index 0000000..547ff7a --- /dev/null +++ b/tests/waker_pending.rs @@ -0,0 +1,348 @@ +#![feature(async_await)] + +use std::future::Future; +use std::pin::Pin; +use std::task::Waker; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; + +use async_task::Task; +use crossbeam::atomic::AtomicCell; +use crossbeam::channel; +use lazy_static::lazy_static; + +// Creates a future with event counters. +// +// Usage: `future!(f, waker, POLL, DROP)` +// +// The future `f` always sleeps for 200 ms and returns `Poll::Pending`. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP` is incremented. +// +// Every time the future is run, it stores the waker into a global variable. +// This waker can be extracted using the `waker` function. +macro_rules! future { + ($name:pat, $waker:pat, $poll:ident, $drop:ident) => { + lazy_static! { + static ref $poll: AtomicCell<usize> = AtomicCell::new(0); + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + static ref WAKER: AtomicCell<Option<Waker>> = AtomicCell::new(None); + } + + let ($name, $waker) = { + struct Fut(Box<i32>); + + impl Future for Fut { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + WAKER.store(Some(cx.waker().clone())); + $poll.fetch_add(1); + thread::sleep(ms(200)); + Poll::Pending + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + (Fut(Box::new(0)), || WAKER.swap(None).unwrap()) + }; + }; +} + +// Creates a schedule function with event counters. +// +// Usage: `schedule!(s, chan, SCHED, DROP)` +// +// The schedule function `s` pushes the task into `chan`. +// When it gets invoked, `SCHED` is incremented. +// When it gets dropped, `DROP` is incremented. +// +// Receiver `chan` extracts the task when it is scheduled. +macro_rules! schedule { + ($name:pat, $chan:pat, $sched:ident, $drop:ident) => { + lazy_static! { + static ref $sched: AtomicCell<usize> = AtomicCell::new(0); + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + } + + let ($name, $chan) = { + let (s, r) = channel::unbounded(); + + struct Guard(Box<i32>); + + impl Drop for Guard { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + let guard = Guard(Box::new(0)); + let sched = move |task: Task<_>| { + &guard; + $sched.fetch_add(1); + s.send(task).unwrap(); + }; + + (sched, r) + }; + }; +} + +// Creates a task with event counters. +// +// Usage: `task!(task, handle f, s, DROP)` +// +// A task with future `f` and schedule function `s` is created. +// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively. +// When the tag inside the task gets dropped, `DROP` is incremented. +macro_rules! task { + ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => { + lazy_static! { + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + } + + let ($task, $handle) = { + struct Tag(Box<i32>); + + impl Drop for Tag { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + async_task::spawn($future, $schedule, Tag(Box::new(0))) + }; + }; +} + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn wake_during_run() { + future!(f, waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + task!(task, _handle, f, s, DROP_D); + + task.run(); + let w = waker(); + w.wake_by_ref(); + let task = chan.recv().unwrap(); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + task.run(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 2); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 1); + }); + + thread::sleep(ms(100)); + + w.wake_by_ref(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + thread::sleep(ms(200)); + + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 2); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 1); + }) + .unwrap(); + + chan.recv().unwrap(); + drop(waker()); +} + +#[test] +fn cancel_during_run() { + future!(f, waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + task.run(); + let w = waker(); + w.wake(); + let task = chan.recv().unwrap(); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + task.run(); + drop(waker()); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); + }); + + thread::sleep(ms(100)); + + handle.cancel(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + drop(handle); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + thread::sleep(ms(200)); + + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); + }) + .unwrap(); +} + +#[test] +fn wake_and_cancel_during_run() { + future!(f, waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + task.run(); + let w = waker(); + w.wake_by_ref(); + let task = chan.recv().unwrap(); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + task.run(); + drop(waker()); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); + }); + + thread::sleep(ms(100)); + + w.wake(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + handle.cancel(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + drop(handle); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + thread::sleep(ms(200)); + + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); + }) + .unwrap(); +} + +#[test] +fn cancel_and_wake_during_run() { + future!(f, waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + task.run(); + let w = waker(); + w.wake_by_ref(); + let task = chan.recv().unwrap(); + + crossbeam::scope(|scope| { + scope.spawn(|_| { + task.run(); + drop(waker()); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); + }); + + thread::sleep(ms(100)); + + handle.cancel(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + drop(handle); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + w.wake(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + thread::sleep(ms(200)); + + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); + }) + .unwrap(); +} diff --git a/tests/waker_ready.rs b/tests/waker_ready.rs new file mode 100644 index 0000000..e64cc55 --- /dev/null +++ b/tests/waker_ready.rs @@ -0,0 +1,328 @@ +#![feature(async_await)] + +use std::cell::Cell; +use std::future::Future; +use std::pin::Pin; +use std::task::Waker; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; + +use async_task::Task; +use crossbeam::atomic::AtomicCell; +use crossbeam::channel; +use lazy_static::lazy_static; + +// Creates a future with event counters. +// +// Usage: `future!(f, waker, POLL, DROP)` +// +// The future `f` always sleeps for 200 ms, and returns `Poll::Ready` the second time it is polled. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP` is incremented. +// +// Every time the future is run, it stores the waker into a global variable. +// This waker can be extracted using the `waker` function. +macro_rules! future { + ($name:pat, $waker:pat, $poll:ident, $drop:ident) => { + lazy_static! { + static ref $poll: AtomicCell<usize> = AtomicCell::new(0); + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + static ref WAKER: AtomicCell<Option<Waker>> = AtomicCell::new(None); + } + + let ($name, $waker) = { + struct Fut(Cell<bool>, Box<i32>); + + impl Future for Fut { + type Output = Box<i32>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + WAKER.store(Some(cx.waker().clone())); + $poll.fetch_add(1); + thread::sleep(ms(200)); + + if self.0.get() { + Poll::Ready(Box::new(0)) + } else { + self.0.set(true); + Poll::Pending + } + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + (Fut(Cell::new(false), Box::new(0)), || { + WAKER.swap(None).unwrap() + }) + }; + }; +} + +// Creates a schedule function with event counters. +// +// Usage: `schedule!(s, chan, SCHED, DROP)` +// +// The schedule function `s` pushes the task into `chan`. +// When it gets invoked, `SCHED` is incremented. +// When it gets dropped, `DROP` is incremented. +// +// Receiver `chan` extracts the task when it is scheduled. +macro_rules! schedule { + ($name:pat, $chan:pat, $sched:ident, $drop:ident) => { + lazy_static! { + static ref $sched: AtomicCell<usize> = AtomicCell::new(0); + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + } + + let ($name, $chan) = { + let (s, r) = channel::unbounded(); + + struct Guard(Box<i32>); + + impl Drop for Guard { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + let guard = Guard(Box::new(0)); + let sched = move |task: Task<_>| { + &guard; + $sched.fetch_add(1); + s.send(task).unwrap(); + }; + + (sched, r) + }; + }; +} + +// Creates a task with event counters. +// +// Usage: `task!(task, handle f, s, DROP)` +// +// A task with future `f` and schedule function `s` is created. +// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively. +// When the tag inside the task gets dropped, `DROP` is incremented. +macro_rules! task { + ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => { + lazy_static! { + static ref $drop: AtomicCell<usize> = AtomicCell::new(0); + } + + let ($task, $handle) = { + struct Tag(Box<i32>); + + impl Drop for Tag { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + async_task::spawn($future, $schedule, Tag(Box::new(0))) + }; + }; +} + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn wake() { + future!(f, waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + task!(mut task, _, f, s, DROP_D); + + assert!(chan.is_empty()); + + task.run(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + waker().wake(); + task = chan.recv().unwrap(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + task.run(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + waker().wake(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); +} + +#[test] +fn wake_by_ref() { + future!(f, waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + task!(mut task, _, f, s, DROP_D); + + assert!(chan.is_empty()); + + task.run(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + waker().wake_by_ref(); + task = chan.recv().unwrap(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + task.run(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + waker().wake_by_ref(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); +} + +#[test] +fn clone() { + future!(f, waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + task!(mut task, _, f, s, DROP_D); + + task.run(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + let w2 = waker().clone(); + let w3 = w2.clone(); + let w4 = w3.clone(); + w4.wake(); + + task = chan.recv().unwrap(); + task.run(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + w3.wake(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + drop(w2); + drop(waker()); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); +} + +#[test] +fn wake_cancelled() { + future!(f, waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + task!(task, _, f, s, DROP_D); + + task.run(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + let w = waker(); + + w.wake_by_ref(); + chan.recv().unwrap().cancel(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + w.wake(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); +} + +#[test] +fn wake_completed() { + future!(f, waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + task!(task, _, f, s, DROP_D); + + task.run(); + let w = waker(); + assert_eq!(POLL.load(), 1); + assert_eq!(SCHEDULE.load(), 0); + assert_eq!(DROP_F.load(), 0); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + w.wake(); + chan.recv().unwrap().run(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 0); + assert_eq!(DROP_D.load(), 0); + assert_eq!(chan.len(), 0); + + waker().wake(); + assert_eq!(POLL.load(), 2); + assert_eq!(SCHEDULE.load(), 1); + assert_eq!(DROP_F.load(), 1); + assert_eq!(DROP_S.load(), 1); + assert_eq!(DROP_D.load(), 1); + assert_eq!(chan.len(), 0); +} |