aboutsummaryrefslogtreecommitdiff
path: root/src/h3/mod.rs
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2021-06-08 17:53:00 -0700
committerBernie Innocenti <codewiz@google.com>2021-06-24 06:33:32 +0000
commitf41107f41f7f32aa9fed11b54c4b08229f4ac43e (patch)
treef322aa3a0b30f78d0f83e74509d4af2e92f0f9e0 /src/h3/mod.rs
parent7339ce145673e0923017350e679ada9c27069cb7 (diff)
downloadquiche-f41107f41f7f32aa9fed11b54c4b08229f4ac43e.tar.gz
Upgrade rust/crates/quiche to 0.9.0
Test: make Change-Id: I438d6a167e6e0bbfe38785ba13c33a285d1c510b Merged-In: Ide718a0186b4d1d9700b05623a7ee3b692062ea6
Diffstat (limited to 'src/h3/mod.rs')
-rw-r--r--src/h3/mod.rs1633
1 files changed, 1480 insertions, 153 deletions
diff --git a/src/h3/mod.rs b/src/h3/mod.rs
index 6248688..dd2a51a 100644
--- a/src/h3/mod.rs
+++ b/src/h3/mod.rs
@@ -59,8 +59,9 @@
//!
//! ```no_run
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
-//! # let scid = [0xba; 16];
-//! # let mut conn = quiche::connect(None, &scid, &mut config).unwrap();
+//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
+//! # let from = "127.0.0.1:1234".parse().unwrap();
+//! # let mut conn = quiche::accept(&scid, None, from, &mut config).unwrap();
//! # let h3_config = quiche::h3::Config::new()?;
//! let h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
//! # Ok::<(), quiche::h3::Error>(())
@@ -74,16 +75,17 @@
//!
//! ```no_run
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
-//! # let scid = [0xba; 16];
-//! # let mut conn = quiche::connect(None, &scid, &mut config).unwrap();
+//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
+//! # let to = "127.0.0.1:1234".parse().unwrap();
+//! # let mut conn = quiche::connect(None, &scid, to, &mut config).unwrap();
//! # let h3_config = quiche::h3::Config::new()?;
//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
//! let req = vec![
-//! quiche::h3::Header::new(":method", "GET"),
-//! quiche::h3::Header::new(":scheme", "https"),
-//! quiche::h3::Header::new(":authority", "quic.tech"),
-//! quiche::h3::Header::new(":path", "/"),
-//! quiche::h3::Header::new("user-agent", "quiche"),
+//! quiche::h3::Header::new(b":method", b"GET"),
+//! quiche::h3::Header::new(b":scheme", b"https"),
+//! quiche::h3::Header::new(b":authority", b"quic.tech"),
+//! quiche::h3::Header::new(b":path", b"/"),
+//! quiche::h3::Header::new(b"user-agent", b"quiche"),
//! ];
//!
//! h3_conn.send_request(&mut conn, &req, true)?;
@@ -95,16 +97,17 @@
//!
//! ```no_run
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
-//! # let scid = [0xba; 16];
-//! # let mut conn = quiche::connect(None, &scid, &mut config).unwrap();
+//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
+//! # let to = "127.0.0.1:1234".parse().unwrap();
+//! # let mut conn = quiche::connect(None, &scid, to, &mut config).unwrap();
//! # let h3_config = quiche::h3::Config::new()?;
//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
//! let req = vec![
-//! quiche::h3::Header::new(":method", "GET"),
-//! quiche::h3::Header::new(":scheme", "https"),
-//! quiche::h3::Header::new(":authority", "quic.tech"),
-//! quiche::h3::Header::new(":path", "/"),
-//! quiche::h3::Header::new("user-agent", "quiche"),
+//! quiche::h3::Header::new(b":method", b"GET"),
+//! quiche::h3::Header::new(b":scheme", b"https"),
+//! quiche::h3::Header::new(b":authority", b"quic.tech"),
+//! quiche::h3::Header::new(b":path", b"/"),
+//! quiche::h3::Header::new(b"user-agent", b"quiche"),
//! ];
//!
//! let stream_id = h3_conn.send_request(&mut conn, &req, false)?;
@@ -125,8 +128,9 @@
//! use quiche::h3::NameValue;
//!
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
-//! # let scid = [0xba; 16];
-//! # let mut conn = quiche::accept(&scid, None, &mut config).unwrap();
+//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
+//! # let from = "127.0.0.1:1234".parse().unwrap();
+//! # let mut conn = quiche::accept(&scid, None, from, &mut config).unwrap();
//! # let h3_config = quiche::h3::Config::new()?;
//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
//! loop {
@@ -135,15 +139,15 @@
//! let mut headers = list.into_iter();
//!
//! // Look for the request's method.
-//! let method = headers.find(|h| h.name() == ":method").unwrap();
+//! let method = headers.find(|h| h.name() == b":method").unwrap();
//!
//! // Look for the request's path.
-//! let path = headers.find(|h| h.name() == ":path").unwrap();
+//! let path = headers.find(|h| h.name() == b":path").unwrap();
//!
-//! if method.value() == "GET" && path.value() == "/" {
+//! if method.value() == b"GET" && path.value() == b"/" {
//! let resp = vec![
-//! quiche::h3::Header::new(":status", &200.to_string()),
-//! quiche::h3::Header::new("server", "quiche"),
+//! quiche::h3::Header::new(b":status", 200.to_string().as_bytes()),
+//! quiche::h3::Header::new(b"server", b"quiche"),
//! ];
//!
//! h3_conn.send_response(&mut conn, stream_id, &resp, false)?;
@@ -186,22 +190,25 @@
//! use quiche::h3::NameValue;
//!
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
-//! # let scid = [0xba; 16];
-//! # let mut conn = quiche::connect(None, &scid, &mut config).unwrap();
+//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
+//! # let to = "127.0.0.1:1234".parse().unwrap();
+//! # let mut conn = quiche::connect(None, &scid, to, &mut config).unwrap();
//! # let h3_config = quiche::h3::Config::new()?;
//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
//! loop {
//! match h3_conn.poll(&mut conn) {
//! Ok((stream_id, quiche::h3::Event::Headers{list, has_body})) => {
-//! let status = list.iter().find(|h| h.name() == ":status").unwrap();
+//! let status = list.iter().find(|h| h.name() == b":status").unwrap();
//! println!("Received {} response on stream {}",
-//! status.value(), stream_id);
+//! std::str::from_utf8(status.value()).unwrap(),
+//! stream_id);
//! },
//!
//! Ok((stream_id, quiche::h3::Event::Data)) => {
//! let mut body = vec![0; 4096];
//!
-//! if let Ok(read) =
+//! // Consume all body data received on the stream.
+//! while let Ok(read) =
//! h3_conn.recv_body(&mut conn, stream_id, &mut body)
//! {
//! println!("Received {} bytes of payload on stream {}",
@@ -274,7 +281,7 @@ use crate::octets;
///
/// [`Config::set_application_protos()`]:
/// ../struct.Config.html#method.set_application_protos
-pub const APPLICATION_PROTOCOL: &[u8] = b"\x05h3-29\x05h3-28\x05h3-27";
+pub const APPLICATION_PROTOCOL: &[u8] = b"\x02h3\x05h3-29\x05h3-28\x05h3-27";
// The offset used when converting HTTP/3 urgency to quiche urgency.
const PRIORITY_URGENCY_OFFSET: u8 = 124;
@@ -333,6 +340,30 @@ pub enum Error {
/// The underlying QUIC stream (or connection) doesn't have enough capacity
/// for the operation to complete. The application should retry later on.
StreamBlocked,
+
+ /// Error in the payload of a SETTINGS frame.
+ SettingsError,
+
+ /// Server rejected request.
+ RequestRejected,
+
+ /// Request or its response cancelled.
+ RequestCancelled,
+
+ /// Client's request stream terminated without containing a full-formed
+ /// request.
+ RequestIncomplete,
+
+ /// An HTTP message was malformed and cannot be processed.
+ MessageError,
+
+ /// The TCP connection established in response to a CONNECT request was
+ /// reset or abnormally closed.
+ ConnectError,
+
+ /// The requested operation cannot be served over HTTP/3. Peer should retry
+ /// over HTTP/1.1.
+ VersionFallback,
}
impl Error {
@@ -351,9 +382,17 @@ impl Error {
Error::BufferTooShort => 0x999,
Error::TransportError { .. } => 0xFF,
Error::StreamBlocked => 0xFF,
+ Error::SettingsError => 0x109,
+ Error::RequestRejected => 0x10B,
+ Error::RequestCancelled => 0x10C,
+ Error::RequestIncomplete => 0x10D,
+ Error::MessageError => 0x10E,
+ Error::ConnectError => 0x10F,
+ Error::VersionFallback => 0x110,
}
}
+ #[cfg(feature = "ffi")]
fn to_c(self) -> libc::ssize_t {
match self {
Error::Done => -1,
@@ -369,6 +408,13 @@ impl Error {
Error::QpackDecompressionFailed => -11,
Error::TransportError { .. } => -12,
Error::StreamBlocked => -13,
+ Error::SettingsError => -14,
+ Error::RequestRejected => -15,
+ Error::RequestCancelled => -16,
+ Error::RequestIncomplete => -17,
+ Error::MessageError => -18,
+ Error::ConnectError => -19,
+ Error::VersionFallback => -20,
}
}
}
@@ -420,7 +466,13 @@ impl Config {
/// Sets the `SETTINGS_MAX_HEADER_LIST_SIZE` setting.
///
- /// By default no limit is enforced.
+ /// By default no limit is enforced. When a request whose headers exceed
+ /// the limit set by the application is received, the call to the [`poll()`]
+ /// method will return the [`Error::ExcessiveLoad`] error, and the
+ /// connection will be closed.
+ ///
+ /// [`poll()`]: struct.Connection.html#method.poll
+ /// [`Error::ExcessiveLoad`]: enum.Error.html#variant.ExcessiveLoad
pub fn set_max_header_list_size(&mut self, v: u64) {
self.max_header_list_size = Some(v);
}
@@ -443,52 +495,52 @@ impl Config {
/// A trait for types with associated string name and value.
pub trait NameValue {
/// Returns the object's name.
- fn name(&self) -> &str;
+ fn name(&self) -> &[u8];
/// Returns the object's value.
- fn value(&self) -> &str;
+ fn value(&self) -> &[u8];
}
/// An owned name-value pair representing a raw HTTP header.
#[derive(Clone, Debug, PartialEq)]
-pub struct Header(String, String);
+pub struct Header(Vec<u8>, Vec<u8>);
impl Header {
/// Creates a new header.
///
/// Both `name` and `value` will be cloned.
- pub fn new(name: &str, value: &str) -> Self {
- Self(String::from(name), String::from(value))
+ pub fn new(name: &[u8], value: &[u8]) -> Self {
+ Self(name.to_vec(), value.to_vec())
}
}
impl NameValue for Header {
- fn name(&self) -> &str {
+ fn name(&self) -> &[u8] {
&self.0
}
- fn value(&self) -> &str {
+ fn value(&self) -> &[u8] {
&self.1
}
}
/// A non-owned name-value pair representing a raw HTTP header.
#[derive(Clone, Debug, PartialEq)]
-pub struct HeaderRef<'a>(&'a str, &'a str);
+pub struct HeaderRef<'a>(&'a [u8], &'a [u8]);
impl<'a> HeaderRef<'a> {
/// Creates a new header.
- pub fn new(name: &'a str, value: &'a str) -> Self {
+ pub fn new(name: &'a [u8], value: &'a [u8]) -> Self {
Self(name, value)
}
}
impl<'a> NameValue for HeaderRef<'a> {
- fn name(&self) -> &str {
+ fn name(&self) -> &[u8] {
self.0
}
- fn value(&self) -> &str {
+ fn value(&self) -> &[u8] {
self.1
}
}
@@ -511,16 +563,28 @@ pub enum Event {
/// This indicates that the application can use the [`recv_body()`] method
/// to retrieve the data from the stream.
///
- /// This event will keep being reported until all the available data is
- /// retrieved by the application.
+ /// Note that [`recv_body()`] will need to be called repeatedly until the
+ /// [`Done`] value is returned, as the event will not be re-armed until all
+ /// buffered data is read.
///
/// [`recv_body()`]: struct.Connection.html#method.recv_body
+ /// [`Done`]: enum.Error.html#variant.Done
Data,
/// Stream was closed,
Finished,
/// DATAGRAM was received.
+ ///
+ /// This indicates that the application can use the [`recv_dgram()`] method
+ /// to retrieve the HTTP/3 DATAGRAM.
+ ///
+ /// Note that [`recv_dgram()`] will need to be called repeatedly until the
+ /// [`Done`] value is returned, as the event will not be re-armed until all
+ /// buffered DATAGRAMs with the same flow ID are read.
+ ///
+ /// [`recv_dgram()`]: struct.Connection.html#method.recv_dgram
+ /// [`Done`]: enum.Error.html#variant.Done
Datagram,
/// GOAWAY was received.
@@ -531,6 +595,7 @@ struct ConnectionSettings {
pub max_header_list_size: Option<u64>,
pub qpack_max_table_capacity: Option<u64>,
pub qpack_blocked_streams: Option<u64>,
+ pub h3_datagram: Option<u64>,
}
struct QpackStreams {
@@ -556,6 +621,7 @@ pub struct Connection {
qpack_encoder: qpack::Encoder,
qpack_decoder: qpack::Decoder,
+ #[allow(dead_code)]
local_qpack_streams: QpackStreams,
peer_qpack_streams: QpackStreams,
@@ -567,11 +633,17 @@ pub struct Connection {
local_goaway_id: Option<u64>,
peer_goaway_id: Option<u64>,
+
+ dgram_event_triggered: bool,
}
impl Connection {
- fn new(config: &Config, is_server: bool) -> Result<Connection> {
+ #[allow(clippy::unnecessary_wraps)]
+ fn new(
+ config: &Config, is_server: bool, enable_dgram: bool,
+ ) -> Result<Connection> {
let initial_uni_stream_id = if is_server { 0x3 } else { 0x2 };
+ let h3_datagram = if enable_dgram { Some(1) } else { None };
Ok(Connection {
is_server,
@@ -586,12 +658,14 @@ impl Connection {
max_header_list_size: config.max_header_list_size,
qpack_max_table_capacity: config.qpack_max_table_capacity,
qpack_blocked_streams: config.qpack_blocked_streams,
+ h3_datagram,
},
peer_settings: ConnectionSettings {
max_header_list_size: None,
qpack_max_table_capacity: None,
qpack_blocked_streams: None,
+ h3_datagram: None,
},
control_stream_id: None,
@@ -618,6 +692,8 @@ impl Connection {
local_goaway_id: None,
peer_goaway_id: None,
+
+ dgram_event_triggered: false,
})
}
@@ -625,12 +701,27 @@ impl Connection {
///
/// This will also initiate the HTTP/3 handshake with the peer by opening
/// all control streams (including QPACK) and sending the local settings.
+ ///
+ /// On success the new connection is returned.
+ ///
+ /// The [`StreamLimit`] error is returned when the HTTP/3 control stream
+ /// cannot be created.
+ ///
+ /// [`StreamLimit`]: ../enum.Error.html#variant.InvalidState
pub fn with_transport(
conn: &mut super::Connection, config: &Config,
) -> Result<Connection> {
- let mut http3_conn = Connection::new(config, conn.is_server)?;
+ let mut http3_conn =
+ Connection::new(config, conn.is_server, conn.dgram_enabled())?;
- http3_conn.send_settings(conn)?;
+ match http3_conn.send_settings(conn) {
+ Ok(_) => (),
+
+ Err(e) => {
+ conn.close(true, e.to_wire(), b"Error opening control stream")?;
+ return Err(e);
+ },
+ };
// Try opening QPACK streams, but ignore errors if it fails since we
// don't need them right now.
@@ -680,7 +771,11 @@ impl Connection {
// stream_capacity() will fail. By writing a 0-length buffer, we force
// the creation of the QUIC stream state, without actually writing
// anything.
- conn.stream_send(stream_id, b"", false)?;
+ if let Err(e) = conn.stream_send(stream_id, b"", false) {
+ self.streams.remove(&stream_id);
+
+ return Err(e.into());
+ };
self.send_headers(conn, stream_id, headers, fin)?;
@@ -737,7 +832,7 @@ impl Connection {
return Err(Error::FrameUnexpected);
}
- let mut urgency = 3;
+ let mut urgency = 3u8.saturating_add(PRIORITY_URGENCY_OFFSET);
let mut incremental = false;
for param in priority.split(',') {
@@ -755,11 +850,14 @@ impl Connection {
// TODO: this also detects when u is not an sh-integer and
// clamps it in the same way. A real structured header parser
// would actually fail to parse.
- let mut u =
- i64::from_str_radix(param.rsplit('=').next().unwrap(), 10)
- .unwrap_or(7);
-
- if u < 0 || u > 7 {
+ let mut u = param
+ .rsplit('=')
+ .next()
+ .unwrap()
+ .parse::<i64>()
+ .unwrap_or(7);
+
+ if !(0..=7).contains(&u) {
u = 7;
}
@@ -806,7 +904,17 @@ impl Connection {
self.frames_greased = true;
}
- let stream_cap = conn.stream_capacity(stream_id)?;
+ let stream_cap = match conn.stream_capacity(stream_id) {
+ Ok(v) => v,
+
+ Err(e) => {
+ if conn.stream_finished(stream_id) {
+ self.streams.remove(&stream_id);
+ }
+
+ return Err(e.into());
+ },
+ };
let header_block = self.encode_header_block(headers)?;
@@ -881,15 +989,28 @@ impl Connection {
},
};
+ // Avoid sending 0-length DATA frames when the fin flag is false.
+ if body.is_empty() && !fin {
+ return Err(Error::Done);
+ }
+
let overhead = octets::varint_len(frame::DATA_FRAME_TYPE_ID) +
octets::varint_len(body.len() as u64);
- let stream_cap = conn.stream_capacity(stream_id)?;
+ let stream_cap = match conn.stream_capacity(stream_id) {
+ Ok(v) => v,
+
+ Err(e) => {
+ if conn.stream_finished(stream_id) {
+ self.streams.remove(&stream_id);
+ }
+
+ return Err(e.into());
+ },
+ };
- // Make sure there is enough capacity to send the frame header and at
- // least one byte of frame payload (this to avoid sending 0-length DATA
- // frames).
- if stream_cap <= overhead {
+ // Make sure there is enough capacity to send the DATA frame header.
+ if stream_cap < overhead {
return Err(Error::Done);
}
@@ -900,6 +1021,11 @@ impl Connection {
// application can try again later.
let fin = if body_len != body.len() { false } else { fin };
+ // Again, avoid sending 0-length DATA frames when the fin flag is false.
+ if body_len == 0 && !fin {
+ return Err(Error::Done);
+ }
+
trace!(
"{} tx frm DATA stream={} len={} fin={}",
conn.trace_id(),
@@ -924,6 +1050,18 @@ impl Connection {
Ok(written)
}
+ /// Returns whether the peer enabled HTTP/3 DATAGRAM frame support.
+ ///
+ /// Support is signalled by the peer's SETTINGS, so this method always
+ /// returns false until they have been processed using the [`poll()`]
+ /// method.
+ ///
+ /// [`poll()`]: struct.Connection.html#method.poll
+ pub fn dgram_enabled_by_peer(&self, conn: &super::Connection) -> bool {
+ self.peer_settings.h3_datagram == Some(1) &&
+ conn.dgram_max_writable_len().is_some()
+ }
+
/// Sends an HTTP/3 DATAGRAM with the specified flow ID.
pub fn send_dgram(
&mut self, conn: &mut super::Connection, flow_id: u64, buf: &[u8],
@@ -977,6 +1115,35 @@ impl Connection {
}
}
+ // A helper function for determining if there is a DATAGRAM event.
+ fn process_dgrams(
+ &mut self, conn: &mut super::Connection,
+ ) -> Result<(u64, Event)> {
+ let mut d = [0; 8];
+
+ match conn.dgram_recv_peek(&mut d, 8) {
+ Ok(_) => {
+ if self.dgram_event_triggered {
+ return Err(Error::Done);
+ }
+
+ self.dgram_event_triggered = true;
+
+ Ok((0, Event::Datagram))
+ },
+
+ Err(crate::Error::Done) => {
+ // The dgram recv queue is empty, so re-arm the Datagram event
+ // so it is issued next time a DATAGRAM is received.
+ self.dgram_event_triggered = false;
+
+ Err(Error::Done)
+ },
+
+ Err(e) => Err(Error::TransportError(e)),
+ }
+ }
+
/// Reads request or response body data into the provided buffer.
///
/// Applications should call this method whenever the [`poll()`] method
@@ -991,32 +1158,78 @@ impl Connection {
pub fn recv_body(
&mut self, conn: &mut super::Connection, stream_id: u64, out: &mut [u8],
) -> Result<usize> {
- let stream = self.streams.get_mut(&stream_id).ok_or(Error::Done)?;
+ let mut total = 0;
- if stream.state() != stream::State::Data {
- return Err(Error::Done);
- }
+ // Try to consume all buffered data for the stream, even across multiple
+ // DATA frames.
+ while total < out.len() {
+ let stream = self.streams.get_mut(&stream_id).ok_or(Error::Done)?;
+
+ if stream.state() != stream::State::Data {
+ break;
+ }
+
+ let (read, fin) =
+ match stream.try_consume_data(conn, &mut out[total..]) {
+ Ok(v) => v,
+
+ Err(Error::Done) => break,
+
+ Err(e) => return Err(e),
+ };
+
+ total += read;
+
+ // No more data to read, we are done.
+ if read == 0 || fin {
+ break;
+ }
+
+ // Process incoming data from the stream. For example, if a whole
+ // DATA frame was consumed, and another one is queued behind it,
+ // this will ensure the additional data will also be returned to
+ // the application.
+ match self.process_readable_stream(conn, stream_id, false) {
+ Ok(_) => unreachable!(),
+
+ Err(Error::Done) => (),
- let read = stream.try_consume_data(conn, out)?;
+ Err(e) => return Err(e),
+ };
+
+ if conn.stream_finished(stream_id) {
+ break;
+ }
+ }
// While body is being received, the stream is marked as finished only
// when all data is read by the application.
if conn.stream_finished(stream_id) {
- self.finished_streams.push_back(stream_id);
+ self.process_finished_stream(stream_id);
+ }
+
+ if total == 0 {
+ return Err(Error::Done);
}
- Ok(read)
+ Ok(total)
}
/// Processes HTTP/3 data received from the peer.
///
- /// On success it returns an [`Event`] and an ID.
+ /// On success it returns an [`Event`] and an ID, or [`Done`] when there are
+ /// no events to report.
+ ///
+ /// Note that all events are edge-triggered, meaning that once reported they
+ /// will not be reported again by calling this method again, until the event
+ /// is re-armed.
///
/// The events [`Headers`], [`Data`] and [`Finished`] return a stream ID,
/// which is used in methods [`recv_body()`], [`send_response()`] or
/// [`send_body()`].
///
- /// The event [`Datagram`] returns a flow ID.
+ /// The event [`Datagram`] returns a dummy value of `0`, this should be
+ /// ignored by the application.
///
/// The event [`GoAway`] returns an ID that depends on the connection role.
/// A client receives the largest processed stream ID. A server receives the
@@ -1026,6 +1239,7 @@ impl Connection {
/// the appropriate error code, using the transport's [`close()`] method.
///
/// [`Event`]: enum.Event.html
+ /// [`Done`]: enum.Error.html#variant.Done
/// [`Headers`]: enum.Event.html#variant.Headers
/// [`Data`]: enum.Event.html#variant.Data
/// [`Finished`]: enum.Event.html#variant.Finished
@@ -1040,7 +1254,7 @@ impl Connection {
// When connection close is initiated by the local application (e.g. due
// to a protocol error), the connection itself might be in a broken
// state, so return early.
- if conn.error.is_some() || conn.app_error.is_some() {
+ if conn.local_error.is_some() {
return Err(Error::Done);
}
@@ -1080,26 +1294,20 @@ impl Connection {
return Ok((finished, Event::Finished));
}
- // Process DATAGRAMs
- let mut d = [0; 8];
+ // Process queued DATAGRAMs if the poll threshold allows it.
+ match self.process_dgrams(conn) {
+ Ok(v) => return Ok(v),
- match conn.dgram_recv_peek(&mut d, 8) {
- Ok(_) => {
- let mut b = octets::Octets::with_slice(&d);
- let flow_id = b.get_varint()?;
- return Ok((flow_id, Event::Datagram));
- },
-
- Err(crate::Error::Done) => (),
+ Err(Error::Done) => (),
- Err(e) => return Err(Error::TransportError(e)),
+ Err(e) => return Err(e),
};
// Process HTTP/3 data from readable streams.
for s in conn.readable() {
trace!("{} stream id {} is readable", conn.trace_id(), s);
- let ev = match self.process_readable_stream(conn, s) {
+ let ev = match self.process_readable_stream(conn, s, true) {
Ok(v) => Some(v),
Err(Error::Done) => None,
@@ -1108,16 +1316,22 @@ impl Connection {
};
if conn.stream_finished(s) {
- self.finished_streams.push_back(s);
+ self.process_finished_stream(s);
}
// TODO: check if stream is completed so it can be freed
-
if let Some(ev) = ev {
return Ok(ev);
}
}
+ // Process finished streams list once again, to make sure `Finished`
+ // events are returned when receiving empty stream frames with the fin
+ // flag set.
+ if let Some(finished) = self.finished_streams.pop_front() {
+ return Ok((finished, Event::Finished));
+ }
+
Err(Error::Done)
}
@@ -1135,9 +1349,13 @@ impl Connection {
pub fn send_goaway(
&mut self, conn: &mut super::Connection, id: u64,
) -> Result<()> {
+ let mut id = id;
+
+ // TODO: server push
+ //
+ // In the meantime always send 0 from client.
if !self.is_server {
- // TODO: server push
- return Ok(());
+ id = 0;
}
if self.is_server && id % 4 != 0 {
@@ -1237,7 +1455,17 @@ impl Connection {
) -> Result<()> {
let mut d = [0; 8];
- let stream_cap = conn.stream_capacity(stream_id)?;
+ let stream_cap = match conn.stream_capacity(stream_id) {
+ Ok(v) => v,
+
+ Err(e) => {
+ if conn.stream_finished(stream_id) {
+ self.streams.remove(&stream_id);
+ }
+
+ return Err(e.into());
+ },
+ };
let grease_frame1 = grease_value();
let grease_frame2 = grease_value();
@@ -1283,7 +1511,7 @@ impl Connection {
Ok(stream_id) => {
trace!("{} open GREASE stream {}", conn.trace_id(), stream_id);
- conn.stream_send(stream_id, b"GREASE is the word", false)?;
+ conn.stream_send(stream_id, b"GREASE is the word", true)?;
},
Err(Error::IdError) => {
@@ -1316,6 +1544,7 @@ impl Connection {
.local_settings
.qpack_max_table_capacity,
qpack_blocked_streams: self.local_settings.qpack_blocked_streams,
+ h3_datagram: self.local_settings.h3_datagram,
grease,
};
@@ -1346,7 +1575,7 @@ impl Connection {
return Err(Error::ClosedCriticalStream);
}
- match self.process_readable_stream(conn, stream_id) {
+ match self.process_readable_stream(conn, stream_id, true) {
Ok(ev) => return Ok(ev),
Err(Error::Done) => (),
@@ -1368,7 +1597,7 @@ impl Connection {
}
fn process_readable_stream(
- &mut self, conn: &mut super::Connection, stream_id: u64,
+ &mut self, conn: &mut super::Connection, stream_id: u64, polling: bool,
) -> Result<(u64, Event)> {
self.streams
.entry(stream_id)
@@ -1541,6 +1770,11 @@ impl Connection {
},
stream::State::FramePayload => {
+ // Do not emit events when not polling.
+ if !polling {
+ break;
+ }
+
stream.try_fill_buffer(conn)?;
let frame = match stream.try_consume_frame() {
@@ -1569,6 +1803,15 @@ impl Connection {
},
stream::State::Data => {
+ // Do not emit events when not polling.
+ if !polling {
+ break;
+ }
+
+ if !stream.try_trigger_data_event() {
+ break;
+ }
+
return Ok((stream_id, Event::Data));
},
@@ -1583,16 +1826,44 @@ impl Connection {
stream::State::Drain => {
// Discard incoming data on the stream.
- conn.stream_shutdown(stream_id, crate::Shutdown::Read, 0)?;
+ conn.stream_shutdown(
+ stream_id,
+ crate::Shutdown::Read,
+ 0x100,
+ )?;
break;
},
+
+ stream::State::Finished => break,
}
}
Err(Error::Done)
}
+ fn process_finished_stream(&mut self, stream_id: u64) {
+ let stream = match self.streams.get_mut(&stream_id) {
+ Some(v) => v,
+
+ None => return,
+ };
+
+ if stream.state() == stream::State::Finished {
+ return;
+ }
+
+ match stream.ty() {
+ Some(stream::Type::Request) | Some(stream::Type::Push) => {
+ stream.finished();
+
+ self.finished_streams.push_back(stream_id);
+ },
+
+ _ => (),
+ };
+ }
+
fn process_frame(
&mut self, conn: &mut super::Connection, stream_id: u64,
frame: frame::Frame,
@@ -1609,13 +1880,28 @@ impl Connection {
max_header_list_size,
qpack_max_table_capacity,
qpack_blocked_streams,
+ h3_datagram,
..
} => {
self.peer_settings = ConnectionSettings {
max_header_list_size,
qpack_max_table_capacity,
qpack_blocked_streams,
+ h3_datagram,
};
+
+ if let Some(1) = h3_datagram {
+ // The peer MUST have also enabled DATAGRAM with a TP
+ if conn.dgram_max_writable_len().is_none() {
+ conn.close(
+ true,
+ Error::SettingsError.to_wire(),
+ b"H3_DATAGRAM sent with value 1 but max_datagram_frame_size TP not set.",
+ )?;
+
+ return Err(Error::SettingsError);
+ }
+ }
},
frame::Frame::Headers { header_block } => {
@@ -1636,14 +1922,25 @@ impl Connection {
.max_header_list_size
.unwrap_or(std::u64::MAX);
- let headers = self
+ let headers = match self
.qpack_decoder
.decode(&header_block[..], max_size)
- .map_err(|e| match e {
- qpack::Error::HeaderListTooLarge => Error::ExcessiveLoad,
+ {
+ Ok(v) => v,
+
+ Err(e) => {
+ let e = match e {
+ qpack::Error::HeaderListTooLarge =>
+ Error::ExcessiveLoad,
- _ => Error::QpackDecompressionFailed,
- })?;
+ _ => Error::QpackDecompressionFailed,
+ };
+
+ conn.close(true, e.to_wire(), b"Error parsing headers.")?;
+
+ return Err(e);
+ },
+ };
let has_body = !conn.stream_finished(stream_id);
@@ -1814,8 +2111,6 @@ pub mod testing {
pub pipe: testing::Pipe,
pub client: Connection,
pub server: Connection,
-
- buf: [u8; 65535],
}
impl Session {
@@ -1831,6 +2126,7 @@ pub mod testing {
config.set_initial_max_streams_bidi(5);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
+ config.enable_dgram(true, 3, 3);
let h3_config = Config::new()?;
Session::with_configs(&mut config, &h3_config)
@@ -1839,47 +2135,49 @@ pub mod testing {
pub fn with_configs(
config: &mut crate::Config, h3_config: &Config,
) -> Result<Session> {
+ let pipe = testing::Pipe::with_config(config)?;
+ let client_dgram = pipe.client.dgram_enabled();
+ let server_dgram = pipe.server.dgram_enabled();
Ok(Session {
- pipe: testing::Pipe::with_config(config)?,
- client: Connection::new(&h3_config, false)?,
- server: Connection::new(&h3_config, true)?,
- buf: [0; 65535],
+ pipe,
+ client: Connection::new(&h3_config, false, client_dgram)?,
+ server: Connection::new(&h3_config, true, server_dgram)?,
})
}
/// Do the HTTP/3 handshake so both ends are in sane initial state.
pub fn handshake(&mut self) -> Result<()> {
- self.pipe.handshake(&mut self.buf)?;
+ self.pipe.handshake()?;
// Client streams.
self.client.send_settings(&mut self.pipe.client)?;
- self.pipe.advance(&mut self.buf).ok();
+ self.pipe.advance().ok();
self.client
.open_qpack_encoder_stream(&mut self.pipe.client)?;
- self.pipe.advance(&mut self.buf).ok();
+ self.pipe.advance().ok();
self.client
.open_qpack_decoder_stream(&mut self.pipe.client)?;
- self.pipe.advance(&mut self.buf).ok();
+ self.pipe.advance().ok();
if self.pipe.client.grease {
self.client.open_grease_stream(&mut self.pipe.client)?;
}
- self.pipe.advance(&mut self.buf).ok();
+ self.pipe.advance().ok();
// Server streams.
self.server.send_settings(&mut self.pipe.server)?;
- self.pipe.advance(&mut self.buf).ok();
+ self.pipe.advance().ok();
self.server
.open_qpack_encoder_stream(&mut self.pipe.server)?;
- self.pipe.advance(&mut self.buf).ok();
+ self.pipe.advance().ok();
self.server
.open_qpack_decoder_stream(&mut self.pipe.server)?;
- self.pipe.advance(&mut self.buf).ok();
+ self.pipe.advance().ok();
if self.pipe.server.grease {
self.server.open_grease_stream(&mut self.pipe.server)?;
@@ -1900,7 +2198,7 @@ pub mod testing {
/// Advances the session pipe over the buffer.
pub fn advance(&mut self) -> crate::Result<()> {
- self.pipe.advance(&mut self.buf)
+ self.pipe.advance()
}
/// Polls the client for events.
@@ -1918,11 +2216,11 @@ pub mod testing {
/// On success it returns the newly allocated stream and the headers.
pub fn send_request(&mut self, fin: bool) -> Result<(u64, Vec<Header>)> {
let req = vec![
- Header::new(":method", "GET"),
- Header::new(":scheme", "https"),
- Header::new(":authority", "quic.tech"),
- Header::new(":path", "/test"),
- Header::new("user-agent", "quiche-test"),
+ Header::new(b":method", b"GET"),
+ Header::new(b":scheme", b"https"),
+ Header::new(b":authority", b"quic.tech"),
+ Header::new(b":path", b"/test"),
+ Header::new(b"user-agent", b"quiche-test"),
];
let stream =
@@ -1940,8 +2238,8 @@ pub mod testing {
&mut self, stream: u64, fin: bool,
) -> Result<Vec<Header>> {
let resp = vec![
- Header::new(":status", "200"),
- Header::new("server", "quiche-test"),
+ Header::new(b":status", b"200"),
+ Header::new(b"server", b"quiche-test"),
];
self.server.send_response(
@@ -2024,6 +2322,54 @@ pub mod testing {
Ok(())
}
+ /// Send an HTTP/3 DATAGRAM with default data from the client.
+ ///
+ /// On success it returns the data.
+ pub fn send_dgram_client(&mut self, flow_id: u64) -> Result<Vec<u8>> {
+ let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
+
+ self.client
+ .send_dgram(&mut self.pipe.client, flow_id, &bytes)?;
+
+ self.advance().ok();
+
+ Ok(bytes)
+ }
+
+ /// Receives an HTTP/3 DATAGRAM from the server.
+ ///
+ /// On success it returns the DATAGRAM length, flow ID and flow ID
+ /// length.
+ pub fn recv_dgram_client(
+ &mut self, buf: &mut [u8],
+ ) -> Result<(usize, u64, usize)> {
+ self.client.recv_dgram(&mut self.pipe.client, buf)
+ }
+
+ /// Send an HTTP/3 DATAGRAM with default data from the server
+ ///
+ /// On success it returns the data.
+ pub fn send_dgram_server(&mut self, flow_id: u64) -> Result<Vec<u8>> {
+ let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
+
+ self.server
+ .send_dgram(&mut self.pipe.server, flow_id, &bytes)?;
+
+ self.advance().ok();
+
+ Ok(bytes)
+ }
+
+ /// Receives an HTTP/3 DATAGRAM from the client.
+ ///
+ /// On success it returns the DATAGRAM length, flow ID and flow ID
+ /// length.
+ pub fn recv_dgram_server(
+ &mut self, buf: &mut [u8],
+ ) -> Result<(usize, u64, usize)> {
+ self.server.recv_dgram(&mut self.pipe.server, buf)
+ }
+
/// Sends a single HTTP/3 frame from the server.
pub fn send_frame_server(
&mut self, frame: frame::Frame, stream_id: u64, fin: bool,
@@ -2041,6 +2387,28 @@ pub mod testing {
Ok(())
}
+
+ /// Sends an arbitrary buffer of HTTP/3 stream data from the client.
+ pub fn send_arbitrary_stream_data_client(
+ &mut self, data: &[u8], stream_id: u64, fin: bool,
+ ) -> Result<()> {
+ self.pipe.client.stream_send(stream_id, data, fin)?;
+
+ self.advance().ok();
+
+ Ok(())
+ }
+
+ /// Sends an arbitrary buffer of HTTP/3 stream data from the server.
+ pub fn send_arbitrary_stream_data_server(
+ &mut self, data: &[u8], stream_id: u64, fin: bool,
+ ) -> Result<()> {
+ self.pipe.server.stream_send(stream_id, data, fin)?;
+
+ self.advance().ok();
+
+ Ok(())
+ }
}
}
@@ -2158,9 +2526,10 @@ mod tests {
};
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
+ assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
+ assert_eq!(s.poll_client(), Err(Error::Done));
for _ in 0..total_data_frames {
- assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
}
@@ -2227,9 +2596,10 @@ mod tests {
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
+ assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+ assert_eq!(s.poll_server(), Err(Error::Done));
for _ in 0..total_data_frames {
- assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
}
@@ -2288,7 +2658,8 @@ mod tests {
assert_eq!(ev, ev_headers);
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
- assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+
assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
}
@@ -2320,6 +2691,46 @@ mod tests {
}
#[test]
+ /// Send a request with no body, get a response with one DATA frame and an
+ /// empty FIN after reception from the client.
+ fn request_no_body_response_one_chunk_empty_fin() {
+ let mut s = Session::default().unwrap();
+ s.handshake().unwrap();
+
+ let (stream, req) = s.send_request(true).unwrap();
+
+ let ev_headers = Event::Headers {
+ list: req,
+ has_body: false,
+ };
+
+ assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
+ assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
+
+ let resp = s.send_response(stream, false).unwrap();
+
+ let body = s.send_body_server(stream, false).unwrap();
+
+ let mut recv_buf = vec![0; body.len()];
+
+ let ev_headers = Event::Headers {
+ list: resp,
+ has_body: true,
+ };
+
+ assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
+
+ assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
+ assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
+
+ assert_eq!(s.pipe.server.stream_send(stream, &[], true), Ok(0));
+ s.advance().ok();
+
+ assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ }
+
+ #[test]
/// Try to send DATA frames before HEADERS.
fn body_response_before_headers() {
let mut s = Session::default().unwrap();
@@ -2620,12 +3031,12 @@ mod tests {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
- s.client.send_goaway(&mut s.pipe.client, 1).unwrap();
+ s.client.send_goaway(&mut s.pipe.client, 100).unwrap();
s.advance().ok();
// TODO: server push
- assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.poll_server(), Ok((0, Event::GoAway)));
}
#[test]
@@ -2703,10 +3114,10 @@ mod tests {
fn uni_stream_local_counting() {
let config = Config::new().unwrap();
- let h3_cln = Connection::new(&config, false).unwrap();
+ let h3_cln = Connection::new(&config, false, false).unwrap();
assert_eq!(h3_cln.next_uni_stream_id, 2);
- let h3_srv = Connection::new(&config, true).unwrap();
+ let h3_srv = Connection::new(&config, true, false).unwrap();
assert_eq!(h3_srv.next_uni_stream_id, 3);
}
@@ -2842,10 +3253,32 @@ mod tests {
#[test]
/// Tests limits for the stream state buffer maximum size.
fn max_state_buf_size() {
- // DATA frames don't consume the state buffer, so can be of any size.
let mut s = Session::default().unwrap();
s.handshake().unwrap();
+ let req = vec![
+ Header::new(b":method", b"GET"),
+ Header::new(b":scheme", b"https"),
+ Header::new(b":authority", b"quic.tech"),
+ Header::new(b":path", b"/test"),
+ Header::new(b"user-agent", b"quiche-test"),
+ ];
+
+ assert_eq!(
+ s.client.send_request(&mut s.pipe.client, &req, false),
+ Ok(0)
+ );
+
+ s.advance().ok();
+
+ let ev_headers = Event::Headers {
+ list: req,
+ has_body: true,
+ };
+
+ assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, ev_headers)));
+
+ // DATA frames don't consume the state buffer, so can be of any size.
let mut d = [42; 128];
let mut b = octets::OctetsMut::with_slice(&mut d);
@@ -2919,16 +3352,16 @@ mod tests {
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
+ assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+ assert_eq!(s.poll_server(), Err(Error::Done));
for _ in 0..total_data_frames {
- assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(
s.recv_body_server(stream, &mut recv_buf),
Ok(bytes.len())
);
}
- assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(
s.recv_body_server(stream, &mut recv_buf),
Ok(bytes.len() - 2)
@@ -2966,11 +3399,11 @@ mod tests {
s.handshake().unwrap();
let req = vec![
- Header::new(":method", "GET"),
- Header::new(":scheme", "https"),
- Header::new(":authority", "quic.tech"),
- Header::new(":path", "/test"),
- Header::new("aaaaaaa", "aaaaaaaa"),
+ Header::new(b":method", b"GET"),
+ Header::new(b":scheme", b"https"),
+ Header::new(b":authority", b"quic.tech"),
+ Header::new(b":path", b"/test"),
+ Header::new(b"aaaaaaa", b"aaaaaaaa"),
];
let stream = s
@@ -2983,6 +3416,11 @@ mod tests {
assert_eq!(stream, 0);
assert_eq!(s.poll_server(), Err(Error::ExcessiveLoad));
+
+ assert_eq!(
+ s.pipe.server.local_error.as_ref().unwrap().error_code,
+ Error::to_wire(Error::ExcessiveLoad)
+ );
}
#[test]
@@ -2992,11 +3430,11 @@ mod tests {
s.handshake().unwrap();
let req = vec![
- Header::new(":method", "GET"),
- Header::new(":scheme", "https"),
- Header::new(":authority", "quic.tech"),
- Header::new(":path", "/test"),
- Header::new("user-agent", "quiche-test"),
+ Header::new(b":method", b"GET"),
+ Header::new(b":scheme", b"https"),
+ Header::new(b":authority", b"quic.tech"),
+ Header::new(b":path", b"/test"),
+ Header::new(b"user-agent", b"quiche-test"),
];
// We need to open all streams in the same flight, so we can't use the
@@ -3022,9 +3460,8 @@ mod tests {
}
#[test]
- /// Tests that calling poll() after an error occured does nothing.
- fn poll_after_error() {
- // DATA frames don't consume the state buffer, so can be of any size.
+ /// Tests that sending DATA before HEADERS causes an error.
+ fn data_before_headers() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
@@ -3034,16 +3471,22 @@ mod tests {
let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
s.pipe.client.stream_send(0, frame_type, false).unwrap();
- let frame_len = b.put_varint(1 << 24).unwrap();
+ let frame_len = b.put_varint(5).unwrap();
s.pipe.client.stream_send(0, frame_len, false).unwrap();
- s.pipe.client.stream_send(0, &d, false).unwrap();
+ s.pipe.client.stream_send(0, b"hello", false).unwrap();
s.advance().ok();
- assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, Event::Data)));
+ assert_eq!(
+ s.server.poll(&mut s.pipe.server),
+ Err(Error::FrameUnexpected)
+ );
+ }
- // GREASE frames consume the state buffer, so need to be limited.
+ #[test]
+ /// Tests that calling poll() after an error occured does nothing.
+ fn poll_after_error() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
@@ -3092,10 +3535,10 @@ mod tests {
s.handshake().unwrap();
let req = vec![
- Header::new(":method", "GET"),
- Header::new(":scheme", "https"),
- Header::new(":authority", "quic.tech"),
- Header::new(":path", "/test"),
+ Header::new(b":method", b"GET"),
+ Header::new(b":scheme", b"https"),
+ Header::new(b":authority", b"quic.tech"),
+ Header::new(b":path", b"/test"),
];
assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
@@ -3113,6 +3556,61 @@ mod tests {
}
#[test]
+ /// Test handling of 0-length DATA writes with and without fin.
+ fn zero_length_data() {
+ let mut s = Session::default().unwrap();
+ s.handshake().unwrap();
+
+ let (stream, req) = s.send_request(false).unwrap();
+
+ assert_eq!(
+ s.client.send_body(&mut s.pipe.client, 0, b"", false),
+ Err(Error::Done)
+ );
+ assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
+
+ s.advance().ok();
+
+ let mut recv_buf = vec![0; 100];
+
+ let ev_headers = Event::Headers {
+ list: req,
+ has_body: true,
+ };
+
+ assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
+
+ assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+ assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
+
+ assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ let resp = s.send_response(stream, false).unwrap();
+
+ assert_eq!(
+ s.server.send_body(&mut s.pipe.server, 0, b"", false),
+ Err(Error::Done)
+ );
+ assert_eq!(s.server.send_body(&mut s.pipe.server, 0, b"", true), Ok(0));
+
+ s.advance().ok();
+
+ let ev_headers = Event::Headers {
+ list: resp,
+ has_body: true,
+ };
+
+ assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
+
+ assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
+ assert_eq!(s.recv_body_client(stream, &mut recv_buf), Err(Error::Done));
+
+ assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ }
+
+ #[test]
/// Tests that blocked 0-length DATA writes are reported correctly.
fn zero_length_data_blocked() {
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
@@ -3123,7 +3621,7 @@ mod tests {
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(b"\x02h3").unwrap();
- config.set_initial_max_data(70);
+ config.set_initial_max_data(69);
config.set_initial_max_stream_data_bidi_local(150);
config.set_initial_max_stream_data_bidi_remote(150);
config.set_initial_max_stream_data_uni(150);
@@ -3138,10 +3636,10 @@ mod tests {
s.handshake().unwrap();
let req = vec![
- Header::new(":method", "GET"),
- Header::new(":scheme", "https"),
- Header::new(":authority", "quic.tech"),
- Header::new(":path", "/test"),
+ Header::new(b":method", b"GET"),
+ Header::new(b":scheme", b"https"),
+ Header::new(b":authority", b"quic.tech"),
+ Header::new(b":path", b"/test"),
];
assert_eq!(
@@ -3159,8 +3657,837 @@ mod tests {
// Once the server gives flow control credits back, we can send the body.
assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
}
+
+ #[test]
+ /// Tests that receiving a H3_DATAGRAM setting is ok.
+ fn dgram_setting() {
+ let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
+ config
+ .load_cert_chain_from_pem_file("examples/cert.crt")
+ .unwrap();
+ config
+ .load_priv_key_from_pem_file("examples/cert.key")
+ .unwrap();
+ config.set_application_protos(b"\x02h3").unwrap();
+ config.set_initial_max_data(70);
+ config.set_initial_max_stream_data_bidi_local(150);
+ config.set_initial_max_stream_data_bidi_remote(150);
+ config.set_initial_max_stream_data_uni(150);
+ config.set_initial_max_streams_bidi(100);
+ config.set_initial_max_streams_uni(5);
+ config.enable_dgram(true, 1000, 1000);
+ config.verify_peer(false);
+
+ let h3_config = Config::new().unwrap();
+
+ let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
+ assert_eq!(s.pipe.handshake(), Ok(()));
+
+ s.client.send_settings(&mut s.pipe.client).unwrap();
+ assert_eq!(s.pipe.advance(), Ok(()));
+
+ // Before processing SETTINGS (via poll), HTTP/3 DATAGRAMS are not
+ // enabled.
+ assert!(!s.server.dgram_enabled_by_peer(&s.pipe.server));
+
+ // When everything is ok, poll returns Done and DATAGRAM is enabled.
+ assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
+ assert!(s.server.dgram_enabled_by_peer(&s.pipe.server));
+
+ // Now detect things on the client
+ s.server.send_settings(&mut s.pipe.server).unwrap();
+ assert_eq!(s.pipe.advance(), Ok(()));
+ assert!(!s.client.dgram_enabled_by_peer(&s.pipe.client));
+ assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
+ assert!(s.client.dgram_enabled_by_peer(&s.pipe.client));
+ }
+
+ #[test]
+ /// Tests that receiving a H3_DATAGRAM setting when no TP is set generates
+ /// an error.
+ fn dgram_setting_no_tp() {
+ let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
+ config
+ .load_cert_chain_from_pem_file("examples/cert.crt")
+ .unwrap();
+ config
+ .load_priv_key_from_pem_file("examples/cert.key")
+ .unwrap();
+ config.set_application_protos(b"\x02h3").unwrap();
+ config.set_initial_max_data(70);
+ config.set_initial_max_stream_data_bidi_local(150);
+ config.set_initial_max_stream_data_bidi_remote(150);
+ config.set_initial_max_stream_data_uni(150);
+ config.set_initial_max_streams_bidi(100);
+ config.set_initial_max_streams_uni(5);
+ config.verify_peer(false);
+
+ let h3_config = Config::new().unwrap();
+
+ let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
+ assert_eq!(s.pipe.handshake(), Ok(()));
+
+ s.client.control_stream_id = Some(
+ s.client
+ .open_uni_stream(
+ &mut s.pipe.client,
+ stream::HTTP3_CONTROL_STREAM_TYPE_ID,
+ )
+ .unwrap(),
+ );
+
+ let settings = frame::Frame::Settings {
+ max_header_list_size: None,
+ qpack_max_table_capacity: None,
+ qpack_blocked_streams: None,
+ h3_datagram: Some(1),
+ grease: None,
+ };
+
+ s.send_frame_client(settings, s.client.control_stream_id.unwrap(), false)
+ .unwrap();
+
+ assert_eq!(s.pipe.advance(), Ok(()));
+
+ assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
+ }
+
+ #[test]
+ /// Tests that receiving SETTINGS with prohibited values generates an error.
+ fn settings_h2_prohibited() {
+ let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
+ config
+ .load_cert_chain_from_pem_file("examples/cert.crt")
+ .unwrap();
+ config
+ .load_priv_key_from_pem_file("examples/cert.key")
+ .unwrap();
+ config.set_application_protos(b"\x02h3").unwrap();
+ config.set_initial_max_data(70);
+ config.set_initial_max_stream_data_bidi_local(150);
+ config.set_initial_max_stream_data_bidi_remote(150);
+ config.set_initial_max_stream_data_uni(150);
+ config.set_initial_max_streams_bidi(100);
+ config.set_initial_max_streams_uni(5);
+ config.verify_peer(false);
+
+ let h3_config = Config::new().unwrap();
+
+ let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
+ assert_eq!(s.pipe.handshake(), Ok(()));
+
+ s.client.control_stream_id = Some(
+ s.client
+ .open_uni_stream(
+ &mut s.pipe.client,
+ stream::HTTP3_CONTROL_STREAM_TYPE_ID,
+ )
+ .unwrap(),
+ );
+
+ s.server.control_stream_id = Some(
+ s.server
+ .open_uni_stream(
+ &mut s.pipe.server,
+ stream::HTTP3_CONTROL_STREAM_TYPE_ID,
+ )
+ .unwrap(),
+ );
+
+ let frame_payload_len = 2u64;
+ let settings = [
+ frame::SETTINGS_FRAME_TYPE_ID as u8,
+ frame_payload_len as u8,
+ 0x2, // 0x2 is a reserved setting type
+ 1,
+ ];
+
+ s.send_arbitrary_stream_data_client(
+ &settings,
+ s.client.control_stream_id.unwrap(),
+ false,
+ )
+ .unwrap();
+
+ s.send_arbitrary_stream_data_server(
+ &settings,
+ s.server.control_stream_id.unwrap(),
+ false,
+ )
+ .unwrap();
+
+ assert_eq!(s.pipe.advance(), Ok(()));
+
+ assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
+
+ assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::SettingsError));
+ }
+
+ #[test]
+ /// Send a single DATAGRAM.
+ fn single_dgram() {
+ let mut buf = [0; 65535];
+ let mut s = Session::default().unwrap();
+ s.handshake().unwrap();
+
+ // We'll send default data of 10 bytes on flow ID 0.
+ let result = (11, 0, 1);
+
+ s.send_dgram_client(0).unwrap();
+
+ assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ s.send_dgram_server(0).unwrap();
+ assert_eq!(s.poll_client(), Ok((0, Event::Datagram)));
+ assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
+ }
+
+ #[test]
+ /// Send multiple DATAGRAMs.
+ fn multiple_dgram() {
+ let mut buf = [0; 65535];
+ let mut s = Session::default().unwrap();
+ s.handshake().unwrap();
+
+ // We'll send default data of 10 bytes on flow ID 0.
+ let result = (11, 0, 1);
+
+ s.send_dgram_client(0).unwrap();
+ s.send_dgram_client(0).unwrap();
+ s.send_dgram_client(0).unwrap();
+
+ assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ s.send_dgram_server(0).unwrap();
+ s.send_dgram_server(0).unwrap();
+ s.send_dgram_server(0).unwrap();
+
+ assert_eq!(s.poll_client(), Ok((0, Event::Datagram)));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_client(&mut buf), Err(Error::Done));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ }
+
+ #[test]
+ /// Send more DATAGRAMs than the send queue allows.
+ fn multiple_dgram_overflow() {
+ let mut buf = [0; 65535];
+ let mut s = Session::default().unwrap();
+ s.handshake().unwrap();
+
+ // We'll send default data of 10 bytes on flow ID 0.
+ let result = (11, 0, 1);
+
+ // Five DATAGRAMs
+ s.send_dgram_client(0).unwrap();
+ s.send_dgram_client(0).unwrap();
+ s.send_dgram_client(0).unwrap();
+ s.send_dgram_client(0).unwrap();
+ s.send_dgram_client(0).unwrap();
+
+ // Only 3 independent DATAGRAM events will fire.
+ assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ }
+
+ #[test]
+ /// Send a single DATAGRAM and request. Ensure that poll continuously cycles
+ /// between the two types if the data is not read.
+ fn poll_yield_cycling() {
+ let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
+ config
+ .load_cert_chain_from_pem_file("examples/cert.crt")
+ .unwrap();
+ config
+ .load_priv_key_from_pem_file("examples/cert.key")
+ .unwrap();
+ config.set_application_protos(b"\x02h3").unwrap();
+ config.set_initial_max_data(1500);
+ config.set_initial_max_stream_data_bidi_local(150);
+ config.set_initial_max_stream_data_bidi_remote(150);
+ config.set_initial_max_stream_data_uni(150);
+ config.set_initial_max_streams_bidi(100);
+ config.set_initial_max_streams_uni(5);
+ config.verify_peer(false);
+ config.enable_dgram(true, 100, 100);
+
+ let mut h3_config = Config::new().unwrap();
+ let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
+ s.handshake().unwrap();
+
+ // Send request followed by DATAGRAM on client side.
+ let (stream, req) = s.send_request(false).unwrap();
+
+ s.send_body_client(stream, true).unwrap();
+
+ let ev_headers = Event::Headers {
+ list: req,
+ has_body: true,
+ };
+
+ s.send_dgram_client(0).unwrap();
+
+ // Now let's test the poll counts and yielding.
+ assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
+
+ assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
+ assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ }
+
+ #[test]
+ /// Send a single DATAGRAM and request. Ensure that poll
+ /// yield cycles and cleanly exits if data is read.
+ fn poll_yield_single_read() {
+ let mut buf = [0; 65535];
+
+ let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
+ config
+ .load_cert_chain_from_pem_file("examples/cert.crt")
+ .unwrap();
+ config
+ .load_priv_key_from_pem_file("examples/cert.key")
+ .unwrap();
+ config.set_application_protos(b"\x02h3").unwrap();
+ config.set_initial_max_data(1500);
+ config.set_initial_max_stream_data_bidi_local(150);
+ config.set_initial_max_stream_data_bidi_remote(150);
+ config.set_initial_max_stream_data_uni(150);
+ config.set_initial_max_streams_bidi(100);
+ config.set_initial_max_streams_uni(5);
+ config.verify_peer(false);
+ config.enable_dgram(true, 100, 100);
+
+ let mut h3_config = Config::new().unwrap();
+ let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
+ s.handshake().unwrap();
+
+ // We'll send default data of 10 bytes on flow ID 0.
+ let result = (11, 0, 1);
+
+ // Send request followed by DATAGRAM on client side.
+ let (stream, req) = s.send_request(false).unwrap();
+
+ let body = s.send_body_client(stream, true).unwrap();
+
+ let mut recv_buf = vec![0; body.len()];
+
+ let ev_headers = Event::Headers {
+ list: req,
+ has_body: true,
+ };
+
+ s.send_dgram_client(0).unwrap();
+
+ // Now let's test the poll counts and yielding.
+ assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
+
+ assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
+ assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
+
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
+ assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ // Send response followed by DATAGRAM on server side
+ let resp = s.send_response(stream, false).unwrap();
+
+ let body = s.send_body_server(stream, true).unwrap();
+
+ let mut recv_buf = vec![0; body.len()];
+
+ let ev_headers = Event::Headers {
+ list: resp,
+ has_body: true,
+ };
+
+ s.send_dgram_server(0).unwrap();
+
+ // Now let's test the poll counts and yielding.
+ assert_eq!(s.poll_client(), Ok((0, Event::Datagram)));
+
+ assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
+ assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
+
+ assert_eq!(s.poll_client(), Err(Error::Done));
+
+ assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
+
+ assert_eq!(s.poll_client(), Err(Error::Done));
+
+ assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
+
+ assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ }
+
+ #[test]
+ /// Send a multiple DATAGRAMs and requests. Ensure that poll
+ /// yield cycles and cleanly exits if data is read.
+ fn poll_yield_multi_read() {
+ let mut buf = [0; 65535];
+
+ let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
+ config
+ .load_cert_chain_from_pem_file("examples/cert.crt")
+ .unwrap();
+ config
+ .load_priv_key_from_pem_file("examples/cert.key")
+ .unwrap();
+ config.set_application_protos(b"\x02h3").unwrap();
+ config.set_initial_max_data(1500);
+ config.set_initial_max_stream_data_bidi_local(150);
+ config.set_initial_max_stream_data_bidi_remote(150);
+ config.set_initial_max_stream_data_uni(150);
+ config.set_initial_max_streams_bidi(100);
+ config.set_initial_max_streams_uni(5);
+ config.verify_peer(false);
+ config.enable_dgram(true, 100, 100);
+
+ let mut h3_config = Config::new().unwrap();
+ let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
+ s.handshake().unwrap();
+
+ // 10 bytes on flow ID 0 and 2.
+ let flow_0_result = (11, 0, 1);
+ let flow_2_result = (11, 2, 1);
+
+ // Send requests followed by DATAGRAMs on client side.
+ let (stream, req) = s.send_request(false).unwrap();
+
+ let body = s.send_body_client(stream, true).unwrap();
+
+ let mut recv_buf = vec![0; body.len()];
+
+ let ev_headers = Event::Headers {
+ list: req,
+ has_body: true,
+ };
+
+ s.send_dgram_client(0).unwrap();
+ s.send_dgram_client(0).unwrap();
+ s.send_dgram_client(0).unwrap();
+ s.send_dgram_client(0).unwrap();
+ s.send_dgram_client(0).unwrap();
+ s.send_dgram_client(2).unwrap();
+ s.send_dgram_client(2).unwrap();
+ s.send_dgram_client(2).unwrap();
+ s.send_dgram_client(2).unwrap();
+ s.send_dgram_client(2).unwrap();
+
+ // Now let's test the poll counts and yielding.
+ assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
+
+ assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
+ assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ // Second cycle, start to read
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
+ assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
+
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ // Third cycle.
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ // Send response followed by DATAGRAM on server side
+ let resp = s.send_response(stream, false).unwrap();
+
+ let body = s.send_body_server(stream, true).unwrap();
+
+ let mut recv_buf = vec![0; body.len()];
+
+ let ev_headers = Event::Headers {
+ list: resp,
+ has_body: true,
+ };
+
+ s.send_dgram_server(0).unwrap();
+ s.send_dgram_server(0).unwrap();
+ s.send_dgram_server(0).unwrap();
+ s.send_dgram_server(0).unwrap();
+ s.send_dgram_server(0).unwrap();
+ s.send_dgram_server(2).unwrap();
+ s.send_dgram_server(2).unwrap();
+ s.send_dgram_server(2).unwrap();
+ s.send_dgram_server(2).unwrap();
+ s.send_dgram_server(2).unwrap();
+
+ assert_eq!(s.poll_client(), Ok((0, Event::Datagram)));
+
+ assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
+ assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
+
+ assert_eq!(s.poll_client(), Err(Error::Done));
+
+ // Second cycle, start to read
+ assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+
+ assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
+ assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
+
+ assert_eq!(s.poll_client(), Err(Error::Done));
+
+ // Third cycle.
+ assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ }
+
+ #[test]
+ /// Tests that the Finished event is not issued for streams of unknown type
+ /// (e.g. GREASE).
+ fn finished_is_for_requests() {
+ let mut s = Session::default().unwrap();
+ s.handshake().unwrap();
+
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ assert_eq!(s.client.open_grease_stream(&mut s.pipe.client), Ok(()));
+ assert_eq!(s.pipe.advance(), Ok(()));
+
+ assert_eq!(s.poll_client(), Err(Error::Done));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ }
+
+ #[test]
+ /// Tests that streams are marked as finished only once.
+ fn finished_once() {
+ let mut s = Session::default().unwrap();
+ s.handshake().unwrap();
+
+ let (stream, req) = s.send_request(false).unwrap();
+ let body = s.send_body_client(stream, true).unwrap();
+
+ let mut recv_buf = vec![0; body.len()];
+
+ let ev_headers = Event::Headers {
+ list: req,
+ has_body: true,
+ };
+
+ assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
+ assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+
+ assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
+ assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
+
+ assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ }
+
+ #[test]
+ /// Tests that the Data event is properly re-armed.
+ fn data_event_rearm() {
+ let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
+
+ let mut s = Session::default().unwrap();
+ s.handshake().unwrap();
+
+ let (stream, req) = s.send_request(false).unwrap();
+
+ let mut recv_buf = vec![0; bytes.len()];
+
+ let ev_headers = Event::Headers {
+ list: req,
+ has_body: true,
+ };
+
+ // Manually send an incomplete DATA frame (i.e. the frame size is longer
+ // than the actual data sent).
+ {
+ let mut d = [42; 10];
+ let mut b = octets::OctetsMut::with_slice(&mut d);
+
+ b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
+ b.put_varint(bytes.len() as u64).unwrap();
+ let off = b.off();
+ s.pipe.client.stream_send(stream, &d[..off], false).unwrap();
+
+ assert_eq!(
+ s.pipe.client.stream_send(stream, &bytes[..5], false),
+ Ok(5)
+ );
+
+ s.advance().ok();
+ }
+
+ assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
+ assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ // Read the available body data.
+ assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(5));
+
+ // Send the remaining DATA payload.
+ assert_eq!(s.pipe.client.stream_send(stream, &bytes[5..], false), Ok(5));
+ s.advance().ok();
+
+ assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ // Read the rest of the body data.
+ assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(5));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ // Send more data.
+ let body = s.send_body_client(stream, false).unwrap();
+
+ assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
+
+ // Send more data, then HEADERS, then more data.
+ let body = s.send_body_client(stream, false).unwrap();
+
+ let trailers = vec![Header::new(b"hello", b"world")];
+
+ s.client
+ .send_headers(&mut s.pipe.client, stream, &trailers, false)
+ .unwrap();
+
+ let ev_trailers = Event::Headers {
+ list: trailers,
+ has_body: true,
+ };
+
+ s.advance().ok();
+
+ s.send_body_client(stream, false).unwrap();
+
+ assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+ assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
+
+ assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
+
+ assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+ assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
+
+ let (stream, req) = s.send_request(false).unwrap();
+
+ let ev_headers = Event::Headers {
+ list: req,
+ has_body: true,
+ };
+
+ // Manually send an incomplete DATA frame (i.e. only the header is sent).
+ {
+ let mut d = [42; 10];
+ let mut b = octets::OctetsMut::with_slice(&mut d);
+
+ b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
+ b.put_varint(bytes.len() as u64).unwrap();
+ let off = b.off();
+ s.pipe.client.stream_send(stream, &d[..off], false).unwrap();
+
+ s.advance().ok();
+ }
+
+ assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
+ assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
+
+ assert_eq!(s.pipe.client.stream_send(stream, &bytes[..5], false), Ok(5));
+
+ s.advance().ok();
+
+ assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(5));
+
+ assert_eq!(s.pipe.client.stream_send(stream, &bytes[5..], false), Ok(5));
+ s.advance().ok();
+
+ assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(5));
+
+ // Buffer multiple data frames.
+ let body = s.send_body_client(stream, false).unwrap();
+ s.send_body_client(stream, false).unwrap();
+ s.send_body_client(stream, false).unwrap();
+
+ assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ {
+ let mut d = [42; 10];
+ let mut b = octets::OctetsMut::with_slice(&mut d);
+
+ b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
+ b.put_varint(0).unwrap();
+ let off = b.off();
+ s.pipe.client.stream_send(stream, &d[..off], true).unwrap();
+
+ s.advance().ok();
+ }
+
+ let mut recv_buf = vec![0; bytes.len() * 3];
+
+ assert_eq!(
+ s.recv_body_server(stream, &mut recv_buf),
+ Ok(body.len() * 3)
+ );
+ }
+
+ #[test]
+ /// Tests that the Datagram event is properly re-armed.
+ fn dgram_event_rearm() {
+ let mut buf = [0; 65535];
+
+ let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
+ config
+ .load_cert_chain_from_pem_file("examples/cert.crt")
+ .unwrap();
+ config
+ .load_priv_key_from_pem_file("examples/cert.key")
+ .unwrap();
+ config.set_application_protos(b"\x02h3").unwrap();
+ config.set_initial_max_data(1500);
+ config.set_initial_max_stream_data_bidi_local(150);
+ config.set_initial_max_stream_data_bidi_remote(150);
+ config.set_initial_max_stream_data_uni(150);
+ config.set_initial_max_streams_bidi(100);
+ config.set_initial_max_streams_uni(5);
+ config.verify_peer(false);
+ config.enable_dgram(true, 100, 100);
+
+ let mut h3_config = Config::new().unwrap();
+ let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
+ s.handshake().unwrap();
+
+ // 10 bytes on flow ID 0 and 2.
+ let flow_0_result = (11, 0, 1);
+ let flow_2_result = (11, 2, 1);
+
+ // Send requests followed by DATAGRAMs on client side.
+ let (stream, req) = s.send_request(false).unwrap();
+
+ let body = s.send_body_client(stream, true).unwrap();
+
+ let mut recv_buf = vec![0; body.len()];
+
+ let ev_headers = Event::Headers {
+ list: req,
+ has_body: true,
+ };
+
+ s.send_dgram_client(0).unwrap();
+ s.send_dgram_client(0).unwrap();
+ s.send_dgram_client(2).unwrap();
+ s.send_dgram_client(2).unwrap();
+
+ assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
+
+ assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
+ assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
+
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
+
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
+
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
+
+ assert_eq!(s.poll_server(), Err(Error::Done));
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
+
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ s.send_dgram_client(0).unwrap();
+ s.send_dgram_client(2).unwrap();
+
+ assert_eq!(s.poll_server(), Ok((0, Event::Datagram)));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
+ assert_eq!(s.poll_server(), Err(Error::Done));
+
+ assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
+ assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
+ }
}
+#[cfg(feature = "ffi")]
mod ffi;
mod frame;
#[doc(hidden)]