diff options
author | Yoshua Wuyts <yoshuawuyts+github@gmail.com> | 2019-12-02 07:15:04 +0900 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-12-02 07:15:04 +0900 |
commit | 5977202fb0d4ad93ace19cc0a52890747f20695a (patch) | |
tree | 1406a6df8b2bf2e2144dc8429e13c1fc34eb2d5b | |
parent | 2b0427a6cf49f14faf675d023bbcdec590cde1af (diff) | |
parent | 5d80be610a66654530bb0a80f613b62e67af8d3b (diff) | |
download | async-task-5977202fb0d4ad93ace19cc0a52890747f20695a.tar.gz |
Merge pull request #10 from async-rs/spawn-local
Add spawn_local and clarify what the schedule function can do
-rw-r--r-- | examples/panic-propagation.rs | 6 | ||||
-rw-r--r-- | examples/panic-result.rs | 9 | ||||
-rw-r--r-- | examples/spawn-local.rs | 76 | ||||
-rw-r--r-- | examples/spawn-on-thread.rs | 4 | ||||
-rw-r--r-- | examples/spawn.rs | 9 | ||||
-rw-r--r-- | examples/task-id.rs | 9 | ||||
-rw-r--r-- | src/join_handle.rs | 2 | ||||
-rw-r--r-- | src/lib.rs | 5 | ||||
-rw-r--r-- | src/raw.rs | 16 | ||||
-rw-r--r-- | src/task.rs | 106 |
10 files changed, 214 insertions, 28 deletions
diff --git a/examples/panic-propagation.rs b/examples/panic-propagation.rs index 8a5339f..05ec85a 100644 --- a/examples/panic-propagation.rs +++ b/examples/panic-propagation.rs @@ -11,6 +11,8 @@ use futures::executor; use futures::future::FutureExt; use lazy_static::lazy_static; +type Task = async_task::Task<()>; + /// Spawns a future on the executor. fn spawn<F, R>(future: F) -> JoinHandle<R> where @@ -19,8 +21,8 @@ where { lazy_static! { // A channel that holds scheduled tasks. - static ref QUEUE: Sender<async_task::Task<()>> = { - let (sender, receiver) = unbounded::<async_task::Task<()>>(); + static ref QUEUE: Sender<Task> = { + let (sender, receiver) = unbounded::<Task>(); // Start the executor thread. thread::spawn(|| { diff --git a/examples/panic-result.rs b/examples/panic-result.rs index 7cf5a14..6308240 100644 --- a/examples/panic-result.rs +++ b/examples/panic-result.rs @@ -9,16 +9,19 @@ use futures::executor; use futures::future::FutureExt; use lazy_static::lazy_static; +type Task = async_task::Task<()>; +type JoinHandle<T> = async_task::JoinHandle<T, ()>; + /// Spawns a future on the executor. -fn spawn<F, R>(future: F) -> async_task::JoinHandle<thread::Result<R>, ()> +fn spawn<F, R>(future: F) -> JoinHandle<thread::Result<R>> where F: Future<Output = R> + Send + 'static, R: Send + 'static, { lazy_static! { // A channel that holds scheduled tasks. - static ref QUEUE: Sender<async_task::Task<()>> = { - let (sender, receiver) = unbounded::<async_task::Task<()>>(); + static ref QUEUE: Sender<Task> = { + let (sender, receiver) = unbounded::<Task>(); // Start the executor thread. thread::spawn(|| { diff --git a/examples/spawn-local.rs b/examples/spawn-local.rs new file mode 100644 index 0000000..4e66c32 --- /dev/null +++ b/examples/spawn-local.rs @@ -0,0 +1,76 @@ +//! A simple single-threaded executor that can spawn non-`Send` futures. + +use std::cell::Cell; +use std::future::Future; +use std::rc::Rc; + +use crossbeam::channel::{unbounded, Receiver, Sender}; + +type Task = async_task::Task<()>; +type JoinHandle<T> = async_task::JoinHandle<T, ()>; + +thread_local! { + // A channel that holds scheduled tasks. + static QUEUE: (Sender<Task>, Receiver<Task>) = unbounded(); +} + +/// Spawns a future on the executor. +fn spawn<F, R>(future: F) -> JoinHandle<R> +where + F: Future<Output = R> + 'static, + R: 'static, +{ + // Create a task that is scheduled by sending itself into the channel. + let schedule = |t| QUEUE.with(|(s, _)| s.send(t).unwrap()); + let (task, handle) = async_task::spawn_local(future, schedule, ()); + + // Schedule the task by sending it into the queue. + task.schedule(); + + handle +} + +/// Runs a future to completion. +fn run<F, R>(future: F) -> R +where + F: Future<Output = R> + 'static, + R: 'static, +{ + // Spawn a task that sends its result through a channel. + let (s, r) = unbounded(); + spawn(async move { s.send(future.await).unwrap() }); + + loop { + // If the original task has completed, return its result. + if let Ok(val) = r.try_recv() { + return val; + } + + // Otherwise, take a task from the queue and run it. + QUEUE.with(|(_, r)| r.recv().unwrap().run()); + } +} + +fn main() { + let val = Rc::new(Cell::new(0)); + + // Run a future that increments a non-`Send` value. + run({ + let val = val.clone(); + async move { + // Spawn a future that increments the value. + let handle = spawn({ + let val = val.clone(); + async move { + val.set(dbg!(val.get()) + 1); + } + }); + + val.set(dbg!(val.get()) + 1); + handle.await; + } + }); + + // The value should be 2 at the end of the program. + dbg!(val.get()); +} diff --git a/examples/spawn-on-thread.rs b/examples/spawn-on-thread.rs index 22da0c5..95214ed 100644 --- a/examples/spawn-on-thread.rs +++ b/examples/spawn-on-thread.rs @@ -7,10 +7,12 @@ use std::thread; use crossbeam::channel; use futures::executor; +type JoinHandle<T> = async_task::JoinHandle<T, ()>; + /// Spawns a future on a new dedicated thread. /// /// The returned handle can be used to await the output of the future. -fn spawn_on_thread<F, R>(future: F) -> async_task::JoinHandle<R, ()> +fn spawn_on_thread<F, R>(future: F) -> JoinHandle<R> where F: Future<Output = R> + Send + 'static, R: Send + 'static, diff --git a/examples/spawn.rs b/examples/spawn.rs index 4af5a02..9db7215 100644 --- a/examples/spawn.rs +++ b/examples/spawn.rs @@ -8,16 +8,19 @@ use crossbeam::channel::{unbounded, Sender}; use futures::executor; use lazy_static::lazy_static; +type Task = async_task::Task<()>; +type JoinHandle<T> = async_task::JoinHandle<T, ()>; + /// Spawns a future on the executor. -fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, ()> +fn spawn<F, R>(future: F) -> JoinHandle<R> where F: Future<Output = R> + Send + 'static, R: Send + 'static, { lazy_static! { // A channel that holds scheduled tasks. - static ref QUEUE: Sender<async_task::Task<()>> = { - let (sender, receiver) = unbounded::<async_task::Task<()>>(); + static ref QUEUE: Sender<Task> = { + let (sender, receiver) = unbounded::<Task>(); // Start the executor thread. thread::spawn(|| { diff --git a/examples/task-id.rs b/examples/task-id.rs index 66b7aec..2a7bcf7 100644 --- a/examples/task-id.rs +++ b/examples/task-id.rs @@ -13,6 +13,9 @@ use lazy_static::lazy_static; #[derive(Clone, Copy, Debug)] struct TaskId(usize); +type Task = async_task::Task<TaskId>; +type JoinHandle<T> = async_task::JoinHandle<T, TaskId>; + thread_local! { /// The ID of the current task. static TASK_ID: Cell<Option<TaskId>> = Cell::new(None); @@ -26,15 +29,15 @@ fn task_id() -> Option<TaskId> { } /// Spawns a future on the executor. -fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, TaskId> +fn spawn<F, R>(future: F) -> JoinHandle<R> where F: Future<Output = R> + Send + 'static, R: Send + 'static, { lazy_static! { // A channel that holds scheduled tasks. - static ref QUEUE: Sender<async_task::Task<TaskId>> = { - let (sender, receiver) = unbounded::<async_task::Task<TaskId>>(); + static ref QUEUE: Sender<Task> = { + let (sender, receiver) = unbounded::<Task>(); // Start the executor thread. thread::spawn(|| { diff --git a/src/join_handle.rs b/src/join_handle.rs index bd6f8c7..9357d32 100644 --- a/src/join_handle.rs +++ b/src/join_handle.rs @@ -24,7 +24,7 @@ pub struct JoinHandle<R, T> { pub(crate) _marker: PhantomData<(R, T)>, } -unsafe impl<R, T> Send for JoinHandle<R, T> {} +unsafe impl<R: Send, T> Send for JoinHandle<R, T> {} unsafe impl<R, T> Sync for JoinHandle<R, T> {} impl<R, T> Unpin for JoinHandle<R, T> {} @@ -23,7 +23,7 @@ //! # let (task, handle) = async_task::spawn(future, schedule, ()); //! ``` //! -//! A task is constructed using the [`spawn`] function: +//! A task is constructed using either [`spawn`] or [`spawn_local`]: //! //! ``` //! # let (sender, receiver) = crossbeam::channel::unbounded(); @@ -93,6 +93,7 @@ //! union of the future and its output. //! //! [`spawn`]: fn.spawn.html +//! [`spawn_local`]: fn.spawn_local.html //! [`Task`]: struct.Task.html //! [`JoinHandle`]: struct.JoinHandle.html @@ -108,4 +109,4 @@ mod task; mod utils; pub use crate::join_handle::JoinHandle; -pub use crate::task::{spawn, Task}; +pub use crate::task::{spawn, spawn_local, Task}; @@ -95,15 +95,13 @@ impl<F, R, S, T> Clone for RawTask<F, R, S, T> { impl<F, R, S, T> RawTask<F, R, S, T> where - F: Future<Output = R> + Send + 'static, - R: Send + 'static, + F: Future<Output = R> + 'static, S: Fn(Task<T>) + Send + Sync + 'static, - T: Send + 'static, { /// Allocates a task with the given `future` and `schedule` function. /// /// It is assumed that initially only the `Task` reference and the `JoinHandle` exist. - pub(crate) fn allocate(tag: T, future: F, schedule: S) -> NonNull<()> { + pub(crate) fn allocate(future: F, schedule: S, tag: T) -> NonNull<()> { // Compute the layout of the task for allocation. Abort if the computation fails. let task_layout = abort_on_panic(|| Self::task_layout()); @@ -592,17 +590,13 @@ where /// A guard that closes the task if polling its future panics. struct Guard<F, R, S, T>(RawTask<F, R, S, T>) where - F: Future<Output = R> + Send + 'static, - R: Send + 'static, - S: Fn(Task<T>) + Send + Sync + 'static, - T: Send + 'static; + F: Future<Output = R> + 'static, + S: Fn(Task<T>) + Send + Sync + 'static; impl<F, R, S, T> Drop for Guard<F, R, S, T> where - F: Future<Output = R> + Send + 'static, - R: Send + 'static, + F: Future<Output = R> + 'static, S: Fn(Task<T>) + Send + Sync + 'static, - T: Send + 'static, { fn drop(&mut self) { let raw = self.0; diff --git a/src/task.rs b/src/task.rs index 42a4024..83cdf79 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,8 +1,11 @@ use std::fmt; use std::future::Future; use std::marker::PhantomData; -use std::mem; +use std::mem::{self, ManuallyDrop}; +use std::pin::Pin; use std::ptr::NonNull; +use std::task::{Context, Poll}; +use std::thread::{self, ThreadId}; use crate::header::Header; use crate::raw::RawTask; @@ -16,8 +19,16 @@ use crate::JoinHandle; /// When run, the task polls `future`. When woken up, it gets scheduled for running by the /// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task. /// +/// The schedule function should not attempt to run the task nor to drop it. Instead, it should +/// push the task into some kind of queue so that it can be processed later. +/// +/// If you need to spawn a future that does not implement [`Send`], consider using the +/// [`spawn_local`] function instead. +/// /// [`Task`]: struct.Task.html /// [`JoinHandle`]: struct.JoinHandle.html +/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html +/// [`spawn_local`]: fn.spawn_local.html /// /// # Examples /// @@ -43,7 +54,98 @@ where S: Fn(Task<T>) + Send + Sync + 'static, T: Send + Sync + 'static, { - let raw_task = RawTask::<F, R, S, T>::allocate(tag, future, schedule); + let raw_task = RawTask::<F, R, S, T>::allocate(future, schedule, tag); + let task = Task { + raw_task, + _marker: PhantomData, + }; + let handle = JoinHandle { + raw_task, + _marker: PhantomData, + }; + (task, handle) +} + +/// Creates a new local task. +/// +/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that +/// awaits its result. +/// +/// When run, the task polls `future`. When woken up, it gets scheduled for running by the +/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task. +/// +/// The schedule function should not attempt to run the task nor to drop it. Instead, it should +/// push the task into some kind of queue so that it can be processed later. +/// +/// Unlike [`spawn`], this function does not require the future to implement [`Send`]. If the +/// [`Task`] reference is run or dropped on a thread it was not created on, a panic will occur. +/// +/// [`Task`]: struct.Task.html +/// [`JoinHandle`]: struct.JoinHandle.html +/// [`spawn`]: fn.spawn.html +/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html +/// +/// # Examples +/// +/// ``` +/// use crossbeam::channel; +/// +/// // The future inside the task. +/// let future = async { +/// println!("Hello, world!"); +/// }; +/// +/// // If the task gets woken up, it will be sent into this channel. +/// let (s, r) = channel::unbounded(); +/// let schedule = move |task| s.send(task).unwrap(); +/// +/// // Create a task with the future and the schedule function. +/// let (task, handle) = async_task::spawn_local(future, schedule, ()); +/// ``` +pub fn spawn_local<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>) +where + F: Future<Output = R> + 'static, + R: 'static, + S: Fn(Task<T>) + Send + Sync + 'static, + T: Send + Sync + 'static, +{ + thread_local! { + static ID: ThreadId = thread::current().id(); + } + + struct Checked<F> { + id: ThreadId, + inner: ManuallyDrop<F>, + } + + impl<F> Drop for Checked<F> { + fn drop(&mut self) { + if ID.with(|id| *id) != self.id { + panic!("local task dropped by a thread that didn't spawn it"); + } + unsafe { + ManuallyDrop::drop(&mut self.inner); + } + } + } + + impl<F: Future> Future for Checked<F> { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + if ID.with(|id| *id) != self.id { + panic!("local task polled by a thread that didn't spawn it"); + } + unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) } + } + } + + let future = Checked { + id: ID.with(|id| *id), + inner: ManuallyDrop::new(future), + }; + + let raw_task = RawTask::<_, R, S, T>::allocate(future, schedule, tag); let task = Task { raw_task, _marker: PhantomData, |