diff options
author | Matthew Maurer <mmaurer@google.com> | 2021-10-26 16:01:17 -0700 |
---|---|---|
committer | Mike Yu <yumike@google.com> | 2021-11-09 21:01:18 +0800 |
commit | f974124eae008ef23e58d9a287eedac1d12fe905 (patch) | |
tree | a9ab833b1064252b592bbe1f6a3c0d559c687505 | |
parent | ed78fdaf9b08551599bc79736729f7cf3fc6eb06 (diff) | |
download | DnsResolver-f974124eae008ef23e58d9a287eedac1d12fe905.tar.gz |
DoH: Automatically reconnect closed connections
Previously, I assumed a reprobe would be issued after a network died to
re-establish it. As this is not the case, this change automatically
re-establishes the connection to survive the stress test.
Bug: 202081046
Bug: 203314532
Test: resolv_integration_test
Test: resolv_stress_test + I682678b84b35c575a3eb88c2c1c67aefd195616c
Change-Id: I4c934429f2eb827382eccc10dea82c1cc8a78d4a
-rw-r--r-- | doh/connection/driver.rs | 59 | ||||
-rw-r--r-- | doh/connection/mod.rs | 47 | ||||
-rw-r--r-- | doh/ffi.rs | 5 | ||||
-rw-r--r-- | doh/network/driver.rs | 23 |
4 files changed, 107 insertions, 27 deletions
diff --git a/doh/connection/driver.rs b/doh/connection/driver.rs index 4fd1e266..34138e19 100644 --- a/doh/connection/driver.rs +++ b/doh/connection/driver.rs @@ -17,7 +17,7 @@ use crate::boot_time; use crate::boot_time::BootTime; -use log::warn; +use log::{debug, warn}; use quiche::h3; use std::collections::HashMap; use std::default::Default; @@ -27,7 +27,9 @@ use std::pin::Pin; use thiserror::Error; use tokio::net::UdpSocket; use tokio::select; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, watch}; + +use super::Status; #[derive(Error, Debug)] pub enum Error { @@ -77,6 +79,7 @@ const MAX_UDP_PACKET_SIZE: usize = 65536; struct Driver { request_rx: mpsc::Receiver<Request>, + status_tx: watch::Sender<Status>, quiche_conn: Pin<Box<quiche::Connection>>, socket: UdpSocket, // This buffer is large, boxing it will keep it @@ -111,19 +114,27 @@ async fn optional_timeout(timeout: Option<boot_time::Duration>) { /// The returned error code will explain why the connection terminated. pub async fn drive( request_rx: mpsc::Receiver<Request>, + status_tx: watch::Sender<Status>, quiche_conn: Pin<Box<quiche::Connection>>, socket: UdpSocket, ) -> Result<()> { - Driver::new(request_rx, quiche_conn, socket).drive().await + Driver::new(request_rx, status_tx, quiche_conn, socket).drive().await } impl Driver { fn new( request_rx: mpsc::Receiver<Request>, + status_tx: watch::Sender<Status>, quiche_conn: Pin<Box<quiche::Connection>>, socket: UdpSocket, ) -> Self { - Self { request_rx, quiche_conn, socket, buffer: Box::new([0; MAX_UDP_PACKET_SIZE]) } + Self { + request_rx, + status_tx, + quiche_conn, + socket, + buffer: Box::new([0; MAX_UDP_PACKET_SIZE]), + } } async fn drive(mut self) -> Result<()> { @@ -136,6 +147,8 @@ impl Driver { fn handle_closed(&self) -> Result<()> { if self.quiche_conn.is_closed() { + // We don't care if the receiver has hung up + let _ = self.status_tx.send(Status::Dead); Err(Error::Closed) } else { Ok(()) @@ -146,7 +159,9 @@ impl Driver { let timer = optional_timeout(self.quiche_conn.timeout()); select! { // If a quiche timer would fire, call their callback - _ = timer => self.quiche_conn.on_timeout(), + _ = timer => { + self.quiche_conn.on_timeout() + } // If we got packets from our peer, pass them to quiche Ok((size, from)) = self.socket.recv_from(self.buffer.as_mut()) => { self.quiche_conn.recv(&mut self.buffer[..size], quiche::RecvInfo { from })?; @@ -159,7 +174,8 @@ impl Driver { if self.quiche_conn.is_established() { let h3_config = h3::Config::new()?; let h3_conn = h3::Connection::with_transport(&mut self.quiche_conn, &h3_config)?; - return H3Driver::new(self, h3_conn).drive().await; + self = H3Driver::new(self, h3_conn).drive().await?; + let _ = self.status_tx.send(Status::QUIC); } // If the connection has closed, tear down @@ -195,8 +211,15 @@ impl H3Driver { } async fn drive(mut self) -> Result<Driver> { + let _ = self.driver.status_tx.send(Status::H3); loop { - self.drive_once().await?; + match self.drive_once().await { + Err(e) => { + let _ = self.driver.status_tx.send(Status::Dead); + return Err(e); + } + Ok(()) => (), + } } } @@ -212,16 +235,19 @@ impl H3Driver { select! { // Only attempt to enqueue new requests if we have no buffered request and aren't // closing - msg = self.driver.request_rx.recv(), if !self.closing && self.buffered_request.is_none() => match msg { - Some(request) => self.handle_request(request)?, - None => self.shutdown(true, b"DONE").await?, + msg = self.driver.request_rx.recv(), if !self.closing && self.buffered_request.is_none() => { + match msg { + Some(request) => self.handle_request(request)?, + None => self.shutdown(true, b"DONE").await?, + } }, // If a quiche timer would fire, call their callback - _ = timer => self.driver.quiche_conn.on_timeout(), - // If we got packets from our peer, pass them to quiche - Ok((size, from)) = self.driver.socket.recv_from(self.driver.buffer.as_mut()) => { - self.driver.quiche_conn.recv(&mut self.driver.buffer[..size], quiche::RecvInfo { from })?; + _ = timer => { + self.driver.quiche_conn.on_timeout() } + // If we got packets from our peer, pass them to quiche + Ok((size, from)) = self.driver.socket.recv_from(self.driver.buffer.as_mut()) => + self.driver.quiche_conn.recv(&mut self.driver.buffer[..size], quiche::RecvInfo { from }).map(|_| ())?, }; // Any of the actions in the select could require us to send packets to the peer @@ -250,6 +276,7 @@ impl H3Driver { // buffered_request, or when buffered_request is empty. This assert just // validates that we don't break that assumption later, as it could result in // requests being dropped on the floor under high load. + debug!("Stream has become blocked, buffering one request."); assert!(self.buffered_request.is_none()); self.buffered_request = Some(request); return Ok(()) @@ -323,9 +350,7 @@ impl H3Driver { self.respond(stream_id); } } - h3::Event::Data => { - self.recv_body(stream_id).await?; - } + h3::Event::Data => self.recv_body(stream_id).await?, h3::Event::Finished => self.respond(stream_id), // This clause is for quiche 0.10.x, we're still on 0.9.x //h3::Event::Reset(e) => { diff --git a/doh/connection/mod.rs b/doh/connection/mod.rs index bc5b75c5..f6691010 100644 --- a/doh/connection/mod.rs +++ b/doh/connection/mod.rs @@ -24,8 +24,7 @@ use std::io; use std::net::SocketAddr; use thiserror::Error; use tokio::net::UdpSocket; -use tokio::sync::mpsc; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot, watch}; use tokio::task; mod driver; @@ -33,9 +32,17 @@ mod driver; pub use driver::Stream; use driver::{drive, Request}; +#[derive(Debug, Copy, Clone)] +pub enum Status { + QUIC, + H3, + Dead, +} + /// Quiche HTTP/3 connection pub struct Connection { request_tx: mpsc::Sender<Request>, + status_rx: watch::Receiver<Status>, } fn new_scid() -> [u8; quiche::MAX_CONN_ID_LEN] { @@ -126,13 +133,45 @@ impl Connection { config: &mut quiche::Config, ) -> Result<Self> { let (request_tx, request_rx) = mpsc::channel(Self::MAX_PENDING_REQUESTS); + let (status_tx, status_rx) = watch::channel(Status::QUIC); let scid = new_scid(); let quiche_conn = quiche::connect(server_name, &quiche::ConnectionId::from_ref(&scid), to, config)?; let socket = build_socket(to, socket_mark, tag_socket).await?; - let driver = drive(request_rx, quiche_conn, socket); + let driver = async { + let result = drive(request_rx, status_tx, quiche_conn, socket).await; + if let Err(ref e) = result { + error!("Connection driver failed: {:?}", e); + } + result + }; task::spawn(driver); - Ok(Self { request_tx }) + Ok(Self { request_tx, status_rx }) + } + + /// Waits until we're either fully alive or dead + pub async fn wait_for_live(&mut self) -> bool { + // Once sc-mainline-prod updates to modern tokio, use + // borrow_and_update here. + match *self.status_rx.borrow() { + Status::H3 => return true, + Status::Dead => return false, + Status::QUIC => (), + } + if self.status_rx.changed().await.is_err() { + // status_tx is gone, we're dead + return false; + } + if matches!(*self.status_rx.borrow(), Status::H3) { + return true; + } + // Since we're stuck on legacy tokio due to mainline, we need to try one more time in case there was an outstanding change notification. Using borrow_and_update avoids this. + match self.status_rx.changed().await { + // status_tx is gone, we're dead + Err(_) => false, + // If there's an HTTP/3 connection now we're alive, otherwise we're stuck/dead + _ => matches!(*self.status_rx.borrow(), Status::H3), + } } /// Send a query, produce a future which will provide a response. @@ -286,7 +286,10 @@ pub unsafe extern "C" fn doh_query( response.copy_from_slice(&answer); answer.len() as ssize_t } - _ => DOH_RESULT_CAN_NOT_SEND, + rsp => { + error!("Non-successful response: {:?}", rsp); + DOH_RESULT_CAN_NOT_SEND + } }, Err(e) => { error!("no result {}", e); diff --git a/doh/network/driver.rs b/doh/network/driver.rs index 6c17f35e..81c3c0c3 100644 --- a/doh/network/driver.rs +++ b/doh/network/driver.rs @@ -28,6 +28,8 @@ use tokio::task; use super::{Query, ServerInfo, SocketTagger, ValidationReporter}; +use log::debug; + pub struct Driver { info: ServerInfo, config: Config, @@ -107,11 +109,15 @@ impl Driver { pub async fn drive(mut self) -> Result<()> { while let Some(cmd) = self.command_rx.recv().await { - if let Err(e) = match cmd { - Command::Probe(duration) => self.probe(duration).await, - Command::Query(query) => self.send_query(query).await, - } { - self.status_tx.send(Status::Failed(Arc::new(e)))? + match cmd { + Command::Probe(duration) => match self.probe(duration).await { + Err(e) => self.status_tx.send(Status::Failed(Arc::new(e)))?, + Ok(()) => (), + }, + Command::Query(query) => match self.send_query(query).await { + Err(e) => debug!("Unable to send query: {:?}", e), + Ok(()) => (), + }, }; } Ok(()) @@ -119,6 +125,7 @@ impl Driver { async fn probe(&mut self, probe_timeout: Duration) -> Result<()> { if self.status_tx.borrow().is_failed() { + debug!("Network is currently failed, reconnecting"); // If our network is currently failed, it may be due to issues with the connection. // Re-establish before re-probing self.connection = @@ -166,12 +173,18 @@ impl Driver { } async fn send_query(&mut self, query: Query) -> Result<()> { + if !self.connection.wait_for_live().await { + // Try reconnecting + self.connection = + build_connection(&self.info, &self.tag_socket, &mut self.config).await?; + } let request = encoding::dns_request(&query.query, &self.info.url)?; let stream_fut = self.connection.query(request, Some(query.expiry)).await?; task::spawn(async move { let stream = match stream_fut.await { Some(stream) => stream, None => { + debug!("Connection died while processing request"); // We don't care if the response is gone let _ = query.response.send(Response::Error { error: QueryError::ConnectionError }); |