diff options
-rw-r--r-- | doh/config.rs | 239 | ||||
-rw-r--r-- | doh/doh.rs | 131 |
2 files changed, 268 insertions, 102 deletions
diff --git a/doh/config.rs b/doh/config.rs new file mode 100644 index 00000000..04d07c50 --- /dev/null +++ b/doh/config.rs @@ -0,0 +1,239 @@ +/* + * Copyright (C) 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! Quiche Config support +//! +//! Quiche config objects are needed mutably for constructing a Quiche +//! connection object, but not when they are actually being used. As these +//! objects include a `SSL_CTX` which can be somewhat expensive and large when +//! using a certificate path, it can be beneficial to cache them. +//! +//! This module provides a caching layer for loading and constructing +//! these configurations. + +use quiche::{h3, Result}; +use std::collections::HashMap; +use std::ops::DerefMut; +use std::sync::{Arc, Mutex, RwLock, Weak}; + +type WeakConfig = Weak<Mutex<quiche::Config>>; + +/// A cheaply clonable `quiche::Config` +#[derive(Clone)] +pub struct Config(Arc<Mutex<quiche::Config>>); + +const MAX_INCOMING_BUFFER_SIZE_WHOLE: u64 = 10000000; +const MAX_INCOMING_BUFFER_SIZE_EACH: u64 = 1000000; +const MAX_CONCURRENT_STREAM_SIZE: u64 = 100; +/// Maximum datagram size we will accept. +pub const MAX_DATAGRAM_SIZE: usize = 1350; +/// How long with no packets before we assume a connection is dead, in milliseconds. +pub const QUICHE_IDLE_TIMEOUT_MS: u64 = 180000; + +impl Config { + fn from_weak(weak: &WeakConfig) -> Option<Self> { + weak.upgrade().map(Self) + } + + fn to_weak(&self) -> WeakConfig { + Arc::downgrade(&self.0) + } + + /// Construct a `Config` object from certificate path. If no path + /// is provided, peers will not be verified. + pub fn from_cert_path(cert_path: Option<&str>) -> Result<Self> { + let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?; + config.set_application_protos(h3::APPLICATION_PROTOCOL)?; + match cert_path { + Some(path) => { + config.verify_peer(true); + config.load_verify_locations_from_directory(path)?; + } + None => config.verify_peer(false), + } + + // Some of these configs are necessary, or the server can't respond the HTTP/3 request. + config.set_max_idle_timeout(QUICHE_IDLE_TIMEOUT_MS); + config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE); + config.set_initial_max_data(MAX_INCOMING_BUFFER_SIZE_WHOLE); + config.set_initial_max_stream_data_bidi_local(MAX_INCOMING_BUFFER_SIZE_EACH); + config.set_initial_max_stream_data_bidi_remote(MAX_INCOMING_BUFFER_SIZE_EACH); + config.set_initial_max_stream_data_uni(MAX_INCOMING_BUFFER_SIZE_EACH); + config.set_initial_max_streams_bidi(MAX_CONCURRENT_STREAM_SIZE); + config.set_initial_max_streams_uni(MAX_CONCURRENT_STREAM_SIZE); + config.set_disable_active_migration(true); + Ok(Self(Arc::new(Mutex::new(config)))) + } + + /// Take the underlying config, usable as `&mut quiche::Config` for use + /// with `quiche::connect`. + pub fn take(&mut self) -> impl DerefMut<Target = quiche::Config> + '_ { + self.0.lock().unwrap() + } +} + +#[derive(Clone, Default)] +struct State { + // Mapping from cert_path to configs + path_to_config: HashMap<Option<String>, WeakConfig>, + // Keep latest config alive to minimize reparsing when flapping + // If more keep-alive is needed, replace with a LRU LinkedList + latest: Option<Config>, +} + +impl State { + fn get_config(&self, cert_path: &Option<String>) -> Option<Config> { + self.path_to_config.get(cert_path).and_then(Config::from_weak) + } + + fn keep_alive(&mut self, config: Config) { + self.latest = Some(config); + } + + fn garbage_collect(&mut self) { + self.path_to_config.retain(|_, config| config.strong_count() != 0) + } +} + +/// Cache of Quiche Config objects +/// +/// Cloning this cache will create another handle to the same cache. +/// +/// Loading a config object through this caching layer will only keep the +/// latest config loaded alive directly, but will still act as a cache +/// for any configurations still in use - if the returned `Config` is still +/// live, queries to `Cache` will not reconstruct it. +#[derive(Clone, Default)] +pub struct Cache { + // Shared state amongst cache handles + state: Arc<RwLock<State>>, +} + +impl Cache { + /// Creates a fresh empty cache + pub fn new() -> Self { + Default::default() + } + + /// Behaves as `Config::from_cert_path`, but with a cache. + /// If any object previously given out by this cache is still live, + /// a duplicate will not be made. + pub fn from_cert_path(&self, cert_path: &Option<String>) -> Result<Config> { + // Fast path - read-only access to state retrieves config + if let Some(config) = self.state.read().unwrap().get_config(cert_path) { + return Ok(config); + } + + // Unlocked, calculate config. If we have two racing attempts to load + // the cert path, we'll arbitrate that in the next step, but this + // makes sure loading a new cert path doesn't block other loads to + // refresh connections. + let config = Config::from_cert_path(cert_path.as_deref())?; + + let mut state = self.state.write().unwrap(); + // We now have exclusive access to the state. + // If someone else calculated a config at the same time as us, we + // want to discard ours and use theirs, since it will result in + // less total memory used. + if let Some(config) = state.get_config(cert_path) { + return Ok(config); + } + + // We have exclusive access and a fresh config. Install it into + // the cache. + state.keep_alive(config.clone()); + state.path_to_config.insert(cert_path.to_owned(), config.to_weak()); + Ok(config) + } + + /// Purges any config paths which no longer point to a config entry. + pub fn garbage_collect(&self) { + self.state.write().unwrap().garbage_collect(); + } +} + +#[test] +fn create_quiche_config() { + assert!(Config::from_cert_path(None).is_ok(), "quiche config without cert creating failed"); + assert!( + Config::from_cert_path(Some("data/local/tmp/")).is_ok(), + "quiche config with cert creating failed" + ); +} + +#[test] +fn shared_cache() { + let cache_a = Cache::new(); + let cache_b = cache_a.clone(); + let config_a = cache_a.from_cert_path(&None).unwrap(); + assert_eq!(Arc::strong_count(&config_a.0), 2); + let _config_b = cache_b.from_cert_path(&None).unwrap(); + assert_eq!(Arc::strong_count(&config_a.0), 3); +} + +#[test] +fn lifetimes() { + let cache = Cache::new(); + let config_none = cache.from_cert_path(&None).unwrap(); + let config_a = cache.from_cert_path(&Some("a".to_string())).unwrap(); + let config_b = cache.from_cert_path(&Some("b".to_string())).unwrap(); + // The first two we created should have a strong count of one - those handles are the only + // thing keeping them alive. + assert_eq!(Arc::strong_count(&config_none.0), 1); + assert_eq!(Arc::strong_count(&config_a.0), 1); + + // If we try to get another handle we already have, it should be the same one. + let _config_a2 = cache.from_cert_path(&Some("a".to_string())).unwrap(); + assert_eq!(Arc::strong_count(&config_a.0), 2); + + // config_b was most recently created, so it should have a keep-alive + // inside the cache. + assert_eq!(Arc::strong_count(&config_b.0), 2); + + // If we weaken one of the first handles, then drop it, the weak handle should break + let config_none_weak = Config::to_weak(&config_none); + assert_eq!(config_none_weak.strong_count(), 1); + drop(config_none); + assert_eq!(config_none_weak.strong_count(), 0); + assert!(Config::from_weak(&config_none_weak).is_none()); + + // If we weaken the most *recent* handle, it should keep working + let config_b_weak = Config::to_weak(&config_b); + assert_eq!(config_b_weak.strong_count(), 2); + drop(config_b); + assert_eq!(config_b_weak.strong_count(), 1); + assert!(Config::from_weak(&config_b_weak).is_some()); + assert_eq!(config_b_weak.strong_count(), 1); + + // If we try to get a config which is still kept alive by the cache, we should get the same + // one. + let _config_b2 = cache.from_cert_path(&Some("b".to_string())).unwrap(); + assert_eq!(config_b_weak.strong_count(), 2); + + // We broke None, but "a" and "b" should still both be alive. Check that + // this is still the case in the mapping after garbage collection. + cache.garbage_collect(); + assert_eq!(cache.state.read().unwrap().path_to_config.len(), 2); +} + +#[test] +fn quiche_connect() { + use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; + let mut config = Config::from_cert_path(None).unwrap(); + let socket_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 42)); + let conn_id = quiche::ConnectionId::from_ref(&[]); + quiche::connect(None, &conn_id, socket_addr, &mut config.take()).unwrap(); +} @@ -27,10 +27,8 @@ use ring::rand::SecureRandom; use std::collections::HashMap; use std::ffi::CString; use std::net::SocketAddr; -use std::ops::Deref; use std::os::unix::io::{AsRawFd, RawFd}; use std::pin::Pin; -use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; use tokio::runtime::{Builder, Runtime}; use tokio::sync::{mpsc, oneshot}; @@ -38,17 +36,14 @@ use tokio::task; use url::Url; pub mod boot_time; +mod config; mod ffi; use boot_time::{timeout, BootTime, Duration}; +use config::Config; const MAX_BUFFERED_CMD_SIZE: usize = 400; -const MAX_INCOMING_BUFFER_SIZE_WHOLE: u64 = 10000000; -const MAX_INCOMING_BUFFER_SIZE_EACH: u64 = 1000000; -const MAX_CONCURRENT_STREAM_SIZE: u64 = 100; -const MAX_DATAGRAM_SIZE: usize = 1350; const DOH_PORT: u16 = 443; -const QUICHE_IDLE_TIMEOUT_MS: u64 = 180000; const NS_T_AAAA: u8 = 28; const NS_C_IN: u8 = 1; // Used to randomly generate query prefix and query id. @@ -124,16 +119,6 @@ enum H3Result { Ignore, } -trait OptionDeref<T: Deref> { - fn as_deref(&self) -> Option<&T::Target>; -} - -impl<T: Deref> OptionDeref<T> for Option<T> { - fn as_deref(&self) -> Option<&T::Target> { - self.as_ref().map(Deref::deref) - } -} - /// Context for a running DoH engine. pub struct DohDispatcher { /// Used to submit cmds to the I/O task. @@ -174,7 +159,7 @@ impl DohDispatcher { struct DohConnection { info: ServerInfo, - shared_config: Arc<Mutex<QuicheConfigCache>>, + config: Config, scid: SCID, state: ConnectionState, pending_queries: Vec<(DnsRequest, QueryResponder, BootTime)>, @@ -185,14 +170,14 @@ struct DohConnection { impl DohConnection { fn new( info: &ServerInfo, - shared_config: Arc<Mutex<QuicheConfigCache>>, + config: Config, tag_socket_fn: TagSocketCallback, ) -> Result<DohConnection> { let mut scid = [0; quiche::MAX_CONN_ID_LEN]; ring::rand::SystemRandom::new().fill(&mut scid).context("failed to generate scid")?; Ok(DohConnection { info: info.clone(), - shared_config, + config, scid, state: ConnectionState::Idle, pending_queries: Vec::new(), @@ -208,15 +193,12 @@ impl DohConnection { (self.tag_socket_fn)(udp_sk_std.as_raw_fd()); let udp_sk = UdpSocket::from_std(udp_sk_std)?; let connid = quiche::ConnectionId::from_ref(&self.scid); - let mut cache = self.shared_config.lock().unwrap(); - let config = - cache.get(&self.info.cert_path)?.ok_or_else(|| anyhow!("no quiche config"))?; debug!("init the connection for Network {}", self.info.net_id); let mut quic_conn = quiche::connect( self.info.domain.as_deref(), &connid, self.info.peer_addr, - config, + &mut self.config.take(), )?; if let Some(session) = &self.cached_session { if quic_conn.set_session(session).is_err() { @@ -516,7 +498,7 @@ fn recv_h3( // Process HTTP/3 events. Ok((stream_id, quiche::h3::Event::Data)) => { debug!("quiche::h3::Event::Data"); - let mut buf = vec![0; MAX_DATAGRAM_SIZE]; + let mut buf = vec![0; config::MAX_DATAGRAM_SIZE]; match h3_conn.recv_body(quic_conn, stream_id, &mut buf) { Ok(read) => { trace!( @@ -603,7 +585,7 @@ async fn recv_rx( ) -> Result<()> { // TODO: Evaluate if we could make the buffer smaller. let mut buf = [0; 65535]; - let quic_idle_timeout_ms = Duration::from_millis(QUICHE_IDLE_TIMEOUT_MS); + let quic_idle_timeout_ms = Duration::from_millis(config::QUICHE_IDLE_TIMEOUT_MS); let ts = quic_conn.timeout().unwrap_or(quic_idle_timeout_ms); if let Some(next_expired) = BootTime::now().checked_add(quic_idle_timeout_ms) { @@ -640,7 +622,7 @@ async fn flush_tx( quic_conn: &mut Pin<Box<quiche::Connection>>, udp_sk: &mut UdpSocket, ) -> Result<()> { - let mut out = [0; MAX_DATAGRAM_SIZE]; + let mut out = [0; config::MAX_DATAGRAM_SIZE]; loop { let (write, _) = match quic_conn.send(&mut out) { Ok(v) => v, @@ -734,7 +716,7 @@ async fn probe_task( fn make_connection_if_needed( info: &ServerInfo, doh_conn_map: &mut HashMap<u32, (ServerInfo, Option<DohConnection>)>, - shared_config: Arc<Mutex<QuicheConfigCache>>, + config_cache: &config::Cache, tag_socket_fn: TagSocketCallback, ) -> Result<Option<DohConnection>> { // Check if connection exists. @@ -751,28 +733,12 @@ fn make_connection_if_needed( // TODO: change the inner connection instead of removing? _ => doh_conn_map.remove(&info.net_id), }; - let doh = DohConnection::new(info, shared_config, tag_socket_fn)?; + let config = config_cache.from_cert_path(&info.cert_path)?; + let doh = DohConnection::new(info, config, tag_socket_fn)?; doh_conn_map.insert(info.net_id, (info.clone(), None)); Ok(Some(doh)) } -struct QuicheConfigCache { - cert_path: Option<String>, - config: Option<quiche::Config>, -} - -impl QuicheConfigCache { - fn get(&mut self, cert_path: &Option<String>) -> Result<Option<&mut quiche::Config>> { - // No config is cached or the cached config isn't matched with the input cert_path - // Create it with the input cert_path. - if self.config.is_none() || self.cert_path != *cert_path { - self.config = Some(create_quiche_config(cert_path.as_deref())?); - self.cert_path = cert_path.clone(); - } - return Ok(self.config.as_mut()); - } -} - async fn handle_query_cmd( net_id: u32, base64_query: Base64Query, @@ -824,7 +790,7 @@ async fn doh_handler( tag_socket_fn: TagSocketCallback, ) -> Result<()> { info!("doh_dispatcher entry"); - let config_cache = Arc::new(Mutex::new(QuicheConfigCache { cert_path: None, config: None })); + let config_cache = config::Cache::new(); // Currently, only support 1 server per network. let mut doh_conn_map: HashMap<u32, (ServerInfo, Option<DohConnection>)> = HashMap::new(); @@ -848,7 +814,7 @@ async fn doh_handler( trace!("recv {:?}", cmd); match cmd { DohCommand::Probe { info, timeout: t } => { - match make_connection_if_needed(&info, &mut doh_conn_map, config_cache.clone(), tag_socket_fn) { + match make_connection_if_needed(&info, &mut doh_conn_map, &config_cache, tag_socket_fn) { Ok(Some(doh)) => { // Create a new async task associated to the DoH connection. probe_futures.push(probe_task(info, doh, t)); @@ -871,6 +837,7 @@ async fn doh_handler( DohCommand::Clear { net_id } => { doh_conn_map.remove(&net_id); info!("Doh Clear server for netid: {}", net_id); + config_cache.garbage_collect(); }, DohCommand::Exit => return Ok(()), } @@ -915,30 +882,6 @@ fn make_doh_udp_socket(peer_addr: SocketAddr, mark: u32) -> Result<std::net::Udp Ok(udp_sk) } -fn create_quiche_config(cert_path: Option<&str>) -> Result<quiche::Config> { - let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?; - config.set_application_protos(h3::APPLICATION_PROTOCOL)?; - match cert_path { - Some(path) => { - config.verify_peer(true); - config.load_verify_locations_from_directory(path)?; - } - None => config.verify_peer(false), - } - - // Some of these configs are necessary, or the server can't respond the HTTP/3 request. - config.set_max_idle_timeout(QUICHE_IDLE_TIMEOUT_MS); - config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE); - config.set_initial_max_data(MAX_INCOMING_BUFFER_SIZE_WHOLE); - config.set_initial_max_stream_data_bidi_local(MAX_INCOMING_BUFFER_SIZE_EACH); - config.set_initial_max_stream_data_bidi_remote(MAX_INCOMING_BUFFER_SIZE_EACH); - config.set_initial_max_stream_data_uni(MAX_INCOMING_BUFFER_SIZE_EACH); - config.set_initial_max_streams_bidi(MAX_CONCURRENT_STREAM_SIZE); - config.set_initial_max_streams_uni(MAX_CONCURRENT_STREAM_SIZE); - config.set_disable_active_migration(true); - Ok(config) -} - fn mark_socket(fd: RawFd, mark: u32) -> Result<()> { // libc::setsockopt is a wrapper function calling into bionic setsockopt. // Both fd and mark are valid, which makes the function call mostly safe. @@ -994,14 +937,11 @@ mod tests { const LOOPBACK_ADDR: &str = "127.0.0.1:443"; const LOCALHOST_URL: &str = "https://mylocal.com/dns-query"; - // TODO: Make some tests for DohConnection and QuicheConfigCache. + // TODO: Make some tests for DohConnection. - fn make_testing_variables() -> ( - ServerInfo, - HashMap<u32, (ServerInfo, Option<DohConnection>)>, - Arc<Mutex<QuicheConfigCache>>, - Runtime, - ) { + fn make_testing_variables( + ) -> (ServerInfo, HashMap<u32, (ServerInfo, Option<DohConnection>)>, config::Cache, Runtime) + { let test_map: HashMap<u32, (ServerInfo, Option<DohConnection>)> = HashMap::new(); let info = ServerInfo { net_id: TEST_NET_ID, @@ -1011,9 +951,7 @@ mod tests { sk_mark: 0, cert_path: None, }; - let config_cache = - Arc::new(Mutex::new(QuicheConfigCache { cert_path: None, config: None })); - + let config_cache = config::Cache::new(); let rt = Builder::new_current_thread() .thread_name("test-runtime") .enable_all() @@ -1028,13 +966,13 @@ mod tests { #[test] fn make_connection_if_needed() { - let (info, mut test_map, config, rt) = make_testing_variables(); + let (info, mut test_map, config_cache, rt) = make_testing_variables(); rt.block_on(async { // Expect to make a new connection. let mut doh = super::make_connection_if_needed( &info, &mut test_map, - config.clone(), + &config_cache, tag_socket_cb, ) .unwrap() @@ -1047,7 +985,7 @@ mod tests { let mut doh = super::make_connection_if_needed( &info, &mut test_map, - config.clone(), + &config_cache, tag_socket_cb, ) .unwrap() @@ -1060,7 +998,7 @@ mod tests { assert!(super::make_connection_if_needed( &info, &mut test_map, - config.clone(), + &config_cache, tag_socket_cb ) .unwrap() @@ -1070,7 +1008,7 @@ mod tests { #[test] fn handle_query_cmd() { - let (info, mut test_map, config, rt) = make_testing_variables(); + let (info, mut test_map, config_cache, rt) = make_testing_variables(); let t = Duration::from_millis(100); rt.block_on(async { @@ -1111,7 +1049,7 @@ mod tests { let mut doh = super::make_connection_if_needed( &info, &mut test_map, - config.clone(), + &config_cache, tag_socket_cb, ) .unwrap() @@ -1178,9 +1116,10 @@ mod tests { let mut scid = [0; quiche::MAX_CONN_ID_LEN]; ring::rand::SystemRandom::new().fill(&mut scid).context("failed to generate scid").unwrap(); let connid = quiche::ConnectionId::from_ref(&scid); - let mut config = super::create_quiche_config(None).unwrap(); + let mut config = Config::from_cert_path(None).unwrap(); let quic_conn = - quiche::connect(None, &connid, LOOPBACK_ADDR.parse().unwrap(), &mut config).unwrap(); + quiche::connect(None, &connid, LOOPBACK_ADDR.parse().unwrap(), &mut config.take()) + .unwrap(); (quic_conn, udp_sk) } @@ -1263,18 +1202,6 @@ mod tests { } #[test] - fn create_quiche_config() { - assert!( - super::create_quiche_config(None).is_ok(), - "quiche config without cert creating failed" - ); - assert!( - super::create_quiche_config(Some("data/local/tmp/")).is_ok(), - "quiche config with cert creating failed" - ); - } - - #[test] fn make_doh_udp_socket() { // Make a socket connecting to loopback with a test mark. let sk = super::make_doh_udp_socket(LOOPBACK_ADDR.parse().unwrap(), TEST_MARK).unwrap(); |