From ed78fdaf9b08551599bc79736729f7cf3fc6eb06 Mon Sep 17 00:00:00 2001 From: Matthew Maurer Date: Mon, 25 Oct 2021 13:04:44 -0700 Subject: DoH: Modularize main event loop * Connection now provides HTTP/3. * Network has the logic for resolving DNS and maintaining a Connection. * Dispatcher routes requests to the appropriate Network or creates one if needed. * IO and maintenance is performed via tasks rather than manually pushing the futures in the main event loop. Bug: 202081046 Test: resolv_integration_test Test: resolv_stress_test + I682678b84b35c575a3eb88c2c1c67aefd195616c Change-Id: I4296d0c7a7852951f41418b18686794d8df781bd --- doh/network/driver.rs | 190 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 doh/network/driver.rs (limited to 'doh/network/driver.rs') diff --git a/doh/network/driver.rs b/doh/network/driver.rs new file mode 100644 index 00000000..6c17f35e --- /dev/null +++ b/doh/network/driver.rs @@ -0,0 +1,190 @@ +/* + * Copyright (C) 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! Provides a backing task to implement a network + +use crate::boot_time::{timeout, BootTime, Duration}; +use crate::config::Config; +use crate::connection::Connection; +use crate::dispatcher::{QueryError, Response}; +use crate::encoding; +use anyhow::{anyhow, Result}; +use std::sync::Arc; +use tokio::sync::{mpsc, watch}; +use tokio::task; + +use super::{Query, ServerInfo, SocketTagger, ValidationReporter}; + +pub struct Driver { + info: ServerInfo, + config: Config, + connection: Connection, + command_rx: mpsc::Receiver, + status_tx: watch::Sender, + validation: ValidationReporter, + tag_socket: SocketTagger, +} + +#[derive(Debug)] +/// Requests the network can handle +pub enum Command { + /// Send a DNS query to the network + Query(Query), + /// Run a probe to check the health of the network. Argument is timeout. + Probe(Duration), +} + +#[derive(Clone, Debug)] +/// Current Network Status +/// +/// (Unprobed or Failed) can go to (Live or Failed) via Probe. +/// Currently, there is no way to go from Live to Failed - probing a live network will short-circuit to returning valid, and query failures do not declare the network failed. +pub enum Status { + /// Network has not been probed, it may or may not work + Unprobed, + /// Network is believed to be working + Live, + /// Network is broken, reason as argument + Failed(Arc), +} + +impl Status { + pub fn is_live(&self) -> bool { + matches!(self, Self::Live) + } + pub fn is_failed(&self) -> bool { + matches!(self, Self::Failed(_)) + } +} + +async fn build_connection( + info: &ServerInfo, + tag_socket: &SocketTagger, + config: &mut Config, +) -> Result { + use std::ops::DerefMut; + Ok(Connection::new( + info.domain.as_deref(), + info.peer_addr, + info.sk_mark, + tag_socket, + config.take().await.deref_mut(), + ) + .await?) +} + +impl Driver { + const MAX_BUFFERED_COMMANDS: usize = 10; + + pub async fn new( + info: ServerInfo, + mut config: Config, + validation: ValidationReporter, + tag_socket: SocketTagger, + ) -> Result<(Self, mpsc::Sender, watch::Receiver)> { + let (command_tx, command_rx) = mpsc::channel(Self::MAX_BUFFERED_COMMANDS); + let (status_tx, status_rx) = watch::channel(Status::Unprobed); + let connection = build_connection(&info, &tag_socket, &mut config).await?; + Ok(( + Self { info, config, connection, status_tx, command_rx, validation, tag_socket }, + command_tx, + status_rx, + )) + } + + pub async fn drive(mut self) -> Result<()> { + while let Some(cmd) = self.command_rx.recv().await { + if let Err(e) = match cmd { + Command::Probe(duration) => self.probe(duration).await, + Command::Query(query) => self.send_query(query).await, + } { + self.status_tx.send(Status::Failed(Arc::new(e)))? + }; + } + Ok(()) + } + + async fn probe(&mut self, probe_timeout: Duration) -> Result<()> { + if self.status_tx.borrow().is_failed() { + // If our network is currently failed, it may be due to issues with the connection. + // Re-establish before re-probing + self.connection = + build_connection(&self.info, &self.tag_socket, &mut self.config).await?; + self.status_tx.send(Status::Unprobed)?; + } + if self.status_tx.borrow().is_live() { + // If we're already validated, short circuit + (self.validation)(&self.info, true).await; + return Ok(()); + } + self.force_probe(probe_timeout).await + } + + async fn force_probe(&mut self, probe_timeout: Duration) -> Result<()> { + let probe = encoding::probe_query()?; + let dns_request = encoding::dns_request(&probe, &self.info.url)?; + let expiry = BootTime::now().checked_add(probe_timeout); + let request = async { + match self.connection.query(dns_request, expiry).await { + Err(e) => self.status_tx.send(Status::Failed(Arc::new(anyhow!(e)))), + Ok(rsp) => { + if let Some(_stream) = rsp.await { + // TODO verify stream contents + self.status_tx.send(Status::Live) + } else { + self.status_tx.send(Status::Failed(Arc::new(anyhow!("Empty response")))) + } + } + } + }; + match timeout(probe_timeout, request).await { + // Timed out + Err(time) => self.status_tx.send(Status::Failed(Arc::new(anyhow!( + "Probe timed out after {:?} (timeout={:?})", + time, + probe_timeout + )))), + // Query completed + Ok(r) => r, + }?; + let valid = self.status_tx.borrow().is_live(); + (self.validation)(&self.info, valid).await; + Ok(()) + } + + async fn send_query(&mut self, query: Query) -> Result<()> { + let request = encoding::dns_request(&query.query, &self.info.url)?; + let stream_fut = self.connection.query(request, Some(query.expiry)).await?; + task::spawn(async move { + let stream = match stream_fut.await { + Some(stream) => stream, + None => { + // We don't care if the response is gone + let _ = + query.response.send(Response::Error { error: QueryError::ConnectionError }); + return; + } + }; + // We don't care if the response is gone. + let _ = if let Some(err) = stream.error { + query.response.send(Response::Error { error: QueryError::Reset(err) }) + } else { + query.response.send(Response::Success { answer: stream.data }) + }; + }); + Ok(()) + } +} -- cgit v1.2.3