diff options
Diffstat (limited to 'tests/sink.rs')
-rw-r--r-- | tests/sink.rs | 594 |
1 files changed, 374 insertions, 220 deletions
diff --git a/tests/sink.rs b/tests/sink.rs index f967e1b..f6ce28c 100644 --- a/tests/sink.rs +++ b/tests/sink.rs @@ -1,43 +1,258 @@ -use futures::channel::{mpsc, oneshot}; -use futures::executor::block_on; -use futures::future::{self, Future, FutureExt, TryFutureExt}; -use futures::never::Never; -use futures::ready; -use futures::sink::{Sink, SinkErrInto, SinkExt}; -use futures::stream::{self, Stream, StreamExt}; -use futures::task::{self, ArcWake, Context, Poll, Waker}; -use futures_test::task::panic_context; -use std::cell::{Cell, RefCell}; -use std::collections::VecDeque; -use std::fmt; -use std::mem; -use std::pin::Pin; -use std::rc::Rc; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; - -fn sassert_next<S>(s: &mut S, item: S::Item) -where - S: Stream + Unpin, - S::Item: Eq + fmt::Debug, -{ - match s.poll_next_unpin(&mut panic_context()) { - Poll::Ready(None) => panic!("stream is at its end"), - Poll::Ready(Some(e)) => assert_eq!(e, item), - Poll::Pending => panic!("stream wasn't ready"), +#[allow(dead_code)] +mod sassert_next { + use futures::stream::{Stream, StreamExt}; + use futures::task::Poll; + use futures_test::task::panic_context; + use std::fmt; + + pub fn sassert_next<S>(s: &mut S, item: S::Item) + where + S: Stream + Unpin, + S::Item: Eq + fmt::Debug, + { + match s.poll_next_unpin(&mut panic_context()) { + Poll::Ready(None) => panic!("stream is at its end"), + Poll::Ready(Some(e)) => assert_eq!(e, item), + Poll::Pending => panic!("stream wasn't ready"), + } + } +} + +#[allow(dead_code)] +mod unwrap { + use futures::task::Poll; + use std::fmt; + + pub fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T { + match x { + Poll::Ready(Ok(x)) => x, + Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"), + Poll::Pending => panic!("Poll::Pending"), + } + } +} + +#[allow(dead_code)] +#[cfg(feature = "alloc")] // ArcWake +mod flag_cx { + use futures::task::{self, ArcWake, Context}; + use std::sync::Arc; + use std::sync::atomic::{AtomicBool, Ordering}; + + // An Unpark struct that records unpark events for inspection + pub struct Flag(AtomicBool); + + impl Flag { + pub fn new() -> Arc<Self> { + Arc::new(Self(AtomicBool::new(false))) + } + + pub fn take(&self) -> bool { + self.0.swap(false, Ordering::SeqCst) + } + + pub fn set(&self, v: bool) { + self.0.store(v, Ordering::SeqCst) + } + } + + impl ArcWake for Flag { + fn wake_by_ref(arc_self: &Arc<Self>) { + arc_self.set(true) + } + } + + pub fn flag_cx<F, R>(f: F) -> R + where + F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R, + { + let flag = Flag::new(); + let waker = task::waker_ref(&flag); + let cx = &mut Context::from_waker(&waker); + f(flag.clone(), cx) + } +} + +#[allow(dead_code)] +mod start_send_fut { + use futures::future::Future; + use futures::ready; + use futures::sink::Sink; + use futures::task::{Context, Poll}; + use std::pin::Pin; + + // Sends a value on an i32 channel sink + pub struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>); + + impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> { + pub fn new(sink: S, item: Item) -> Self { + Self(Some(sink), Some(item)) + } + } + + impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> { + type Output = Result<S, S::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let Self(inner, item) = self.get_mut(); + { + let mut inner = inner.as_mut().unwrap(); + ready!(Pin::new(&mut inner).poll_ready(cx))?; + Pin::new(&mut inner).start_send(item.take().unwrap())?; + } + Poll::Ready(Ok(inner.take().unwrap())) + } + } +} + +#[allow(dead_code)] +mod manual_flush { + use futures::sink::Sink; + use futures::task::{Context, Poll, Waker}; + use std::mem; + use std::pin::Pin; + + // Immediately accepts all requests to start pushing, but completion is managed + // by manually flushing + pub struct ManualFlush<T: Unpin> { + data: Vec<T>, + waiting_tasks: Vec<Waker>, + } + + impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> { + type Error = (); + + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> { + if let Some(item) = item { + self.data.push(item); + } else { + self.force_flush(); + } + Ok(()) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + if self.data.is_empty() { + Poll::Ready(Ok(())) + } else { + self.waiting_tasks.push(cx.waker().clone()); + Poll::Pending + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.poll_flush(cx) + } + } + + impl<T: Unpin> ManualFlush<T> { + pub fn new() -> Self { + Self { + data: Vec::new(), + waiting_tasks: Vec::new(), + } + } + + pub fn force_flush(&mut self) -> Vec<T> { + for task in self.waiting_tasks.drain(..) { + task.wake() + } + mem::replace(&mut self.data, Vec::new()) + } } } -fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T { - match x { - Poll::Ready(Ok(x)) => x, - Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"), - Poll::Pending => panic!("Poll::Pending"), +#[allow(dead_code)] +mod allowance { + use futures::sink::Sink; + use futures::task::{Context, Poll, Waker}; + use std::cell::{Cell, RefCell}; + use std::pin::Pin; + use std::rc::Rc; + + pub struct ManualAllow<T: Unpin> { + pub data: Vec<T>, + allow: Rc<Allow>, + } + + pub struct Allow { + flag: Cell<bool>, + tasks: RefCell<Vec<Waker>>, + } + + impl Allow { + pub fn new() -> Self { + Self { + flag: Cell::new(false), + tasks: RefCell::new(Vec::new()), + } + } + + pub fn check(&self, cx: &mut Context<'_>) -> bool { + if self.flag.get() { + true + } else { + self.tasks.borrow_mut().push(cx.waker().clone()); + false + } + } + + pub fn start(&self) { + self.flag.set(true); + let mut tasks = self.tasks.borrow_mut(); + for task in tasks.drain(..) { + task.wake(); + } + } + } + + impl<T: Unpin> Sink<T> for ManualAllow<T> { + type Error = (); + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + if self.allow.check(cx) { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } + + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + self.data.push(item); + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + } + + pub fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) { + let allow = Rc::new(Allow::new()); + let manual_allow = ManualAllow { + data: Vec::new(), + allow: allow.clone(), + }; + (manual_allow, allow) } } + +#[cfg(feature = "alloc")] // futures_sink::Sink satisfying for .left/right_sink #[test] fn either_sink() { + use futures::sink::{Sink, SinkExt}; + use std::collections::VecDeque; + use std::pin::Pin; + let mut s = if true { Vec::<i32>::new().left_sink() } else { @@ -47,8 +262,13 @@ fn either_sink() { Pin::new(&mut s).start_send(0).unwrap(); } +#[cfg(feature = "executor")] // executor:: #[test] fn vec_sink() { + use futures::executor::block_on; + use futures::sink::{Sink, SinkExt}; + use std::pin::Pin; + let mut v = Vec::new(); Pin::new(&mut v).start_send(0).unwrap(); Pin::new(&mut v).start_send(1).unwrap(); @@ -57,8 +277,13 @@ fn vec_sink() { assert_eq!(v, vec![0, 1]); } +#[cfg(feature = "alloc")] // futures_sink::Sink satisfying for .start_send() #[test] fn vecdeque_sink() { + use futures::sink::Sink; + use std::collections::VecDeque; + use std::pin::Pin; + let mut deque = VecDeque::new(); Pin::new(&mut deque).start_send(2).unwrap(); Pin::new(&mut deque).start_send(3).unwrap(); @@ -68,8 +293,12 @@ fn vecdeque_sink() { assert_eq!(deque.pop_front(), None); } +#[cfg(feature = "executor")] // executor:: #[test] fn send() { + use futures::executor::block_on; + use futures::sink::SinkExt; + let mut v = Vec::new(); block_on(v.send(0)).unwrap(); @@ -82,8 +311,13 @@ fn send() { assert_eq!(v, vec![0, 1, 2]); } +#[cfg(feature = "executor")] // executor:: #[test] fn send_all() { + use futures::executor::block_on; + use futures::sink::SinkExt; + use futures::stream::{self, StreamExt}; + let mut v = Vec::new(); block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap(); @@ -96,66 +330,20 @@ fn send_all() { assert_eq!(v, vec![0, 1, 2, 3, 4, 5]); } -// An Unpark struct that records unpark events for inspection -struct Flag(AtomicBool); - -impl Flag { - fn new() -> Arc<Self> { - Arc::new(Self(AtomicBool::new(false))) - } - - fn take(&self) -> bool { - self.0.swap(false, Ordering::SeqCst) - } - - fn set(&self, v: bool) { - self.0.store(v, Ordering::SeqCst) - } -} - -impl ArcWake for Flag { - fn wake_by_ref(arc_self: &Arc<Self>) { - arc_self.set(true) - } -} - -fn flag_cx<F, R>(f: F) -> R -where - F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R, -{ - let flag = Flag::new(); - let waker = task::waker_ref(&flag); - let cx = &mut Context::from_waker(&waker); - f(flag.clone(), cx) -} - -// Sends a value on an i32 channel sink -struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>); - -impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> { - fn new(sink: S, item: Item) -> Self { - Self(Some(sink), Some(item)) - } -} - -impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> { - type Output = Result<S, S::Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let Self(inner, item) = self.get_mut(); - { - let mut inner = inner.as_mut().unwrap(); - ready!(Pin::new(&mut inner).poll_ready(cx))?; - Pin::new(&mut inner).start_send(item.take().unwrap())?; - } - Poll::Ready(Ok(inner.take().unwrap())) - } -} - // Test that `start_send` on an `mpsc` channel does indeed block when the // channel is full +#[cfg(feature = "executor")] // executor:: #[test] fn mpsc_blocking_start_send() { + use futures::channel::mpsc; + use futures::executor::block_on; + use futures::future::{self, FutureExt}; + + use start_send_fut::StartSendFut; + use flag_cx::flag_cx; + use sassert_next::sassert_next; + use unwrap::unwrap; + let (mut tx, mut rx) = mpsc::channel::<i32>(0); block_on(future::lazy(|_| { @@ -177,8 +365,20 @@ fn mpsc_blocking_start_send() { // test `flush` by using `with` to make the first insertion into a sink block // until a oneshot is completed +#[cfg(feature = "executor")] // executor:: #[test] fn with_flush() { + use futures::channel::oneshot; + use futures::executor::block_on; + use futures::future::{self, FutureExt, TryFutureExt}; + use futures::never::Never; + use futures::sink::{Sink, SinkExt}; + use std::mem; + use std::pin::Pin; + + use flag_cx::flag_cx; + use unwrap::unwrap; + let (tx, rx) = oneshot::channel(); let mut block = rx.boxed(); let mut sink = Vec::new().with(|elem| { @@ -203,8 +403,14 @@ fn with_flush() { } // test simple use of with to change data +#[cfg(feature = "executor")] // executor:: #[test] fn with_as_map() { + use futures::executor::block_on; + use futures::future; + use futures::never::Never; + use futures::sink::SinkExt; + let mut sink = Vec::new().with(|item| future::ok::<i32, Never>(item * 2)); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -213,8 +419,13 @@ fn with_as_map() { } // test simple use of with_flat_map +#[cfg(feature = "executor")] // executor:: #[test] fn with_flat_map() { + use futures::executor::block_on; + use futures::sink::SinkExt; + use futures::stream::{self, StreamExt}; + let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok)); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -225,8 +436,19 @@ fn with_flat_map() { // Check that `with` propagates `poll_ready` to the inner sink. // Regression test for the issue #1834. +#[cfg(feature = "executor")] // executor:: #[test] fn with_propagates_poll_ready() { + use futures::channel::mpsc; + use futures::executor::block_on; + use futures::future; + use futures::sink::{Sink, SinkExt}; + use futures::task::Poll; + use std::pin::Pin; + + use flag_cx::flag_cx; + use sassert_next::sassert_next; + let (tx, mut rx) = mpsc::channel::<i32>(0); let mut tx = tx.with(|item: i32| future::ok::<i32, mpsc::SendError>(item + 10)); @@ -249,63 +471,19 @@ fn with_propagates_poll_ready() { })); } -// Immediately accepts all requests to start pushing, but completion is managed -// by manually flushing -struct ManualFlush<T: Unpin> { - data: Vec<T>, - waiting_tasks: Vec<Waker>, -} - -impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> { - type Error = (); - - fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } - - fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> { - if let Some(item) = item { - self.data.push(item); - } else { - self.force_flush(); - } - Ok(()) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - if self.data.is_empty() { - Poll::Ready(Ok(())) - } else { - self.waiting_tasks.push(cx.waker().clone()); - Poll::Pending - } - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.poll_flush(cx) - } -} - -impl<T: Unpin> ManualFlush<T> { - fn new() -> Self { - Self { - data: Vec::new(), - waiting_tasks: Vec::new(), - } - } - - fn force_flush(&mut self) -> Vec<T> { - for task in self.waiting_tasks.drain(..) { - task.wake() - } - mem::replace(&mut self.data, Vec::new()) - } -} - // test that the `with` sink doesn't require the underlying sink to flush, // but doesn't claim to be flushed until the underlying sink is +#[cfg(feature = "alloc")] // flag_cx #[test] fn with_flush_propagate() { + use futures::future::{self, FutureExt}; + use futures::sink::{Sink, SinkExt}; + use std::pin::Pin; + + use manual_flush::ManualFlush; + use flag_cx::flag_cx; + use unwrap::unwrap; + let mut sink = ManualFlush::new().with(future::ok::<Option<i32>, ()>); flag_cx(|flag, cx| { unwrap(Pin::new(&mut sink).poll_ready(cx)); @@ -325,8 +503,12 @@ fn with_flush_propagate() { } // test that a buffer is a no-nop around a sink that always accepts sends +#[cfg(feature = "executor")] // executor:: #[test] fn buffer_noop() { + use futures::executor::block_on; + use futures::sink::SinkExt; + let mut sink = Vec::new().buffer(0); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -338,80 +520,21 @@ fn buffer_noop() { assert_eq!(sink.get_ref(), &[0, 1]); } -struct ManualAllow<T: Unpin> { - data: Vec<T>, - allow: Rc<Allow>, -} - -struct Allow { - flag: Cell<bool>, - tasks: RefCell<Vec<Waker>>, -} - -impl Allow { - fn new() -> Self { - Self { - flag: Cell::new(false), - tasks: RefCell::new(Vec::new()), - } - } - - fn check(&self, cx: &mut Context<'_>) -> bool { - if self.flag.get() { - true - } else { - self.tasks.borrow_mut().push(cx.waker().clone()); - false - } - } - - fn start(&self) { - self.flag.set(true); - let mut tasks = self.tasks.borrow_mut(); - for task in tasks.drain(..) { - task.wake(); - } - } -} - -impl<T: Unpin> Sink<T> for ManualAllow<T> { - type Error = (); - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - if self.allow.check(cx) { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } - - fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - self.data.push(item); - Ok(()) - } - - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } -} - -fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) { - let allow = Rc::new(Allow::new()); - let manual_allow = ManualAllow { - data: Vec::new(), - allow: allow.clone(), - }; - (manual_allow, allow) -} - // test basic buffer functionality, including both filling up to capacity, // and writing out when the underlying sink is ready +#[cfg(feature = "executor")] // executor:: +#[cfg(feature = "alloc")] // flag_cx #[test] fn buffer() { + use futures::executor::block_on; + use futures::future::FutureExt; + use futures::sink::SinkExt; + + use start_send_fut::StartSendFut; + use flag_cx::flag_cx; + use unwrap::unwrap; + use allowance::manual_allow; + let (sink, allow) = manual_allow::<i32>(); let sink = sink.buffer(2); @@ -429,8 +552,13 @@ fn buffer() { }) } +#[cfg(feature = "executor")] // executor:: #[test] fn fanout_smoke() { + use futures::executor::block_on; + use futures::sink::SinkExt; + use futures::stream::{self, StreamExt}; + let sink1 = Vec::new(); let sink2 = Vec::new(); let mut sink = sink1.fanout(sink2); @@ -440,8 +568,20 @@ fn fanout_smoke() { assert_eq!(sink2, vec![1, 2, 3]); } +#[cfg(feature = "executor")] // executor:: +#[cfg(feature = "alloc")] // flag_cx #[test] fn fanout_backpressure() { + use futures::channel::mpsc; + use futures::executor::block_on; + use futures::future::FutureExt; + use futures::sink::SinkExt; + use futures::stream::StreamExt; + + use start_send_fut::StartSendFut; + use flag_cx::flag_cx; + use unwrap::unwrap; + let (left_send, mut left_recv) = mpsc::channel(0); let (right_send, mut right_recv) = mpsc::channel(0); let sink = left_send.fanout(right_send); @@ -472,8 +612,15 @@ fn fanout_backpressure() { }) } +#[cfg(all(feature = "alloc", feature = "std"))] // channel::mpsc #[test] fn sink_map_err() { + use futures::channel::mpsc; + use futures::sink::{Sink, SinkExt}; + use futures::task::Poll; + use futures_test::task::panic_context; + use std::pin::Pin; + { let cx = &mut panic_context(); let (tx, _rx) = mpsc::channel(1); @@ -489,17 +636,24 @@ fn sink_map_err() { ); } -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -struct ErrIntoTest; - -impl From<mpsc::SendError> for ErrIntoTest { - fn from(_: mpsc::SendError) -> Self { - Self - } -} - +#[cfg(all(feature = "alloc", feature = "std"))] // channel::mpsc #[test] fn err_into() { + use futures::channel::mpsc; + use futures::sink::{Sink, SinkErrInto, SinkExt}; + use futures::task::Poll; + use futures_test::task::panic_context; + use std::pin::Pin; + + #[derive(Copy, Clone, Debug, PartialEq, Eq)] + pub struct ErrIntoTest; + + impl From<mpsc::SendError> for ErrIntoTest { + fn from(_: mpsc::SendError) -> Self { + Self + } + } + { let cx = &mut panic_context(); let (tx, _rx) = mpsc::channel(1); |