diff options
Diffstat (limited to 'src/rust')
-rw-r--r-- | src/rust/uwb_core/src/uci/pcapng_uci_logger_factory.rs | 318 | ||||
-rw-r--r-- | src/rust/uwb_core/src/uci/uci_logger_pcapng.rs | 6 |
2 files changed, 177 insertions, 147 deletions
diff --git a/src/rust/uwb_core/src/uci/pcapng_uci_logger_factory.rs b/src/rust/uwb_core/src/uci/pcapng_uci_logger_factory.rs index d6d584b..9bcadf3 100644 --- a/src/rust/uwb_core/src/uci/pcapng_uci_logger_factory.rs +++ b/src/rust/uwb_core/src/uci/pcapng_uci_logger_factory.rs @@ -129,6 +129,7 @@ impl PcapngUciLoggerFactoryBuilder { pub(crate) enum PcapngLoggerMessage { ByteStream(Vec<u8>), NewChip((String, u32)), + Flush(mpsc::UnboundedSender<bool>), } /// LogWriterActor performs the log writing and file operations asynchronously. @@ -215,6 +216,22 @@ impl LogWriterActor { break; } } + Some(PcapngLoggerMessage::Flush(flush_sender)) => { + if self.current_file.is_some() { + match self.current_file.as_mut().unwrap().flush_file() { + Some(_) => { + let _ = flush_sender.send(true); + } + None => { + error!("UCI log: failed flushing the file"); + let _ = flush_sender.send(false); + } + } + } else { + error!("UCI log: current_file not present"); + let _ = flush_sender.send(false); + } + } None => { debug!("UCI log: LogWriterActor dropping."); break; @@ -264,6 +281,19 @@ impl LogWriter { } } + pub fn flush(&mut self) -> Option<mpsc::UnboundedReceiver<bool>> { + let log_sender = self.log_sender.as_ref()?; + let (flush_sender, flush_receiver) = mpsc::unbounded_channel(); + match log_sender.send(PcapngLoggerMessage::Flush(flush_sender)) { + Ok(_) => Some(flush_receiver), + Err(e) => { + error!("UCI log: LogWriterActor dead unexpectedly, sender error: {:?}", e); + self.log_sender = None; + None + } + } + } + fn send_chip(&mut self, chip_id: String, interface_id: u32) -> Option<()> { let log_sender = self.log_sender.as_ref()?; match log_sender.send(PcapngLoggerMessage::NewChip((chip_id, interface_id))) { @@ -391,6 +421,7 @@ impl BufferedFile { ); } } + let file = match fs::OpenOptions::new().write(true).create_new(true).open(file_path) { Ok(f) => f, Err(e) => { @@ -427,6 +458,13 @@ impl BufferedFile { self.file.flush().ok() } + + pub fn flush_file(&mut self) -> Option<()> { + // Flush the buffer and then the file to storage. + self.flush_buffer(); + self.file.sync_all().ok(); + Some(()) + } } /// Manual Drop implementation. @@ -441,7 +479,7 @@ impl Drop for BufferedFile { mod tests { use super::*; - use std::{fs, thread, time}; + use std::fs; use tempfile::tempdir; use tokio::runtime::Builder; @@ -476,23 +514,31 @@ mod tests { Some(block_info) } + async fn flush_loggers(loggers: Vec<UciLoggerPcapng>) { + for mut logger in loggers { + let flush_receiver = logger.flush(); + flush_receiver.unwrap().recv().await; + } + } + #[test] fn test_no_file_write() { let dir = tempdir().unwrap(); - { - let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); - let mut file_manager = PcapngUciLoggerFactoryBuilder::new() - .buffer_size(1024) - .filename_prefix("log".to_owned()) - .log_path(dir.as_ref().to_owned()) - .runtime_handle(runtime.handle().to_owned()) - .build() - .unwrap(); - let _logger_0 = file_manager.build_logger("logger 0").unwrap(); - let _logger_1 = file_manager.build_logger("logger 1").unwrap(); - // Sleep needed to guarantee handling pending logs before runtime goes out of scope. - thread::sleep(time::Duration::from_millis(10)); - } + + let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); + let mut file_manager = PcapngUciLoggerFactoryBuilder::new() + .buffer_size(1024) + .filename_prefix("log".to_owned()) + .log_path(dir.as_ref().to_owned()) + .runtime_handle(runtime.handle().to_owned()) + .build() + .unwrap(); + let logger_0 = file_manager.build_logger("logger 0").unwrap(); + let logger_1 = file_manager.build_logger("logger 1").unwrap(); + + // Flush the loggers so that the files are created. + runtime.block_on(flush_loggers(vec![logger_0, logger_1])); + // Expect no log file created as no packet is received. let log_path = dir.as_ref().to_owned().join("log.pcapng"); assert!(fs::read(log_path).is_err()); @@ -503,29 +549,22 @@ mod tests { let dir_root = Path::new("./uwb_test_dir_123"); let dir = dir_root.join("this/path/doesnt/exist"); let log_path = dir.join("log.pcapng"); - { - let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); - let mut file_manager = PcapngUciLoggerFactoryBuilder::new() - .buffer_size(1024) - .filename_prefix("log".to_owned()) - .log_path(dir.clone()) - .runtime_handle(runtime.handle().to_owned()) - .build() - .unwrap(); - let mut logger_0 = file_manager.build_logger("logger 0").unwrap(); - let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build(); - logger_0.log_uci_control_packet(packet_0.into()); - // Sleep needed to guarantee handling pending logs before runtime goes out of scope. - let mut timeout = 100; - let timeout_slice = 10; - loop { - if log_path.exists() || timeout == 0 { - break; - } - thread::sleep(time::Duration::from_millis(timeout_slice)); - timeout -= timeout_slice; - } - } + + let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); + let mut file_manager = PcapngUciLoggerFactoryBuilder::new() + .buffer_size(1024) + .filename_prefix("log".to_owned()) + .log_path(dir.clone()) + .runtime_handle(runtime.handle().to_owned()) + .build() + .unwrap(); + let mut logger_0 = file_manager.build_logger("logger 0").unwrap(); + let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build(); + logger_0.log_uci_control_packet(packet_0.into()); + + // Flush all the loggers so that the files are created and all packets written. + runtime.block_on(flush_loggers(vec![logger_0])); + // Expect the dir was created. assert!(dir.is_dir()); // Expect the log file exists. @@ -537,38 +576,30 @@ mod tests { #[test] fn test_single_file_write() { let dir = tempdir().unwrap(); - let last_file_expected = dir.as_ref().to_owned().join("log.pcapng"); - { - let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); - let mut file_manager = PcapngUciLoggerFactoryBuilder::new() - .buffer_size(1024) - .filename_prefix("log".to_owned()) - .log_path(dir.as_ref().to_owned()) - .runtime_handle(runtime.handle().to_owned()) - .build() - .unwrap(); - let mut logger_0 = file_manager.build_logger("logger 0").unwrap(); - let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build(); - logger_0.log_uci_control_packet(packet_0.into()); - let mut logger_1 = file_manager.build_logger("logger 1").unwrap(); - let packet_1 = UciVendor_A_NotificationBuilder { opcode: 1, payload: None }.build(); - logger_1.log_uci_control_packet(packet_1.into()); - let packet_2 = UciVendor_A_NotificationBuilder { opcode: 2, payload: None }.build(); - logger_0.log_uci_control_packet(packet_2.into()); - // Sleep needed to guarantee handling pending logs before runtime goes out of scope. - let mut timeout = 100; - let timeout_slice = 10; - loop { - if last_file_expected.exists() || timeout == 0 { - break; - } - thread::sleep(time::Duration::from_millis(timeout_slice)); - timeout -= timeout_slice; - } - } + let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); + let mut file_manager = PcapngUciLoggerFactoryBuilder::new() + .buffer_size(1024) + .filename_prefix("log".to_owned()) + .log_path(dir.as_ref().to_owned()) + .runtime_handle(runtime.handle().to_owned()) + .build() + .unwrap(); + let mut logger_0 = file_manager.build_logger("logger 0").unwrap(); + let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build(); + logger_0.log_uci_control_packet(packet_0.into()); + let mut logger_1 = file_manager.build_logger("logger 1").unwrap(); + let packet_1 = UciVendor_A_NotificationBuilder { opcode: 1, payload: None }.build(); + logger_1.log_uci_control_packet(packet_1.into()); + let packet_2 = UciVendor_A_NotificationBuilder { opcode: 2, payload: None }.build(); + logger_0.log_uci_control_packet(packet_2.into()); + + // Flush all the loggers so that the files are created and all packets written. + runtime.block_on(flush_loggers(vec![logger_0, logger_1])); + // Expect file log.pcapng consist of SHB->IDB(logger 0)->EPB(packet 0)->IDB(logger 1) // ->EPB(packet 1)->EPB(packet 2) let log_path = dir.as_ref().to_owned().join("log.pcapng"); + assert!(log_path.is_file()); let log_content = fs::read(log_path).unwrap(); let block_info = get_block_info(log_content).unwrap(); assert_eq!(block_info.len(), 6); @@ -583,41 +614,33 @@ mod tests { #[test] fn test_file_switch_epb_unfit_case() { let dir = tempdir().unwrap(); - let last_file_expected = dir.as_ref().to_owned().join("log_2.pcapng"); - { - let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); - let mut file_manager_140 = PcapngUciLoggerFactoryBuilder::new() - .buffer_size(1024) - .filename_prefix("log".to_owned()) - .log_path(dir.as_ref().to_owned()) - .file_size(140) - .runtime_handle(runtime.handle().to_owned()) - .build() - .unwrap(); - let mut logger_0 = file_manager_140.build_logger("logger 0").unwrap(); - let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build(); - logger_0.log_uci_control_packet(packet_0.into()); - let mut logger_1 = file_manager_140.build_logger("logger 1").unwrap(); - let packet_1 = UciVendor_A_NotificationBuilder { opcode: 1, payload: None }.build(); - logger_1.log_uci_control_packet(packet_1.into()); - let packet_2 = UciVendor_A_NotificationBuilder { opcode: 2, payload: None }.build(); - logger_0.log_uci_control_packet(packet_2.into()); - // Sleep needed to guarantee handling pending logs before runtime goes out of scope. - let mut timeout = 100; - let timeout_slice = 10; - loop { - if last_file_expected.exists() || timeout == 0 { - break; - } - thread::sleep(time::Duration::from_millis(timeout_slice)); - timeout -= timeout_slice; - } - } + let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); + let mut file_manager_140 = PcapngUciLoggerFactoryBuilder::new() + .buffer_size(1024) + .filename_prefix("log".to_owned()) + .log_path(dir.as_ref().to_owned()) + .file_size(140) + .runtime_handle(runtime.handle().to_owned()) + .build() + .unwrap(); + let mut logger_0 = file_manager_140.build_logger("logger 0").unwrap(); + let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build(); + logger_0.log_uci_control_packet(packet_0.into()); + let mut logger_1 = file_manager_140.build_logger("logger 1").unwrap(); + let packet_1 = UciVendor_A_NotificationBuilder { opcode: 1, payload: None }.build(); + logger_1.log_uci_control_packet(packet_1.into()); + let packet_2 = UciVendor_A_NotificationBuilder { opcode: 2, payload: None }.build(); + logger_0.log_uci_control_packet(packet_2.into()); + + // Flush all the loggers so that the files are created and all packets written. + runtime.block_on(flush_loggers(vec![logger_0, logger_1])); + // Expect (Old to new): // File 2: SHB->IDB->EPB->IDB (cannot fit next) // File 1: SHB->IDB->IDB->EPB (cannot fit next) // File 0: SHB->IDB->IDB->EPB let log_path = dir.as_ref().to_owned().join("log_2.pcapng"); + assert!(log_path.is_file()); let log_content = fs::read(log_path).unwrap(); let block_info = get_block_info(log_content).unwrap(); assert_eq!(block_info.len(), 4); @@ -626,6 +649,7 @@ mod tests { assert_eq!(block_info[2].0, 0x6); // EPB assert_eq!(block_info[3].0, 0x1); // IDB let log_path = dir.as_ref().to_owned().join("log_1.pcapng"); + assert!(log_path.is_file()); let log_content = fs::read(log_path).unwrap(); let block_info = get_block_info(log_content).unwrap(); assert_eq!(block_info.len(), 4); @@ -634,6 +658,7 @@ mod tests { assert_eq!(block_info[2].0, 0x1); // IDB assert_eq!(block_info[3].0, 0x6); // EPB let log_path = dir.as_ref().to_owned().join("log.pcapng"); + assert!(log_path.is_file()); let log_content = fs::read(log_path).unwrap(); let block_info = get_block_info(log_content).unwrap(); assert_eq!(block_info.len(), 4); @@ -644,43 +669,34 @@ mod tests { } #[test] - #[ignore] // Temporarily disable due to flakiness - b/287161361. fn test_file_switch_idb_unfit_case() { let dir = tempdir().unwrap(); - let last_file_expected = dir.as_ref().to_owned().join("log_1.pcapng"); - { - let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); - let mut file_manager_144 = PcapngUciLoggerFactoryBuilder::new() - .buffer_size(1024) - .filename_prefix("log".to_owned()) - .log_path(dir.as_ref().to_owned()) - .file_size(144) - .runtime_handle(runtime.handle().to_owned()) - .build() - .unwrap(); - let mut logger_0 = file_manager_144.build_logger("logger 0").unwrap(); - let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build(); - logger_0.log_uci_control_packet(packet_0.into()); - let packet_2 = UciVendor_A_NotificationBuilder { opcode: 2, payload: None }.build(); - logger_0.log_uci_control_packet(packet_2.into()); - let mut logger_1 = file_manager_144.build_logger("logger 1").unwrap(); - let packet_1 = UciVendor_A_NotificationBuilder { opcode: 1, payload: None }.build(); - logger_1.log_uci_control_packet(packet_1.into()); - // Sleep needed to guarantee handling pending logs before runtime goes out of scope. - let mut timeout = 100; - let timeout_slice = 10; - loop { - if last_file_expected.exists() || timeout == 0 { - break; - } - thread::sleep(time::Duration::from_millis(timeout_slice)); - timeout -= timeout_slice; - } - } + let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); + let mut file_manager_144 = PcapngUciLoggerFactoryBuilder::new() + .buffer_size(1024) + .filename_prefix("log".to_owned()) + .log_path(dir.as_ref().to_owned()) + .file_size(144) + .runtime_handle(runtime.handle().to_owned()) + .build() + .unwrap(); + let mut logger_0 = file_manager_144.build_logger("logger 0").unwrap(); + let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build(); + logger_0.log_uci_control_packet(packet_0.into()); + let packet_2 = UciVendor_A_NotificationBuilder { opcode: 2, payload: None }.build(); + logger_0.log_uci_control_packet(packet_2.into()); + let mut logger_1 = file_manager_144.build_logger("logger 1").unwrap(); + let packet_1 = UciVendor_A_NotificationBuilder { opcode: 1, payload: None }.build(); + logger_1.log_uci_control_packet(packet_1.into()); + + // Flush all the loggers so that the files are created and all packets written. + runtime.block_on(flush_loggers(vec![logger_0, logger_1])); + // Expect (Old to new): // File 1: SHB->IDB->EPB->EPB (cannot fit next) // File 0: SHB->IDB->IDB->EPB let log_path = dir.as_ref().to_owned().join("log_1.pcapng"); + assert!(log_path.is_file()); let log_content = fs::read(log_path).unwrap(); let block_info = get_block_info(log_content).unwrap(); assert_eq!(block_info.len(), 4); @@ -689,6 +705,7 @@ mod tests { assert_eq!(block_info[2].0, 0x6); // EPB assert_eq!(block_info[3].0, 0x6); // EPB let log_path = dir.as_ref().to_owned().join("log.pcapng"); + assert!(log_path.is_file()); let log_content = fs::read(log_path).unwrap(); let block_info = get_block_info(log_content).unwrap(); assert_eq!(block_info.len(), 4); @@ -702,24 +719,31 @@ mod tests { #[test] fn test_log_fail_safe() { let dir = tempdir().unwrap(); - { - let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); - let mut file_manager_96 = PcapngUciLoggerFactoryBuilder::new() - .buffer_size(1024) - .filename_prefix("log".to_owned()) - .log_path(dir.as_ref().to_owned()) - .file_size(96) // Fails logging, as metadata takes 100 - .runtime_handle(runtime.handle().to_owned()) - .build() - .unwrap(); - let mut logger_0 = file_manager_96.build_logger("logger 0").unwrap(); - let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build(); - logger_0.log_uci_control_packet(packet_0.into()); - let packet_2 = UciVendor_A_NotificationBuilder { opcode: 2, payload: None }.build(); - logger_0.log_uci_control_packet(packet_2.into()); - let mut logger_1 = file_manager_96.build_logger("logger 1").unwrap(); - let packet_1 = UciVendor_A_NotificationBuilder { opcode: 1, payload: None }.build(); - logger_1.log_uci_control_packet(packet_1.into()); - } + let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); + let mut file_manager_96 = PcapngUciLoggerFactoryBuilder::new() + .buffer_size(1024) + .filename_prefix("log".to_owned()) + .log_path(dir.as_ref().to_owned()) + .file_size(96) // Fails logging, as metadata takes 100 + .runtime_handle(runtime.handle().to_owned()) + .build() + .unwrap(); + let mut logger_0 = file_manager_96.build_logger("logger 0").unwrap(); + let packet_0 = UciVendor_A_NotificationBuilder { opcode: 0, payload: None }.build(); + logger_0.log_uci_control_packet(packet_0.into()); + let packet_2 = UciVendor_A_NotificationBuilder { opcode: 2, payload: None }.build(); + logger_0.log_uci_control_packet(packet_2.into()); + let mut logger_1 = file_manager_96.build_logger("logger 1").unwrap(); + let packet_1 = UciVendor_A_NotificationBuilder { opcode: 1, payload: None }.build(); + logger_1.log_uci_control_packet(packet_1.into()); + + // Flush all the loggers so that the files are created and all packets written. + runtime.block_on(flush_loggers(vec![logger_0, logger_1])); + + // Verify existence of the log files. + let log_path = dir.as_ref().to_owned().join("log_1.pcapng"); + assert!(log_path.is_file()); + let log_path = dir.as_ref().to_owned().join("log.pcapng"); + assert!(log_path.is_file()); } } diff --git a/src/rust/uwb_core/src/uci/uci_logger_pcapng.rs b/src/rust/uwb_core/src/uci/uci_logger_pcapng.rs index 279713b..5743913 100644 --- a/src/rust/uwb_core/src/uci/uci_logger_pcapng.rs +++ b/src/rust/uwb_core/src/uci/uci_logger_pcapng.rs @@ -15,6 +15,7 @@ //! Implements UciLoggerPcapng, a UciLogger with PCAPNG format log. use log::warn; +use tokio::sync::mpsc; use uwb_uci_packets::{UciControlPacket, UciDataPacket}; use crate::uci::pcapng_block::{BlockBuilder, BlockOption, EnhancedPacketBlockBuilder}; @@ -39,6 +40,11 @@ impl UciLoggerPcapng { warn!("UCI log: Logging to LogWritter failed.") } } + + /// Flush the logs. + pub fn flush(&mut self) -> Option<mpsc::UnboundedReceiver<bool>> { + self.log_writer.flush() + } } impl UciLogger for UciLoggerPcapng { |