diff options
author | Gabriel White-Vega <gwhitevega@google.com> | 2023-08-30 11:28:43 -0400 |
---|---|---|
committer | Gabriel White-Vega <gwhitevega@google.com> | 2023-09-06 09:47:08 -0400 |
commit | 9732eb8836f7e10ce4fa37a899a0222e9e9f5eb2 (patch) | |
tree | 09db2c96a11cbeea87157762ae44f04acb829c26 /rust | |
parent | 5ae668bc709dad09ff3c9ce550d69d05f78435bb (diff) | |
download | bumble-9732eb8836f7e10ce4fa37a899a0222e9e9f5eb2.tar.gz |
Address PR feedback
Diffstat (limited to 'rust')
-rw-r--r-- | rust/Cargo.toml | 2 | ||||
-rw-r--r-- | rust/examples/l2cap_bridge.rs | 645 | ||||
-rw-r--r-- | rust/src/cli/l2cap/client_bridge.rs | 191 | ||||
-rw-r--r-- | rust/src/cli/l2cap/mod.rs | 190 | ||||
-rw-r--r-- | rust/src/cli/l2cap/server_bridge.rs | 205 | ||||
-rw-r--r-- | rust/src/cli/mod.rs | 2 | ||||
-rw-r--r-- | rust/src/main.rs | 92 | ||||
-rw-r--r-- | rust/src/wrapper/device.rs | 8 | ||||
-rw-r--r-- | rust/src/wrapper/l2cap.rs | 4 |
9 files changed, 687 insertions, 652 deletions
diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 58ae964..6c38c82 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -15,7 +15,7 @@ rust-version = "1.70.0" [dependencies] pyo3 = { version = "0.18.3", features = ["macros"] } pyo3-asyncio = { version = "0.18.0", features = ["tokio-runtime"] } -tokio = { version = "1.28.2" } +tokio = { version = "1.28.2", features = ["macros", "signal"] } nom = "7.1.3" strum = "0.25.0" strum_macros = "0.25.0" diff --git a/rust/examples/l2cap_bridge.rs b/rust/examples/l2cap_bridge.rs deleted file mode 100644 index 57aebe5..0000000 --- a/rust/examples/l2cap_bridge.rs +++ /dev/null @@ -1,645 +0,0 @@ -// Copyright 2023 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Rust version of the Python `l2cap_bridge.py` found under the `apps` folder. - -use anyhow::anyhow; -use bumble::wrapper::{ - device::Device, - l2cap::LeConnectionOrientedChannel, - logging::{bumble_env_logging_level, py_logging_basic_config}, - transport::Transport, -}; -use clap::Parser as _; -use owo_colors::OwoColorize; -use pyo3::{PyObject, PyResult, Python}; -use std::{future::Future, path, sync::Arc}; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - net::tcp::{OwnedReadHalf, OwnedWriteHalf}, - sync::{mpsc::Receiver, Mutex}, -}; - -#[pyo3_asyncio::tokio::main] -async fn main() -> PyResult<()> { - env_logger::builder() - .filter_level(log::LevelFilter::Info) - .init(); - - py_logging_basic_config(bumble_env_logging_level("WARNING"))?; - - let cli = Cli::parse(); - - println!("<<< connecting to HCI..."); - let transport = Transport::open(cli.hci_transport).await?; - println!("<<< connected"); - - let mut device = Device::from_config_file_with_hci( - &cli.device_config, - transport.source()?, - transport.sink()?, - )?; - - device.power_on().await?; - - match cli.subcommand { - Subcommand::Server { tcp_host, tcp_port } => { - let args = server_bridge::Args { - psm: cli.psm, - max_credits: cli.l2cap_coc_max_credits, - mtu: cli.l2cap_coc_mtu, - mps: cli.l2cap_coc_mps, - tcp_host, - tcp_port, - }; - - server_bridge::start(&args, &mut device).await? - } - Subcommand::Client { - bluetooth_address, - tcp_host, - tcp_port, - } => { - let args = client_bridge::Args { - psm: cli.psm, - max_credits: cli.l2cap_coc_max_credits, - mtu: cli.l2cap_coc_mtu, - mps: cli.l2cap_coc_mps, - bluetooth_address, - tcp_host, - tcp_port, - }; - - client_bridge::start(&args, &mut device).await? - } - }; - - // wait until user kills the process - tokio::signal::ctrl_c().await?; - - Ok(()) -} - -/// L2CAP CoC server bridge: waits for a peer to connect an L2CAP CoC channel -/// on a specified PSM. When the connection is made, the bridge connects a TCP -/// socket to a remote host and bridges the data in both directions, with flow -/// control. -/// When the L2CAP CoC channel is closed, the bridge disconnects the TCP socket -/// and waits for a new L2CAP CoC channel to be connected. -/// When the TCP connection is closed by the TCP server, XXXX -mod server_bridge { - use crate::{ - proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, run_future_with_current_task_locals, - BridgeData, - }; - use bumble::wrapper::{device::Device, hci::HciConstant, l2cap::LeConnectionOrientedChannel}; - use futures::executor::block_on; - use owo_colors::OwoColorize; - use pyo3::{PyResult, Python}; - use std::{sync::Arc, time::Duration}; - use tokio::{ - join, - net::TcpStream, - select, - sync::{mpsc, Mutex}, - }; - - pub struct Args { - pub psm: u16, - pub max_credits: u16, - pub mtu: u16, - pub mps: u16, - pub tcp_host: String, - pub tcp_port: u16, - } - - pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> { - let host = args.tcp_host.clone(); - let port = args.tcp_port; - device.register_l2cap_channel_server( - args.psm, - move |_py, l2cap_channel| { - let channel_info = match l2cap_channel.debug_string() { - Ok(info_string) => info_string, - Err(py_err) => format!("failed to get l2cap channel info ({})", py_err), - }; - println!("{} {channel_info}", "*** L2CAP channel:".cyan()); - - let host = host.clone(); - // Ensure Python event loop is available to l2cap `disconnect` - let _ = run_future_with_current_task_locals(handle_connection_oriented_channel( - l2cap_channel, - host, - port, - )); - Ok(()) - }, - Some(args.max_credits), - Some(args.mtu), - Some(args.mps), - )?; - - println!( - "{}", - format!("### Listening for CoC connection on PSM {}", args.psm).yellow() - ); - - device.on_connection(|_py, mut connection| { - let connection_info = match connection.debug_string() { - Ok(info_string) => info_string, - Err(py_err) => format!("failed to get connection info ({})", py_err), - }; - println!( - "{} {}", - "@@@ Bluetooth connection: ".green(), - connection_info, - ); - connection.on_disconnection(|_py, reason| { - let disconnection_info = match HciConstant::error_name(reason) { - Ok(info_string) => info_string, - Err(py_err) => format!("failed to get disconnection error name ({})", py_err), - }; - println!( - "{} {}", - "@@@ Bluetooth disconnection: ".red(), - disconnection_info, - ); - Ok(()) - })?; - Ok(()) - })?; - - device.start_advertising(false).await?; - - Ok(()) - } - - async fn handle_connection_oriented_channel( - mut l2cap_channel: LeConnectionOrientedChannel, - tcp_host: String, - tcp_port: u16, - ) -> PyResult<()> { - let (l2cap_to_tcp_tx, mut l2cap_to_tcp_rx) = mpsc::channel::<BridgeData>(10); - - // Set callback (`set_sink`) for when l2cap data is received. - let l2cap_to_tcp_tx_clone = l2cap_to_tcp_tx.clone(); - l2cap_channel - .set_sink(move |_py, sdu| { - block_on(l2cap_to_tcp_tx_clone.send(BridgeData::Data(sdu.into()))) - .expect("failed to channel data to tcp"); - Ok(()) - }) - .expect("failed to set sink for l2cap connection"); - - // Set l2cap callback for when the channel is closed. - l2cap_channel - .on_close(move |_py| { - println!("{}", "*** L2CAP channel closed".red()); - block_on(l2cap_to_tcp_tx.send(BridgeData::CloseSignal)) - .expect("failed to channel close signal to tcp"); - Ok(()) - }) - .expect("failed to set on_close callback for l2cap channel"); - - println!( - "{}", - format!("### Connecting to TCP {tcp_host}:{tcp_port}...").yellow() - ); - - let l2cap_channel = Arc::new(Mutex::new(Some(l2cap_channel))); - let tcp_stream = match TcpStream::connect(format!("{tcp_host}:{tcp_port}")).await { - Ok(stream) => { - println!("{}", "### Connected".green()); - Some(stream) - } - Err(err) => { - println!("{}", format!("!!! Connection failed: {err}").red()); - if let Some(channel) = l2cap_channel.lock().await.take() { - // Bumble might enter an invalid state if disconnection request is received from - // l2cap client before receiving a disconnection response from the same client, - // blocking this async call from returning. - // See: https://github.com/google/bumble/issues/257 - select! { - res = channel.disconnect() => { - let _ = res.map_err(|e| eprintln!("Failed to call disconnect on l2cap channel: {e}")); - }, - _ = tokio::time::sleep(Duration::from_secs(1)) => eprintln!("Timed out while calling disconnect on l2cap channel."), - } - } - None - } - }; - - match tcp_stream { - None => { - while let Some(bridge_data) = l2cap_to_tcp_rx.recv().await { - match bridge_data { - BridgeData::Data(sdu) => { - println!("{}", format!("<<< [L2CAP SDU]: {} bytes", sdu.len()).cyan()); - println!("{}", "!!! TCP socket not open, dropping".red()) - } - BridgeData::CloseSignal => break, - } - } - } - Some(tcp_stream) => { - let (tcp_reader, tcp_writer) = tcp_stream.into_split(); - - // Do tcp stuff when something happens on the l2cap channel. - let handle_l2cap_data_future = - proxy_l2cap_rx_to_tcp_tx(l2cap_to_tcp_rx, tcp_writer, l2cap_channel.clone()); - - // Do l2cap stuff when something happens on tcp. - let handle_tcp_data_future = - proxy_tcp_rx_to_l2cap_tx(tcp_reader, l2cap_channel.clone(), false); - - let (handle_l2cap_result, handle_tcp_result) = - join!(handle_l2cap_data_future, handle_tcp_data_future); - - if let Err(e) = handle_l2cap_result { - println!("!!! Error: {e}"); - } - - if let Err(e) = handle_tcp_result { - println!("!!! Error: {e}"); - } - } - }; - - Python::with_gil(|_| { - // Must hold GIL at least once while/after dropping for Python heap object to ensure - // de-allocation. - drop(l2cap_channel); - }); - - Ok(()) - } -} - -/// L2CAP CoC client bridge: connects to a BLE device, then waits for an inbound -/// TCP connection on a specified port number. When a TCP client connects, an -/// L2CAP CoC channel connection to the BLE device is established, and the data -/// is bridged in both directions, with flow control. -/// When the TCP connection is closed by the client, the L2CAP CoC channel is -/// disconnected, but the connection to the BLE device remains, ready for a new -/// TCP client to connect. -/// When the L2CAP CoC channel is closed, XXXX -mod client_bridge { - use crate::{ - proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, run_future_with_current_task_locals, - BridgeData, - }; - use bumble::wrapper::{ - device::{Connection, Device}, - hci::HciConstant, - }; - use futures::executor::block_on; - use owo_colors::OwoColorize; - use pyo3::{PyResult, Python}; - use std::{net::SocketAddr, sync::Arc}; - use tokio::{ - join, - net::{TcpListener, TcpStream}, - sync::{mpsc, Mutex}, - }; - - pub struct Args { - pub psm: u16, - pub max_credits: u16, - pub mtu: u16, - pub mps: u16, - pub bluetooth_address: String, - pub tcp_host: String, - pub tcp_port: u16, - } - - pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> { - println!( - "{}", - format!("### Connecting to {}...", args.bluetooth_address).yellow() - ); - let mut ble_connection = device.connect(&args.bluetooth_address).await?; - ble_connection.on_disconnection(|_py, reason| { - let disconnection_info = match HciConstant::error_name(reason) { - Ok(info_string) => info_string, - Err(py_err) => format!("failed to get disconnection error name ({})", py_err), - }; - println!( - "{} {}", - "@@@ Bluetooth disconnection: ".red(), - disconnection_info, - ); - Ok(()) - })?; - - // Start the TCP server. - let listener = TcpListener::bind(format!("{}:{}", args.tcp_host, args.tcp_port)) - .await - .expect("failed to bind tcp to address"); - println!( - "{}", - format!( - "### Listening for TCP connections on port {}", - args.tcp_port - ) - .magenta() - ); - - let psm = args.psm; - let max_credits = args.max_credits; - let mtu = args.mtu; - let mps = args.mps; - let ble_connection = Arc::new(ble_connection); - // Ensure Python event loop is available to l2cap `disconnect` - let _ = run_future_with_current_task_locals(async move { - while let Ok((tcp_stream, addr)) = listener.accept().await { - let ble_connection = ble_connection.clone(); - let _ = run_future_with_current_task_locals(handle_tcp_connection( - ble_connection, - tcp_stream, - addr, - psm, - max_credits, - mtu, - mps, - )); - } - Ok(()) - }); - Ok(()) - } - - async fn handle_tcp_connection( - ble_connection: Arc<Connection>, - tcp_stream: TcpStream, - addr: SocketAddr, - psm: u16, - max_credits: u16, - mtu: u16, - mps: u16, - ) -> PyResult<()> { - println!("{}", format!("<<< TCP connection from {}", addr).magenta()); - println!( - "{}", - format!(">>> Opening L2CAP channel on PSM = {}", psm).yellow() - ); - - let mut l2cap_channel = match ble_connection - .open_l2cap_channel(psm, Some(max_credits), Some(mtu), Some(mps)) - .await - { - Ok(channel) => channel, - Err(e) => { - println!("{}", format!("!!! Connection failed: {e}").red()); - // TCP stream will get dropped after returning, automatically shutting it down. - return Err(e); - } - }; - let channel_info = match l2cap_channel.debug_string() { - Ok(info_string) => info_string, - Err(py_err) => format!("failed to get l2cap channel info ({})", py_err), - }; - - println!("{}{}", "*** L2CAP channel: ".cyan(), channel_info); - - let (l2cap_to_tcp_tx, l2cap_to_tcp_rx) = mpsc::channel::<BridgeData>(10); - - // Set l2cap callback (`set_sink`) for when data is received. - let l2cap_to_tcp_tx_clone = l2cap_to_tcp_tx.clone(); - l2cap_channel - .set_sink(move |_py, sdu| { - block_on(l2cap_to_tcp_tx_clone.send(BridgeData::Data(sdu.into()))) - .expect("failed to channel data to tcp"); - Ok(()) - }) - .expect("failed to set sink for l2cap connection"); - - // Set l2cap callback for when the channel is closed. - l2cap_channel - .on_close(move |_py| { - println!("{}", "*** L2CAP channel closed".red()); - block_on(l2cap_to_tcp_tx.send(BridgeData::CloseSignal)) - .expect("failed to channel close signal to tcp"); - Ok(()) - }) - .expect("failed to set on_close callback for l2cap channel"); - - let l2cap_channel = Arc::new(Mutex::new(Some(l2cap_channel))); - let (tcp_reader, tcp_writer) = tcp_stream.into_split(); - - // Do tcp stuff when something happens on the l2cap channel. - let handle_l2cap_data_future = - proxy_l2cap_rx_to_tcp_tx(l2cap_to_tcp_rx, tcp_writer, l2cap_channel.clone()); - - // Do l2cap stuff when something happens on tcp. - let handle_tcp_data_future = - proxy_tcp_rx_to_l2cap_tx(tcp_reader, l2cap_channel.clone(), true); - - let (handle_l2cap_result, handle_tcp_result) = - join!(handle_l2cap_data_future, handle_tcp_data_future); - - if let Err(e) = handle_l2cap_result { - println!("!!! Error: {e}"); - } - - if let Err(e) = handle_tcp_result { - println!("!!! Error: {e}"); - } - - Python::with_gil(|_| { - // Must hold GIL at least once while/after dropping for Python heap object to ensure - // de-allocation. - drop(l2cap_channel); - }); - - Ok(()) - } -} - -/// Used for channeling data from Python callbacks to a Rust consumer. -enum BridgeData { - Data(Vec<u8>), - CloseSignal, -} - -async fn proxy_l2cap_rx_to_tcp_tx( - mut l2cap_data_receiver: Receiver<BridgeData>, - mut tcp_writer: OwnedWriteHalf, - l2cap_channel: Arc<Mutex<Option<LeConnectionOrientedChannel>>>, -) -> anyhow::Result<()> { - while let Some(bridge_data) = l2cap_data_receiver.recv().await { - match bridge_data { - BridgeData::Data(sdu) => { - println!("{}", format!("<<< [L2CAP SDU]: {} bytes", sdu.len()).cyan()); - tcp_writer - .write_all(sdu.as_ref()) - .await - .map_err(|_| anyhow!("Failed to write to tcp stream"))?; - tcp_writer - .flush() - .await - .map_err(|_| anyhow!("Failed to flush tcp stream"))?; - } - BridgeData::CloseSignal => { - l2cap_channel.lock().await.take(); - tcp_writer - .shutdown() - .await - .map_err(|_| anyhow!("Failed to shut down write half of tcp stream"))?; - return Ok(()); - } - } - } - Ok(()) -} - -async fn proxy_tcp_rx_to_l2cap_tx( - mut tcp_reader: OwnedReadHalf, - l2cap_channel: Arc<Mutex<Option<LeConnectionOrientedChannel>>>, - drain_l2cap_after_write: bool, -) -> PyResult<()> { - let mut buf = [0; 4096]; - loop { - match tcp_reader.read(&mut buf).await { - Ok(len) => { - if len == 0 { - println!("{}", "!!! End of stream".yellow()); - - if let Some(channel) = l2cap_channel.lock().await.take() { - channel.disconnect().await.map_err(|e| { - eprintln!("Failed to call disconnect on l2cap channel: {e}"); - e - })?; - } - return Ok(()); - } - - println!("{}", format!("<<< [TCP DATA]: {len} bytes").blue()); - match l2cap_channel.lock().await.as_mut() { - None => { - println!("{}", "!!! L2CAP channel not connected, dropping".red()); - return Ok(()); - } - Some(channel) => { - channel.write(&buf[..len])?; - if drain_l2cap_after_write { - channel.drain().await?; - } - } - } - } - Err(e) => { - println!("{}", format!("!!! TCP connection lost: {}", e).red()); - if let Some(channel) = l2cap_channel.lock().await.take() { - let _ = channel.disconnect().await.map_err(|e| { - eprintln!("Failed to call disconnect on l2cap channel: {e}"); - }); - } - return Err(e.into()); - } - } - } -} - -/// Copies the current thread's task locals into a Python "awaitable" and encapsulates it in a Rust -/// future, running it as a Python Task. -/// -/// If the calling thread has a Python event loop, then the Python Task will too. -pub fn run_future_with_current_task_locals<F>( - fut: F, -) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send> -where - F: Future<Output = PyResult<()>> + Send + 'static, -{ - Python::with_gil(|py| { - let locals = pyo3_asyncio::tokio::get_current_locals(py)?; - let future = pyo3_asyncio::tokio::scope(locals.clone(), fut); - pyo3_asyncio::tokio::future_into_py_with_locals(py, locals.clone(), future) - .and_then(pyo3_asyncio::tokio::into_future) - }) -} - -#[derive(clap::Parser)] -#[command(author, version, about, long_about = None)] -struct Cli { - #[command(subcommand)] - pub(crate) subcommand: Subcommand, - - /// Device configuration file. - /// - /// See, for instance, `examples/device1.json` in the Python project. - #[arg(long)] - device_config: path::PathBuf, - /// Bumble transport spec. - /// - /// <https://google.github.io/bumble/transports/index.html> - #[arg(long)] - hci_transport: String, - - /// PSM for L2CAP Connection-oriented Channel. - /// - /// Must be in the range [0, 65535]. - #[arg(long, default_value_t = 1234)] - psm: u16, - - /// Maximum L2CAP CoC Credits. - /// - /// Must be in the range [1, 65535]. - #[arg(long, default_value_t = 128, value_parser = clap::value_parser!(u16).range(1..))] - l2cap_coc_max_credits: u16, - - /// L2CAP CoC MTU - /// - /// Must be in the range [23, 65535]. - #[arg(long, default_value_t = 1022, value_parser = clap::value_parser!(u16).range(23..))] - l2cap_coc_mtu: u16, - - /// L2CAP CoC MPS - /// - /// Must be in the range [23, 65535]. - #[arg(long, default_value_t = 1024, value_parser = clap::value_parser!(u16).range(23..))] - l2cap_coc_mps: u16, -} - -#[derive(clap::Subcommand)] -enum Subcommand { - /// Starts an L2CAP server - Server { - /// TCP host that the l2cap server will connect to. - /// Data is bridged like so: - /// TCP server <-> (TCP client / **L2CAP server**) <-> (L2CAP client / TCP server) <-> TCP client - #[arg(long, default_value = "localhost")] - tcp_host: String, - /// TCP port that the server will connect to. - /// - /// Must be in the range [1, 65535]. - #[arg(long, default_value_t = 9544)] - tcp_port: u16, - }, - /// Starts an L2CAP client - Client { - /// L2cap server address that this l2cap client will connect to. - bluetooth_address: String, - /// TCP host that the l2cap client will bind to and listen for incoming TCP connections. - /// Data is bridged like so: - /// TCP client <-> (TCP server / **L2CAP client**) <-> (L2CAP server / TCP client) <-> TCP Client - #[arg(long, default_value = "localhost")] - tcp_host: String, - /// TCP port that the client will connect to. - /// - /// Must be in the range [1, 65535]. - #[arg(long, default_value_t = 9543)] - tcp_port: u16, - }, -} diff --git a/rust/src/cli/l2cap/client_bridge.rs b/rust/src/cli/l2cap/client_bridge.rs new file mode 100644 index 0000000..37606fc --- /dev/null +++ b/rust/src/cli/l2cap/client_bridge.rs @@ -0,0 +1,191 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// L2CAP CoC client bridge: connects to a BLE device, then waits for an inbound +/// TCP connection on a specified port number. When a TCP client connects, an +/// L2CAP CoC channel connection to the BLE device is established, and the data +/// is bridged in both directions, with flow control. +/// When the TCP connection is closed by the client, the L2CAP CoC channel is +/// disconnected, but the connection to the BLE device remains, ready for a new +/// TCP client to connect. +/// When the L2CAP CoC channel is closed, the TCP connection is closed as well. +use crate::cli::l2cap::{ + proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, run_future_with_current_task_locals, + BridgeData, +}; +use bumble::wrapper::{ + device::{Connection, Device}, + hci::HciConstant, +}; +use futures::executor::block_on; +use owo_colors::OwoColorize; +use pyo3::{PyResult, Python}; +use std::{net::SocketAddr, sync::Arc}; +use tokio::{ + join, + net::{TcpListener, TcpStream}, + sync::{mpsc, Mutex}, +}; + +pub struct Args { + pub psm: u16, + pub max_credits: Option<u16>, + pub mtu: Option<u16>, + pub mps: Option<u16>, + pub bluetooth_address: String, + pub tcp_host: String, + pub tcp_port: u16, +} + +pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> { + println!( + "{}", + format!("### Connecting to {}...", args.bluetooth_address).yellow() + ); + let mut ble_connection = device.connect(&args.bluetooth_address).await?; + ble_connection.on_disconnection(|_py, reason| { + let disconnection_info = match HciConstant::error_name(reason) { + Ok(info_string) => info_string, + Err(py_err) => format!("failed to get disconnection error name ({})", py_err), + }; + println!( + "{} {}", + "@@@ Bluetooth disconnection: ".red(), + disconnection_info, + ); + Ok(()) + })?; + + // Start the TCP server. + let listener = TcpListener::bind(format!("{}:{}", args.tcp_host, args.tcp_port)) + .await + .expect("failed to bind tcp to address"); + println!( + "{}", + format!( + "### Listening for TCP connections on port {}", + args.tcp_port + ) + .magenta() + ); + + let psm = args.psm; + let max_credits = args.max_credits; + let mtu = args.mtu; + let mps = args.mps; + let ble_connection = Arc::new(Mutex::new(ble_connection)); + // Ensure Python event loop is available to l2cap `disconnect` + let _ = run_future_with_current_task_locals(async move { + while let Ok((tcp_stream, addr)) = listener.accept().await { + let ble_connection = ble_connection.clone(); + let _ = run_future_with_current_task_locals(proxy_data_between_tcp_and_l2cap( + ble_connection, + tcp_stream, + addr, + psm, + max_credits, + mtu, + mps, + )); + } + Ok(()) + }); + Ok(()) +} + +async fn proxy_data_between_tcp_and_l2cap( + ble_connection: Arc<Mutex<Connection>>, + tcp_stream: TcpStream, + addr: SocketAddr, + psm: u16, + max_credits: Option<u16>, + mtu: Option<u16>, + mps: Option<u16>, +) -> PyResult<()> { + println!("{}", format!("<<< TCP connection from {}", addr).magenta()); + println!( + "{}", + format!(">>> Opening L2CAP channel on PSM = {}", psm).yellow() + ); + + let mut l2cap_channel = match ble_connection + .lock() + .await + .open_l2cap_channel(psm, max_credits, mtu, mps) + .await + { + Ok(channel) => channel, + Err(e) => { + println!("{}", format!("!!! Connection failed: {e}").red()); + // TCP stream will get dropped after returning, automatically shutting it down. + return Err(e); + } + }; + let channel_info = l2cap_channel + .debug_string() + .unwrap_or_else(|e| format!("failed to get l2cap channel info ({e})")); + + println!("{}{}", "*** L2CAP channel: ".cyan(), channel_info); + + let (l2cap_to_tcp_tx, l2cap_to_tcp_rx) = mpsc::channel::<BridgeData>(10); + + // Set l2cap callback (`set_sink`) for when data is received. + let l2cap_to_tcp_tx_clone = l2cap_to_tcp_tx.clone(); + l2cap_channel + .set_sink(move |_py, sdu| { + block_on(l2cap_to_tcp_tx_clone.send(BridgeData::Data(sdu.into()))) + .expect("failed to channel data to tcp"); + Ok(()) + }) + .expect("failed to set sink for l2cap connection"); + + // Set l2cap callback for when the channel is closed. + l2cap_channel + .on_close(move |_py| { + println!("{}", "*** L2CAP channel closed".red()); + block_on(l2cap_to_tcp_tx.send(BridgeData::CloseSignal)) + .expect("failed to channel close signal to tcp"); + Ok(()) + }) + .expect("failed to set on_close callback for l2cap channel"); + + let l2cap_channel = Arc::new(Mutex::new(Some(l2cap_channel))); + let (tcp_reader, tcp_writer) = tcp_stream.into_split(); + + // Do tcp stuff when something happens on the l2cap channel. + let handle_l2cap_data_future = + proxy_l2cap_rx_to_tcp_tx(l2cap_to_tcp_rx, tcp_writer, l2cap_channel.clone()); + + // Do l2cap stuff when something happens on tcp. + let handle_tcp_data_future = proxy_tcp_rx_to_l2cap_tx(tcp_reader, l2cap_channel.clone(), true); + + let (handle_l2cap_result, handle_tcp_result) = + join!(handle_l2cap_data_future, handle_tcp_data_future); + + if let Err(e) = handle_l2cap_result { + println!("!!! Error: {e}"); + } + + if let Err(e) = handle_tcp_result { + println!("!!! Error: {e}"); + } + + Python::with_gil(|_| { + // Must hold GIL at least once while/after dropping for Python heap object to ensure + // de-allocation. + drop(l2cap_channel); + }); + + Ok(()) +} diff --git a/rust/src/cli/l2cap/mod.rs b/rust/src/cli/l2cap/mod.rs new file mode 100644 index 0000000..31097ed --- /dev/null +++ b/rust/src/cli/l2cap/mod.rs @@ -0,0 +1,190 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Rust version of the Python `l2cap_bridge.py` found under the `apps` folder. + +use crate::L2cap; +use anyhow::anyhow; +use bumble::wrapper::{device::Device, l2cap::LeConnectionOrientedChannel, transport::Transport}; +use owo_colors::{colors::css::Orange, OwoColorize}; +use pyo3::{PyObject, PyResult, Python}; +use std::{future::Future, path::PathBuf, sync::Arc}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::tcp::{OwnedReadHalf, OwnedWriteHalf}, + sync::{mpsc::Receiver, Mutex}, +}; + +mod client_bridge; +mod server_bridge; + +pub(crate) async fn run( + command: L2cap, + device_config: PathBuf, + transport: String, + psm: u16, + max_credits: Option<u16>, + mtu: Option<u16>, + mps: Option<u16>, +) -> PyResult<()> { + println!("<<< connecting to HCI..."); + let transport = Transport::open(transport).await?; + println!("<<< connected"); + + let mut device = + Device::from_config_file_with_hci(&device_config, transport.source()?, transport.sink()?)?; + + device.power_on().await?; + + match command { + L2cap::Server { tcp_host, tcp_port } => { + let args = server_bridge::Args { + psm, + max_credits, + mtu, + mps, + tcp_host, + tcp_port, + }; + + server_bridge::start(&args, &mut device).await? + } + L2cap::Client { + bluetooth_address, + tcp_host, + tcp_port, + } => { + let args = client_bridge::Args { + psm, + max_credits, + mtu, + mps, + bluetooth_address, + tcp_host, + tcp_port, + }; + + client_bridge::start(&args, &mut device).await? + } + }; + + // wait until user kills the process + tokio::signal::ctrl_c().await?; + + Ok(()) +} + +/// Used for channeling data from Python callbacks to a Rust consumer. +enum BridgeData { + Data(Vec<u8>), + CloseSignal, +} + +async fn proxy_l2cap_rx_to_tcp_tx( + mut l2cap_data_receiver: Receiver<BridgeData>, + mut tcp_writer: OwnedWriteHalf, + l2cap_channel: Arc<Mutex<Option<LeConnectionOrientedChannel>>>, +) -> anyhow::Result<()> { + while let Some(bridge_data) = l2cap_data_receiver.recv().await { + match bridge_data { + BridgeData::Data(sdu) => { + println!("{}", format!("<<< [L2CAP SDU]: {} bytes", sdu.len()).cyan()); + tcp_writer + .write_all(sdu.as_ref()) + .await + .map_err(|_| anyhow!("Failed to write to tcp stream"))?; + tcp_writer + .flush() + .await + .map_err(|_| anyhow!("Failed to flush tcp stream"))?; + } + BridgeData::CloseSignal => { + l2cap_channel.lock().await.take(); + tcp_writer + .shutdown() + .await + .map_err(|_| anyhow!("Failed to shut down write half of tcp stream"))?; + return Ok(()); + } + } + } + Ok(()) +} + +async fn proxy_tcp_rx_to_l2cap_tx( + mut tcp_reader: OwnedReadHalf, + l2cap_channel: Arc<Mutex<Option<LeConnectionOrientedChannel>>>, + drain_l2cap_after_write: bool, +) -> PyResult<()> { + let mut buf = [0; 4096]; + loop { + match tcp_reader.read(&mut buf).await { + Ok(len) => { + if len == 0 { + println!("{}", "!!! End of stream".fg::<Orange>()); + + if let Some(mut channel) = l2cap_channel.lock().await.take() { + channel.disconnect().await.map_err(|e| { + eprintln!("Failed to call disconnect on l2cap channel: {e}"); + e + })?; + } + return Ok(()); + } + + println!("{}", format!("<<< [TCP DATA]: {len} bytes").blue()); + match l2cap_channel.lock().await.as_mut() { + None => { + println!("{}", "!!! L2CAP channel not connected, dropping".red()); + return Ok(()); + } + Some(channel) => { + channel.write(&buf[..len])?; + if drain_l2cap_after_write { + channel.drain().await?; + } + } + } + } + Err(e) => { + println!("{}", format!("!!! TCP connection lost: {}", e).red()); + if let Some(mut channel) = l2cap_channel.lock().await.take() { + let _ = channel.disconnect().await.map_err(|e| { + eprintln!("Failed to call disconnect on l2cap channel: {e}"); + }); + } + return Err(e.into()); + } + } + } +} + +/// Copies the current thread's TaskLocals into a Python "awaitable" and encapsulates it in a Rust +/// future, running it as a Python Task. +/// `TaskLocals` stores the current event loop, and allows the user to copy the current Python +/// context if necessary. In this case, the python event loop is used when calling `disconnect` on +/// an l2cap connection, or else the call will fail. +pub fn run_future_with_current_task_locals<F>( + fut: F, +) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send> +where + F: Future<Output = PyResult<()>> + Send + 'static, +{ + Python::with_gil(|py| { + let locals = pyo3_asyncio::tokio::get_current_locals(py)?; + let future = pyo3_asyncio::tokio::scope(locals.clone(), fut); + pyo3_asyncio::tokio::future_into_py_with_locals(py, locals, future) + .and_then(pyo3_asyncio::tokio::into_future) + }) +} diff --git a/rust/src/cli/l2cap/server_bridge.rs b/rust/src/cli/l2cap/server_bridge.rs new file mode 100644 index 0000000..3a32db9 --- /dev/null +++ b/rust/src/cli/l2cap/server_bridge.rs @@ -0,0 +1,205 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// L2CAP CoC server bridge: waits for a peer to connect an L2CAP CoC channel +/// on a specified PSM. When the connection is made, the bridge connects a TCP +/// socket to a remote host and bridges the data in both directions, with flow +/// control. +/// When the L2CAP CoC channel is closed, the bridge disconnects the TCP socket +/// and waits for a new L2CAP CoC channel to be connected. +/// When the TCP connection is closed by the TCP server, the L2CAP connection is closed as well. +use crate::cli::l2cap::{ + proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, run_future_with_current_task_locals, + BridgeData, +}; +use bumble::wrapper::{device::Device, hci::HciConstant, l2cap::LeConnectionOrientedChannel}; +use futures::executor::block_on; +use owo_colors::OwoColorize; +use pyo3::{PyResult, Python}; +use std::{sync::Arc, time::Duration}; +use tokio::{ + join, + net::TcpStream, + select, + sync::{mpsc, Mutex}, +}; + +pub struct Args { + pub psm: u16, + pub max_credits: Option<u16>, + pub mtu: Option<u16>, + pub mps: Option<u16>, + pub tcp_host: String, + pub tcp_port: u16, +} + +pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> { + let host = args.tcp_host.clone(); + let port = args.tcp_port; + device.register_l2cap_channel_server( + args.psm, + move |_py, l2cap_channel| { + let channel_info = l2cap_channel + .debug_string() + .unwrap_or_else(|e| format!("failed to get l2cap channel info ({e})")); + println!("{} {channel_info}", "*** L2CAP channel:".cyan()); + + let host = host.clone(); + // Ensure Python event loop is available to l2cap `disconnect` + let _ = run_future_with_current_task_locals(proxy_data_between_l2cap_and_tcp( + l2cap_channel, + host, + port, + )); + Ok(()) + }, + args.max_credits, + args.mtu, + args.mps, + )?; + + println!( + "{}", + format!("### Listening for CoC connection on PSM {}", args.psm).yellow() + ); + + device.on_connection(|_py, mut connection| { + let connection_info = connection + .debug_string() + .unwrap_or_else(|e| format!("failed to get connection info ({e})")); + println!( + "{} {}", + "@@@ Bluetooth connection: ".green(), + connection_info, + ); + connection.on_disconnection(|_py, reason| { + let disconnection_info = match HciConstant::error_name(reason) { + Ok(info_string) => info_string, + Err(py_err) => format!("failed to get disconnection error name ({})", py_err), + }; + println!( + "{} {}", + "@@@ Bluetooth disconnection: ".red(), + disconnection_info, + ); + Ok(()) + })?; + Ok(()) + })?; + + device.start_advertising(false).await?; + + Ok(()) +} + +async fn proxy_data_between_l2cap_and_tcp( + mut l2cap_channel: LeConnectionOrientedChannel, + tcp_host: String, + tcp_port: u16, +) -> PyResult<()> { + let (l2cap_to_tcp_tx, mut l2cap_to_tcp_rx) = mpsc::channel::<BridgeData>(10); + + // Set callback (`set_sink`) for when l2cap data is received. + let l2cap_to_tcp_tx_clone = l2cap_to_tcp_tx.clone(); + l2cap_channel + .set_sink(move |_py, sdu| { + block_on(l2cap_to_tcp_tx_clone.send(BridgeData::Data(sdu.into()))) + .expect("failed to channel data to tcp"); + Ok(()) + }) + .expect("failed to set sink for l2cap connection"); + + // Set l2cap callback for when the channel is closed. + l2cap_channel + .on_close(move |_py| { + println!("{}", "*** L2CAP channel closed".red()); + block_on(l2cap_to_tcp_tx.send(BridgeData::CloseSignal)) + .expect("failed to channel close signal to tcp"); + Ok(()) + }) + .expect("failed to set on_close callback for l2cap channel"); + + println!( + "{}", + format!("### Connecting to TCP {tcp_host}:{tcp_port}...").yellow() + ); + + let l2cap_channel = Arc::new(Mutex::new(Some(l2cap_channel))); + let tcp_stream = match TcpStream::connect(format!("{tcp_host}:{tcp_port}")).await { + Ok(stream) => { + println!("{}", "### Connected".green()); + Some(stream) + } + Err(err) => { + println!("{}", format!("!!! Connection failed: {err}").red()); + if let Some(mut channel) = l2cap_channel.lock().await.take() { + // Bumble might enter an invalid state if disconnection request is received from + // l2cap client before receiving a disconnection response from the same client, + // blocking this async call from returning. + // See: https://github.com/google/bumble/issues/257 + select! { + res = channel.disconnect() => { + let _ = res.map_err(|e| eprintln!("Failed to call disconnect on l2cap channel: {e}")); + }, + _ = tokio::time::sleep(Duration::from_secs(1)) => eprintln!("Timed out while calling disconnect on l2cap channel."), + } + } + None + } + }; + + match tcp_stream { + None => { + while let Some(bridge_data) = l2cap_to_tcp_rx.recv().await { + match bridge_data { + BridgeData::Data(sdu) => { + println!("{}", format!("<<< [L2CAP SDU]: {} bytes", sdu.len()).cyan()); + println!("{}", "!!! TCP socket not open, dropping".red()) + } + BridgeData::CloseSignal => break, + } + } + } + Some(tcp_stream) => { + let (tcp_reader, tcp_writer) = tcp_stream.into_split(); + + // Do tcp stuff when something happens on the l2cap channel. + let handle_l2cap_data_future = + proxy_l2cap_rx_to_tcp_tx(l2cap_to_tcp_rx, tcp_writer, l2cap_channel.clone()); + + // Do l2cap stuff when something happens on tcp. + let handle_tcp_data_future = + proxy_tcp_rx_to_l2cap_tx(tcp_reader, l2cap_channel.clone(), false); + + let (handle_l2cap_result, handle_tcp_result) = + join!(handle_l2cap_data_future, handle_tcp_data_future); + + if let Err(e) = handle_l2cap_result { + println!("!!! Error: {e}"); + } + + if let Err(e) = handle_tcp_result { + println!("!!! Error: {e}"); + } + } + }; + + Python::with_gil(|_| { + // Must hold GIL at least once while/after dropping for Python heap object to ensure + // de-allocation. + drop(l2cap_channel); + }); + + Ok(()) +} diff --git a/rust/src/cli/mod.rs b/rust/src/cli/mod.rs index 2648e12..e58f88c 100644 --- a/rust/src/cli/mod.rs +++ b/rust/src/cli/mod.rs @@ -15,3 +15,5 @@ pub(crate) mod firmware; pub(crate) mod usb; + +pub(crate) mod l2cap; diff --git a/rust/src/main.rs b/rust/src/main.rs index f8401e9..c21f4c8 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -49,6 +49,26 @@ async fn main() -> PyResult<()> { Realtek::Parse { firmware_path } => cli::firmware::rtk::parse(&firmware_path)?, }, }, + Subcommand::L2cap { + subcommand, + device_config, + transport, + psm, + l2cap_coc_max_credits, + l2cap_coc_mtu, + l2cap_coc_mps, + } => { + cli::l2cap::run( + subcommand, + device_config, + transport, + psm, + l2cap_coc_max_credits, + l2cap_coc_mtu, + l2cap_coc_mps, + ) + .await? + } Subcommand::Usb { subcommand } => match subcommand { Usb::Probe(probe) => cli::usb::probe(probe.verbose)?, }, @@ -70,6 +90,46 @@ enum Subcommand { #[clap(subcommand)] subcommand: Firmware, }, + /// L2cap client/server operations + L2cap { + #[command(subcommand)] + subcommand: L2cap, + + /// Device configuration file. + /// + /// See, for instance, `examples/device1.json` in the Python project. + #[arg(long)] + device_config: path::PathBuf, + /// Bumble transport spec. + /// + /// <https://google.github.io/bumble/transports/index.html> + #[arg(long)] + transport: String, + + /// PSM for L2CAP Connection-oriented Channel. + /// + /// Must be in the range [0, 65535]. + #[arg(long)] + psm: u16, + + /// Maximum L2CAP CoC Credits. When not specified, lets Bumble set the default. + /// + /// Must be in the range [1, 65535]. + #[arg(long, value_parser = clap::value_parser!(u16).range(1..))] + l2cap_coc_max_credits: Option<u16>, + + /// L2CAP CoC MTU. When not specified, lets Bumble set the default. + /// + /// Must be in the range [23, 65535]. + #[arg(long, value_parser = clap::value_parser!(u16).range(23..))] + l2cap_coc_mtu: Option<u16>, + + /// L2CAP CoC MPS. When not specified, lets Bumble set the default. + /// + /// Must be in the range [23, 65535]. + #[arg(long, value_parser = clap::value_parser!(u16).range(23..))] + l2cap_coc_mps: Option<u16>, + }, /// USB operations Usb { #[clap(subcommand)] @@ -166,6 +226,38 @@ impl fmt::Display for Source { } #[derive(clap::Subcommand, Debug, Clone)] +enum L2cap { + /// Starts an L2CAP server + Server { + /// TCP host that the l2cap server will connect to. + /// Data is bridged like so: + /// TCP server <-> (TCP client / **L2CAP server**) <-> (L2CAP client / TCP server) <-> TCP client + #[arg(long, default_value = "localhost")] + tcp_host: String, + /// TCP port that the server will connect to. + /// + /// Must be in the range [1, 65535]. + #[arg(long, default_value_t = 9544)] + tcp_port: u16, + }, + /// Starts an L2CAP client + Client { + /// L2cap server address that this l2cap client will connect to. + bluetooth_address: String, + /// TCP host that the l2cap client will bind to and listen for incoming TCP connections. + /// Data is bridged like so: + /// TCP client <-> (TCP server / **L2CAP client**) <-> (L2CAP server / TCP client) <-> TCP server + #[arg(long, default_value = "localhost")] + tcp_host: String, + /// TCP port that the client will connect to. + /// + /// Must be in the range [1, 65535]. + #[arg(long, default_value_t = 9543)] + tcp_port: u16, + }, +} + +#[derive(clap::Subcommand, Debug, Clone)] enum Usb { /// Probe the USB bus for Bluetooth devices Probe(Probe), diff --git a/rust/src/wrapper/device.rs b/rust/src/wrapper/device.rs index 824658d..be5e4fa 100644 --- a/rust/src/wrapper/device.rs +++ b/rust/src/wrapper/device.rs @@ -180,10 +180,10 @@ impl Device { } /// Registers an L2CAP connection oriented channel server. When a client connects to the server, - /// the `server` callback returns a handle to the established channel. When optional arguments + /// the `server` callback is passed a handle to the established channel. When optional arguments /// are not specified, the Python module specifies the defaults. pub fn register_l2cap_channel_server( - &self, + &mut self, psm: u16, server: impl Fn(Python, LeConnectionOrientedChannel) -> PyResult<()> + Send + 'static, max_credits: Option<u16>, @@ -222,7 +222,7 @@ impl Connection { /// Open an L2CAP channel using this connection. When optional arguments are not specified, the /// Python module specifies the defaults. pub async fn open_l2cap_channel( - &self, + &mut self, psm: u16, max_credits: Option<u16>, mtu: Option<u16>, @@ -244,7 +244,7 @@ impl Connection { /// Disconnect from device with provided reason. When optional arguments are not specified, the /// Python module specifies the defaults. - pub async fn disconnect(self, reason: Option<HciErrorCode>) -> PyResult<()> { + pub async fn disconnect(&mut self, reason: Option<HciErrorCode>) -> PyResult<()> { Python::with_gil(|py| { let kwargs = PyDict::new(py); kwargs.set_opt_item("reason", reason)?; diff --git a/rust/src/wrapper/l2cap.rs b/rust/src/wrapper/l2cap.rs index 694f0c4..5e0752e 100644 --- a/rust/src/wrapper/l2cap.rs +++ b/rust/src/wrapper/l2cap.rs @@ -32,7 +32,7 @@ impl LeConnectionOrientedChannel { } /// Wait for queued data to be sent on this channel. - pub async fn drain(&self) -> PyResult<()> { + pub async fn drain(&mut self) -> PyResult<()> { Python::with_gil(|py| { self.0 .call_method0(py, intern!(py, "drain")) @@ -72,7 +72,7 @@ impl LeConnectionOrientedChannel { /// `tokio::main` and `async_std::main`. /// /// For more info, see https://awestlake87.github.io/pyo3-asyncio/master/doc/pyo3_asyncio/#event-loop-references-and-contextvars. - pub async fn disconnect(self) -> PyResult<()> { + pub async fn disconnect(&mut self) -> PyResult<()> { Python::with_gil(|py| { self.0 .call_method0(py, intern!(py, "disconnect")) |