aboutsummaryrefslogtreecommitdiff
path: root/src/compat
diff options
context:
space:
mode:
authorJason Macnak <natsu@google.com>2020-04-06 10:30:28 -0700
committerJason Macnak <natsu@google.com>2020-04-06 10:30:28 -0700
commitc417d3b8e54162ffaebbe968ea49428da5198558 (patch)
tree3cba9568bd9debde72a475fd0f367a87a3a7fb27 /src/compat
parent0effc9e03e8c57f0f38d51e7c74a1e3f21f66447 (diff)
downloadfutures-util-c417d3b8e54162ffaebbe968ea49428da5198558.tar.gz
Import 'futures-util' rust crate version 0.3.4
Bug: b/151760391 Test: m crosvm.experimental Change-Id: I03ce20612b0c746bbff5053e98a1ec0310c75fdd
Diffstat (limited to 'src/compat')
-rw-r--r--src/compat/compat01as03.rs472
-rw-r--r--src/compat/compat03as01.rs299
-rw-r--r--src/compat/executor.rs87
-rw-r--r--src/compat/mod.rs19
4 files changed, 877 insertions, 0 deletions
diff --git a/src/compat/compat01as03.rs b/src/compat/compat01as03.rs
new file mode 100644
index 0000000..9bb00bf
--- /dev/null
+++ b/src/compat/compat01as03.rs
@@ -0,0 +1,472 @@
+use futures_01::executor::{
+ spawn as spawn01, Notify as Notify01, NotifyHandle as NotifyHandle01,
+ Spawn as Spawn01, UnsafeNotify as UnsafeNotify01,
+};
+use futures_01::{
+ Async as Async01, Future as Future01,
+ Stream as Stream01,
+};
+#[cfg(feature = "sink")]
+use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01};
+use futures_core::{task as task03, future::Future as Future03, stream::Stream as Stream03};
+use std::pin::Pin;
+use std::task::Context;
+#[cfg(feature = "sink")]
+use futures_sink::Sink as Sink03;
+
+#[cfg(feature = "io-compat")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use io::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
+
+/// Converts a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
+/// object to a futures 0.3-compatible version,
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Compat01As03<T> {
+ pub(crate) inner: Spawn01<T>,
+}
+
+impl<T> Unpin for Compat01As03<T> {}
+
+impl<T> Compat01As03<T> {
+ /// Wraps a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
+ /// object in a futures 0.3-compatible wrapper.
+ pub fn new(object: T) -> Compat01As03<T> {
+ Compat01As03 {
+ inner: spawn01(object),
+ }
+ }
+
+ fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R {
+ let notify = &WakerToHandle(cx.waker());
+ self.inner.poll_fn_notify(notify, 0, f)
+ }
+
+ /// Get a reference to 0.1 Future, Stream, AsyncRead, or AsyncWrite object contained within.
+ pub fn get_ref(&self) -> &T {
+ self.inner.get_ref()
+ }
+
+ /// Get a mutable reference to 0.1 Future, Stream, AsyncRead or AsyncWrite object contained
+ /// within.
+ pub fn get_mut(&mut self) -> &mut T {
+ self.inner.get_mut()
+ }
+
+ /// Consume this wrapper to return the underlying 0.1 Future, Stream, AsyncRead, or
+ /// AsyncWrite object.
+ pub fn into_inner(self) -> T {
+ self.inner.into_inner()
+ }
+}
+
+/// Extension trait for futures 0.1 [`Future`](futures_01::future::Future)
+pub trait Future01CompatExt: Future01 {
+ /// Converts a futures 0.1
+ /// [`Future<Item = T, Error = E>`](futures_01::future::Future)
+ /// into a futures 0.3
+ /// [`Future<Output = Result<T, E>>`](futures_core::future::Future).
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// # // TODO: These should be all using `futures::compat`, but that runs up against Cargo
+ /// # // feature issues
+ /// use futures_util::compat::Future01CompatExt;
+ ///
+ /// let future = futures_01::future::ok::<u32, ()>(1);
+ /// assert_eq!(future.compat().await, Ok(1));
+ /// # });
+ /// ```
+ fn compat(self) -> Compat01As03<Self>
+ where
+ Self: Sized,
+ {
+ Compat01As03::new(self)
+ }
+}
+impl<Fut: Future01> Future01CompatExt for Fut {}
+
+/// Extension trait for futures 0.1 [`Stream`](futures_01::stream::Stream)
+pub trait Stream01CompatExt: Stream01 {
+ /// Converts a futures 0.1
+ /// [`Stream<Item = T, Error = E>`](futures_01::stream::Stream)
+ /// into a futures 0.3
+ /// [`Stream<Item = Result<T, E>>`](futures_core::stream::Stream).
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::StreamExt;
+ /// use futures_util::compat::Stream01CompatExt;
+ ///
+ /// let stream = futures_01::stream::once::<u32, ()>(Ok(1));
+ /// let mut stream = stream.compat();
+ /// assert_eq!(stream.next().await, Some(Ok(1)));
+ /// assert_eq!(stream.next().await, None);
+ /// # });
+ /// ```
+ fn compat(self) -> Compat01As03<Self>
+ where
+ Self: Sized,
+ {
+ Compat01As03::new(self)
+ }
+}
+impl<St: Stream01> Stream01CompatExt for St {}
+
+/// Extension trait for futures 0.1 [`Sink`](futures_01::sink::Sink)
+#[cfg(feature = "sink")]
+pub trait Sink01CompatExt: Sink01 {
+ /// Converts a futures 0.1
+ /// [`Sink<SinkItem = T, SinkError = E>`](futures_01::sink::Sink)
+ /// into a futures 0.3
+ /// [`Sink<T, Error = E>`](futures_sink::Sink).
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::{sink::SinkExt, stream::StreamExt};
+ /// use futures_util::compat::{Stream01CompatExt, Sink01CompatExt};
+ ///
+ /// let (tx, rx) = futures_01::unsync::mpsc::channel(1);
+ /// let (mut tx, mut rx) = (tx.sink_compat(), rx.compat());
+ ///
+ /// tx.send(1).await.unwrap();
+ /// drop(tx);
+ /// assert_eq!(rx.next().await, Some(Ok(1)));
+ /// assert_eq!(rx.next().await, None);
+ /// # });
+ /// ```
+ fn sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem>
+ where
+ Self: Sized,
+ {
+ Compat01As03Sink::new(self)
+ }
+}
+#[cfg(feature = "sink")]
+impl<Si: Sink01> Sink01CompatExt for Si {}
+
+fn poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>> {
+ match x? {
+ Async01::Ready(t) => task03::Poll::Ready(Ok(t)),
+ Async01::NotReady => task03::Poll::Pending,
+ }
+}
+
+impl<Fut: Future01> Future03 for Compat01As03<Fut> {
+ type Output = Result<Fut::Item, Fut::Error>;
+
+ fn poll(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> task03::Poll<Self::Output> {
+ poll_01_to_03(self.in_notify(cx, Future01::poll))
+ }
+}
+
+impl<St: Stream01> Stream03 for Compat01As03<St> {
+ type Item = Result<St::Item, St::Error>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> task03::Poll<Option<Self::Item>> {
+ match self.in_notify(cx, Stream01::poll)? {
+ Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
+ Async01::Ready(None) => task03::Poll::Ready(None),
+ Async01::NotReady => task03::Poll::Pending,
+ }
+ }
+}
+
+/// Converts a futures 0.1 Sink object to a futures 0.3-compatible version
+#[cfg(feature = "sink")]
+#[derive(Debug)]
+#[must_use = "sinks do nothing unless polled"]
+pub struct Compat01As03Sink<S, SinkItem> {
+ pub(crate) inner: Spawn01<S>,
+ pub(crate) buffer: Option<SinkItem>,
+ pub(crate) close_started: bool,
+}
+
+#[cfg(feature = "sink")]
+impl<S, SinkItem> Unpin for Compat01As03Sink<S, SinkItem> {}
+
+#[cfg(feature = "sink")]
+impl<S, SinkItem> Compat01As03Sink<S, SinkItem> {
+ /// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper.
+ pub fn new(inner: S) -> Compat01As03Sink<S, SinkItem> {
+ Compat01As03Sink {
+ inner: spawn01(inner),
+ buffer: None,
+ close_started: false
+ }
+ }
+
+ fn in_notify<R>(
+ &mut self,
+ cx: &mut Context<'_>,
+ f: impl FnOnce(&mut S) -> R,
+ ) -> R {
+ let notify = &WakerToHandle(cx.waker());
+ self.inner.poll_fn_notify(notify, 0, f)
+ }
+
+ /// Get a reference to 0.1 Sink object contained within.
+ pub fn get_ref(&self) -> &S {
+ self.inner.get_ref()
+ }
+
+ /// Get a mutable reference to 0.1 Sink contained within.
+ pub fn get_mut(&mut self) -> &mut S {
+ self.inner.get_mut()
+ }
+
+ /// Consume this wrapper to return the underlying 0.1 Sink.
+ pub fn into_inner(self) -> S {
+ self.inner.into_inner()
+ }
+}
+
+#[cfg(feature = "sink")]
+impl<S, SinkItem> Stream03 for Compat01As03Sink<S, SinkItem>
+where
+ S: Stream01,
+{
+ type Item = Result<S::Item, S::Error>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> task03::Poll<Option<Self::Item>> {
+ match self.in_notify(cx, Stream01::poll)? {
+ Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
+ Async01::Ready(None) => task03::Poll::Ready(None),
+ Async01::NotReady => task03::Poll::Pending,
+ }
+ }
+}
+
+#[cfg(feature = "sink")]
+impl<S, SinkItem> Sink03<SinkItem> for Compat01As03Sink<S, SinkItem>
+where
+ S: Sink01<SinkItem = SinkItem>,
+{
+ type Error = S::SinkError;
+
+ fn start_send(
+ mut self: Pin<&mut Self>,
+ item: SinkItem,
+ ) -> Result<(), Self::Error> {
+ debug_assert!(self.buffer.is_none());
+ self.buffer = Some(item);
+ Ok(())
+ }
+
+ fn poll_ready(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> task03::Poll<Result<(), Self::Error>> {
+ match self.buffer.take() {
+ Some(item) => match self.in_notify(cx, |f| f.start_send(item))? {
+ AsyncSink01::Ready => task03::Poll::Ready(Ok(())),
+ AsyncSink01::NotReady(i) => {
+ self.buffer = Some(i);
+ task03::Poll::Pending
+ }
+ },
+ None => task03::Poll::Ready(Ok(())),
+ }
+ }
+
+ fn poll_flush(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> task03::Poll<Result<(), Self::Error>> {
+ let item = self.buffer.take();
+ match self.in_notify(cx, |f| match item {
+ Some(i) => match f.start_send(i)? {
+ AsyncSink01::Ready => f.poll_complete().map(|i| (i, None)),
+ AsyncSink01::NotReady(t) => {
+ Ok((Async01::NotReady, Some(t)))
+ }
+ },
+ None => f.poll_complete().map(|i| (i, None)),
+ })? {
+ (Async01::Ready(_), _) => task03::Poll::Ready(Ok(())),
+ (Async01::NotReady, item) => {
+ self.buffer = item;
+ task03::Poll::Pending
+ }
+ }
+ }
+
+ fn poll_close(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> task03::Poll<Result<(), Self::Error>> {
+ let item = self.buffer.take();
+ let close_started = self.close_started;
+
+ let result = self.in_notify(cx, |f| {
+ if !close_started {
+ if let Some(item) = item {
+ if let AsyncSink01::NotReady(item) = f.start_send(item)? {
+ return Ok((Async01::NotReady, Some(item), false));
+ }
+ }
+
+ if let Async01::NotReady = f.poll_complete()? {
+ return Ok((Async01::NotReady, None, false));
+ }
+ }
+
+ Ok((<S as Sink01>::close(f)?, None, true))
+ });
+
+ match result? {
+ (Async01::Ready(_), _, _) => task03::Poll::Ready(Ok(())),
+ (Async01::NotReady, item, close_started) => {
+ self.buffer = item;
+ self.close_started = close_started;
+ task03::Poll::Pending
+ }
+ }
+ }
+}
+
+struct NotifyWaker(task03::Waker);
+
+#[allow(missing_debug_implementations)] // false positive: this is private type
+#[derive(Clone)]
+struct WakerToHandle<'a>(&'a task03::Waker);
+
+impl From<WakerToHandle<'_>> for NotifyHandle01 {
+ fn from(handle: WakerToHandle<'_>) -> NotifyHandle01 {
+ let ptr = Box::new(NotifyWaker(handle.0.clone()));
+
+ unsafe { NotifyHandle01::new(Box::into_raw(ptr)) }
+ }
+}
+
+impl Notify01 for NotifyWaker {
+ fn notify(&self, _: usize) {
+ self.0.wake_by_ref();
+ }
+}
+
+unsafe impl UnsafeNotify01 for NotifyWaker {
+ unsafe fn clone_raw(&self) -> NotifyHandle01 {
+ WakerToHandle(&self.0).into()
+ }
+
+ unsafe fn drop_raw(&self) {
+ let ptr: *const dyn UnsafeNotify01 = self;
+ drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
+ }
+}
+
+#[cfg(feature = "io-compat")]
+mod io {
+ use super::*;
+ #[cfg(feature = "read-initializer")]
+ use futures_io::Initializer;
+ use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
+ use std::io::Error;
+ use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
+
+ /// Extension trait for tokio-io [`AsyncRead`](tokio_io::AsyncRead)
+ pub trait AsyncRead01CompatExt: AsyncRead01 {
+ /// Converts a tokio-io [`AsyncRead`](tokio_io::AsyncRead) into a futures-io 0.3
+ /// [`AsyncRead`](futures_io::AsyncRead).
+ ///
+ /// ```
+ /// #![feature(impl_trait_in_bindings)]
+ /// # #![allow(incomplete_features)]
+ /// # futures::executor::block_on(async {
+ /// use futures::io::AsyncReadExt;
+ /// use futures_util::compat::AsyncRead01CompatExt;
+ ///
+ /// let input = b"Hello World!";
+ /// let reader: impl tokio_io::AsyncRead = std::io::Cursor::new(input);
+ /// let mut reader: impl futures::io::AsyncRead + Unpin = reader.compat();
+ ///
+ /// let mut output = Vec::with_capacity(12);
+ /// reader.read_to_end(&mut output).await.unwrap();
+ /// assert_eq!(output, input);
+ /// # });
+ /// ```
+ fn compat(self) -> Compat01As03<Self>
+ where
+ Self: Sized,
+ {
+ Compat01As03::new(self)
+ }
+ }
+ impl<R: AsyncRead01> AsyncRead01CompatExt for R {}
+
+ /// Extension trait for tokio-io [`AsyncWrite`](tokio_io::AsyncWrite)
+ pub trait AsyncWrite01CompatExt: AsyncWrite01 {
+ /// Converts a tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) into a futures-io 0.3
+ /// [`AsyncWrite`](futures_io::AsyncWrite).
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::AsyncWriteExt;
+ /// use futures_util::compat::AsyncWrite01CompatExt;
+ ///
+ /// let input = b"Hello World!";
+ /// let mut cursor = std::io::Cursor::new(Vec::with_capacity(12));
+ ///
+ /// let mut writer = (&mut cursor).compat();
+ /// writer.write_all(input).await.unwrap();
+ ///
+ /// assert_eq!(cursor.into_inner(), input);
+ /// # });
+ /// ```
+ fn compat(self) -> Compat01As03<Self>
+ where
+ Self: Sized,
+ {
+ Compat01As03::new(self)
+ }
+ }
+ impl<W: AsyncWrite01> AsyncWrite01CompatExt for W {}
+
+ impl<R: AsyncRead01> AsyncRead03 for Compat01As03<R> {
+ #[cfg(feature = "read-initializer")]
+ unsafe fn initializer(&self) -> Initializer {
+ // check if `prepare_uninitialized_buffer` needs zeroing
+ if self.inner.get_ref().prepare_uninitialized_buffer(&mut [1]) {
+ Initializer::zeroing()
+ } else {
+ Initializer::nop()
+ }
+ }
+
+ fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
+ -> task03::Poll<Result<usize, Error>>
+ {
+ poll_01_to_03(self.in_notify(cx, |x| x.poll_read(buf)))
+ }
+ }
+
+ impl<W: AsyncWrite01> AsyncWrite03 for Compat01As03<W> {
+ fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8])
+ -> task03::Poll<Result<usize, Error>>
+ {
+ poll_01_to_03(self.in_notify(cx, |x| x.poll_write(buf)))
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
+ -> task03::Poll<Result<(), Error>>
+ {
+ poll_01_to_03(self.in_notify(cx, AsyncWrite01::poll_flush))
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
+ -> task03::Poll<Result<(), Error>>
+ {
+ poll_01_to_03(self.in_notify(cx, AsyncWrite01::shutdown))
+ }
+ }
+}
diff --git a/src/compat/compat03as01.rs b/src/compat/compat03as01.rs
new file mode 100644
index 0000000..3fd2ae0
--- /dev/null
+++ b/src/compat/compat03as01.rs
@@ -0,0 +1,299 @@
+use futures_01::{
+ task as task01, Async as Async01, Future as Future01, Poll as Poll01,
+ Stream as Stream01,
+};
+#[cfg(feature = "sink")]
+use futures_01::{
+ AsyncSink as AsyncSink01, Sink as Sink01, StartSend as StartSend01,
+};
+use futures_core::{
+ task::{RawWaker, RawWakerVTable},
+ future::TryFuture as TryFuture03,
+ stream::TryStream as TryStream03,
+};
+#[cfg(feature = "sink")]
+use futures_sink::Sink as Sink03;
+use crate::task::{
+ self as task03,
+ ArcWake as ArcWake03,
+ WakerRef,
+};
+#[cfg(feature = "sink")]
+use std::marker::PhantomData;
+use std::{
+ mem,
+ pin::Pin,
+ sync::Arc,
+ task::Context,
+};
+
+/// Converts a futures 0.3 [`TryFuture`](futures_core::future::TryFuture) or
+/// [`TryStream`](futures_core::stream::TryStream) into a futures 0.1
+/// [`Future`](futures_01::future::Future) or
+/// [`Stream`](futures_01::stream::Stream).
+#[derive(Debug, Clone, Copy)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Compat<T> {
+ pub(crate) inner: T,
+}
+
+/// Converts a futures 0.3 [`Sink`](futures_sink::Sink) into a futures 0.1
+/// [`Sink`](futures_01::sink::Sink).
+#[cfg(feature = "sink")]
+#[derive(Debug)]
+#[must_use = "sinks do nothing unless polled"]
+pub struct CompatSink<T, Item> {
+ inner: T,
+ _phantom: PhantomData<fn(Item)>,
+}
+
+impl<T> Compat<T> {
+ /// Creates a new [`Compat`].
+ ///
+ /// For types which implement appropriate futures `0.3`
+ /// traits, the result will be a type which implements
+ /// the corresponding futures 0.1 type.
+ pub fn new(inner: T) -> Compat<T> {
+ Compat { inner }
+ }
+
+ /// Get a reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
+ /// contained within.
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ /// Get a mutable reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
+ /// contained within.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+
+ /// Returns the inner item.
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+}
+
+#[cfg(feature = "sink")]
+impl<T, Item> CompatSink<T, Item> {
+ /// Creates a new [`CompatSink`].
+ pub fn new(inner: T) -> Self {
+ CompatSink {
+ inner,
+ _phantom: PhantomData,
+ }
+ }
+
+ /// Get a reference to 0.3 Sink contained within.
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ /// Get a mutable reference to 0.3 Sink contained within.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+
+ /// Returns the inner item.
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+}
+
+fn poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>)
+ -> Result<Async01<T>, E>
+{
+ match x? {
+ task03::Poll::Ready(t) => Ok(Async01::Ready(t)),
+ task03::Poll::Pending => Ok(Async01::NotReady),
+ }
+}
+
+impl<Fut> Future01 for Compat<Fut>
+where
+ Fut: TryFuture03 + Unpin,
+{
+ type Item = Fut::Ok;
+ type Error = Fut::Error;
+
+ fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
+ with_context(self, |inner, cx| poll_03_to_01(inner.try_poll(cx)))
+ }
+}
+
+impl<St> Stream01 for Compat<St>
+where
+ St: TryStream03 + Unpin,
+{
+ type Item = St::Ok;
+ type Error = St::Error;
+
+ fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> {
+ with_context(self, |inner, cx| match inner.try_poll_next(cx)? {
+ task03::Poll::Ready(None) => Ok(Async01::Ready(None)),
+ task03::Poll::Ready(Some(t)) => Ok(Async01::Ready(Some(t))),
+ task03::Poll::Pending => Ok(Async01::NotReady),
+ })
+ }
+}
+
+#[cfg(feature = "sink")]
+impl<T, Item> Sink01 for CompatSink<T, Item>
+where
+ T: Sink03<Item> + Unpin,
+{
+ type SinkItem = Item;
+ type SinkError = T::Error;
+
+ fn start_send(
+ &mut self,
+ item: Self::SinkItem,
+ ) -> StartSend01<Self::SinkItem, Self::SinkError> {
+ with_sink_context(self, |mut inner, cx| {
+ match inner.as_mut().poll_ready(cx)? {
+ task03::Poll::Ready(()) => {
+ inner.start_send(item).map(|()| AsyncSink01::Ready)
+ }
+ task03::Poll::Pending => Ok(AsyncSink01::NotReady(item)),
+ }
+ })
+ }
+
+ fn poll_complete(&mut self) -> Poll01<(), Self::SinkError> {
+ with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_flush(cx)))
+ }
+
+ fn close(&mut self) -> Poll01<(), Self::SinkError> {
+ with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_close(cx)))
+ }
+}
+
+#[derive(Clone)]
+struct Current(task01::Task);
+
+impl Current {
+ fn new() -> Current {
+ Current(task01::current())
+ }
+
+ fn as_waker(&self) -> WakerRef<'_> {
+ unsafe fn ptr_to_current<'a>(ptr: *const ()) -> &'a Current {
+ &*(ptr as *const Current)
+ }
+ fn current_to_ptr(current: &Current) -> *const () {
+ current as *const Current as *const ()
+ }
+
+ unsafe fn clone(ptr: *const ()) -> RawWaker {
+ // Lazily create the `Arc` only when the waker is actually cloned.
+ // FIXME: remove `transmute` when a `Waker` -> `RawWaker` conversion
+ // function is landed in `core`.
+ mem::transmute::<task03::Waker, RawWaker>(
+ task03::waker(Arc::new(ptr_to_current(ptr).clone()))
+ )
+ }
+ unsafe fn drop(_: *const ()) {}
+ unsafe fn wake(ptr: *const ()) {
+ ptr_to_current(ptr).0.notify()
+ }
+
+ let ptr = current_to_ptr(self);
+ let vtable = &RawWakerVTable::new(clone, wake, wake, drop);
+ WakerRef::new_unowned(std::mem::ManuallyDrop::new(unsafe {
+ task03::Waker::from_raw(RawWaker::new(ptr, vtable))
+ }))
+ }
+}
+
+impl ArcWake03 for Current {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ arc_self.0.notify();
+ }
+}
+
+fn with_context<T, R, F>(compat: &mut Compat<T>, f: F) -> R
+where
+ T: Unpin,
+ F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
+{
+ let current = Current::new();
+ let waker = current.as_waker();
+ let mut cx = Context::from_waker(&waker);
+ f(Pin::new(&mut compat.inner), &mut cx)
+}
+
+#[cfg(feature = "sink")]
+fn with_sink_context<T, Item, R, F>(compat: &mut CompatSink<T, Item>, f: F) -> R
+where
+ T: Unpin,
+ F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
+{
+ let current = Current::new();
+ let waker = current.as_waker();
+ let mut cx = Context::from_waker(&waker);
+ f(Pin::new(&mut compat.inner), &mut cx)
+}
+
+#[cfg(feature = "io-compat")]
+mod io {
+ use super::*;
+ use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
+ use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
+
+ fn poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>)
+ -> Result<T, std::io::Error>
+ {
+ match x {
+ task03::Poll::Ready(Ok(t)) => Ok(t),
+ task03::Poll::Pending => Err(std::io::ErrorKind::WouldBlock.into()),
+ task03::Poll::Ready(Err(e)) => Err(e),
+ }
+ }
+
+ impl<R: AsyncRead03 + Unpin> std::io::Read for Compat<R> {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ let current = Current::new();
+ let waker = current.as_waker();
+ let mut cx = Context::from_waker(&waker);
+ poll_03_to_io(Pin::new(&mut self.inner).poll_read(&mut cx, buf))
+ }
+ }
+
+ impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {
+ #[cfg(feature = "read-initializer")]
+ unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
+ let initializer = self.inner.initializer();
+ let does_init = initializer.should_initialize();
+ if does_init {
+ initializer.initialize(buf);
+ }
+ does_init
+ }
+ }
+
+ impl<W: AsyncWrite03 + Unpin> std::io::Write for Compat<W> {
+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+ let current = Current::new();
+ let waker = current.as_waker();
+ let mut cx = Context::from_waker(&waker);
+ poll_03_to_io(Pin::new(&mut self.inner).poll_write(&mut cx, buf))
+ }
+
+ fn flush(&mut self) -> std::io::Result<()> {
+ let current = Current::new();
+ let waker = current.as_waker();
+ let mut cx = Context::from_waker(&waker);
+ poll_03_to_io(Pin::new(&mut self.inner).poll_flush(&mut cx))
+ }
+ }
+
+ impl<W: AsyncWrite03 + Unpin> AsyncWrite01 for Compat<W> {
+ fn shutdown(&mut self) -> std::io::Result<Async01<()>> {
+ let current = Current::new();
+ let waker = current.as_waker();
+ let mut cx = Context::from_waker(&waker);
+ poll_03_to_01(Pin::new(&mut self.inner).poll_close(&mut cx))
+ }
+ }
+}
diff --git a/src/compat/executor.rs b/src/compat/executor.rs
new file mode 100644
index 0000000..82cb496
--- /dev/null
+++ b/src/compat/executor.rs
@@ -0,0 +1,87 @@
+use super::{Compat, Future01CompatExt};
+use crate::{
+ future::{FutureExt, TryFutureExt, UnitError},
+ task::SpawnExt,
+};
+use futures_01::future::{ExecuteError as ExecuteError01, Executor as Executor01};
+use futures_01::Future as Future01;
+use futures_task::{FutureObj, Spawn as Spawn03, SpawnError as SpawnError03};
+
+/// A future that can run on a futures 0.1
+/// [`Executor`](futures_01::future::Executor).
+pub type Executor01Future = Compat<UnitError<FutureObj<'static, ()>>>;
+
+/// Extension trait for futures 0.1 [`Executor`](futures_01::future::Executor).
+pub trait Executor01CompatExt: Executor01<Executor01Future> + Clone + Send + 'static {
+ /// Converts a futures 0.1 [`Executor`](futures_01::future::Executor) into a
+ /// futures 0.3 [`Spawn`](futures_task::Spawn).
+ ///
+ /// ```
+ /// use futures::task::SpawnExt;
+ /// use futures::future::{FutureExt, TryFutureExt};
+ /// use futures_util::compat::Executor01CompatExt;
+ /// use tokio::executor::DefaultExecutor;
+ ///
+ /// # let (tx, rx) = futures::channel::oneshot::channel();
+ ///
+ /// let spawner = DefaultExecutor::current().compat();
+ /// let future03 = async move {
+ /// println!("Running on the pool");
+ /// spawner.spawn(async {
+ /// println!("Spawned!");
+ /// # tx.send(42).unwrap();
+ /// }).unwrap();
+ /// };
+ ///
+ /// let future01 = future03.unit_error().boxed().compat();
+ ///
+ /// tokio::run(future01);
+ /// # futures::executor::block_on(rx).unwrap();
+ /// ```
+ fn compat(self) -> Executor01As03<Self>
+ where
+ Self: Sized;
+}
+
+impl<Ex> Executor01CompatExt for Ex
+where
+ Ex: Executor01<Executor01Future> + Clone + Send + 'static,
+{
+ fn compat(self) -> Executor01As03<Self> {
+ Executor01As03 { executor01: self }
+ }
+}
+
+/// Converts a futures 0.1 [`Executor`](futures_01::future::Executor) into a
+/// futures 0.3 [`Spawn`](futures_task::Spawn).
+#[derive(Debug, Clone)]
+pub struct Executor01As03<Ex> {
+ executor01: Ex,
+}
+
+impl<Ex> Spawn03 for Executor01As03<Ex>
+where
+ Ex: Executor01<Executor01Future> + Clone + Send + 'static,
+{
+ fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError03> {
+ let future = future.unit_error().compat();
+
+ self.executor01
+ .execute(future)
+ .map_err(|_| SpawnError03::shutdown())
+ }
+}
+
+#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
+impl<Sp, Fut> Executor01<Fut> for Compat<Sp>
+where
+ for<'a> &'a Sp: Spawn03,
+ Fut: Future01<Item = (), Error = ()> + Send + 'static,
+{
+ fn execute(&self, future: Fut) -> Result<(), ExecuteError01<Fut>> {
+ (&self.inner)
+ .spawn(future.compat().map(|_| ()))
+ .expect("unable to spawn future from Compat executor");
+ Ok(())
+ }
+}
diff --git a/src/compat/mod.rs b/src/compat/mod.rs
new file mode 100644
index 0000000..1826836
--- /dev/null
+++ b/src/compat/mod.rs
@@ -0,0 +1,19 @@
+//! Futures 0.1 / 0.3 shims
+//!
+//! This module is only available when the `compat` feature of this
+//! library is activated.
+
+mod executor;
+pub use self::executor::{Executor01CompatExt, Executor01Future, Executor01As03};
+
+mod compat01as03;
+pub use self::compat01as03::{Compat01As03, Future01CompatExt, Stream01CompatExt};
+#[cfg(feature = "sink")]
+pub use self::compat01as03::{Compat01As03Sink, Sink01CompatExt};
+#[cfg(feature = "io-compat")]
+pub use self::compat01as03::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
+
+mod compat03as01;
+pub use self::compat03as01::Compat;
+#[cfg(feature = "sink")]
+pub use self::compat03as01::CompatSink;