aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYoshua Wuyts <yoshuawuyts+github@gmail.com>2019-12-02 07:15:04 +0900
committerGitHub <noreply@github.com>2019-12-02 07:15:04 +0900
commit5977202fb0d4ad93ace19cc0a52890747f20695a (patch)
tree1406a6df8b2bf2e2144dc8429e13c1fc34eb2d5b
parent2b0427a6cf49f14faf675d023bbcdec590cde1af (diff)
parent5d80be610a66654530bb0a80f613b62e67af8d3b (diff)
downloadasync-task-5977202fb0d4ad93ace19cc0a52890747f20695a.tar.gz
Merge pull request #10 from async-rs/spawn-local
Add spawn_local and clarify what the schedule function can do
-rw-r--r--examples/panic-propagation.rs6
-rw-r--r--examples/panic-result.rs9
-rw-r--r--examples/spawn-local.rs76
-rw-r--r--examples/spawn-on-thread.rs4
-rw-r--r--examples/spawn.rs9
-rw-r--r--examples/task-id.rs9
-rw-r--r--src/join_handle.rs2
-rw-r--r--src/lib.rs5
-rw-r--r--src/raw.rs16
-rw-r--r--src/task.rs106
10 files changed, 214 insertions, 28 deletions
diff --git a/examples/panic-propagation.rs b/examples/panic-propagation.rs
index 8a5339f..05ec85a 100644
--- a/examples/panic-propagation.rs
+++ b/examples/panic-propagation.rs
@@ -11,6 +11,8 @@ use futures::executor;
use futures::future::FutureExt;
use lazy_static::lazy_static;
+type Task = async_task::Task<()>;
+
/// Spawns a future on the executor.
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
@@ -19,8 +21,8 @@ where
{
lazy_static! {
// A channel that holds scheduled tasks.
- static ref QUEUE: Sender<async_task::Task<()>> = {
- let (sender, receiver) = unbounded::<async_task::Task<()>>();
+ static ref QUEUE: Sender<Task> = {
+ let (sender, receiver) = unbounded::<Task>();
// Start the executor thread.
thread::spawn(|| {
diff --git a/examples/panic-result.rs b/examples/panic-result.rs
index 7cf5a14..6308240 100644
--- a/examples/panic-result.rs
+++ b/examples/panic-result.rs
@@ -9,16 +9,19 @@ use futures::executor;
use futures::future::FutureExt;
use lazy_static::lazy_static;
+type Task = async_task::Task<()>;
+type JoinHandle<T> = async_task::JoinHandle<T, ()>;
+
/// Spawns a future on the executor.
-fn spawn<F, R>(future: F) -> async_task::JoinHandle<thread::Result<R>, ()>
+fn spawn<F, R>(future: F) -> JoinHandle<thread::Result<R>>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
lazy_static! {
// A channel that holds scheduled tasks.
- static ref QUEUE: Sender<async_task::Task<()>> = {
- let (sender, receiver) = unbounded::<async_task::Task<()>>();
+ static ref QUEUE: Sender<Task> = {
+ let (sender, receiver) = unbounded::<Task>();
// Start the executor thread.
thread::spawn(|| {
diff --git a/examples/spawn-local.rs b/examples/spawn-local.rs
new file mode 100644
index 0000000..4e66c32
--- /dev/null
+++ b/examples/spawn-local.rs
@@ -0,0 +1,76 @@
+//! A simple single-threaded executor that can spawn non-`Send` futures.
+
+use std::cell::Cell;
+use std::future::Future;
+use std::rc::Rc;
+
+use crossbeam::channel::{unbounded, Receiver, Sender};
+
+type Task = async_task::Task<()>;
+type JoinHandle<T> = async_task::JoinHandle<T, ()>;
+
+thread_local! {
+ // A channel that holds scheduled tasks.
+ static QUEUE: (Sender<Task>, Receiver<Task>) = unbounded();
+}
+
+/// Spawns a future on the executor.
+fn spawn<F, R>(future: F) -> JoinHandle<R>
+where
+ F: Future<Output = R> + 'static,
+ R: 'static,
+{
+ // Create a task that is scheduled by sending itself into the channel.
+ let schedule = |t| QUEUE.with(|(s, _)| s.send(t).unwrap());
+ let (task, handle) = async_task::spawn_local(future, schedule, ());
+
+ // Schedule the task by sending it into the queue.
+ task.schedule();
+
+ handle
+}
+
+/// Runs a future to completion.
+fn run<F, R>(future: F) -> R
+where
+ F: Future<Output = R> + 'static,
+ R: 'static,
+{
+ // Spawn a task that sends its result through a channel.
+ let (s, r) = unbounded();
+ spawn(async move { s.send(future.await).unwrap() });
+
+ loop {
+ // If the original task has completed, return its result.
+ if let Ok(val) = r.try_recv() {
+ return val;
+ }
+
+ // Otherwise, take a task from the queue and run it.
+ QUEUE.with(|(_, r)| r.recv().unwrap().run());
+ }
+}
+
+fn main() {
+ let val = Rc::new(Cell::new(0));
+
+ // Run a future that increments a non-`Send` value.
+ run({
+ let val = val.clone();
+ async move {
+ // Spawn a future that increments the value.
+ let handle = spawn({
+ let val = val.clone();
+ async move {
+ val.set(dbg!(val.get()) + 1);
+ }
+ });
+
+ val.set(dbg!(val.get()) + 1);
+ handle.await;
+ }
+ });
+
+ // The value should be 2 at the end of the program.
+ dbg!(val.get());
+}
diff --git a/examples/spawn-on-thread.rs b/examples/spawn-on-thread.rs
index 22da0c5..95214ed 100644
--- a/examples/spawn-on-thread.rs
+++ b/examples/spawn-on-thread.rs
@@ -7,10 +7,12 @@ use std::thread;
use crossbeam::channel;
use futures::executor;
+type JoinHandle<T> = async_task::JoinHandle<T, ()>;
+
/// Spawns a future on a new dedicated thread.
///
/// The returned handle can be used to await the output of the future.
-fn spawn_on_thread<F, R>(future: F) -> async_task::JoinHandle<R, ()>
+fn spawn_on_thread<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
diff --git a/examples/spawn.rs b/examples/spawn.rs
index 4af5a02..9db7215 100644
--- a/examples/spawn.rs
+++ b/examples/spawn.rs
@@ -8,16 +8,19 @@ use crossbeam::channel::{unbounded, Sender};
use futures::executor;
use lazy_static::lazy_static;
+type Task = async_task::Task<()>;
+type JoinHandle<T> = async_task::JoinHandle<T, ()>;
+
/// Spawns a future on the executor.
-fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, ()>
+fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
lazy_static! {
// A channel that holds scheduled tasks.
- static ref QUEUE: Sender<async_task::Task<()>> = {
- let (sender, receiver) = unbounded::<async_task::Task<()>>();
+ static ref QUEUE: Sender<Task> = {
+ let (sender, receiver) = unbounded::<Task>();
// Start the executor thread.
thread::spawn(|| {
diff --git a/examples/task-id.rs b/examples/task-id.rs
index 66b7aec..2a7bcf7 100644
--- a/examples/task-id.rs
+++ b/examples/task-id.rs
@@ -13,6 +13,9 @@ use lazy_static::lazy_static;
#[derive(Clone, Copy, Debug)]
struct TaskId(usize);
+type Task = async_task::Task<TaskId>;
+type JoinHandle<T> = async_task::JoinHandle<T, TaskId>;
+
thread_local! {
/// The ID of the current task.
static TASK_ID: Cell<Option<TaskId>> = Cell::new(None);
@@ -26,15 +29,15 @@ fn task_id() -> Option<TaskId> {
}
/// Spawns a future on the executor.
-fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, TaskId>
+fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
lazy_static! {
// A channel that holds scheduled tasks.
- static ref QUEUE: Sender<async_task::Task<TaskId>> = {
- let (sender, receiver) = unbounded::<async_task::Task<TaskId>>();
+ static ref QUEUE: Sender<Task> = {
+ let (sender, receiver) = unbounded::<Task>();
// Start the executor thread.
thread::spawn(|| {
diff --git a/src/join_handle.rs b/src/join_handle.rs
index bd6f8c7..9357d32 100644
--- a/src/join_handle.rs
+++ b/src/join_handle.rs
@@ -24,7 +24,7 @@ pub struct JoinHandle<R, T> {
pub(crate) _marker: PhantomData<(R, T)>,
}
-unsafe impl<R, T> Send for JoinHandle<R, T> {}
+unsafe impl<R: Send, T> Send for JoinHandle<R, T> {}
unsafe impl<R, T> Sync for JoinHandle<R, T> {}
impl<R, T> Unpin for JoinHandle<R, T> {}
diff --git a/src/lib.rs b/src/lib.rs
index 3f61ea4..a265679 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -23,7 +23,7 @@
//! # let (task, handle) = async_task::spawn(future, schedule, ());
//! ```
//!
-//! A task is constructed using the [`spawn`] function:
+//! A task is constructed using either [`spawn`] or [`spawn_local`]:
//!
//! ```
//! # let (sender, receiver) = crossbeam::channel::unbounded();
@@ -93,6 +93,7 @@
//! union of the future and its output.
//!
//! [`spawn`]: fn.spawn.html
+//! [`spawn_local`]: fn.spawn_local.html
//! [`Task`]: struct.Task.html
//! [`JoinHandle`]: struct.JoinHandle.html
@@ -108,4 +109,4 @@ mod task;
mod utils;
pub use crate::join_handle::JoinHandle;
-pub use crate::task::{spawn, Task};
+pub use crate::task::{spawn, spawn_local, Task};
diff --git a/src/raw.rs b/src/raw.rs
index 3b993a3..2c47f0c 100644
--- a/src/raw.rs
+++ b/src/raw.rs
@@ -95,15 +95,13 @@ impl<F, R, S, T> Clone for RawTask<F, R, S, T> {
impl<F, R, S, T> RawTask<F, R, S, T>
where
- F: Future<Output = R> + Send + 'static,
- R: Send + 'static,
+ F: Future<Output = R> + 'static,
S: Fn(Task<T>) + Send + Sync + 'static,
- T: Send + 'static,
{
/// Allocates a task with the given `future` and `schedule` function.
///
/// It is assumed that initially only the `Task` reference and the `JoinHandle` exist.
- pub(crate) fn allocate(tag: T, future: F, schedule: S) -> NonNull<()> {
+ pub(crate) fn allocate(future: F, schedule: S, tag: T) -> NonNull<()> {
// Compute the layout of the task for allocation. Abort if the computation fails.
let task_layout = abort_on_panic(|| Self::task_layout());
@@ -592,17 +590,13 @@ where
/// A guard that closes the task if polling its future panics.
struct Guard<F, R, S, T>(RawTask<F, R, S, T>)
where
- F: Future<Output = R> + Send + 'static,
- R: Send + 'static,
- S: Fn(Task<T>) + Send + Sync + 'static,
- T: Send + 'static;
+ F: Future<Output = R> + 'static,
+ S: Fn(Task<T>) + Send + Sync + 'static;
impl<F, R, S, T> Drop for Guard<F, R, S, T>
where
- F: Future<Output = R> + Send + 'static,
- R: Send + 'static,
+ F: Future<Output = R> + 'static,
S: Fn(Task<T>) + Send + Sync + 'static,
- T: Send + 'static,
{
fn drop(&mut self) {
let raw = self.0;
diff --git a/src/task.rs b/src/task.rs
index 42a4024..83cdf79 100644
--- a/src/task.rs
+++ b/src/task.rs
@@ -1,8 +1,11 @@
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
-use std::mem;
+use std::mem::{self, ManuallyDrop};
+use std::pin::Pin;
use std::ptr::NonNull;
+use std::task::{Context, Poll};
+use std::thread::{self, ThreadId};
use crate::header::Header;
use crate::raw::RawTask;
@@ -16,8 +19,16 @@ use crate::JoinHandle;
/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
///
+/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
+/// push the task into some kind of queue so that it can be processed later.
+///
+/// If you need to spawn a future that does not implement [`Send`], consider using the
+/// [`spawn_local`] function instead.
+///
/// [`Task`]: struct.Task.html
/// [`JoinHandle`]: struct.JoinHandle.html
+/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
+/// [`spawn_local`]: fn.spawn_local.html
///
/// # Examples
///
@@ -43,7 +54,98 @@ where
S: Fn(Task<T>) + Send + Sync + 'static,
T: Send + Sync + 'static,
{
- let raw_task = RawTask::<F, R, S, T>::allocate(tag, future, schedule);
+ let raw_task = RawTask::<F, R, S, T>::allocate(future, schedule, tag);
+ let task = Task {
+ raw_task,
+ _marker: PhantomData,
+ };
+ let handle = JoinHandle {
+ raw_task,
+ _marker: PhantomData,
+ };
+ (task, handle)
+}
+
+/// Creates a new local task.
+///
+/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
+/// awaits its result.
+///
+/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
+/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
+///
+/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
+/// push the task into some kind of queue so that it can be processed later.
+///
+/// Unlike [`spawn`], this function does not require the future to implement [`Send`]. If the
+/// [`Task`] reference is run or dropped on a thread it was not created on, a panic will occur.
+///
+/// [`Task`]: struct.Task.html
+/// [`JoinHandle`]: struct.JoinHandle.html
+/// [`spawn`]: fn.spawn.html
+/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
+///
+/// # Examples
+///
+/// ```
+/// use crossbeam::channel;
+///
+/// // The future inside the task.
+/// let future = async {
+/// println!("Hello, world!");
+/// };
+///
+/// // If the task gets woken up, it will be sent into this channel.
+/// let (s, r) = channel::unbounded();
+/// let schedule = move |task| s.send(task).unwrap();
+///
+/// // Create a task with the future and the schedule function.
+/// let (task, handle) = async_task::spawn_local(future, schedule, ());
+/// ```
+pub fn spawn_local<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
+where
+ F: Future<Output = R> + 'static,
+ R: 'static,
+ S: Fn(Task<T>) + Send + Sync + 'static,
+ T: Send + Sync + 'static,
+{
+ thread_local! {
+ static ID: ThreadId = thread::current().id();
+ }
+
+ struct Checked<F> {
+ id: ThreadId,
+ inner: ManuallyDrop<F>,
+ }
+
+ impl<F> Drop for Checked<F> {
+ fn drop(&mut self) {
+ if ID.with(|id| *id) != self.id {
+ panic!("local task dropped by a thread that didn't spawn it");
+ }
+ unsafe {
+ ManuallyDrop::drop(&mut self.inner);
+ }
+ }
+ }
+
+ impl<F: Future> Future for Checked<F> {
+ type Output = F::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ if ID.with(|id| *id) != self.id {
+ panic!("local task polled by a thread that didn't spawn it");
+ }
+ unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
+ }
+ }
+
+ let future = Checked {
+ id: ID.with(|id| *id),
+ inner: ManuallyDrop::new(future),
+ };
+
+ let raw_task = RawTask::<_, R, S, T>::allocate(future, schedule, tag);
let task = Task {
raw_task,
_marker: PhantomData,