diff options
author | Joel Galenson <jgalenson@google.com> | 2021-06-08 17:53:00 -0700 |
---|---|---|
committer | Bernie Innocenti <codewiz@google.com> | 2021-06-24 06:33:32 +0000 |
commit | f41107f41f7f32aa9fed11b54c4b08229f4ac43e (patch) | |
tree | f322aa3a0b30f78d0f83e74509d4af2e92f0f9e0 /src/h3/mod.rs | |
parent | 7339ce145673e0923017350e679ada9c27069cb7 (diff) | |
download | quiche-f41107f41f7f32aa9fed11b54c4b08229f4ac43e.tar.gz |
Upgrade rust/crates/quiche to 0.9.0
Test: make
Change-Id: I438d6a167e6e0bbfe38785ba13c33a285d1c510b
Merged-In: Ide718a0186b4d1d9700b05623a7ee3b692062ea6
Diffstat (limited to 'src/h3/mod.rs')
-rw-r--r-- | src/h3/mod.rs | 1633 |
1 files changed, 1480 insertions, 153 deletions
diff --git a/src/h3/mod.rs b/src/h3/mod.rs index 6248688..dd2a51a 100644 --- a/src/h3/mod.rs +++ b/src/h3/mod.rs @@ -59,8 +59,9 @@ //! //! ```no_run //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap(); -//! # let scid = [0xba; 16]; -//! # let mut conn = quiche::connect(None, &scid, &mut config).unwrap(); +//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]); +//! # let from = "127.0.0.1:1234".parse().unwrap(); +//! # let mut conn = quiche::accept(&scid, None, from, &mut config).unwrap(); //! # let h3_config = quiche::h3::Config::new()?; //! let h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?; //! # Ok::<(), quiche::h3::Error>(()) @@ -74,16 +75,17 @@ //! //! ```no_run //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap(); -//! # let scid = [0xba; 16]; -//! # let mut conn = quiche::connect(None, &scid, &mut config).unwrap(); +//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]); +//! # let to = "127.0.0.1:1234".parse().unwrap(); +//! # let mut conn = quiche::connect(None, &scid, to, &mut config).unwrap(); //! # let h3_config = quiche::h3::Config::new()?; //! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?; //! let req = vec![ -//! quiche::h3::Header::new(":method", "GET"), -//! quiche::h3::Header::new(":scheme", "https"), -//! quiche::h3::Header::new(":authority", "quic.tech"), -//! quiche::h3::Header::new(":path", "/"), -//! quiche::h3::Header::new("user-agent", "quiche"), +//! quiche::h3::Header::new(b":method", b"GET"), +//! quiche::h3::Header::new(b":scheme", b"https"), +//! quiche::h3::Header::new(b":authority", b"quic.tech"), +//! quiche::h3::Header::new(b":path", b"/"), +//! quiche::h3::Header::new(b"user-agent", b"quiche"), //! ]; //! //! h3_conn.send_request(&mut conn, &req, true)?; @@ -95,16 +97,17 @@ //! //! ```no_run //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap(); -//! # let scid = [0xba; 16]; -//! # let mut conn = quiche::connect(None, &scid, &mut config).unwrap(); +//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]); +//! # let to = "127.0.0.1:1234".parse().unwrap(); +//! # let mut conn = quiche::connect(None, &scid, to, &mut config).unwrap(); //! # let h3_config = quiche::h3::Config::new()?; //! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?; //! let req = vec![ -//! quiche::h3::Header::new(":method", "GET"), -//! quiche::h3::Header::new(":scheme", "https"), -//! quiche::h3::Header::new(":authority", "quic.tech"), -//! quiche::h3::Header::new(":path", "/"), -//! quiche::h3::Header::new("user-agent", "quiche"), +//! quiche::h3::Header::new(b":method", b"GET"), +//! quiche::h3::Header::new(b":scheme", b"https"), +//! quiche::h3::Header::new(b":authority", b"quic.tech"), +//! quiche::h3::Header::new(b":path", b"/"), +//! quiche::h3::Header::new(b"user-agent", b"quiche"), //! ]; //! //! let stream_id = h3_conn.send_request(&mut conn, &req, false)?; @@ -125,8 +128,9 @@ //! use quiche::h3::NameValue; //! //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap(); -//! # let scid = [0xba; 16]; -//! # let mut conn = quiche::accept(&scid, None, &mut config).unwrap(); +//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]); +//! # let from = "127.0.0.1:1234".parse().unwrap(); +//! # let mut conn = quiche::accept(&scid, None, from, &mut config).unwrap(); //! # let h3_config = quiche::h3::Config::new()?; //! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?; //! loop { @@ -135,15 +139,15 @@ //! let mut headers = list.into_iter(); //! //! // Look for the request's method. -//! let method = headers.find(|h| h.name() == ":method").unwrap(); +//! let method = headers.find(|h| h.name() == b":method").unwrap(); //! //! // Look for the request's path. -//! let path = headers.find(|h| h.name() == ":path").unwrap(); +//! let path = headers.find(|h| h.name() == b":path").unwrap(); //! -//! if method.value() == "GET" && path.value() == "/" { +//! if method.value() == b"GET" && path.value() == b"/" { //! let resp = vec![ -//! quiche::h3::Header::new(":status", &200.to_string()), -//! quiche::h3::Header::new("server", "quiche"), +//! quiche::h3::Header::new(b":status", 200.to_string().as_bytes()), +//! quiche::h3::Header::new(b"server", b"quiche"), //! ]; //! //! h3_conn.send_response(&mut conn, stream_id, &resp, false)?; @@ -186,22 +190,25 @@ //! use quiche::h3::NameValue; //! //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap(); -//! # let scid = [0xba; 16]; -//! # let mut conn = quiche::connect(None, &scid, &mut config).unwrap(); +//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]); +//! # let to = "127.0.0.1:1234".parse().unwrap(); +//! # let mut conn = quiche::connect(None, &scid, to, &mut config).unwrap(); //! # let h3_config = quiche::h3::Config::new()?; //! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?; //! loop { //! match h3_conn.poll(&mut conn) { //! Ok((stream_id, quiche::h3::Event::Headers{list, has_body})) => { -//! let status = list.iter().find(|h| h.name() == ":status").unwrap(); +//! let status = list.iter().find(|h| h.name() == b":status").unwrap(); //! println!("Received {} response on stream {}", -//! status.value(), stream_id); +//! std::str::from_utf8(status.value()).unwrap(), +//! stream_id); //! }, //! //! Ok((stream_id, quiche::h3::Event::Data)) => { //! let mut body = vec![0; 4096]; //! -//! if let Ok(read) = +//! // Consume all body data received on the stream. +//! while let Ok(read) = //! h3_conn.recv_body(&mut conn, stream_id, &mut body) //! { //! println!("Received {} bytes of payload on stream {}", @@ -274,7 +281,7 @@ use crate::octets; /// /// [`Config::set_application_protos()`]: /// ../struct.Config.html#method.set_application_protos -pub const APPLICATION_PROTOCOL: &[u8] = b"\x05h3-29\x05h3-28\x05h3-27"; +pub const APPLICATION_PROTOCOL: &[u8] = b"\x02h3\x05h3-29\x05h3-28\x05h3-27"; // The offset used when converting HTTP/3 urgency to quiche urgency. const PRIORITY_URGENCY_OFFSET: u8 = 124; @@ -333,6 +340,30 @@ pub enum Error { /// The underlying QUIC stream (or connection) doesn't have enough capacity /// for the operation to complete. The application should retry later on. StreamBlocked, + + /// Error in the payload of a SETTINGS frame. + SettingsError, + + /// Server rejected request. + RequestRejected, + + /// Request or its response cancelled. + RequestCancelled, + + /// Client's request stream terminated without containing a full-formed + /// request. + RequestIncomplete, + + /// An HTTP message was malformed and cannot be processed. + MessageError, + + /// The TCP connection established in response to a CONNECT request was + /// reset or abnormally closed. + ConnectError, + + /// The requested operation cannot be served over HTTP/3. Peer should retry + /// over HTTP/1.1. + VersionFallback, } impl Error { @@ -351,9 +382,17 @@ impl Error { Error::BufferTooShort => 0x999, Error::TransportError { .. } => 0xFF, Error::StreamBlocked => 0xFF, + Error::SettingsError => 0x109, + Error::RequestRejected => 0x10B, + Error::RequestCancelled => 0x10C, + Error::RequestIncomplete => 0x10D, + Error::MessageError => 0x10E, + Error::ConnectError => 0x10F, + Error::VersionFallback => 0x110, } } + #[cfg(feature = "ffi")] fn to_c(self) -> libc::ssize_t { match self { Error::Done => -1, @@ -369,6 +408,13 @@ impl Error { Error::QpackDecompressionFailed => -11, Error::TransportError { .. } => -12, Error::StreamBlocked => -13, + Error::SettingsError => -14, + Error::RequestRejected => -15, + Error::RequestCancelled => -16, + Error::RequestIncomplete => -17, + Error::MessageError => -18, + Error::ConnectError => -19, + Error::VersionFallback => -20, } } } @@ -420,7 +466,13 @@ impl Config { /// Sets the `SETTINGS_MAX_HEADER_LIST_SIZE` setting. /// - /// By default no limit is enforced. + /// By default no limit is enforced. When a request whose headers exceed + /// the limit set by the application is received, the call to the [`poll()`] + /// method will return the [`Error::ExcessiveLoad`] error, and the + /// connection will be closed. + /// + /// [`poll()`]: struct.Connection.html#method.poll + /// [`Error::ExcessiveLoad`]: enum.Error.html#variant.ExcessiveLoad pub fn set_max_header_list_size(&mut self, v: u64) { self.max_header_list_size = Some(v); } @@ -443,52 +495,52 @@ impl Config { /// A trait for types with associated string name and value. pub trait NameValue { /// Returns the object's name. - fn name(&self) -> &str; + fn name(&self) -> &[u8]; /// Returns the object's value. - fn value(&self) -> &str; + fn value(&self) -> &[u8]; } /// An owned name-value pair representing a raw HTTP header. #[derive(Clone, Debug, PartialEq)] -pub struct Header(String, String); +pub struct Header(Vec<u8>, Vec<u8>); impl Header { /// Creates a new header. /// /// Both `name` and `value` will be cloned. - pub fn new(name: &str, value: &str) -> Self { - Self(String::from(name), String::from(value)) + pub fn new(name: &[u8], value: &[u8]) -> Self { + Self(name.to_vec(), value.to_vec()) } } impl NameValue for Header { - fn name(&self) -> &str { + fn name(&self) -> &[u8] { &self.0 } - fn value(&self) -> &str { + fn value(&self) -> &[u8] { &self.1 } } /// A non-owned name-value pair representing a raw HTTP header. #[derive(Clone, Debug, PartialEq)] -pub struct HeaderRef<'a>(&'a str, &'a str); +pub struct HeaderRef<'a>(&'a [u8], &'a [u8]); impl<'a> HeaderRef<'a> { /// Creates a new header. - pub fn new(name: &'a str, value: &'a str) -> Self { + pub fn new(name: &'a [u8], value: &'a [u8]) -> Self { Self(name, value) } } impl<'a> NameValue for HeaderRef<'a> { - fn name(&self) -> &str { + fn name(&self) -> &[u8] { self.0 } - fn value(&self) -> &str { + fn value(&self) -> &[u8] { self.1 } } @@ -511,16 +563,28 @@ pub enum Event { /// This indicates that the application can use the [`recv_body()`] method /// to retrieve the data from the stream. /// - /// This event will keep being reported until all the available data is - /// retrieved by the application. + /// Note that [`recv_body()`] will need to be called repeatedly until the + /// [`Done`] value is returned, as the event will not be re-armed until all + /// buffered data is read. /// /// [`recv_body()`]: struct.Connection.html#method.recv_body + /// [`Done`]: enum.Error.html#variant.Done Data, /// Stream was closed, Finished, /// DATAGRAM was received. + /// + /// This indicates that the application can use the [`recv_dgram()`] method + /// to retrieve the HTTP/3 DATAGRAM. + /// + /// Note that [`recv_dgram()`] will need to be called repeatedly until the + /// [`Done`] value is returned, as the event will not be re-armed until all + /// buffered DATAGRAMs with the same flow ID are read. + /// + /// [`recv_dgram()`]: struct.Connection.html#method.recv_dgram + /// [`Done`]: enum.Error.html#variant.Done Datagram, /// GOAWAY was received. @@ -531,6 +595,7 @@ struct ConnectionSettings { pub max_header_list_size: Option<u64>, pub qpack_max_table_capacity: Option<u64>, pub qpack_blocked_streams: Option<u64>, + pub h3_datagram: Option<u64>, } struct QpackStreams { @@ -556,6 +621,7 @@ pub struct Connection { qpack_encoder: qpack::Encoder, qpack_decoder: qpack::Decoder, + #[allow(dead_code)] local_qpack_streams: QpackStreams, peer_qpack_streams: QpackStreams, @@ -567,11 +633,17 @@ pub struct Connection { local_goaway_id: Option<u64>, peer_goaway_id: Option<u64>, + + dgram_event_triggered: bool, } impl Connection { - fn new(config: &Config, is_server: bool) -> Result<Connection> { + #[allow(clippy::unnecessary_wraps)] + fn new( + config: &Config, is_server: bool, enable_dgram: bool, + ) -> Result<Connection> { let initial_uni_stream_id = if is_server { 0x3 } else { 0x2 }; + let h3_datagram = if enable_dgram { Some(1) } else { None }; Ok(Connection { is_server, @@ -586,12 +658,14 @@ impl Connection { max_header_list_size: config.max_header_list_size, qpack_max_table_capacity: config.qpack_max_table_capacity, qpack_blocked_streams: config.qpack_blocked_streams, + h3_datagram, }, peer_settings: ConnectionSettings { max_header_list_size: None, qpack_max_table_capacity: None, qpack_blocked_streams: None, + h3_datagram: None, }, control_stream_id: None, @@ -618,6 +692,8 @@ impl Connection { local_goaway_id: None, peer_goaway_id: None, + + dgram_event_triggered: false, }) } @@ -625,12 +701,27 @@ impl Connection { /// /// This will also initiate the HTTP/3 handshake with the peer by opening /// all control streams (including QPACK) and sending the local settings. + /// + /// On success the new connection is returned. + /// + /// The [`StreamLimit`] error is returned when the HTTP/3 control stream + /// cannot be created. + /// + /// [`StreamLimit`]: ../enum.Error.html#variant.InvalidState pub fn with_transport( conn: &mut super::Connection, config: &Config, ) -> Result<Connection> { - let mut http3_conn = Connection::new(config, conn.is_server)?; + let mut http3_conn = + Connection::new(config, conn.is_server, conn.dgram_enabled())?; - http3_conn.send_settings(conn)?; + match http3_conn.send_settings(conn) { + Ok(_) => (), + + Err(e) => { + conn.close(true, e.to_wire(), b"Error opening control stream")?; + return Err(e); + }, + }; // Try opening QPACK streams, but ignore errors if it fails since we // don't need them right now. @@ -680,7 +771,11 @@ impl Connection { // stream_capacity() will fail. By writing a 0-length buffer, we force // the creation of the QUIC stream state, without actually writing // anything. - conn.stream_send(stream_id, b"", false)?; + if let Err(e) = conn.stream_send(stream_id, b"", false) { + self.streams.remove(&stream_id); + + return Err(e.into()); + }; self.send_headers(conn, stream_id, headers, fin)?; @@ -737,7 +832,7 @@ impl Connection { return Err(Error::FrameUnexpected); } - let mut urgency = 3; + let mut urgency = 3u8.saturating_add(PRIORITY_URGENCY_OFFSET); let mut incremental = false; for param in priority.split(',') { @@ -755,11 +850,14 @@ impl Connection { // TODO: this also detects when u is not an sh-integer and // clamps it in the same way. A real structured header parser // would actually fail to parse. - let mut u = - i64::from_str_radix(param.rsplit('=').next().unwrap(), 10) - .unwrap_or(7); - - if u < 0 || u > 7 { + let mut u = param + .rsplit('=') + .next() + .unwrap() + .parse::<i64>() + .unwrap_or(7); + + if !(0..=7).contains(&u) { u = 7; } @@ -806,7 +904,17 @@ impl Connection { self.frames_greased = true; } - let stream_cap = conn.stream_capacity(stream_id)?; + let stream_cap = match conn.stream_capacity(stream_id) { + Ok(v) => v, + + Err(e) => { + if conn.stream_finished(stream_id) { + self.streams.remove(&stream_id); + } + + return Err(e.into()); + }, + }; let header_block = self.encode_header_block(headers)?; @@ -881,15 +989,28 @@ impl Connection { }, }; + // Avoid sending 0-length DATA frames when the fin flag is false. + if body.is_empty() && !fin { + return Err(Error::Done); + } + let overhead = octets::varint_len(frame::DATA_FRAME_TYPE_ID) + octets::varint_len(body.len() as u64); - let stream_cap = conn.stream_capacity(stream_id)?; + let stream_cap = match conn.stream_capacity(stream_id) { + Ok(v) => v, + + Err(e) => { + if conn.stream_finished(stream_id) { + self.streams.remove(&stream_id); + } + + return Err(e.into()); + }, + }; - // Make sure there is enough capacity to send the frame header and at - // least one byte of frame payload (this to avoid sending 0-length DATA - // frames). - if stream_cap <= overhead { + // Make sure there is enough capacity to send the DATA frame header. + if stream_cap < overhead { return Err(Error::Done); } @@ -900,6 +1021,11 @@ impl Connection { // application can try again later. let fin = if body_len != body.len() { false } else { fin }; + // Again, avoid sending 0-length DATA frames when the fin flag is false. + if body_len == 0 && !fin { + return Err(Error::Done); + } + trace!( "{} tx frm DATA stream={} len={} fin={}", conn.trace_id(), @@ -924,6 +1050,18 @@ impl Connection { Ok(written) } + /// Returns whether the peer enabled HTTP/3 DATAGRAM frame support. + /// + /// Support is signalled by the peer's SETTINGS, so this method always + /// returns false until they have been processed using the [`poll()`] + /// method. + /// + /// [`poll()`]: struct.Connection.html#method.poll + pub fn dgram_enabled_by_peer(&self, conn: &super::Connection) -> bool { + self.peer_settings.h3_datagram == Some(1) && + conn.dgram_max_writable_len().is_some() + } + /// Sends an HTTP/3 DATAGRAM with the specified flow ID. pub fn send_dgram( &mut self, conn: &mut super::Connection, flow_id: u64, buf: &[u8], @@ -977,6 +1115,35 @@ impl Connection { } } + // A helper function for determining if there is a DATAGRAM event. + fn process_dgrams( + &mut self, conn: &mut super::Connection, + ) -> Result<(u64, Event)> { + let mut d = [0; 8]; + + match conn.dgram_recv_peek(&mut d, 8) { + Ok(_) => { + if self.dgram_event_triggered { + return Err(Error::Done); + } + + self.dgram_event_triggered = true; + + Ok((0, Event::Datagram)) + }, + + Err(crate::Error::Done) => { + // The dgram recv queue is empty, so re-arm the Datagram event + // so it is issued next time a DATAGRAM is received. + self.dgram_event_triggered = false; + + Err(Error::Done) + }, + + Err(e) => Err(Error::TransportError(e)), + } + } + /// Reads request or response body data into the provided buffer. /// /// Applications should call this method whenever the [`poll()`] method @@ -991,32 +1158,78 @@ impl Connection { pub fn recv_body( &mut self, conn: &mut super::Connection, stream_id: u64, out: &mut [u8], ) -> Result<usize> { - let stream = self.streams.get_mut(&stream_id).ok_or(Error::Done)?; + let mut total = 0; - if stream.state() != stream::State::Data { - return Err(Error::Done); - } + // Try to consume all buffered data for the stream, even across multiple + // DATA frames. + while total < out.len() { + let stream = self.streams.get_mut(&stream_id).ok_or(Error::Done)?; + + if stream.state() != stream::State::Data { + break; + } + + let (read, fin) = + match stream.try_consume_data(conn, &mut out[total..]) { + Ok(v) => v, + + Err(Error::Done) => break, + + Err(e) => return Err(e), + }; + + total += read; + + // No more data to read, we are done. + if read == 0 || fin { + break; + } + + // Process incoming data from the stream. For example, if a whole + // DATA frame was consumed, and another one is queued behind it, + // this will ensure the additional data will also be returned to + // the application. + match self.process_readable_stream(conn, stream_id, false) { + Ok(_) => unreachable!(), + + Err(Error::Done) => (), - let read = stream.try_consume_data(conn, out)?; + Err(e) => return Err(e), + }; + + if conn.stream_finished(stream_id) { + break; + } + } // While body is being received, the stream is marked as finished only // when all data is read by the application. if conn.stream_finished(stream_id) { - self.finished_streams.push_back(stream_id); + self.process_finished_stream(stream_id); + } + + if total == 0 { + return Err(Error::Done); } - Ok(read) + Ok(total) } /// Processes HTTP/3 data received from the peer. /// - /// On success it returns an [`Event`] and an ID. + /// On success it returns an [`Event`] and an ID, or [`Done`] when there are + /// no events to report. + /// + /// Note that all events are edge-triggered, meaning that once reported they + /// will not be reported again by calling this method again, until the event + /// is re-armed. /// /// The events [`Headers`], [`Data`] and [`Finished`] return a stream ID, /// which is used in methods [`recv_body()`], [`send_response()`] or /// [`send_body()`]. /// - /// The event [`Datagram`] returns a flow ID. + /// The event [`Datagram`] returns a dummy value of `0`, this should be + /// ignored by the application. /// /// The event [`GoAway`] returns an ID that depends on the connection role. /// A client receives the largest processed stream ID. A server receives the @@ -1026,6 +1239,7 @@ impl Connection { /// the appropriate error code, using the transport's [`close()`] method. /// /// [`Event`]: enum.Event.html + /// [`Done`]: enum.Error.html#variant.Done /// [`Headers`]: enum.Event.html#variant.Headers /// [`Data`]: enum.Event.html#variant.Data /// [`Finished`]: enum.Event.html#variant.Finished @@ -1040,7 +1254,7 @@ impl Connection { // When connection close is initiated by the local application (e.g. due // to a protocol error), the connection itself might be in a broken // state, so return early. - if conn.error.is_some() || conn.app_error.is_some() { + if conn.local_error.is_some() { return Err(Error::Done); } @@ -1080,26 +1294,20 @@ impl Connection { return Ok((finished, Event::Finished)); } - // Process DATAGRAMs - let mut d = [0; 8]; + // Process queued DATAGRAMs if the poll threshold allows it. + match self.process_dgrams(conn) { + Ok(v) => return Ok(v), - match conn.dgram_recv_peek(&mut d, 8) { - Ok(_) => { - let mut b = octets::Octets::with_slice(&d); - let flow_id = b.get_varint()?; - return Ok((flow_id, Event::Datagram)); - }, - - Err(crate::Error::Done) => (), + Err(Error::Done) => (), - Err(e) => return Err(Error::TransportError(e)), + Err(e) => return Err(e), }; // Process HTTP/3 data from readable streams. for s in conn.readable() { trace!("{} stream id {} is readable", conn.trace_id(), s); - let ev = match self.process_readable_stream(conn, s) { + let ev = match self.process_readable_stream(conn, s, true) { Ok(v) => Some(v), Err(Error::Done) => None, @@ -1108,16 +1316,22 @@ impl Connection { }; if conn.stream_finished(s) { - self.finished_streams.push_back(s); + self.process_finished_stream(s); } // TODO: check if stream is completed so it can be freed - if let Some(ev) = ev { return Ok(ev); } } + // Process finished streams list once again, to make sure `Finished` + // events are returned when receiving empty stream frames with the fin + // flag set. + if let Some(finished) = self.finished_streams.pop_front() { + return Ok((finished, Event::Finished)); + } + Err(Error::Done) } @@ -1135,9 +1349,13 @@ impl Connection { pub fn send_goaway( &mut self, conn: &mut super::Connection, id: u64, ) -> Result<()> { + let mut id = id; + + // TODO: server push + // + // In the meantime always send 0 from client. if !self.is_server { - // TODO: server push - return Ok(()); + id = 0; } if self.is_server && id % 4 != 0 { @@ -1237,7 +1455,17 @@ impl Connection { ) -> Result<()> { let mut d = [0; 8]; - let stream_cap = conn.stream_capacity(stream_id)?; + let stream_cap = match conn.stream_capacity(stream_id) { + Ok(v) => v, + + Err(e) => { + if conn.stream_finished(stream_id) { + self.streams.remove(&stream_id); + } + + return Err(e.into()); + }, + }; let grease_frame1 = grease_value(); let grease_frame2 = grease_value(); @@ -1283,7 +1511,7 @@ impl Connection { Ok(stream_id) => { trace!("{} open GREASE stream {}", conn.trace_id(), stream_id); - conn.stream_send(stream_id, b"GREASE is the word", false)?; + conn.stream_send(stream_id, b"GREASE is the word", true)?; }, Err(Error::IdError) => { @@ -1316,6 +1544,7 @@ impl Connection { .local_settings .qpack_max_table_capacity, qpack_blocked_streams: self.local_settings.qpack_blocked_streams, + h3_datagram: self.local_settings.h3_datagram, grease, }; @@ -1346,7 +1575,7 @@ impl Connection { return Err(Error::ClosedCriticalStream); } - match self.process_readable_stream(conn, stream_id) { + match self.process_readable_stream(conn, stream_id, true) { Ok(ev) => return Ok(ev), Err(Error::Done) => (), @@ -1368,7 +1597,7 @@ impl Connection { } fn process_readable_stream( - &mut self, conn: &mut super::Connection, stream_id: u64, + &mut self, conn: &mut super::Connection, stream_id: u64, polling: bool, ) -> Result<(u64, Event)> { self.streams .entry(stream_id) @@ -1541,6 +1770,11 @@ impl Connection { }, stream::State::FramePayload => { + // Do not emit events when not polling. + if !polling { + break; + } + stream.try_fill_buffer(conn)?; let frame = match stream.try_consume_frame() { @@ -1569,6 +1803,15 @@ impl Connection { }, stream::State::Data => { + // Do not emit events when not polling. + if !polling { + break; + } + + if !stream.try_trigger_data_event() { + break; + } + return Ok((stream_id, Event::Data)); }, @@ -1583,16 +1826,44 @@ impl Connection { stream::State::Drain => { // Discard incoming data on the stream. - conn.stream_shutdown(stream_id, crate::Shutdown::Read, 0)?; + conn.stream_shutdown( + stream_id, + crate::Shutdown::Read, + 0x100, + )?; break; }, + + stream::State::Finished => break, } } Err(Error::Done) } + fn process_finished_stream(&mut self, stream_id: u64) { + let stream = match self.streams.get_mut(&stream_id) { + Some(v) => v, + + None => return, + }; + + if stream.state() == stream::State::Finished { + return; + } + + match stream.ty() { + Some(stream::Type::Request) | Some(stream::Type::Push) => { + stream.finished(); + + self.finished_streams.push_back(stream_id); + }, + + _ => (), + }; + } + fn process_frame( &mut self, conn: &mut super::Connection, stream_id: u64, frame: frame::Frame, @@ -1609,13 +1880,28 @@ impl Connection { max_header_list_size, qpack_max_table_capacity, qpack_blocked_streams, + h3_datagram, .. } => { self.peer_settings = ConnectionSettings { max_header_list_size, qpack_max_table_capacity, qpack_blocked_streams, + h3_datagram, }; + + if let Some(1) = h3_datagram { + // The peer MUST have also enabled DATAGRAM with a TP + if conn.dgram_max_writable_len().is_none() { + conn.close( + true, + Error::SettingsError.to_wire(), + b"H3_DATAGRAM sent with value 1 but max_datagram_frame_size TP not set.", + )?; + + return Err(Error::SettingsError); + } + } }, frame::Frame::Headers { header_block } => { @@ -1636,14 +1922,25 @@ impl Connection { .max_header_list_size .unwrap_or(std::u64::MAX); - let headers = self + let headers = match self .qpack_decoder .decode(&header_block[..], max_size) - .map_err(|e| match e { - qpack::Error::HeaderListTooLarge => Error::ExcessiveLoad, + { + Ok(v) => v, + + Err(e) => { + let e = match e { + qpack::Error::HeaderListTooLarge => + Error::ExcessiveLoad, - _ => Error::QpackDecompressionFailed, - })?; + _ => Error::QpackDecompressionFailed, + }; + + conn.close(true, e.to_wire(), b"Error parsing headers.")?; + + return Err(e); + }, + }; let has_body = !conn.stream_finished(stream_id); @@ -1814,8 +2111,6 @@ pub mod testing { pub pipe: testing::Pipe, pub client: Connection, pub server: Connection, - - buf: [u8; 65535], } impl Session { @@ -1831,6 +2126,7 @@ pub mod testing { config.set_initial_max_streams_bidi(5); config.set_initial_max_streams_uni(5); config.verify_peer(false); + config.enable_dgram(true, 3, 3); let h3_config = Config::new()?; Session::with_configs(&mut config, &h3_config) @@ -1839,47 +2135,49 @@ pub mod testing { pub fn with_configs( config: &mut crate::Config, h3_config: &Config, ) -> Result<Session> { + let pipe = testing::Pipe::with_config(config)?; + let client_dgram = pipe.client.dgram_enabled(); + let server_dgram = pipe.server.dgram_enabled(); Ok(Session { - pipe: testing::Pipe::with_config(config)?, - client: Connection::new(&h3_config, false)?, - server: Connection::new(&h3_config, true)?, - buf: [0; 65535], + pipe, + client: Connection::new(&h3_config, false, client_dgram)?, + server: Connection::new(&h3_config, true, server_dgram)?, }) } /// Do the HTTP/3 handshake so both ends are in sane initial state. pub fn handshake(&mut self) -> Result<()> { - self.pipe.handshake(&mut self.buf)?; + self.pipe.handshake()?; // Client streams. self.client.send_settings(&mut self.pipe.client)?; - self.pipe.advance(&mut self.buf).ok(); + self.pipe.advance().ok(); self.client .open_qpack_encoder_stream(&mut self.pipe.client)?; - self.pipe.advance(&mut self.buf).ok(); + self.pipe.advance().ok(); self.client .open_qpack_decoder_stream(&mut self.pipe.client)?; - self.pipe.advance(&mut self.buf).ok(); + self.pipe.advance().ok(); if self.pipe.client.grease { self.client.open_grease_stream(&mut self.pipe.client)?; } - self.pipe.advance(&mut self.buf).ok(); + self.pipe.advance().ok(); // Server streams. self.server.send_settings(&mut self.pipe.server)?; - self.pipe.advance(&mut self.buf).ok(); + self.pipe.advance().ok(); self.server .open_qpack_encoder_stream(&mut self.pipe.server)?; - self.pipe.advance(&mut self.buf).ok(); + self.pipe.advance().ok(); self.server .open_qpack_decoder_stream(&mut self.pipe.server)?; - self.pipe.advance(&mut self.buf).ok(); + self.pipe.advance().ok(); if self.pipe.server.grease { self.server.open_grease_stream(&mut self.pipe.server)?; @@ -1900,7 +2198,7 @@ pub mod testing { /// Advances the session pipe over the buffer. pub fn advance(&mut self) -> crate::Result<()> { - self.pipe.advance(&mut self.buf) + self.pipe.advance() } /// Polls the client for events. @@ -1918,11 +2216,11 @@ pub mod testing { /// On success it returns the newly allocated stream and the headers. pub fn send_request(&mut self, fin: bool) -> Result<(u64, Vec<Header>)> { let req = vec![ - Header::new(":method", "GET"), - Header::new(":scheme", "https"), - Header::new(":authority", "quic.tech"), - Header::new(":path", "/test"), - Header::new("user-agent", "quiche-test"), + Header::new(b":method", b"GET"), + Header::new(b":scheme", b"https"), + Header::new(b":authority", b"quic.tech"), + Header::new(b":path", b"/test"), + Header::new(b"user-agent", b"quiche-test"), ]; let stream = @@ -1940,8 +2238,8 @@ pub mod testing { &mut self, stream: u64, fin: bool, ) -> Result<Vec<Header>> { let resp = vec![ - Header::new(":status", "200"), - Header::new("server", "quiche-test"), + Header::new(b":status", b"200"), + Header::new(b"server", b"quiche-test"), ]; self.server.send_response( @@ -2024,6 +2322,54 @@ pub mod testing { Ok(()) } + /// Send an HTTP/3 DATAGRAM with default data from the client. + /// + /// On success it returns the data. + pub fn send_dgram_client(&mut self, flow_id: u64) -> Result<Vec<u8>> { + let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + + self.client + .send_dgram(&mut self.pipe.client, flow_id, &bytes)?; + + self.advance().ok(); + + Ok(bytes) + } + + /// Receives an HTTP/3 DATAGRAM from the server. + /// + /// On success it returns the DATAGRAM length, flow ID and flow ID + /// length. + pub fn recv_dgram_client( + &mut self, buf: &mut [u8], + ) -> Result<(usize, u64, usize)> { + self.client.recv_dgram(&mut self.pipe.client, buf) + } + + /// Send an HTTP/3 DATAGRAM with default data from the server + /// + /// On success it returns the data. + pub fn send_dgram_server(&mut self, flow_id: u64) -> Result<Vec<u8>> { + let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + + self.server + .send_dgram(&mut self.pipe.server, flow_id, &bytes)?; + + self.advance().ok(); + + Ok(bytes) + } + + /// Receives an HTTP/3 DATAGRAM from the client. + /// + /// On success it returns the DATAGRAM length, flow ID and flow ID + /// length. + pub fn recv_dgram_server( + &mut self, buf: &mut [u8], + ) -> Result<(usize, u64, usize)> { + self.server.recv_dgram(&mut self.pipe.server, buf) + } + /// Sends a single HTTP/3 frame from the server. pub fn send_frame_server( &mut self, frame: frame::Frame, stream_id: u64, fin: bool, @@ -2041,6 +2387,28 @@ pub mod testing { Ok(()) } + + /// Sends an arbitrary buffer of HTTP/3 stream data from the client. + pub fn send_arbitrary_stream_data_client( + &mut self, data: &[u8], stream_id: u64, fin: bool, + ) -> Result<()> { + self.pipe.client.stream_send(stream_id, data, fin)?; + + self.advance().ok(); + + Ok(()) + } + + /// Sends an arbitrary buffer of HTTP/3 stream data from the server. + pub fn send_arbitrary_stream_data_server( + &mut self, data: &[u8], stream_id: u64, fin: bool, + ) -> Result<()> { + self.pipe.server.stream_send(stream_id, data, fin)?; + + self.advance().ok(); + + Ok(()) + } } } @@ -2158,9 +2526,10 @@ mod tests { }; assert_eq!(s.poll_client(), Ok((stream, ev_headers))); + assert_eq!(s.poll_client(), Ok((stream, Event::Data))); + assert_eq!(s.poll_client(), Err(Error::Done)); for _ in 0..total_data_frames { - assert_eq!(s.poll_client(), Ok((stream, Event::Data))); assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len())); } @@ -2227,9 +2596,10 @@ mod tests { }; assert_eq!(s.poll_server(), Ok((stream, ev_headers))); + assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + assert_eq!(s.poll_server(), Err(Error::Done)); for _ in 0..total_data_frames { - assert_eq!(s.poll_server(), Ok((stream, Event::Data))); assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len())); } @@ -2288,7 +2658,8 @@ mod tests { assert_eq!(ev, ev_headers); assert_eq!(s.poll_server(), Ok((stream, Event::Data))); assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len())); - assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + assert_eq!(s.poll_client(), Err(Error::Done)); + assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len())); assert_eq!(s.poll_server(), Ok((stream, Event::Finished))); } @@ -2320,6 +2691,46 @@ mod tests { } #[test] + /// Send a request with no body, get a response with one DATA frame and an + /// empty FIN after reception from the client. + fn request_no_body_response_one_chunk_empty_fin() { + let mut s = Session::default().unwrap(); + s.handshake().unwrap(); + + let (stream, req) = s.send_request(true).unwrap(); + + let ev_headers = Event::Headers { + list: req, + has_body: false, + }; + + assert_eq!(s.poll_server(), Ok((stream, ev_headers))); + assert_eq!(s.poll_server(), Ok((stream, Event::Finished))); + + let resp = s.send_response(stream, false).unwrap(); + + let body = s.send_body_server(stream, false).unwrap(); + + let mut recv_buf = vec![0; body.len()]; + + let ev_headers = Event::Headers { + list: resp, + has_body: true, + }; + + assert_eq!(s.poll_client(), Ok((stream, ev_headers))); + + assert_eq!(s.poll_client(), Ok((stream, Event::Data))); + assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len())); + + assert_eq!(s.pipe.server.stream_send(stream, &[], true), Ok(0)); + s.advance().ok(); + + assert_eq!(s.poll_client(), Ok((stream, Event::Finished))); + assert_eq!(s.poll_client(), Err(Error::Done)); + } + + #[test] /// Try to send DATA frames before HEADERS. fn body_response_before_headers() { let mut s = Session::default().unwrap(); @@ -2620,12 +3031,12 @@ mod tests { let mut s = Session::default().unwrap(); s.handshake().unwrap(); - s.client.send_goaway(&mut s.pipe.client, 1).unwrap(); + s.client.send_goaway(&mut s.pipe.client, 100).unwrap(); s.advance().ok(); // TODO: server push - assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.poll_server(), Ok((0, Event::GoAway))); } #[test] @@ -2703,10 +3114,10 @@ mod tests { fn uni_stream_local_counting() { let config = Config::new().unwrap(); - let h3_cln = Connection::new(&config, false).unwrap(); + let h3_cln = Connection::new(&config, false, false).unwrap(); assert_eq!(h3_cln.next_uni_stream_id, 2); - let h3_srv = Connection::new(&config, true).unwrap(); + let h3_srv = Connection::new(&config, true, false).unwrap(); assert_eq!(h3_srv.next_uni_stream_id, 3); } @@ -2842,10 +3253,32 @@ mod tests { #[test] /// Tests limits for the stream state buffer maximum size. fn max_state_buf_size() { - // DATA frames don't consume the state buffer, so can be of any size. let mut s = Session::default().unwrap(); s.handshake().unwrap(); + let req = vec![ + Header::new(b":method", b"GET"), + Header::new(b":scheme", b"https"), + Header::new(b":authority", b"quic.tech"), + Header::new(b":path", b"/test"), + Header::new(b"user-agent", b"quiche-test"), + ]; + + assert_eq!( + s.client.send_request(&mut s.pipe.client, &req, false), + Ok(0) + ); + + s.advance().ok(); + + let ev_headers = Event::Headers { + list: req, + has_body: true, + }; + + assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, ev_headers))); + + // DATA frames don't consume the state buffer, so can be of any size. let mut d = [42; 128]; let mut b = octets::OctetsMut::with_slice(&mut d); @@ -2919,16 +3352,16 @@ mod tests { }; assert_eq!(s.poll_server(), Ok((stream, ev_headers))); + assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + assert_eq!(s.poll_server(), Err(Error::Done)); for _ in 0..total_data_frames { - assert_eq!(s.poll_server(), Ok((stream, Event::Data))); assert_eq!( s.recv_body_server(stream, &mut recv_buf), Ok(bytes.len()) ); } - assert_eq!(s.poll_server(), Ok((stream, Event::Data))); assert_eq!( s.recv_body_server(stream, &mut recv_buf), Ok(bytes.len() - 2) @@ -2966,11 +3399,11 @@ mod tests { s.handshake().unwrap(); let req = vec![ - Header::new(":method", "GET"), - Header::new(":scheme", "https"), - Header::new(":authority", "quic.tech"), - Header::new(":path", "/test"), - Header::new("aaaaaaa", "aaaaaaaa"), + Header::new(b":method", b"GET"), + Header::new(b":scheme", b"https"), + Header::new(b":authority", b"quic.tech"), + Header::new(b":path", b"/test"), + Header::new(b"aaaaaaa", b"aaaaaaaa"), ]; let stream = s @@ -2983,6 +3416,11 @@ mod tests { assert_eq!(stream, 0); assert_eq!(s.poll_server(), Err(Error::ExcessiveLoad)); + + assert_eq!( + s.pipe.server.local_error.as_ref().unwrap().error_code, + Error::to_wire(Error::ExcessiveLoad) + ); } #[test] @@ -2992,11 +3430,11 @@ mod tests { s.handshake().unwrap(); let req = vec![ - Header::new(":method", "GET"), - Header::new(":scheme", "https"), - Header::new(":authority", "quic.tech"), - Header::new(":path", "/test"), - Header::new("user-agent", "quiche-test"), + Header::new(b":method", b"GET"), + Header::new(b":scheme", b"https"), + Header::new(b":authority", b"quic.tech"), + Header::new(b":path", b"/test"), + Header::new(b"user-agent", b"quiche-test"), ]; // We need to open all streams in the same flight, so we can't use the @@ -3022,9 +3460,8 @@ mod tests { } #[test] - /// Tests that calling poll() after an error occured does nothing. - fn poll_after_error() { - // DATA frames don't consume the state buffer, so can be of any size. + /// Tests that sending DATA before HEADERS causes an error. + fn data_before_headers() { let mut s = Session::default().unwrap(); s.handshake().unwrap(); @@ -3034,16 +3471,22 @@ mod tests { let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap(); s.pipe.client.stream_send(0, frame_type, false).unwrap(); - let frame_len = b.put_varint(1 << 24).unwrap(); + let frame_len = b.put_varint(5).unwrap(); s.pipe.client.stream_send(0, frame_len, false).unwrap(); - s.pipe.client.stream_send(0, &d, false).unwrap(); + s.pipe.client.stream_send(0, b"hello", false).unwrap(); s.advance().ok(); - assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, Event::Data))); + assert_eq!( + s.server.poll(&mut s.pipe.server), + Err(Error::FrameUnexpected) + ); + } - // GREASE frames consume the state buffer, so need to be limited. + #[test] + /// Tests that calling poll() after an error occured does nothing. + fn poll_after_error() { let mut s = Session::default().unwrap(); s.handshake().unwrap(); @@ -3092,10 +3535,10 @@ mod tests { s.handshake().unwrap(); let req = vec![ - Header::new(":method", "GET"), - Header::new(":scheme", "https"), - Header::new(":authority", "quic.tech"), - Header::new(":path", "/test"), + Header::new(b":method", b"GET"), + Header::new(b":scheme", b"https"), + Header::new(b":authority", b"quic.tech"), + Header::new(b":path", b"/test"), ]; assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0)); @@ -3113,6 +3556,61 @@ mod tests { } #[test] + /// Test handling of 0-length DATA writes with and without fin. + fn zero_length_data() { + let mut s = Session::default().unwrap(); + s.handshake().unwrap(); + + let (stream, req) = s.send_request(false).unwrap(); + + assert_eq!( + s.client.send_body(&mut s.pipe.client, 0, b"", false), + Err(Error::Done) + ); + assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0)); + + s.advance().ok(); + + let mut recv_buf = vec![0; 100]; + + let ev_headers = Event::Headers { + list: req, + has_body: true, + }; + + assert_eq!(s.poll_server(), Ok((stream, ev_headers))); + + assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done)); + + assert_eq!(s.poll_server(), Ok((stream, Event::Finished))); + assert_eq!(s.poll_server(), Err(Error::Done)); + + let resp = s.send_response(stream, false).unwrap(); + + assert_eq!( + s.server.send_body(&mut s.pipe.server, 0, b"", false), + Err(Error::Done) + ); + assert_eq!(s.server.send_body(&mut s.pipe.server, 0, b"", true), Ok(0)); + + s.advance().ok(); + + let ev_headers = Event::Headers { + list: resp, + has_body: true, + }; + + assert_eq!(s.poll_client(), Ok((stream, ev_headers))); + + assert_eq!(s.poll_client(), Ok((stream, Event::Data))); + assert_eq!(s.recv_body_client(stream, &mut recv_buf), Err(Error::Done)); + + assert_eq!(s.poll_client(), Ok((stream, Event::Finished))); + assert_eq!(s.poll_client(), Err(Error::Done)); + } + + #[test] /// Tests that blocked 0-length DATA writes are reported correctly. fn zero_length_data_blocked() { let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); @@ -3123,7 +3621,7 @@ mod tests { .load_priv_key_from_pem_file("examples/cert.key") .unwrap(); config.set_application_protos(b"\x02h3").unwrap(); - config.set_initial_max_data(70); + config.set_initial_max_data(69); config.set_initial_max_stream_data_bidi_local(150); config.set_initial_max_stream_data_bidi_remote(150); config.set_initial_max_stream_data_uni(150); @@ -3138,10 +3636,10 @@ mod tests { s.handshake().unwrap(); let req = vec![ - Header::new(":method", "GET"), - Header::new(":scheme", "https"), - Header::new(":authority", "quic.tech"), - Header::new(":path", "/test"), + Header::new(b":method", b"GET"), + Header::new(b":scheme", b"https"), + Header::new(b":authority", b"quic.tech"), + Header::new(b":path", b"/test"), ]; assert_eq!( @@ -3159,8 +3657,837 @@ mod tests { // Once the server gives flow control credits back, we can send the body. assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0)); } + + #[test] + /// Tests that receiving a H3_DATAGRAM setting is ok. + fn dgram_setting() { + let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + config + .load_cert_chain_from_pem_file("examples/cert.crt") + .unwrap(); + config + .load_priv_key_from_pem_file("examples/cert.key") + .unwrap(); + config.set_application_protos(b"\x02h3").unwrap(); + config.set_initial_max_data(70); + config.set_initial_max_stream_data_bidi_local(150); + config.set_initial_max_stream_data_bidi_remote(150); + config.set_initial_max_stream_data_uni(150); + config.set_initial_max_streams_bidi(100); + config.set_initial_max_streams_uni(5); + config.enable_dgram(true, 1000, 1000); + config.verify_peer(false); + + let h3_config = Config::new().unwrap(); + + let mut s = Session::with_configs(&mut config, &h3_config).unwrap(); + assert_eq!(s.pipe.handshake(), Ok(())); + + s.client.send_settings(&mut s.pipe.client).unwrap(); + assert_eq!(s.pipe.advance(), Ok(())); + + // Before processing SETTINGS (via poll), HTTP/3 DATAGRAMS are not + // enabled. + assert!(!s.server.dgram_enabled_by_peer(&s.pipe.server)); + + // When everything is ok, poll returns Done and DATAGRAM is enabled. + assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done)); + assert!(s.server.dgram_enabled_by_peer(&s.pipe.server)); + + // Now detect things on the client + s.server.send_settings(&mut s.pipe.server).unwrap(); + assert_eq!(s.pipe.advance(), Ok(())); + assert!(!s.client.dgram_enabled_by_peer(&s.pipe.client)); + assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done)); + assert!(s.client.dgram_enabled_by_peer(&s.pipe.client)); + } + + #[test] + /// Tests that receiving a H3_DATAGRAM setting when no TP is set generates + /// an error. + fn dgram_setting_no_tp() { + let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + config + .load_cert_chain_from_pem_file("examples/cert.crt") + .unwrap(); + config + .load_priv_key_from_pem_file("examples/cert.key") + .unwrap(); + config.set_application_protos(b"\x02h3").unwrap(); + config.set_initial_max_data(70); + config.set_initial_max_stream_data_bidi_local(150); + config.set_initial_max_stream_data_bidi_remote(150); + config.set_initial_max_stream_data_uni(150); + config.set_initial_max_streams_bidi(100); + config.set_initial_max_streams_uni(5); + config.verify_peer(false); + + let h3_config = Config::new().unwrap(); + + let mut s = Session::with_configs(&mut config, &h3_config).unwrap(); + assert_eq!(s.pipe.handshake(), Ok(())); + + s.client.control_stream_id = Some( + s.client + .open_uni_stream( + &mut s.pipe.client, + stream::HTTP3_CONTROL_STREAM_TYPE_ID, + ) + .unwrap(), + ); + + let settings = frame::Frame::Settings { + max_header_list_size: None, + qpack_max_table_capacity: None, + qpack_blocked_streams: None, + h3_datagram: Some(1), + grease: None, + }; + + s.send_frame_client(settings, s.client.control_stream_id.unwrap(), false) + .unwrap(); + + assert_eq!(s.pipe.advance(), Ok(())); + + assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError)); + } + + #[test] + /// Tests that receiving SETTINGS with prohibited values generates an error. + fn settings_h2_prohibited() { + let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + config + .load_cert_chain_from_pem_file("examples/cert.crt") + .unwrap(); + config + .load_priv_key_from_pem_file("examples/cert.key") + .unwrap(); + config.set_application_protos(b"\x02h3").unwrap(); + config.set_initial_max_data(70); + config.set_initial_max_stream_data_bidi_local(150); + config.set_initial_max_stream_data_bidi_remote(150); + config.set_initial_max_stream_data_uni(150); + config.set_initial_max_streams_bidi(100); + config.set_initial_max_streams_uni(5); + config.verify_peer(false); + + let h3_config = Config::new().unwrap(); + + let mut s = Session::with_configs(&mut config, &h3_config).unwrap(); + assert_eq!(s.pipe.handshake(), Ok(())); + + s.client.control_stream_id = Some( + s.client + .open_uni_stream( + &mut s.pipe.client, + stream::HTTP3_CONTROL_STREAM_TYPE_ID, + ) + .unwrap(), + ); + + s.server.control_stream_id = Some( + s.server + .open_uni_stream( + &mut s.pipe.server, + stream::HTTP3_CONTROL_STREAM_TYPE_ID, + ) + .unwrap(), + ); + + let frame_payload_len = 2u64; + let settings = [ + frame::SETTINGS_FRAME_TYPE_ID as u8, + frame_payload_len as u8, + 0x2, // 0x2 is a reserved setting type + 1, + ]; + + s.send_arbitrary_stream_data_client( + &settings, + s.client.control_stream_id.unwrap(), + false, + ) + .unwrap(); + + s.send_arbitrary_stream_data_server( + &settings, + s.server.control_stream_id.unwrap(), + false, + ) + .unwrap(); + + assert_eq!(s.pipe.advance(), Ok(())); + + assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError)); + + assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::SettingsError)); + } + + #[test] + /// Send a single DATAGRAM. + fn single_dgram() { + let mut buf = [0; 65535]; + let mut s = Session::default().unwrap(); + s.handshake().unwrap(); + + // We'll send default data of 10 bytes on flow ID 0. + let result = (11, 0, 1); + + s.send_dgram_client(0).unwrap(); + + assert_eq!(s.poll_server(), Ok((0, Event::Datagram))); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + + s.send_dgram_server(0).unwrap(); + assert_eq!(s.poll_client(), Ok((0, Event::Datagram))); + assert_eq!(s.recv_dgram_client(&mut buf), Ok(result)); + } + + #[test] + /// Send multiple DATAGRAMs. + fn multiple_dgram() { + let mut buf = [0; 65535]; + let mut s = Session::default().unwrap(); + s.handshake().unwrap(); + + // We'll send default data of 10 bytes on flow ID 0. + let result = (11, 0, 1); + + s.send_dgram_client(0).unwrap(); + s.send_dgram_client(0).unwrap(); + s.send_dgram_client(0).unwrap(); + + assert_eq!(s.poll_server(), Ok((0, Event::Datagram))); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done)); + assert_eq!(s.poll_server(), Err(Error::Done)); + + s.send_dgram_server(0).unwrap(); + s.send_dgram_server(0).unwrap(); + s.send_dgram_server(0).unwrap(); + + assert_eq!(s.poll_client(), Ok((0, Event::Datagram))); + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_client(&mut buf), Ok(result)); + assert_eq!(s.poll_client(), Err(Error::Done)); + assert_eq!(s.recv_dgram_client(&mut buf), Ok(result)); + assert_eq!(s.poll_client(), Err(Error::Done)); + assert_eq!(s.recv_dgram_client(&mut buf), Ok(result)); + assert_eq!(s.poll_client(), Err(Error::Done)); + assert_eq!(s.recv_dgram_client(&mut buf), Err(Error::Done)); + assert_eq!(s.poll_client(), Err(Error::Done)); + } + + #[test] + /// Send more DATAGRAMs than the send queue allows. + fn multiple_dgram_overflow() { + let mut buf = [0; 65535]; + let mut s = Session::default().unwrap(); + s.handshake().unwrap(); + + // We'll send default data of 10 bytes on flow ID 0. + let result = (11, 0, 1); + + // Five DATAGRAMs + s.send_dgram_client(0).unwrap(); + s.send_dgram_client(0).unwrap(); + s.send_dgram_client(0).unwrap(); + s.send_dgram_client(0).unwrap(); + s.send_dgram_client(0).unwrap(); + + // Only 3 independent DATAGRAM events will fire. + assert_eq!(s.poll_server(), Ok((0, Event::Datagram))); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done)); + assert_eq!(s.poll_server(), Err(Error::Done)); + } + + #[test] + /// Send a single DATAGRAM and request. Ensure that poll continuously cycles + /// between the two types if the data is not read. + fn poll_yield_cycling() { + let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + config + .load_cert_chain_from_pem_file("examples/cert.crt") + .unwrap(); + config + .load_priv_key_from_pem_file("examples/cert.key") + .unwrap(); + config.set_application_protos(b"\x02h3").unwrap(); + config.set_initial_max_data(1500); + config.set_initial_max_stream_data_bidi_local(150); + config.set_initial_max_stream_data_bidi_remote(150); + config.set_initial_max_stream_data_uni(150); + config.set_initial_max_streams_bidi(100); + config.set_initial_max_streams_uni(5); + config.verify_peer(false); + config.enable_dgram(true, 100, 100); + + let mut h3_config = Config::new().unwrap(); + let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap(); + s.handshake().unwrap(); + + // Send request followed by DATAGRAM on client side. + let (stream, req) = s.send_request(false).unwrap(); + + s.send_body_client(stream, true).unwrap(); + + let ev_headers = Event::Headers { + list: req, + has_body: true, + }; + + s.send_dgram_client(0).unwrap(); + + // Now let's test the poll counts and yielding. + assert_eq!(s.poll_server(), Ok((0, Event::Datagram))); + + assert_eq!(s.poll_server(), Ok((stream, ev_headers))); + assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + + assert_eq!(s.poll_server(), Err(Error::Done)); + } + + #[test] + /// Send a single DATAGRAM and request. Ensure that poll + /// yield cycles and cleanly exits if data is read. + fn poll_yield_single_read() { + let mut buf = [0; 65535]; + + let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + config + .load_cert_chain_from_pem_file("examples/cert.crt") + .unwrap(); + config + .load_priv_key_from_pem_file("examples/cert.key") + .unwrap(); + config.set_application_protos(b"\x02h3").unwrap(); + config.set_initial_max_data(1500); + config.set_initial_max_stream_data_bidi_local(150); + config.set_initial_max_stream_data_bidi_remote(150); + config.set_initial_max_stream_data_uni(150); + config.set_initial_max_streams_bidi(100); + config.set_initial_max_streams_uni(5); + config.verify_peer(false); + config.enable_dgram(true, 100, 100); + + let mut h3_config = Config::new().unwrap(); + let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap(); + s.handshake().unwrap(); + + // We'll send default data of 10 bytes on flow ID 0. + let result = (11, 0, 1); + + // Send request followed by DATAGRAM on client side. + let (stream, req) = s.send_request(false).unwrap(); + + let body = s.send_body_client(stream, true).unwrap(); + + let mut recv_buf = vec![0; body.len()]; + + let ev_headers = Event::Headers { + list: req, + has_body: true, + }; + + s.send_dgram_client(0).unwrap(); + + // Now let's test the poll counts and yielding. + assert_eq!(s.poll_server(), Ok((0, Event::Datagram))); + + assert_eq!(s.poll_server(), Ok((stream, ev_headers))); + assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + + assert_eq!(s.poll_server(), Err(Error::Done)); + + assert_eq!(s.recv_dgram_server(&mut buf), Ok(result)); + + assert_eq!(s.poll_server(), Err(Error::Done)); + + assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len())); + assert_eq!(s.poll_server(), Ok((stream, Event::Finished))); + assert_eq!(s.poll_server(), Err(Error::Done)); + + // Send response followed by DATAGRAM on server side + let resp = s.send_response(stream, false).unwrap(); + + let body = s.send_body_server(stream, true).unwrap(); + + let mut recv_buf = vec![0; body.len()]; + + let ev_headers = Event::Headers { + list: resp, + has_body: true, + }; + + s.send_dgram_server(0).unwrap(); + + // Now let's test the poll counts and yielding. + assert_eq!(s.poll_client(), Ok((0, Event::Datagram))); + + assert_eq!(s.poll_client(), Ok((stream, ev_headers))); + assert_eq!(s.poll_client(), Ok((stream, Event::Data))); + + assert_eq!(s.poll_client(), Err(Error::Done)); + + assert_eq!(s.recv_dgram_client(&mut buf), Ok(result)); + + assert_eq!(s.poll_client(), Err(Error::Done)); + + assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len())); + + assert_eq!(s.poll_client(), Ok((stream, Event::Finished))); + assert_eq!(s.poll_client(), Err(Error::Done)); + } + + #[test] + /// Send a multiple DATAGRAMs and requests. Ensure that poll + /// yield cycles and cleanly exits if data is read. + fn poll_yield_multi_read() { + let mut buf = [0; 65535]; + + let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + config + .load_cert_chain_from_pem_file("examples/cert.crt") + .unwrap(); + config + .load_priv_key_from_pem_file("examples/cert.key") + .unwrap(); + config.set_application_protos(b"\x02h3").unwrap(); + config.set_initial_max_data(1500); + config.set_initial_max_stream_data_bidi_local(150); + config.set_initial_max_stream_data_bidi_remote(150); + config.set_initial_max_stream_data_uni(150); + config.set_initial_max_streams_bidi(100); + config.set_initial_max_streams_uni(5); + config.verify_peer(false); + config.enable_dgram(true, 100, 100); + + let mut h3_config = Config::new().unwrap(); + let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap(); + s.handshake().unwrap(); + + // 10 bytes on flow ID 0 and 2. + let flow_0_result = (11, 0, 1); + let flow_2_result = (11, 2, 1); + + // Send requests followed by DATAGRAMs on client side. + let (stream, req) = s.send_request(false).unwrap(); + + let body = s.send_body_client(stream, true).unwrap(); + + let mut recv_buf = vec![0; body.len()]; + + let ev_headers = Event::Headers { + list: req, + has_body: true, + }; + + s.send_dgram_client(0).unwrap(); + s.send_dgram_client(0).unwrap(); + s.send_dgram_client(0).unwrap(); + s.send_dgram_client(0).unwrap(); + s.send_dgram_client(0).unwrap(); + s.send_dgram_client(2).unwrap(); + s.send_dgram_client(2).unwrap(); + s.send_dgram_client(2).unwrap(); + s.send_dgram_client(2).unwrap(); + s.send_dgram_client(2).unwrap(); + + // Now let's test the poll counts and yielding. + assert_eq!(s.poll_server(), Ok((0, Event::Datagram))); + + assert_eq!(s.poll_server(), Ok((stream, ev_headers))); + assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + + assert_eq!(s.poll_server(), Err(Error::Done)); + + // Second cycle, start to read + assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + + assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len())); + assert_eq!(s.poll_server(), Ok((stream, Event::Finished))); + + assert_eq!(s.poll_server(), Err(Error::Done)); + + // Third cycle. + assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + + // Send response followed by DATAGRAM on server side + let resp = s.send_response(stream, false).unwrap(); + + let body = s.send_body_server(stream, true).unwrap(); + + let mut recv_buf = vec![0; body.len()]; + + let ev_headers = Event::Headers { + list: resp, + has_body: true, + }; + + s.send_dgram_server(0).unwrap(); + s.send_dgram_server(0).unwrap(); + s.send_dgram_server(0).unwrap(); + s.send_dgram_server(0).unwrap(); + s.send_dgram_server(0).unwrap(); + s.send_dgram_server(2).unwrap(); + s.send_dgram_server(2).unwrap(); + s.send_dgram_server(2).unwrap(); + s.send_dgram_server(2).unwrap(); + s.send_dgram_server(2).unwrap(); + + assert_eq!(s.poll_client(), Ok((0, Event::Datagram))); + + assert_eq!(s.poll_client(), Ok((stream, ev_headers))); + assert_eq!(s.poll_client(), Ok((stream, Event::Data))); + + assert_eq!(s.poll_client(), Err(Error::Done)); + + // Second cycle, start to read + assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result)); + assert_eq!(s.poll_client(), Err(Error::Done)); + assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result)); + assert_eq!(s.poll_client(), Err(Error::Done)); + assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result)); + assert_eq!(s.poll_client(), Err(Error::Done)); + + assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len())); + assert_eq!(s.poll_client(), Ok((stream, Event::Finished))); + + assert_eq!(s.poll_client(), Err(Error::Done)); + + // Third cycle. + assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result)); + assert_eq!(s.poll_client(), Err(Error::Done)); + assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result)); + assert_eq!(s.poll_client(), Err(Error::Done)); + assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result)); + assert_eq!(s.poll_client(), Err(Error::Done)); + assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result)); + assert_eq!(s.poll_client(), Err(Error::Done)); + assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result)); + assert_eq!(s.poll_client(), Err(Error::Done)); + assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result)); + assert_eq!(s.poll_client(), Err(Error::Done)); + assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result)); + assert_eq!(s.poll_client(), Err(Error::Done)); + } + + #[test] + /// Tests that the Finished event is not issued for streams of unknown type + /// (e.g. GREASE). + fn finished_is_for_requests() { + let mut s = Session::default().unwrap(); + s.handshake().unwrap(); + + assert_eq!(s.poll_client(), Err(Error::Done)); + assert_eq!(s.poll_server(), Err(Error::Done)); + + assert_eq!(s.client.open_grease_stream(&mut s.pipe.client), Ok(())); + assert_eq!(s.pipe.advance(), Ok(())); + + assert_eq!(s.poll_client(), Err(Error::Done)); + assert_eq!(s.poll_server(), Err(Error::Done)); + } + + #[test] + /// Tests that streams are marked as finished only once. + fn finished_once() { + let mut s = Session::default().unwrap(); + s.handshake().unwrap(); + + let (stream, req) = s.send_request(false).unwrap(); + let body = s.send_body_client(stream, true).unwrap(); + + let mut recv_buf = vec![0; body.len()]; + + let ev_headers = Event::Headers { + list: req, + has_body: true, + }; + + assert_eq!(s.poll_server(), Ok((stream, ev_headers))); + assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + + assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len())); + assert_eq!(s.poll_server(), Ok((stream, Event::Finished))); + + assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done)); + assert_eq!(s.poll_server(), Err(Error::Done)); + } + + #[test] + /// Tests that the Data event is properly re-armed. + fn data_event_rearm() { + let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + + let mut s = Session::default().unwrap(); + s.handshake().unwrap(); + + let (stream, req) = s.send_request(false).unwrap(); + + let mut recv_buf = vec![0; bytes.len()]; + + let ev_headers = Event::Headers { + list: req, + has_body: true, + }; + + // Manually send an incomplete DATA frame (i.e. the frame size is longer + // than the actual data sent). + { + let mut d = [42; 10]; + let mut b = octets::OctetsMut::with_slice(&mut d); + + b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap(); + b.put_varint(bytes.len() as u64).unwrap(); + let off = b.off(); + s.pipe.client.stream_send(stream, &d[..off], false).unwrap(); + + assert_eq!( + s.pipe.client.stream_send(stream, &bytes[..5], false), + Ok(5) + ); + + s.advance().ok(); + } + + assert_eq!(s.poll_server(), Ok((stream, ev_headers))); + assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + assert_eq!(s.poll_server(), Err(Error::Done)); + + // Read the available body data. + assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(5)); + + // Send the remaining DATA payload. + assert_eq!(s.pipe.client.stream_send(stream, &bytes[5..], false), Ok(5)); + s.advance().ok(); + + assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + assert_eq!(s.poll_server(), Err(Error::Done)); + + // Read the rest of the body data. + assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(5)); + assert_eq!(s.poll_server(), Err(Error::Done)); + + // Send more data. + let body = s.send_body_client(stream, false).unwrap(); + + assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + assert_eq!(s.poll_server(), Err(Error::Done)); + + assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len())); + + // Send more data, then HEADERS, then more data. + let body = s.send_body_client(stream, false).unwrap(); + + let trailers = vec![Header::new(b"hello", b"world")]; + + s.client + .send_headers(&mut s.pipe.client, stream, &trailers, false) + .unwrap(); + + let ev_trailers = Event::Headers { + list: trailers, + has_body: true, + }; + + s.advance().ok(); + + s.send_body_client(stream, false).unwrap(); + + assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len())); + + assert_eq!(s.poll_server(), Ok((stream, ev_trailers))); + + assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len())); + + let (stream, req) = s.send_request(false).unwrap(); + + let ev_headers = Event::Headers { + list: req, + has_body: true, + }; + + // Manually send an incomplete DATA frame (i.e. only the header is sent). + { + let mut d = [42; 10]; + let mut b = octets::OctetsMut::with_slice(&mut d); + + b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap(); + b.put_varint(bytes.len() as u64).unwrap(); + let off = b.off(); + s.pipe.client.stream_send(stream, &d[..off], false).unwrap(); + + s.advance().ok(); + } + + assert_eq!(s.poll_server(), Ok((stream, ev_headers))); + assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + assert_eq!(s.poll_server(), Err(Error::Done)); + + assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done)); + + assert_eq!(s.pipe.client.stream_send(stream, &bytes[..5], false), Ok(5)); + + s.advance().ok(); + + assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + assert_eq!(s.poll_server(), Err(Error::Done)); + + assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(5)); + + assert_eq!(s.pipe.client.stream_send(stream, &bytes[5..], false), Ok(5)); + s.advance().ok(); + + assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + assert_eq!(s.poll_server(), Err(Error::Done)); + + assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(5)); + + // Buffer multiple data frames. + let body = s.send_body_client(stream, false).unwrap(); + s.send_body_client(stream, false).unwrap(); + s.send_body_client(stream, false).unwrap(); + + assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + assert_eq!(s.poll_server(), Err(Error::Done)); + + { + let mut d = [42; 10]; + let mut b = octets::OctetsMut::with_slice(&mut d); + + b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap(); + b.put_varint(0).unwrap(); + let off = b.off(); + s.pipe.client.stream_send(stream, &d[..off], true).unwrap(); + + s.advance().ok(); + } + + let mut recv_buf = vec![0; bytes.len() * 3]; + + assert_eq!( + s.recv_body_server(stream, &mut recv_buf), + Ok(body.len() * 3) + ); + } + + #[test] + /// Tests that the Datagram event is properly re-armed. + fn dgram_event_rearm() { + let mut buf = [0; 65535]; + + let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + config + .load_cert_chain_from_pem_file("examples/cert.crt") + .unwrap(); + config + .load_priv_key_from_pem_file("examples/cert.key") + .unwrap(); + config.set_application_protos(b"\x02h3").unwrap(); + config.set_initial_max_data(1500); + config.set_initial_max_stream_data_bidi_local(150); + config.set_initial_max_stream_data_bidi_remote(150); + config.set_initial_max_stream_data_uni(150); + config.set_initial_max_streams_bidi(100); + config.set_initial_max_streams_uni(5); + config.verify_peer(false); + config.enable_dgram(true, 100, 100); + + let mut h3_config = Config::new().unwrap(); + let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap(); + s.handshake().unwrap(); + + // 10 bytes on flow ID 0 and 2. + let flow_0_result = (11, 0, 1); + let flow_2_result = (11, 2, 1); + + // Send requests followed by DATAGRAMs on client side. + let (stream, req) = s.send_request(false).unwrap(); + + let body = s.send_body_client(stream, true).unwrap(); + + let mut recv_buf = vec![0; body.len()]; + + let ev_headers = Event::Headers { + list: req, + has_body: true, + }; + + s.send_dgram_client(0).unwrap(); + s.send_dgram_client(0).unwrap(); + s.send_dgram_client(2).unwrap(); + s.send_dgram_client(2).unwrap(); + + assert_eq!(s.poll_server(), Ok((0, Event::Datagram))); + + assert_eq!(s.poll_server(), Ok((stream, ev_headers))); + assert_eq!(s.poll_server(), Ok((stream, Event::Data))); + + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result)); + + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result)); + + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result)); + + assert_eq!(s.poll_server(), Err(Error::Done)); + assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result)); + + assert_eq!(s.poll_server(), Err(Error::Done)); + + s.send_dgram_client(0).unwrap(); + s.send_dgram_client(2).unwrap(); + + assert_eq!(s.poll_server(), Ok((0, Event::Datagram))); + assert_eq!(s.poll_server(), Err(Error::Done)); + + assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + + assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result)); + assert_eq!(s.poll_server(), Err(Error::Done)); + + assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len())); + assert_eq!(s.poll_server(), Ok((stream, Event::Finished))); + } } +#[cfg(feature = "ffi")] mod ffi; mod frame; #[doc(hidden)] |