aboutsummaryrefslogtreecommitdiff
path: root/tests/framed_read.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/framed_read.rs')
-rw-r--r--tests/framed_read.rs339
1 files changed, 339 insertions, 0 deletions
diff --git a/tests/framed_read.rs b/tests/framed_read.rs
new file mode 100644
index 0000000..2a9e27e
--- /dev/null
+++ b/tests/framed_read.rs
@@ -0,0 +1,339 @@
+#![warn(rust_2018_idioms)]
+
+use tokio::io::{AsyncRead, ReadBuf};
+use tokio_test::assert_ready;
+use tokio_test::task;
+use tokio_util::codec::{Decoder, FramedRead};
+
+use bytes::{Buf, BytesMut};
+use futures::Stream;
+use std::collections::VecDeque;
+use std::io;
+use std::pin::Pin;
+use std::task::Poll::{Pending, Ready};
+use std::task::{Context, Poll};
+
+macro_rules! mock {
+ ($($x:expr,)*) => {{
+ let mut v = VecDeque::new();
+ v.extend(vec![$($x),*]);
+ Mock { calls: v }
+ }};
+}
+
+macro_rules! assert_read {
+ ($e:expr, $n:expr) => {{
+ let val = assert_ready!($e);
+ assert_eq!(val.unwrap().unwrap(), $n);
+ }};
+}
+
+macro_rules! pin {
+ ($id:ident) => {
+ Pin::new(&mut $id)
+ };
+}
+
+struct U32Decoder;
+
+impl Decoder for U32Decoder {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ if buf.len() < 4 {
+ return Ok(None);
+ }
+
+ let n = buf.split_to(4).get_u32();
+ Ok(Some(n))
+ }
+}
+
+struct U64Decoder;
+
+impl Decoder for U64Decoder {
+ type Item = u64;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u64>> {
+ if buf.len() < 8 {
+ return Ok(None);
+ }
+
+ let n = buf.split_to(8).get_u64();
+ Ok(Some(n))
+ }
+}
+
+#[test]
+fn read_multi_frame_in_packet() {
+ let mut task = task::spawn(());
+ let mock = mock! {
+ Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
+ };
+ let mut framed = FramedRead::new(mock, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert_read!(pin!(framed).poll_next(cx), 0);
+ assert_read!(pin!(framed).poll_next(cx), 1);
+ assert_read!(pin!(framed).poll_next(cx), 2);
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
+ });
+}
+
+#[test]
+fn read_multi_frame_across_packets() {
+ let mut task = task::spawn(());
+ let mock = mock! {
+ Ok(b"\x00\x00\x00\x00".to_vec()),
+ Ok(b"\x00\x00\x00\x01".to_vec()),
+ Ok(b"\x00\x00\x00\x02".to_vec()),
+ };
+ let mut framed = FramedRead::new(mock, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert_read!(pin!(framed).poll_next(cx), 0);
+ assert_read!(pin!(framed).poll_next(cx), 1);
+ assert_read!(pin!(framed).poll_next(cx), 2);
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
+ });
+}
+
+#[test]
+fn read_multi_frame_in_packet_after_codec_changed() {
+ let mut task = task::spawn(());
+ let mock = mock! {
+ Ok(b"\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x08".to_vec()),
+ };
+ let mut framed = FramedRead::new(mock, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert_read!(pin!(framed).poll_next(cx), 0x04);
+
+ let mut framed = framed.map_decoder(|_| U64Decoder);
+ assert_read!(pin!(framed).poll_next(cx), 0x08);
+
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
+ });
+}
+
+#[test]
+fn read_not_ready() {
+ let mut task = task::spawn(());
+ let mock = mock! {
+ Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
+ Ok(b"\x00\x00\x00\x00".to_vec()),
+ Ok(b"\x00\x00\x00\x01".to_vec()),
+ };
+ let mut framed = FramedRead::new(mock, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert!(pin!(framed).poll_next(cx).is_pending());
+ assert_read!(pin!(framed).poll_next(cx), 0);
+ assert_read!(pin!(framed).poll_next(cx), 1);
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
+ });
+}
+
+#[test]
+fn read_partial_then_not_ready() {
+ let mut task = task::spawn(());
+ let mock = mock! {
+ Ok(b"\x00\x00".to_vec()),
+ Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
+ Ok(b"\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
+ };
+ let mut framed = FramedRead::new(mock, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert!(pin!(framed).poll_next(cx).is_pending());
+ assert_read!(pin!(framed).poll_next(cx), 0);
+ assert_read!(pin!(framed).poll_next(cx), 1);
+ assert_read!(pin!(framed).poll_next(cx), 2);
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
+ });
+}
+
+#[test]
+fn read_err() {
+ let mut task = task::spawn(());
+ let mock = mock! {
+ Err(io::Error::new(io::ErrorKind::Other, "")),
+ };
+ let mut framed = FramedRead::new(mock, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert_eq!(
+ io::ErrorKind::Other,
+ assert_ready!(pin!(framed).poll_next(cx))
+ .unwrap()
+ .unwrap_err()
+ .kind()
+ )
+ });
+}
+
+#[test]
+fn read_partial_then_err() {
+ let mut task = task::spawn(());
+ let mock = mock! {
+ Ok(b"\x00\x00".to_vec()),
+ Err(io::Error::new(io::ErrorKind::Other, "")),
+ };
+ let mut framed = FramedRead::new(mock, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert_eq!(
+ io::ErrorKind::Other,
+ assert_ready!(pin!(framed).poll_next(cx))
+ .unwrap()
+ .unwrap_err()
+ .kind()
+ )
+ });
+}
+
+#[test]
+fn read_partial_would_block_then_err() {
+ let mut task = task::spawn(());
+ let mock = mock! {
+ Ok(b"\x00\x00".to_vec()),
+ Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
+ Err(io::Error::new(io::ErrorKind::Other, "")),
+ };
+ let mut framed = FramedRead::new(mock, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert!(pin!(framed).poll_next(cx).is_pending());
+ assert_eq!(
+ io::ErrorKind::Other,
+ assert_ready!(pin!(framed).poll_next(cx))
+ .unwrap()
+ .unwrap_err()
+ .kind()
+ )
+ });
+}
+
+#[test]
+fn huge_size() {
+ let mut task = task::spawn(());
+ let data = &[0; 32 * 1024][..];
+ let mut framed = FramedRead::new(data, BigDecoder);
+
+ task.enter(|cx, _| {
+ assert_read!(pin!(framed).poll_next(cx), 0);
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
+ });
+
+ struct BigDecoder;
+
+ impl Decoder for BigDecoder {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ if buf.len() < 32 * 1024 {
+ return Ok(None);
+ }
+ buf.advance(32 * 1024);
+ Ok(Some(0))
+ }
+ }
+}
+
+#[test]
+fn data_remaining_is_error() {
+ let mut task = task::spawn(());
+ let slice = &[0; 5][..];
+ let mut framed = FramedRead::new(slice, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert_read!(pin!(framed).poll_next(cx), 0);
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).unwrap().is_err());
+ });
+}
+
+#[test]
+fn multi_frames_on_eof() {
+ let mut task = task::spawn(());
+ struct MyDecoder(Vec<u32>);
+
+ impl Decoder for MyDecoder {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn decode(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ unreachable!();
+ }
+
+ fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ if self.0.is_empty() {
+ return Ok(None);
+ }
+
+ Ok(Some(self.0.remove(0)))
+ }
+ }
+
+ let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3]));
+
+ task.enter(|cx, _| {
+ assert_read!(pin!(framed).poll_next(cx), 0);
+ assert_read!(pin!(framed).poll_next(cx), 1);
+ assert_read!(pin!(framed).poll_next(cx), 2);
+ assert_read!(pin!(framed).poll_next(cx), 3);
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
+ });
+}
+
+#[test]
+fn read_eof_then_resume() {
+ let mut task = task::spawn(());
+ let mock = mock! {
+ Ok(b"\x00\x00\x00\x01".to_vec()),
+ Ok(b"".to_vec()),
+ Ok(b"\x00\x00\x00\x02".to_vec()),
+ Ok(b"".to_vec()),
+ Ok(b"\x00\x00\x00\x03".to_vec()),
+ };
+ let mut framed = FramedRead::new(mock, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert_read!(pin!(framed).poll_next(cx), 1);
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
+ assert_read!(pin!(framed).poll_next(cx), 2);
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
+ assert_read!(pin!(framed).poll_next(cx), 3);
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
+ });
+}
+
+// ===== Mock ======
+
+struct Mock {
+ calls: VecDeque<io::Result<Vec<u8>>>,
+}
+
+impl AsyncRead for Mock {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ use io::ErrorKind::WouldBlock;
+
+ match self.calls.pop_front() {
+ Some(Ok(data)) => {
+ debug_assert!(buf.remaining() >= data.len());
+ buf.put_slice(&data);
+ Ready(Ok(()))
+ }
+ Some(Err(ref e)) if e.kind() == WouldBlock => Pending,
+ Some(Err(e)) => Ready(Err(e)),
+ None => Ready(Ok(())),
+ }
+ }
+}