aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Maurer <mmaurer@google.com>2021-10-26 16:01:17 -0700
committerMike Yu <yumike@google.com>2021-11-09 21:01:18 +0800
commitf974124eae008ef23e58d9a287eedac1d12fe905 (patch)
treea9ab833b1064252b592bbe1f6a3c0d559c687505
parented78fdaf9b08551599bc79736729f7cf3fc6eb06 (diff)
downloadDnsResolver-f974124eae008ef23e58d9a287eedac1d12fe905.tar.gz
DoH: Automatically reconnect closed connections
Previously, I assumed a reprobe would be issued after a network died to re-establish it. As this is not the case, this change automatically re-establishes the connection to survive the stress test. Bug: 202081046 Bug: 203314532 Test: resolv_integration_test Test: resolv_stress_test + I682678b84b35c575a3eb88c2c1c67aefd195616c Change-Id: I4c934429f2eb827382eccc10dea82c1cc8a78d4a
-rw-r--r--doh/connection/driver.rs59
-rw-r--r--doh/connection/mod.rs47
-rw-r--r--doh/ffi.rs5
-rw-r--r--doh/network/driver.rs23
4 files changed, 107 insertions, 27 deletions
diff --git a/doh/connection/driver.rs b/doh/connection/driver.rs
index 4fd1e266..34138e19 100644
--- a/doh/connection/driver.rs
+++ b/doh/connection/driver.rs
@@ -17,7 +17,7 @@
use crate::boot_time;
use crate::boot_time::BootTime;
-use log::warn;
+use log::{debug, warn};
use quiche::h3;
use std::collections::HashMap;
use std::default::Default;
@@ -27,7 +27,9 @@ use std::pin::Pin;
use thiserror::Error;
use tokio::net::UdpSocket;
use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, watch};
+
+use super::Status;
#[derive(Error, Debug)]
pub enum Error {
@@ -77,6 +79,7 @@ const MAX_UDP_PACKET_SIZE: usize = 65536;
struct Driver {
request_rx: mpsc::Receiver<Request>,
+ status_tx: watch::Sender<Status>,
quiche_conn: Pin<Box<quiche::Connection>>,
socket: UdpSocket,
// This buffer is large, boxing it will keep it
@@ -111,19 +114,27 @@ async fn optional_timeout(timeout: Option<boot_time::Duration>) {
/// The returned error code will explain why the connection terminated.
pub async fn drive(
request_rx: mpsc::Receiver<Request>,
+ status_tx: watch::Sender<Status>,
quiche_conn: Pin<Box<quiche::Connection>>,
socket: UdpSocket,
) -> Result<()> {
- Driver::new(request_rx, quiche_conn, socket).drive().await
+ Driver::new(request_rx, status_tx, quiche_conn, socket).drive().await
}
impl Driver {
fn new(
request_rx: mpsc::Receiver<Request>,
+ status_tx: watch::Sender<Status>,
quiche_conn: Pin<Box<quiche::Connection>>,
socket: UdpSocket,
) -> Self {
- Self { request_rx, quiche_conn, socket, buffer: Box::new([0; MAX_UDP_PACKET_SIZE]) }
+ Self {
+ request_rx,
+ status_tx,
+ quiche_conn,
+ socket,
+ buffer: Box::new([0; MAX_UDP_PACKET_SIZE]),
+ }
}
async fn drive(mut self) -> Result<()> {
@@ -136,6 +147,8 @@ impl Driver {
fn handle_closed(&self) -> Result<()> {
if self.quiche_conn.is_closed() {
+ // We don't care if the receiver has hung up
+ let _ = self.status_tx.send(Status::Dead);
Err(Error::Closed)
} else {
Ok(())
@@ -146,7 +159,9 @@ impl Driver {
let timer = optional_timeout(self.quiche_conn.timeout());
select! {
// If a quiche timer would fire, call their callback
- _ = timer => self.quiche_conn.on_timeout(),
+ _ = timer => {
+ self.quiche_conn.on_timeout()
+ }
// If we got packets from our peer, pass them to quiche
Ok((size, from)) = self.socket.recv_from(self.buffer.as_mut()) => {
self.quiche_conn.recv(&mut self.buffer[..size], quiche::RecvInfo { from })?;
@@ -159,7 +174,8 @@ impl Driver {
if self.quiche_conn.is_established() {
let h3_config = h3::Config::new()?;
let h3_conn = h3::Connection::with_transport(&mut self.quiche_conn, &h3_config)?;
- return H3Driver::new(self, h3_conn).drive().await;
+ self = H3Driver::new(self, h3_conn).drive().await?;
+ let _ = self.status_tx.send(Status::QUIC);
}
// If the connection has closed, tear down
@@ -195,8 +211,15 @@ impl H3Driver {
}
async fn drive(mut self) -> Result<Driver> {
+ let _ = self.driver.status_tx.send(Status::H3);
loop {
- self.drive_once().await?;
+ match self.drive_once().await {
+ Err(e) => {
+ let _ = self.driver.status_tx.send(Status::Dead);
+ return Err(e);
+ }
+ Ok(()) => (),
+ }
}
}
@@ -212,16 +235,19 @@ impl H3Driver {
select! {
// Only attempt to enqueue new requests if we have no buffered request and aren't
// closing
- msg = self.driver.request_rx.recv(), if !self.closing && self.buffered_request.is_none() => match msg {
- Some(request) => self.handle_request(request)?,
- None => self.shutdown(true, b"DONE").await?,
+ msg = self.driver.request_rx.recv(), if !self.closing && self.buffered_request.is_none() => {
+ match msg {
+ Some(request) => self.handle_request(request)?,
+ None => self.shutdown(true, b"DONE").await?,
+ }
},
// If a quiche timer would fire, call their callback
- _ = timer => self.driver.quiche_conn.on_timeout(),
- // If we got packets from our peer, pass them to quiche
- Ok((size, from)) = self.driver.socket.recv_from(self.driver.buffer.as_mut()) => {
- self.driver.quiche_conn.recv(&mut self.driver.buffer[..size], quiche::RecvInfo { from })?;
+ _ = timer => {
+ self.driver.quiche_conn.on_timeout()
}
+ // If we got packets from our peer, pass them to quiche
+ Ok((size, from)) = self.driver.socket.recv_from(self.driver.buffer.as_mut()) =>
+ self.driver.quiche_conn.recv(&mut self.driver.buffer[..size], quiche::RecvInfo { from }).map(|_| ())?,
};
// Any of the actions in the select could require us to send packets to the peer
@@ -250,6 +276,7 @@ impl H3Driver {
// buffered_request, or when buffered_request is empty. This assert just
// validates that we don't break that assumption later, as it could result in
// requests being dropped on the floor under high load.
+ debug!("Stream has become blocked, buffering one request.");
assert!(self.buffered_request.is_none());
self.buffered_request = Some(request);
return Ok(())
@@ -323,9 +350,7 @@ impl H3Driver {
self.respond(stream_id);
}
}
- h3::Event::Data => {
- self.recv_body(stream_id).await?;
- }
+ h3::Event::Data => self.recv_body(stream_id).await?,
h3::Event::Finished => self.respond(stream_id),
// This clause is for quiche 0.10.x, we're still on 0.9.x
//h3::Event::Reset(e) => {
diff --git a/doh/connection/mod.rs b/doh/connection/mod.rs
index bc5b75c5..f6691010 100644
--- a/doh/connection/mod.rs
+++ b/doh/connection/mod.rs
@@ -24,8 +24,7 @@ use std::io;
use std::net::SocketAddr;
use thiserror::Error;
use tokio::net::UdpSocket;
-use tokio::sync::mpsc;
-use tokio::sync::oneshot;
+use tokio::sync::{mpsc, oneshot, watch};
use tokio::task;
mod driver;
@@ -33,9 +32,17 @@ mod driver;
pub use driver::Stream;
use driver::{drive, Request};
+#[derive(Debug, Copy, Clone)]
+pub enum Status {
+ QUIC,
+ H3,
+ Dead,
+}
+
/// Quiche HTTP/3 connection
pub struct Connection {
request_tx: mpsc::Sender<Request>,
+ status_rx: watch::Receiver<Status>,
}
fn new_scid() -> [u8; quiche::MAX_CONN_ID_LEN] {
@@ -126,13 +133,45 @@ impl Connection {
config: &mut quiche::Config,
) -> Result<Self> {
let (request_tx, request_rx) = mpsc::channel(Self::MAX_PENDING_REQUESTS);
+ let (status_tx, status_rx) = watch::channel(Status::QUIC);
let scid = new_scid();
let quiche_conn =
quiche::connect(server_name, &quiche::ConnectionId::from_ref(&scid), to, config)?;
let socket = build_socket(to, socket_mark, tag_socket).await?;
- let driver = drive(request_rx, quiche_conn, socket);
+ let driver = async {
+ let result = drive(request_rx, status_tx, quiche_conn, socket).await;
+ if let Err(ref e) = result {
+ error!("Connection driver failed: {:?}", e);
+ }
+ result
+ };
task::spawn(driver);
- Ok(Self { request_tx })
+ Ok(Self { request_tx, status_rx })
+ }
+
+ /// Waits until we're either fully alive or dead
+ pub async fn wait_for_live(&mut self) -> bool {
+ // Once sc-mainline-prod updates to modern tokio, use
+ // borrow_and_update here.
+ match *self.status_rx.borrow() {
+ Status::H3 => return true,
+ Status::Dead => return false,
+ Status::QUIC => (),
+ }
+ if self.status_rx.changed().await.is_err() {
+ // status_tx is gone, we're dead
+ return false;
+ }
+ if matches!(*self.status_rx.borrow(), Status::H3) {
+ return true;
+ }
+ // Since we're stuck on legacy tokio due to mainline, we need to try one more time in case there was an outstanding change notification. Using borrow_and_update avoids this.
+ match self.status_rx.changed().await {
+ // status_tx is gone, we're dead
+ Err(_) => false,
+ // If there's an HTTP/3 connection now we're alive, otherwise we're stuck/dead
+ _ => matches!(*self.status_rx.borrow(), Status::H3),
+ }
}
/// Send a query, produce a future which will provide a response.
diff --git a/doh/ffi.rs b/doh/ffi.rs
index 2df5be68..e3b38fca 100644
--- a/doh/ffi.rs
+++ b/doh/ffi.rs
@@ -286,7 +286,10 @@ pub unsafe extern "C" fn doh_query(
response.copy_from_slice(&answer);
answer.len() as ssize_t
}
- _ => DOH_RESULT_CAN_NOT_SEND,
+ rsp => {
+ error!("Non-successful response: {:?}", rsp);
+ DOH_RESULT_CAN_NOT_SEND
+ }
},
Err(e) => {
error!("no result {}", e);
diff --git a/doh/network/driver.rs b/doh/network/driver.rs
index 6c17f35e..81c3c0c3 100644
--- a/doh/network/driver.rs
+++ b/doh/network/driver.rs
@@ -28,6 +28,8 @@ use tokio::task;
use super::{Query, ServerInfo, SocketTagger, ValidationReporter};
+use log::debug;
+
pub struct Driver {
info: ServerInfo,
config: Config,
@@ -107,11 +109,15 @@ impl Driver {
pub async fn drive(mut self) -> Result<()> {
while let Some(cmd) = self.command_rx.recv().await {
- if let Err(e) = match cmd {
- Command::Probe(duration) => self.probe(duration).await,
- Command::Query(query) => self.send_query(query).await,
- } {
- self.status_tx.send(Status::Failed(Arc::new(e)))?
+ match cmd {
+ Command::Probe(duration) => match self.probe(duration).await {
+ Err(e) => self.status_tx.send(Status::Failed(Arc::new(e)))?,
+ Ok(()) => (),
+ },
+ Command::Query(query) => match self.send_query(query).await {
+ Err(e) => debug!("Unable to send query: {:?}", e),
+ Ok(()) => (),
+ },
};
}
Ok(())
@@ -119,6 +125,7 @@ impl Driver {
async fn probe(&mut self, probe_timeout: Duration) -> Result<()> {
if self.status_tx.borrow().is_failed() {
+ debug!("Network is currently failed, reconnecting");
// If our network is currently failed, it may be due to issues with the connection.
// Re-establish before re-probing
self.connection =
@@ -166,12 +173,18 @@ impl Driver {
}
async fn send_query(&mut self, query: Query) -> Result<()> {
+ if !self.connection.wait_for_live().await {
+ // Try reconnecting
+ self.connection =
+ build_connection(&self.info, &self.tag_socket, &mut self.config).await?;
+ }
let request = encoding::dns_request(&query.query, &self.info.url)?;
let stream_fut = self.connection.query(request, Some(query.expiry)).await?;
task::spawn(async move {
let stream = match stream_fut.await {
Some(stream) => stream,
None => {
+ debug!("Connection died while processing request");
// We don't care if the response is gone
let _ =
query.response.send(Response::Error { error: QueryError::ConnectionError });