summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rust/uwb_core/src/uci/pcapng_uci_logger_factory.rs318
-rw-r--r--src/rust/uwb_core/src/uci/uci_logger_pcapng.rs6
2 files changed, 177 insertions, 147 deletions
diff --git a/src/rust/uwb_core/src/uci/pcapng_uci_logger_factory.rs b/src/rust/uwb_core/src/uci/pcapng_uci_logger_factory.rs
index d6d584b..9bcadf3 100644
--- a/src/rust/uwb_core/src/uci/pcapng_uci_logger_factory.rs
+++ b/src/rust/uwb_core/src/uci/pcapng_uci_logger_factory.rs
@@ -129,6 +129,7 @@ impl PcapngUciLoggerFactoryBuilder {
pub(crate) enum PcapngLoggerMessage {
ByteStream(Vec<u8>),
NewChip((String, u32)),
+ Flush(mpsc::UnboundedSender<bool>),
}
/// LogWriterActor performs the log writing and file operations asynchronously.
@@ -215,6 +216,22 @@ impl LogWriterActor {
break;
}
}
+ Some(PcapngLoggerMessage::Flush(flush_sender)) => {
+ if self.current_file.is_some() {
+ match self.current_file.as_mut().unwrap().flush_file() {
+ Some(_) => {
+ let _ = flush_sender.send(true);
+ }
+ None => {
+ error!("UCI log: failed flushing the file");
+ let _ = flush_sender.send(false);
+ }
+ }
+ } else {
+ error!("UCI log: current_file not present");
+ let _ = flush_sender.send(false);
+ }
+ }
None => {
debug!("UCI log: LogWriterActor dropping.");
break;
@@ -264,6 +281,19 @@ impl LogWriter {
}
}
+ pub fn flush(&mut self) -> Option<mpsc::UnboundedReceiver<bool>> {
+ let log_sender = self.log_sender.as_ref()?;
+ let (flush_sender, flush_receiver) = mpsc::unbounded_channel();
+ match log_sender.send(PcapngLoggerMessage::Flush(flush_sender)) {
+ Ok(_) => Some(flush_receiver),
+ Err(e) => {
+ error!("UCI log: LogWriterActor dead unexpectedly, sender error: {:?}", e);
+ self.log_sender = None;
+ None
+ }
+ }
+ }
+
fn send_chip(&mut self, chip_id: String, interface_id: u32) -> Option<()> {
let log_sender = self.log_sender.as_ref()?;
match log_sender.send(PcapngLoggerMessage::NewChip((chip_id, interface_id))) {
@@ -391,6 +421,7 @@ impl BufferedFile {
);
}
}
+
let file = match fs::OpenOptions::new().write(true).create_new(true).open(file_path) {
Ok(f) => f,
Err(e) => {
@@ -427,6 +458,13 @@ impl BufferedFile {
self.file.flush().ok()
}
+
+ pub fn flush_file(&mut self) -> Option<()> {
+ // Flush the buffer and then the file to storage.
+ self.flush_buffer();
+ self.file.sync_all().ok();
+ Some(())
+ }
}
/// Manual Drop implementation.
@@ -441,7 +479,7 @@ impl Drop for BufferedFile {
mod tests {
use super::*;
- use std::{fs, thread, time};
+ use std::fs;
use tempfile::tempdir;
use tokio::runtime::Builder;
@@ -476,23 +514,31 @@ mod tests {
Some(block_info)
}
+ async fn flush_loggers(loggers: Vec<UciLoggerPcapng>) {
+ for mut logger in loggers {
+ let flush_receiver = logger.flush();
+ flush_receiver.unwrap().recv().await;
+ }
+ }
+
#[test]
fn test_no_file_write() {
let dir = tempdir().unwrap();
- {
- let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
- let mut file_manager = PcapngUciLoggerFactoryBuilder::new()
- .buffer_size(1024)
- .filename_prefix("log".to_owned())
- .log_path(dir.as_ref().to_owned())
- .runtime_handle(runtime.handle().to_owned())
- .build()
- .unwrap();
- let _logger_0 = file_manager.build_logger("logger 0").unwrap();
- let _logger_1 = file_manager.build_logger("logger 1").unwrap();
- // Sleep needed to guarantee handling pending logs before runtime goes out of scope.
- thread::sleep(time::Duration::from_millis(10));
- }
+
+ let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
+ let mut file_manager = PcapngUciLoggerFactoryBuilder::new()
+ .buffer_size(1024)
+ .filename_prefix("log".to_owned())
+ .log_path(dir.as_ref().to_owned())
+ .runtime_handle(runtime.handle().to_owned())
+ .build()
+ .unwrap();
+ let logger_0 = file_manager.build_logger("logger 0").unwrap();
+ let logger_1 = file_manager.build_logger("logger 1").unwrap();
+
+ // Flush the loggers so that the files are created.
+ runtime.block_on(flush_loggers(vec![logger_0, logger_1]));
+
// Expect no log file created as no packet is received.
let log_path = dir.as_ref().to_owned().join("log.pcapng");
assert!(fs::read(log_path).is_err());
@@ -503,29 +549,22 @@ mod tests {
let dir_root = Path::new("./uwb_test_dir_123");
let dir = dir_root.join("this/path/doesnt/exist");
let log_path = dir.join("log.pcapng");
- {
- let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
- let mut file_manager = PcapngUciLoggerFactoryBuilder::new()
- .buffer_size(1024)
- .filename_prefix("log".to_owned())
- .log_path(dir.clone())
- .runtime_handle(runtime.handle().to_owned())
- .build()
- .unwrap();
- let mut logger_0 = file_manager.build_logger("logger 0").unwrap();
- let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build();
- logger_0.log_uci_control_packet(packet_0.into());
- // Sleep needed to guarantee handling pending logs before runtime goes out of scope.
- let mut timeout = 100;
- let timeout_slice = 10;
- loop {
- if log_path.exists() || timeout == 0 {
- break;
- }
- thread::sleep(time::Duration::from_millis(timeout_slice));
- timeout -= timeout_slice;
- }
- }
+
+ let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
+ let mut file_manager = PcapngUciLoggerFactoryBuilder::new()
+ .buffer_size(1024)
+ .filename_prefix("log".to_owned())
+ .log_path(dir.clone())
+ .runtime_handle(runtime.handle().to_owned())
+ .build()
+ .unwrap();
+ let mut logger_0 = file_manager.build_logger("logger 0").unwrap();
+ let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build();
+ logger_0.log_uci_control_packet(packet_0.into());
+
+ // Flush all the loggers so that the files are created and all packets written.
+ runtime.block_on(flush_loggers(vec![logger_0]));
+
// Expect the dir was created.
assert!(dir.is_dir());
// Expect the log file exists.
@@ -537,38 +576,30 @@ mod tests {
#[test]
fn test_single_file_write() {
let dir = tempdir().unwrap();
- let last_file_expected = dir.as_ref().to_owned().join("log.pcapng");
- {
- let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
- let mut file_manager = PcapngUciLoggerFactoryBuilder::new()
- .buffer_size(1024)
- .filename_prefix("log".to_owned())
- .log_path(dir.as_ref().to_owned())
- .runtime_handle(runtime.handle().to_owned())
- .build()
- .unwrap();
- let mut logger_0 = file_manager.build_logger("logger 0").unwrap();
- let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build();
- logger_0.log_uci_control_packet(packet_0.into());
- let mut logger_1 = file_manager.build_logger("logger 1").unwrap();
- let packet_1 = UciVendor_A_NotificationBuilder { opcode: 1, payload: None }.build();
- logger_1.log_uci_control_packet(packet_1.into());
- let packet_2 = UciVendor_A_NotificationBuilder { opcode: 2, payload: None }.build();
- logger_0.log_uci_control_packet(packet_2.into());
- // Sleep needed to guarantee handling pending logs before runtime goes out of scope.
- let mut timeout = 100;
- let timeout_slice = 10;
- loop {
- if last_file_expected.exists() || timeout == 0 {
- break;
- }
- thread::sleep(time::Duration::from_millis(timeout_slice));
- timeout -= timeout_slice;
- }
- }
+ let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
+ let mut file_manager = PcapngUciLoggerFactoryBuilder::new()
+ .buffer_size(1024)
+ .filename_prefix("log".to_owned())
+ .log_path(dir.as_ref().to_owned())
+ .runtime_handle(runtime.handle().to_owned())
+ .build()
+ .unwrap();
+ let mut logger_0 = file_manager.build_logger("logger 0").unwrap();
+ let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build();
+ logger_0.log_uci_control_packet(packet_0.into());
+ let mut logger_1 = file_manager.build_logger("logger 1").unwrap();
+ let packet_1 = UciVendor_A_NotificationBuilder { opcode: 1, payload: None }.build();
+ logger_1.log_uci_control_packet(packet_1.into());
+ let packet_2 = UciVendor_A_NotificationBuilder { opcode: 2, payload: None }.build();
+ logger_0.log_uci_control_packet(packet_2.into());
+
+ // Flush all the loggers so that the files are created and all packets written.
+ runtime.block_on(flush_loggers(vec![logger_0, logger_1]));
+
// Expect file log.pcapng consist of SHB->IDB(logger 0)->EPB(packet 0)->IDB(logger 1)
// ->EPB(packet 1)->EPB(packet 2)
let log_path = dir.as_ref().to_owned().join("log.pcapng");
+ assert!(log_path.is_file());
let log_content = fs::read(log_path).unwrap();
let block_info = get_block_info(log_content).unwrap();
assert_eq!(block_info.len(), 6);
@@ -583,41 +614,33 @@ mod tests {
#[test]
fn test_file_switch_epb_unfit_case() {
let dir = tempdir().unwrap();
- let last_file_expected = dir.as_ref().to_owned().join("log_2.pcapng");
- {
- let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
- let mut file_manager_140 = PcapngUciLoggerFactoryBuilder::new()
- .buffer_size(1024)
- .filename_prefix("log".to_owned())
- .log_path(dir.as_ref().to_owned())
- .file_size(140)
- .runtime_handle(runtime.handle().to_owned())
- .build()
- .unwrap();
- let mut logger_0 = file_manager_140.build_logger("logger 0").unwrap();
- let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build();
- logger_0.log_uci_control_packet(packet_0.into());
- let mut logger_1 = file_manager_140.build_logger("logger 1").unwrap();
- let packet_1 = UciVendor_A_NotificationBuilder { opcode: 1, payload: None }.build();
- logger_1.log_uci_control_packet(packet_1.into());
- let packet_2 = UciVendor_A_NotificationBuilder { opcode: 2, payload: None }.build();
- logger_0.log_uci_control_packet(packet_2.into());
- // Sleep needed to guarantee handling pending logs before runtime goes out of scope.
- let mut timeout = 100;
- let timeout_slice = 10;
- loop {
- if last_file_expected.exists() || timeout == 0 {
- break;
- }
- thread::sleep(time::Duration::from_millis(timeout_slice));
- timeout -= timeout_slice;
- }
- }
+ let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
+ let mut file_manager_140 = PcapngUciLoggerFactoryBuilder::new()
+ .buffer_size(1024)
+ .filename_prefix("log".to_owned())
+ .log_path(dir.as_ref().to_owned())
+ .file_size(140)
+ .runtime_handle(runtime.handle().to_owned())
+ .build()
+ .unwrap();
+ let mut logger_0 = file_manager_140.build_logger("logger 0").unwrap();
+ let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build();
+ logger_0.log_uci_control_packet(packet_0.into());
+ let mut logger_1 = file_manager_140.build_logger("logger 1").unwrap();
+ let packet_1 = UciVendor_A_NotificationBuilder { opcode: 1, payload: None }.build();
+ logger_1.log_uci_control_packet(packet_1.into());
+ let packet_2 = UciVendor_A_NotificationBuilder { opcode: 2, payload: None }.build();
+ logger_0.log_uci_control_packet(packet_2.into());
+
+ // Flush all the loggers so that the files are created and all packets written.
+ runtime.block_on(flush_loggers(vec![logger_0, logger_1]));
+
// Expect (Old to new):
// File 2: SHB->IDB->EPB->IDB (cannot fit next)
// File 1: SHB->IDB->IDB->EPB (cannot fit next)
// File 0: SHB->IDB->IDB->EPB
let log_path = dir.as_ref().to_owned().join("log_2.pcapng");
+ assert!(log_path.is_file());
let log_content = fs::read(log_path).unwrap();
let block_info = get_block_info(log_content).unwrap();
assert_eq!(block_info.len(), 4);
@@ -626,6 +649,7 @@ mod tests {
assert_eq!(block_info[2].0, 0x6); // EPB
assert_eq!(block_info[3].0, 0x1); // IDB
let log_path = dir.as_ref().to_owned().join("log_1.pcapng");
+ assert!(log_path.is_file());
let log_content = fs::read(log_path).unwrap();
let block_info = get_block_info(log_content).unwrap();
assert_eq!(block_info.len(), 4);
@@ -634,6 +658,7 @@ mod tests {
assert_eq!(block_info[2].0, 0x1); // IDB
assert_eq!(block_info[3].0, 0x6); // EPB
let log_path = dir.as_ref().to_owned().join("log.pcapng");
+ assert!(log_path.is_file());
let log_content = fs::read(log_path).unwrap();
let block_info = get_block_info(log_content).unwrap();
assert_eq!(block_info.len(), 4);
@@ -644,43 +669,34 @@ mod tests {
}
#[test]
- #[ignore] // Temporarily disable due to flakiness - b/287161361.
fn test_file_switch_idb_unfit_case() {
let dir = tempdir().unwrap();
- let last_file_expected = dir.as_ref().to_owned().join("log_1.pcapng");
- {
- let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
- let mut file_manager_144 = PcapngUciLoggerFactoryBuilder::new()
- .buffer_size(1024)
- .filename_prefix("log".to_owned())
- .log_path(dir.as_ref().to_owned())
- .file_size(144)
- .runtime_handle(runtime.handle().to_owned())
- .build()
- .unwrap();
- let mut logger_0 = file_manager_144.build_logger("logger 0").unwrap();
- let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build();
- logger_0.log_uci_control_packet(packet_0.into());
- let packet_2 = UciVendor_A_NotificationBuilder { opcode: 2, payload: None }.build();
- logger_0.log_uci_control_packet(packet_2.into());
- let mut logger_1 = file_manager_144.build_logger("logger 1").unwrap();
- let packet_1 = UciVendor_A_NotificationBuilder { opcode: 1, payload: None }.build();
- logger_1.log_uci_control_packet(packet_1.into());
- // Sleep needed to guarantee handling pending logs before runtime goes out of scope.
- let mut timeout = 100;
- let timeout_slice = 10;
- loop {
- if last_file_expected.exists() || timeout == 0 {
- break;
- }
- thread::sleep(time::Duration::from_millis(timeout_slice));
- timeout -= timeout_slice;
- }
- }
+ let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
+ let mut file_manager_144 = PcapngUciLoggerFactoryBuilder::new()
+ .buffer_size(1024)
+ .filename_prefix("log".to_owned())
+ .log_path(dir.as_ref().to_owned())
+ .file_size(144)
+ .runtime_handle(runtime.handle().to_owned())
+ .build()
+ .unwrap();
+ let mut logger_0 = file_manager_144.build_logger("logger 0").unwrap();
+ let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build();
+ logger_0.log_uci_control_packet(packet_0.into());
+ let packet_2 = UciVendor_A_NotificationBuilder { opcode: 2, payload: None }.build();
+ logger_0.log_uci_control_packet(packet_2.into());
+ let mut logger_1 = file_manager_144.build_logger("logger 1").unwrap();
+ let packet_1 = UciVendor_A_NotificationBuilder { opcode: 1, payload: None }.build();
+ logger_1.log_uci_control_packet(packet_1.into());
+
+ // Flush all the loggers so that the files are created and all packets written.
+ runtime.block_on(flush_loggers(vec![logger_0, logger_1]));
+
// Expect (Old to new):
// File 1: SHB->IDB->EPB->EPB (cannot fit next)
// File 0: SHB->IDB->IDB->EPB
let log_path = dir.as_ref().to_owned().join("log_1.pcapng");
+ assert!(log_path.is_file());
let log_content = fs::read(log_path).unwrap();
let block_info = get_block_info(log_content).unwrap();
assert_eq!(block_info.len(), 4);
@@ -689,6 +705,7 @@ mod tests {
assert_eq!(block_info[2].0, 0x6); // EPB
assert_eq!(block_info[3].0, 0x6); // EPB
let log_path = dir.as_ref().to_owned().join("log.pcapng");
+ assert!(log_path.is_file());
let log_content = fs::read(log_path).unwrap();
let block_info = get_block_info(log_content).unwrap();
assert_eq!(block_info.len(), 4);
@@ -702,24 +719,31 @@ mod tests {
#[test]
fn test_log_fail_safe() {
let dir = tempdir().unwrap();
- {
- let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
- let mut file_manager_96 = PcapngUciLoggerFactoryBuilder::new()
- .buffer_size(1024)
- .filename_prefix("log".to_owned())
- .log_path(dir.as_ref().to_owned())
- .file_size(96) // Fails logging, as metadata takes 100
- .runtime_handle(runtime.handle().to_owned())
- .build()
- .unwrap();
- let mut logger_0 = file_manager_96.build_logger("logger 0").unwrap();
- let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build();
- logger_0.log_uci_control_packet(packet_0.into());
- let packet_2 = UciVendor_A_NotificationBuilder { opcode: 2, payload: None }.build();
- logger_0.log_uci_control_packet(packet_2.into());
- let mut logger_1 = file_manager_96.build_logger("logger 1").unwrap();
- let packet_1 = UciVendor_A_NotificationBuilder { opcode: 1, payload: None }.build();
- logger_1.log_uci_control_packet(packet_1.into());
- }
+ let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
+ let mut file_manager_96 = PcapngUciLoggerFactoryBuilder::new()
+ .buffer_size(1024)
+ .filename_prefix("log".to_owned())
+ .log_path(dir.as_ref().to_owned())
+ .file_size(96) // Fails logging, as metadata takes 100
+ .runtime_handle(runtime.handle().to_owned())
+ .build()
+ .unwrap();
+ let mut logger_0 = file_manager_96.build_logger("logger 0").unwrap();
+ let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build();
+ logger_0.log_uci_control_packet(packet_0.into());
+ let packet_2 = UciVendor_A_NotificationBuilder { opcode: 2, payload: None }.build();
+ logger_0.log_uci_control_packet(packet_2.into());
+ let mut logger_1 = file_manager_96.build_logger("logger 1").unwrap();
+ let packet_1 = UciVendor_A_NotificationBuilder { opcode: 1, payload: None }.build();
+ logger_1.log_uci_control_packet(packet_1.into());
+
+ // Flush all the loggers so that the files are created and all packets written.
+ runtime.block_on(flush_loggers(vec![logger_0, logger_1]));
+
+ // Verify existence of the log files.
+ let log_path = dir.as_ref().to_owned().join("log_1.pcapng");
+ assert!(log_path.is_file());
+ let log_path = dir.as_ref().to_owned().join("log.pcapng");
+ assert!(log_path.is_file());
}
}
diff --git a/src/rust/uwb_core/src/uci/uci_logger_pcapng.rs b/src/rust/uwb_core/src/uci/uci_logger_pcapng.rs
index 279713b..5743913 100644
--- a/src/rust/uwb_core/src/uci/uci_logger_pcapng.rs
+++ b/src/rust/uwb_core/src/uci/uci_logger_pcapng.rs
@@ -15,6 +15,7 @@
//! Implements UciLoggerPcapng, a UciLogger with PCAPNG format log.
use log::warn;
+use tokio::sync::mpsc;
use uwb_uci_packets::{UciControlPacket, UciDataPacket};
use crate::uci::pcapng_block::{BlockBuilder, BlockOption, EnhancedPacketBlockBuilder};
@@ -39,6 +40,11 @@ impl UciLoggerPcapng {
warn!("UCI log: Logging to LogWritter failed.")
}
}
+
+ /// Flush the logs.
+ pub fn flush(&mut self) -> Option<mpsc::UnboundedReceiver<bool>> {
+ self.log_writer.flush()
+ }
}
impl UciLogger for UciLoggerPcapng {