diff options
author | Gabriel White-Vega <gwhitevega@google.com> | 2023-08-21 09:54:18 -0400 |
---|---|---|
committer | Gabriel White-Vega <gwhitevega@google.com> | 2023-09-05 16:03:02 -0400 |
commit | 5ae668bc709dad09ff3c9ce550d69d05f78435bb (patch) | |
tree | 93c79b32fe0c6cb5d894471fb8a15548e166d620 /rust | |
parent | fd4d1bcca3382bf7ae23643cc3d5803e13eac18b (diff) | |
download | bumble-5ae668bc709dad09ff3c9ce550d69d05f78435bb.tar.gz |
Port l2cap_bridge sample to Rust
- Added Rust wrappers where relevant
- Edited a couple logs in python l2cap_bridge to be more symmetrical
- Created cli subcommand for running the rustified l2cap bridge
Diffstat (limited to 'rust')
-rw-r--r-- | rust/Cargo.lock | 1 | ||||
-rw-r--r-- | rust/Cargo.toml | 9 | ||||
-rw-r--r-- | rust/examples/l2cap_bridge.rs | 645 | ||||
-rw-r--r-- | rust/src/wrapper/device.rs | 121 | ||||
-rw-r--r-- | rust/src/wrapper/hci.rs | 35 | ||||
-rw-r--r-- | rust/src/wrapper/l2cap.rs | 92 | ||||
-rw-r--r-- | rust/src/wrapper/mod.rs | 17 |
7 files changed, 911 insertions, 9 deletions
diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 024a1a2..bd168dc 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -138,6 +138,7 @@ dependencies = [ "clap 4.4.1", "directories", "env_logger", + "futures", "hex", "itertools", "lazy_static", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index c12709f..58ae964 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -28,11 +28,12 @@ thiserror = "1.0.41" anyhow = { version = "1.0.71", optional = true } clap = { version = "4.3.3", features = ["derive"], optional = true } directories = { version = "5.0.1", optional = true } +env_logger = { version = "0.10.0", optional = true } +futures = { version = "0.3.28", optional = true } +log = { version = "0.4.19", optional = true } owo-colors = { version = "3.5.0", optional = true } reqwest = { version = "0.11.20", features = ["blocking"], optional = true } rusb = { version = "0.9.2", optional = true } -log = { version = "0.4.19", optional = true } -env_logger = { version = "0.10.0", optional = true } [dev-dependencies] tokio = { version = "1.28.2", features = ["full"] } @@ -72,5 +73,5 @@ anyhow = ["pyo3/anyhow"] pyo3-asyncio-attributes = ["pyo3-asyncio/attributes"] bumble-codegen = ["dep:anyhow"] # separate feature for CLI so that dependencies don't spend time building these -bumble-tools = ["dep:clap", "anyhow", "dep:anyhow", "dep:directories", "pyo3-asyncio-attributes", "dep:owo-colors", "dep:reqwest", "dep:rusb", "dep:log", "dep:env_logger"] -default = []
\ No newline at end of file +bumble-tools = ["dep:clap", "anyhow", "dep:anyhow", "dep:directories", "pyo3-asyncio-attributes", "dep:owo-colors", "dep:reqwest", "dep:rusb", "dep:log", "dep:env_logger", "dep:futures"] +default = [] diff --git a/rust/examples/l2cap_bridge.rs b/rust/examples/l2cap_bridge.rs new file mode 100644 index 0000000..57aebe5 --- /dev/null +++ b/rust/examples/l2cap_bridge.rs @@ -0,0 +1,645 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Rust version of the Python `l2cap_bridge.py` found under the `apps` folder. + +use anyhow::anyhow; +use bumble::wrapper::{ + device::Device, + l2cap::LeConnectionOrientedChannel, + logging::{bumble_env_logging_level, py_logging_basic_config}, + transport::Transport, +}; +use clap::Parser as _; +use owo_colors::OwoColorize; +use pyo3::{PyObject, PyResult, Python}; +use std::{future::Future, path, sync::Arc}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::tcp::{OwnedReadHalf, OwnedWriteHalf}, + sync::{mpsc::Receiver, Mutex}, +}; + +#[pyo3_asyncio::tokio::main] +async fn main() -> PyResult<()> { + env_logger::builder() + .filter_level(log::LevelFilter::Info) + .init(); + + py_logging_basic_config(bumble_env_logging_level("WARNING"))?; + + let cli = Cli::parse(); + + println!("<<< connecting to HCI..."); + let transport = Transport::open(cli.hci_transport).await?; + println!("<<< connected"); + + let mut device = Device::from_config_file_with_hci( + &cli.device_config, + transport.source()?, + transport.sink()?, + )?; + + device.power_on().await?; + + match cli.subcommand { + Subcommand::Server { tcp_host, tcp_port } => { + let args = server_bridge::Args { + psm: cli.psm, + max_credits: cli.l2cap_coc_max_credits, + mtu: cli.l2cap_coc_mtu, + mps: cli.l2cap_coc_mps, + tcp_host, + tcp_port, + }; + + server_bridge::start(&args, &mut device).await? + } + Subcommand::Client { + bluetooth_address, + tcp_host, + tcp_port, + } => { + let args = client_bridge::Args { + psm: cli.psm, + max_credits: cli.l2cap_coc_max_credits, + mtu: cli.l2cap_coc_mtu, + mps: cli.l2cap_coc_mps, + bluetooth_address, + tcp_host, + tcp_port, + }; + + client_bridge::start(&args, &mut device).await? + } + }; + + // wait until user kills the process + tokio::signal::ctrl_c().await?; + + Ok(()) +} + +/// L2CAP CoC server bridge: waits for a peer to connect an L2CAP CoC channel +/// on a specified PSM. When the connection is made, the bridge connects a TCP +/// socket to a remote host and bridges the data in both directions, with flow +/// control. +/// When the L2CAP CoC channel is closed, the bridge disconnects the TCP socket +/// and waits for a new L2CAP CoC channel to be connected. +/// When the TCP connection is closed by the TCP server, XXXX +mod server_bridge { + use crate::{ + proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, run_future_with_current_task_locals, + BridgeData, + }; + use bumble::wrapper::{device::Device, hci::HciConstant, l2cap::LeConnectionOrientedChannel}; + use futures::executor::block_on; + use owo_colors::OwoColorize; + use pyo3::{PyResult, Python}; + use std::{sync::Arc, time::Duration}; + use tokio::{ + join, + net::TcpStream, + select, + sync::{mpsc, Mutex}, + }; + + pub struct Args { + pub psm: u16, + pub max_credits: u16, + pub mtu: u16, + pub mps: u16, + pub tcp_host: String, + pub tcp_port: u16, + } + + pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> { + let host = args.tcp_host.clone(); + let port = args.tcp_port; + device.register_l2cap_channel_server( + args.psm, + move |_py, l2cap_channel| { + let channel_info = match l2cap_channel.debug_string() { + Ok(info_string) => info_string, + Err(py_err) => format!("failed to get l2cap channel info ({})", py_err), + }; + println!("{} {channel_info}", "*** L2CAP channel:".cyan()); + + let host = host.clone(); + // Ensure Python event loop is available to l2cap `disconnect` + let _ = run_future_with_current_task_locals(handle_connection_oriented_channel( + l2cap_channel, + host, + port, + )); + Ok(()) + }, + Some(args.max_credits), + Some(args.mtu), + Some(args.mps), + )?; + + println!( + "{}", + format!("### Listening for CoC connection on PSM {}", args.psm).yellow() + ); + + device.on_connection(|_py, mut connection| { + let connection_info = match connection.debug_string() { + Ok(info_string) => info_string, + Err(py_err) => format!("failed to get connection info ({})", py_err), + }; + println!( + "{} {}", + "@@@ Bluetooth connection: ".green(), + connection_info, + ); + connection.on_disconnection(|_py, reason| { + let disconnection_info = match HciConstant::error_name(reason) { + Ok(info_string) => info_string, + Err(py_err) => format!("failed to get disconnection error name ({})", py_err), + }; + println!( + "{} {}", + "@@@ Bluetooth disconnection: ".red(), + disconnection_info, + ); + Ok(()) + })?; + Ok(()) + })?; + + device.start_advertising(false).await?; + + Ok(()) + } + + async fn handle_connection_oriented_channel( + mut l2cap_channel: LeConnectionOrientedChannel, + tcp_host: String, + tcp_port: u16, + ) -> PyResult<()> { + let (l2cap_to_tcp_tx, mut l2cap_to_tcp_rx) = mpsc::channel::<BridgeData>(10); + + // Set callback (`set_sink`) for when l2cap data is received. + let l2cap_to_tcp_tx_clone = l2cap_to_tcp_tx.clone(); + l2cap_channel + .set_sink(move |_py, sdu| { + block_on(l2cap_to_tcp_tx_clone.send(BridgeData::Data(sdu.into()))) + .expect("failed to channel data to tcp"); + Ok(()) + }) + .expect("failed to set sink for l2cap connection"); + + // Set l2cap callback for when the channel is closed. + l2cap_channel + .on_close(move |_py| { + println!("{}", "*** L2CAP channel closed".red()); + block_on(l2cap_to_tcp_tx.send(BridgeData::CloseSignal)) + .expect("failed to channel close signal to tcp"); + Ok(()) + }) + .expect("failed to set on_close callback for l2cap channel"); + + println!( + "{}", + format!("### Connecting to TCP {tcp_host}:{tcp_port}...").yellow() + ); + + let l2cap_channel = Arc::new(Mutex::new(Some(l2cap_channel))); + let tcp_stream = match TcpStream::connect(format!("{tcp_host}:{tcp_port}")).await { + Ok(stream) => { + println!("{}", "### Connected".green()); + Some(stream) + } + Err(err) => { + println!("{}", format!("!!! Connection failed: {err}").red()); + if let Some(channel) = l2cap_channel.lock().await.take() { + // Bumble might enter an invalid state if disconnection request is received from + // l2cap client before receiving a disconnection response from the same client, + // blocking this async call from returning. + // See: https://github.com/google/bumble/issues/257 + select! { + res = channel.disconnect() => { + let _ = res.map_err(|e| eprintln!("Failed to call disconnect on l2cap channel: {e}")); + }, + _ = tokio::time::sleep(Duration::from_secs(1)) => eprintln!("Timed out while calling disconnect on l2cap channel."), + } + } + None + } + }; + + match tcp_stream { + None => { + while let Some(bridge_data) = l2cap_to_tcp_rx.recv().await { + match bridge_data { + BridgeData::Data(sdu) => { + println!("{}", format!("<<< [L2CAP SDU]: {} bytes", sdu.len()).cyan()); + println!("{}", "!!! TCP socket not open, dropping".red()) + } + BridgeData::CloseSignal => break, + } + } + } + Some(tcp_stream) => { + let (tcp_reader, tcp_writer) = tcp_stream.into_split(); + + // Do tcp stuff when something happens on the l2cap channel. + let handle_l2cap_data_future = + proxy_l2cap_rx_to_tcp_tx(l2cap_to_tcp_rx, tcp_writer, l2cap_channel.clone()); + + // Do l2cap stuff when something happens on tcp. + let handle_tcp_data_future = + proxy_tcp_rx_to_l2cap_tx(tcp_reader, l2cap_channel.clone(), false); + + let (handle_l2cap_result, handle_tcp_result) = + join!(handle_l2cap_data_future, handle_tcp_data_future); + + if let Err(e) = handle_l2cap_result { + println!("!!! Error: {e}"); + } + + if let Err(e) = handle_tcp_result { + println!("!!! Error: {e}"); + } + } + }; + + Python::with_gil(|_| { + // Must hold GIL at least once while/after dropping for Python heap object to ensure + // de-allocation. + drop(l2cap_channel); + }); + + Ok(()) + } +} + +/// L2CAP CoC client bridge: connects to a BLE device, then waits for an inbound +/// TCP connection on a specified port number. When a TCP client connects, an +/// L2CAP CoC channel connection to the BLE device is established, and the data +/// is bridged in both directions, with flow control. +/// When the TCP connection is closed by the client, the L2CAP CoC channel is +/// disconnected, but the connection to the BLE device remains, ready for a new +/// TCP client to connect. +/// When the L2CAP CoC channel is closed, XXXX +mod client_bridge { + use crate::{ + proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, run_future_with_current_task_locals, + BridgeData, + }; + use bumble::wrapper::{ + device::{Connection, Device}, + hci::HciConstant, + }; + use futures::executor::block_on; + use owo_colors::OwoColorize; + use pyo3::{PyResult, Python}; + use std::{net::SocketAddr, sync::Arc}; + use tokio::{ + join, + net::{TcpListener, TcpStream}, + sync::{mpsc, Mutex}, + }; + + pub struct Args { + pub psm: u16, + pub max_credits: u16, + pub mtu: u16, + pub mps: u16, + pub bluetooth_address: String, + pub tcp_host: String, + pub tcp_port: u16, + } + + pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> { + println!( + "{}", + format!("### Connecting to {}...", args.bluetooth_address).yellow() + ); + let mut ble_connection = device.connect(&args.bluetooth_address).await?; + ble_connection.on_disconnection(|_py, reason| { + let disconnection_info = match HciConstant::error_name(reason) { + Ok(info_string) => info_string, + Err(py_err) => format!("failed to get disconnection error name ({})", py_err), + }; + println!( + "{} {}", + "@@@ Bluetooth disconnection: ".red(), + disconnection_info, + ); + Ok(()) + })?; + + // Start the TCP server. + let listener = TcpListener::bind(format!("{}:{}", args.tcp_host, args.tcp_port)) + .await + .expect("failed to bind tcp to address"); + println!( + "{}", + format!( + "### Listening for TCP connections on port {}", + args.tcp_port + ) + .magenta() + ); + + let psm = args.psm; + let max_credits = args.max_credits; + let mtu = args.mtu; + let mps = args.mps; + let ble_connection = Arc::new(ble_connection); + // Ensure Python event loop is available to l2cap `disconnect` + let _ = run_future_with_current_task_locals(async move { + while let Ok((tcp_stream, addr)) = listener.accept().await { + let ble_connection = ble_connection.clone(); + let _ = run_future_with_current_task_locals(handle_tcp_connection( + ble_connection, + tcp_stream, + addr, + psm, + max_credits, + mtu, + mps, + )); + } + Ok(()) + }); + Ok(()) + } + + async fn handle_tcp_connection( + ble_connection: Arc<Connection>, + tcp_stream: TcpStream, + addr: SocketAddr, + psm: u16, + max_credits: u16, + mtu: u16, + mps: u16, + ) -> PyResult<()> { + println!("{}", format!("<<< TCP connection from {}", addr).magenta()); + println!( + "{}", + format!(">>> Opening L2CAP channel on PSM = {}", psm).yellow() + ); + + let mut l2cap_channel = match ble_connection + .open_l2cap_channel(psm, Some(max_credits), Some(mtu), Some(mps)) + .await + { + Ok(channel) => channel, + Err(e) => { + println!("{}", format!("!!! Connection failed: {e}").red()); + // TCP stream will get dropped after returning, automatically shutting it down. + return Err(e); + } + }; + let channel_info = match l2cap_channel.debug_string() { + Ok(info_string) => info_string, + Err(py_err) => format!("failed to get l2cap channel info ({})", py_err), + }; + + println!("{}{}", "*** L2CAP channel: ".cyan(), channel_info); + + let (l2cap_to_tcp_tx, l2cap_to_tcp_rx) = mpsc::channel::<BridgeData>(10); + + // Set l2cap callback (`set_sink`) for when data is received. + let l2cap_to_tcp_tx_clone = l2cap_to_tcp_tx.clone(); + l2cap_channel + .set_sink(move |_py, sdu| { + block_on(l2cap_to_tcp_tx_clone.send(BridgeData::Data(sdu.into()))) + .expect("failed to channel data to tcp"); + Ok(()) + }) + .expect("failed to set sink for l2cap connection"); + + // Set l2cap callback for when the channel is closed. + l2cap_channel + .on_close(move |_py| { + println!("{}", "*** L2CAP channel closed".red()); + block_on(l2cap_to_tcp_tx.send(BridgeData::CloseSignal)) + .expect("failed to channel close signal to tcp"); + Ok(()) + }) + .expect("failed to set on_close callback for l2cap channel"); + + let l2cap_channel = Arc::new(Mutex::new(Some(l2cap_channel))); + let (tcp_reader, tcp_writer) = tcp_stream.into_split(); + + // Do tcp stuff when something happens on the l2cap channel. + let handle_l2cap_data_future = + proxy_l2cap_rx_to_tcp_tx(l2cap_to_tcp_rx, tcp_writer, l2cap_channel.clone()); + + // Do l2cap stuff when something happens on tcp. + let handle_tcp_data_future = + proxy_tcp_rx_to_l2cap_tx(tcp_reader, l2cap_channel.clone(), true); + + let (handle_l2cap_result, handle_tcp_result) = + join!(handle_l2cap_data_future, handle_tcp_data_future); + + if let Err(e) = handle_l2cap_result { + println!("!!! Error: {e}"); + } + + if let Err(e) = handle_tcp_result { + println!("!!! Error: {e}"); + } + + Python::with_gil(|_| { + // Must hold GIL at least once while/after dropping for Python heap object to ensure + // de-allocation. + drop(l2cap_channel); + }); + + Ok(()) + } +} + +/// Used for channeling data from Python callbacks to a Rust consumer. +enum BridgeData { + Data(Vec<u8>), + CloseSignal, +} + +async fn proxy_l2cap_rx_to_tcp_tx( + mut l2cap_data_receiver: Receiver<BridgeData>, + mut tcp_writer: OwnedWriteHalf, + l2cap_channel: Arc<Mutex<Option<LeConnectionOrientedChannel>>>, +) -> anyhow::Result<()> { + while let Some(bridge_data) = l2cap_data_receiver.recv().await { + match bridge_data { + BridgeData::Data(sdu) => { + println!("{}", format!("<<< [L2CAP SDU]: {} bytes", sdu.len()).cyan()); + tcp_writer + .write_all(sdu.as_ref()) + .await + .map_err(|_| anyhow!("Failed to write to tcp stream"))?; + tcp_writer + .flush() + .await + .map_err(|_| anyhow!("Failed to flush tcp stream"))?; + } + BridgeData::CloseSignal => { + l2cap_channel.lock().await.take(); + tcp_writer + .shutdown() + .await + .map_err(|_| anyhow!("Failed to shut down write half of tcp stream"))?; + return Ok(()); + } + } + } + Ok(()) +} + +async fn proxy_tcp_rx_to_l2cap_tx( + mut tcp_reader: OwnedReadHalf, + l2cap_channel: Arc<Mutex<Option<LeConnectionOrientedChannel>>>, + drain_l2cap_after_write: bool, +) -> PyResult<()> { + let mut buf = [0; 4096]; + loop { + match tcp_reader.read(&mut buf).await { + Ok(len) => { + if len == 0 { + println!("{}", "!!! End of stream".yellow()); + + if let Some(channel) = l2cap_channel.lock().await.take() { + channel.disconnect().await.map_err(|e| { + eprintln!("Failed to call disconnect on l2cap channel: {e}"); + e + })?; + } + return Ok(()); + } + + println!("{}", format!("<<< [TCP DATA]: {len} bytes").blue()); + match l2cap_channel.lock().await.as_mut() { + None => { + println!("{}", "!!! L2CAP channel not connected, dropping".red()); + return Ok(()); + } + Some(channel) => { + channel.write(&buf[..len])?; + if drain_l2cap_after_write { + channel.drain().await?; + } + } + } + } + Err(e) => { + println!("{}", format!("!!! TCP connection lost: {}", e).red()); + if let Some(channel) = l2cap_channel.lock().await.take() { + let _ = channel.disconnect().await.map_err(|e| { + eprintln!("Failed to call disconnect on l2cap channel: {e}"); + }); + } + return Err(e.into()); + } + } + } +} + +/// Copies the current thread's task locals into a Python "awaitable" and encapsulates it in a Rust +/// future, running it as a Python Task. +/// +/// If the calling thread has a Python event loop, then the Python Task will too. +pub fn run_future_with_current_task_locals<F>( + fut: F, +) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send> +where + F: Future<Output = PyResult<()>> + Send + 'static, +{ + Python::with_gil(|py| { + let locals = pyo3_asyncio::tokio::get_current_locals(py)?; + let future = pyo3_asyncio::tokio::scope(locals.clone(), fut); + pyo3_asyncio::tokio::future_into_py_with_locals(py, locals.clone(), future) + .and_then(pyo3_asyncio::tokio::into_future) + }) +} + +#[derive(clap::Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + #[command(subcommand)] + pub(crate) subcommand: Subcommand, + + /// Device configuration file. + /// + /// See, for instance, `examples/device1.json` in the Python project. + #[arg(long)] + device_config: path::PathBuf, + /// Bumble transport spec. + /// + /// <https://google.github.io/bumble/transports/index.html> + #[arg(long)] + hci_transport: String, + + /// PSM for L2CAP Connection-oriented Channel. + /// + /// Must be in the range [0, 65535]. + #[arg(long, default_value_t = 1234)] + psm: u16, + + /// Maximum L2CAP CoC Credits. + /// + /// Must be in the range [1, 65535]. + #[arg(long, default_value_t = 128, value_parser = clap::value_parser!(u16).range(1..))] + l2cap_coc_max_credits: u16, + + /// L2CAP CoC MTU + /// + /// Must be in the range [23, 65535]. + #[arg(long, default_value_t = 1022, value_parser = clap::value_parser!(u16).range(23..))] + l2cap_coc_mtu: u16, + + /// L2CAP CoC MPS + /// + /// Must be in the range [23, 65535]. + #[arg(long, default_value_t = 1024, value_parser = clap::value_parser!(u16).range(23..))] + l2cap_coc_mps: u16, +} + +#[derive(clap::Subcommand)] +enum Subcommand { + /// Starts an L2CAP server + Server { + /// TCP host that the l2cap server will connect to. + /// Data is bridged like so: + /// TCP server <-> (TCP client / **L2CAP server**) <-> (L2CAP client / TCP server) <-> TCP client + #[arg(long, default_value = "localhost")] + tcp_host: String, + /// TCP port that the server will connect to. + /// + /// Must be in the range [1, 65535]. + #[arg(long, default_value_t = 9544)] + tcp_port: u16, + }, + /// Starts an L2CAP client + Client { + /// L2cap server address that this l2cap client will connect to. + bluetooth_address: String, + /// TCP host that the l2cap client will bind to and listen for incoming TCP connections. + /// Data is bridged like so: + /// TCP client <-> (TCP server / **L2CAP client**) <-> (L2CAP server / TCP client) <-> TCP Client + #[arg(long, default_value = "localhost")] + tcp_host: String, + /// TCP port that the client will connect to. + /// + /// Must be in the range [1, 65535]. + #[arg(long, default_value_t = 9543)] + tcp_port: u16, + }, +} diff --git a/rust/src/wrapper/device.rs b/rust/src/wrapper/device.rs index 11770a3..824658d 100644 --- a/rust/src/wrapper/device.rs +++ b/rust/src/wrapper/device.rs @@ -19,16 +19,17 @@ use crate::{ wrapper::{ core::AdvertisingData, gatt_client::{ProfileServiceProxy, ServiceProxy}, - hci::Address, + hci::{Address, HciErrorCode}, host::Host, + l2cap::LeConnectionOrientedChannel, transport::{Sink, Source}, - ClosureCallback, PyObjectExt, + ClosureCallback, PyDictExt, PyObjectExt, }, }; use pyo3::{ intern, types::{PyDict, PyModule}, - PyObject, PyResult, Python, ToPyObject, + IntoPy, PyObject, PyResult, Python, ToPyObject, }; use pyo3_asyncio::tokio::into_future; use std::path; @@ -87,6 +88,22 @@ impl Device { .map(Connection) } + /// Register a callback to be called for each incoming connection. + pub fn on_connection( + &mut self, + callback: impl Fn(Python, Connection) -> PyResult<()> + Send + 'static, + ) -> PyResult<()> { + let boxed = ClosureCallback::new(move |py, args, _kwargs| { + callback(py, Connection(args.get_item(0)?.into())) + }); + + Python::with_gil(|py| { + self.0 + .call_method1(py, intern!(py, "add_listener"), ("connection", boxed)) + }) + .map(|_| ()) + } + /// Start scanning pub async fn start_scanning(&self, filter_duplicates: bool) -> PyResult<()> { Python::with_gil(|py| { @@ -161,11 +178,109 @@ impl Device { .await .map(|_| ()) } + + /// Registers an L2CAP connection oriented channel server. When a client connects to the server, + /// the `server` callback returns a handle to the established channel. When optional arguments + /// are not specified, the Python module specifies the defaults. + pub fn register_l2cap_channel_server( + &self, + psm: u16, + server: impl Fn(Python, LeConnectionOrientedChannel) -> PyResult<()> + Send + 'static, + max_credits: Option<u16>, + mtu: Option<u16>, + mps: Option<u16>, + ) -> PyResult<()> { + Python::with_gil(|py| { + let boxed = ClosureCallback::new(move |py, args, _kwargs| { + server( + py, + LeConnectionOrientedChannel::from(args.get_item(0)?.into()), + ) + }); + + let kwargs = PyDict::new(py); + kwargs.set_item("psm", psm)?; + kwargs.set_item("server", boxed.into_py(py))?; + kwargs.set_opt_item("max_credits", max_credits)?; + kwargs.set_opt_item("mtu", mtu)?; + kwargs.set_opt_item("mps", mps)?; + self.0.call_method( + py, + intern!(py, "register_l2cap_channel_server"), + (), + Some(kwargs), + ) + })?; + Ok(()) + } } /// A connection to a remote device. pub struct Connection(PyObject); +impl Connection { + /// Open an L2CAP channel using this connection. When optional arguments are not specified, the + /// Python module specifies the defaults. + pub async fn open_l2cap_channel( + &self, + psm: u16, + max_credits: Option<u16>, + mtu: Option<u16>, + mps: Option<u16>, + ) -> PyResult<LeConnectionOrientedChannel> { + Python::with_gil(|py| { + let kwargs = PyDict::new(py); + kwargs.set_item("psm", psm)?; + kwargs.set_opt_item("max_credits", max_credits)?; + kwargs.set_opt_item("mtu", mtu)?; + kwargs.set_opt_item("mps", mps)?; + self.0 + .call_method(py, intern!(py, "open_l2cap_channel"), (), Some(kwargs)) + .and_then(|coroutine| pyo3_asyncio::tokio::into_future(coroutine.as_ref(py))) + })? + .await + .map(LeConnectionOrientedChannel::from) + } + + /// Disconnect from device with provided reason. When optional arguments are not specified, the + /// Python module specifies the defaults. + pub async fn disconnect(self, reason: Option<HciErrorCode>) -> PyResult<()> { + Python::with_gil(|py| { + let kwargs = PyDict::new(py); + kwargs.set_opt_item("reason", reason)?; + self.0 + .call_method(py, intern!(py, "disconnect"), (), Some(kwargs)) + .and_then(|coroutine| pyo3_asyncio::tokio::into_future(coroutine.as_ref(py))) + })? + .await + .map(|_| ()) + } + + /// Register a callback to be called on disconnection. + pub fn on_disconnection( + &mut self, + callback: impl Fn(Python, HciErrorCode) -> PyResult<()> + Send + 'static, + ) -> PyResult<()> { + let boxed = ClosureCallback::new(move |py, args, _kwargs| { + callback(py, args.get_item(0)?.extract()?) + }); + + Python::with_gil(|py| { + self.0 + .call_method1(py, intern!(py, "add_listener"), ("disconnection", boxed)) + }) + .map(|_| ()) + } + + /// Returns some information about the connection as a [String]. + pub fn debug_string(&self) -> PyResult<String> { + Python::with_gil(|py| { + let str_obj = self.0.call_method0(py, intern!(py, "__str__"))?; + str_obj.gil_ref(py).extract() + }) + } +} + /// The other end of a connection pub struct Peer(PyObject); diff --git a/rust/src/wrapper/hci.rs b/rust/src/wrapper/hci.rs index 48f7dc1..41dcbf3 100644 --- a/rust/src/wrapper/hci.rs +++ b/rust/src/wrapper/hci.rs @@ -15,7 +15,40 @@ //! HCI use itertools::Itertools as _; -use pyo3::{exceptions::PyException, intern, types::PyModule, PyErr, PyObject, PyResult, Python}; +use pyo3::{ + exceptions::PyException, intern, types::PyModule, FromPyObject, PyAny, PyErr, PyObject, + PyResult, Python, ToPyObject, +}; + +/// HCI error code. +pub struct HciErrorCode(u8); + +impl<'source> FromPyObject<'source> for HciErrorCode { + fn extract(ob: &'source PyAny) -> PyResult<Self> { + Ok(HciErrorCode(ob.extract()?)) + } +} + +impl ToPyObject for HciErrorCode { + fn to_object(&self, py: Python<'_>) -> PyObject { + self.0.to_object(py) + } +} + +/// Provides helpers for interacting with HCI +pub struct HciConstant; + +impl HciConstant { + /// Human-readable error name + pub fn error_name(status: HciErrorCode) -> PyResult<String> { + Python::with_gil(|py| { + PyModule::import(py, intern!(py, "bumble.hci"))? + .getattr(intern!(py, "HCI_Constant"))? + .call_method1(intern!(py, "error_name"), (status.0,))? + .extract() + }) + } +} /// A Bluetooth address pub struct Address(pub(crate) PyObject); diff --git a/rust/src/wrapper/l2cap.rs b/rust/src/wrapper/l2cap.rs new file mode 100644 index 0000000..694f0c4 --- /dev/null +++ b/rust/src/wrapper/l2cap.rs @@ -0,0 +1,92 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! L2CAP + +use crate::wrapper::{ClosureCallback, PyObjectExt}; +use pyo3::{intern, PyObject, PyResult, Python}; + +/// L2CAP connection-oriented channel +pub struct LeConnectionOrientedChannel(PyObject); + +impl LeConnectionOrientedChannel { + /// Create a LeConnectionOrientedChannel that wraps the provided obj. + pub(crate) fn from(obj: PyObject) -> Self { + Self(obj) + } + + /// Queues data to be automatically sent across this channel. + pub fn write(&mut self, data: &[u8]) -> PyResult<()> { + Python::with_gil(|py| self.0.call_method1(py, intern!(py, "write"), (data,))).map(|_| ()) + } + + /// Wait for queued data to be sent on this channel. + pub async fn drain(&self) -> PyResult<()> { + Python::with_gil(|py| { + self.0 + .call_method0(py, intern!(py, "drain")) + .and_then(|coroutine| pyo3_asyncio::tokio::into_future(coroutine.as_ref(py))) + })? + .await + .map(|_| ()) + } + + /// Register a callback to be called when the channel is closed. + pub fn on_close( + &mut self, + callback: impl Fn(Python) -> PyResult<()> + Send + 'static, + ) -> PyResult<()> { + let boxed = ClosureCallback::new(move |py, _args, _kwargs| callback(py)); + + Python::with_gil(|py| { + self.0 + .call_method1(py, intern!(py, "add_listener"), ("close", boxed)) + }) + .map(|_| ()) + } + + /// Register a callback to be called when the channel receives data. + pub fn set_sink( + &mut self, + callback: impl Fn(Python, &[u8]) -> PyResult<()> + Send + 'static, + ) -> PyResult<()> { + let boxed = ClosureCallback::new(move |py, args, _kwargs| { + callback(py, args.get_item(0)?.extract()?) + }); + Python::with_gil(|py| self.0.setattr(py, intern!(py, "sink"), boxed)).map(|_| ()) + } + + /// Disconnect the l2cap channel. + /// Must be called from a thread with a Python event loop, which should be true on + /// `tokio::main` and `async_std::main`. + /// + /// For more info, see https://awestlake87.github.io/pyo3-asyncio/master/doc/pyo3_asyncio/#event-loop-references-and-contextvars. + pub async fn disconnect(self) -> PyResult<()> { + Python::with_gil(|py| { + self.0 + .call_method0(py, intern!(py, "disconnect")) + .and_then(|coroutine| pyo3_asyncio::tokio::into_future(coroutine.as_ref(py))) + })? + .await + .map(|_| ()) + } + + /// Returns some information about the channel as a [String]. + pub fn debug_string(&self) -> PyResult<String> { + Python::with_gil(|py| { + let str_obj = self.0.call_method0(py, intern!(py, "__str__"))?; + str_obj.gil_ref(py).extract() + }) + } +} diff --git a/rust/src/wrapper/mod.rs b/rust/src/wrapper/mod.rs index cb0730b..94ac15a 100644 --- a/rust/src/wrapper/mod.rs +++ b/rust/src/wrapper/mod.rs @@ -31,11 +31,11 @@ pub use pyo3_asyncio; pub mod assigned_numbers; pub mod core; pub mod device; - pub mod drivers; pub mod gatt_client; pub mod hci; pub mod host; +pub mod l2cap; pub mod logging; pub mod profile; pub mod transport; @@ -71,6 +71,21 @@ impl PyObjectExt for PyObject { } } +/// Convenience extensions to [PyDict] +pub trait PyDictExt { + /// Set item in dict only if value is Some, otherwise do nothing. + fn set_opt_item<K: ToPyObject, V: ToPyObject>(&self, key: K, value: Option<V>) -> PyResult<()>; +} + +impl PyDictExt for PyDict { + fn set_opt_item<K: ToPyObject, V: ToPyObject>(&self, key: K, value: Option<V>) -> PyResult<()> { + if let Some(value) = value { + self.set_item(key, value)? + } + Ok(()) + } +} + /// Wrapper to make Rust closures ([Fn] implementations) callable from Python. /// /// The Python callable form returns a Python `None`. |