// Copyright 2020, The Android Open Source Project // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //! This module implements the handling of async tasks. //! The worker thread has a high priority and a low priority queue. Adding a job to either //! will cause one thread to be spawned if none exists. As a compromise between performance //! and resource consumption, the thread will linger for about 30 seconds after it has //! processed all tasks before it terminates. //! Note that low priority tasks are processed only when the high priority queue is empty. use std::{any::Any, any::TypeId, time::Duration}; use std::{ collections::{HashMap, VecDeque}, sync::Arc, sync::{Condvar, Mutex, MutexGuard}, thread, }; #[derive(Debug, PartialEq, Eq)] enum State { Exiting, Running, } /// The Shelf allows async tasks to store state across invocations. /// Note: Store elves at your own peril ;-). #[derive(Debug, Default)] pub struct Shelf(HashMap>); impl Shelf { /// Get a reference to the shelved data of type T. Returns Some if the data exists. pub fn get_downcast_ref(&self) -> Option<&T> { self.0.get(&TypeId::of::()).and_then(|v| v.downcast_ref::()) } /// Get a mutable reference to the shelved data of type T. If a T was inserted using put, /// get_mut, or get_or_put_with. pub fn get_downcast_mut(&mut self) -> Option<&mut T> { self.0.get_mut(&TypeId::of::()).and_then(|v| v.downcast_mut::()) } /// Remove the entry of the given type and returns the stored data if it existed. pub fn remove_downcast_ref(&mut self) -> Option { self.0.remove(&TypeId::of::()).and_then(|v| v.downcast::().ok().map(|b| *b)) } /// Puts data `v` on the shelf. If there already was an entry of type T it is returned. pub fn put(&mut self, v: T) -> Option { self.0 .insert(TypeId::of::(), Box::new(v) as Box) .and_then(|v| v.downcast::().ok().map(|b| *b)) } /// Gets a mutable reference to the entry of the given type and default creates it if necessary. /// The type must implement Default. pub fn get_mut(&mut self) -> &mut T { self.0 .entry(TypeId::of::()) .or_insert_with(|| Box::::default() as Box) .downcast_mut::() .unwrap() } /// Gets a mutable reference to the entry of the given type or creates it using the init /// function. Init is not executed if the entry already existed. pub fn get_or_put_with(&mut self, init: F) -> &mut T where F: FnOnce() -> T, { self.0 .entry(TypeId::of::()) .or_insert_with(|| Box::new(init()) as Box) .downcast_mut::() .unwrap() } } struct AsyncTaskState { state: State, thread: Option>, timeout: Duration, hi_prio_req: VecDeque>, lo_prio_req: VecDeque>, idle_fns: Vec>, /// The store allows tasks to store state across invocations. It is passed to each invocation /// of each task. Tasks need to cooperate on the ids they use for storing state. shelf: Option, } /// AsyncTask spawns one worker thread on demand to process jobs inserted into /// a low and a high priority work queue. The queues are processed FIFO, and low /// priority queue is processed if the high priority queue is empty. /// Note: Because there is only one worker thread at a time for a given AsyncTask instance, /// all scheduled requests are guaranteed to be serialized with respect to one another. pub struct AsyncTask { state: Arc<(Condvar, Mutex)>, } impl Default for AsyncTask { fn default() -> Self { Self::new(Duration::from_secs(30)) } } impl AsyncTask { /// Construct an [`AsyncTask`] with a specific timeout value. pub fn new(timeout: Duration) -> Self { Self { state: Arc::new(( Condvar::new(), Mutex::new(AsyncTaskState { state: State::Exiting, thread: None, timeout, hi_prio_req: VecDeque::new(), lo_prio_req: VecDeque::new(), idle_fns: Vec::new(), shelf: None, }), )), } } /// Adds a one-off job to the high priority queue. High priority jobs are /// completed before low priority jobs and can also overtake low priority /// jobs. But they cannot preempt them. pub fn queue_hi(&self, f: F) where F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static, { self.queue(f, true) } /// Adds a one-off job to the low priority queue. Low priority jobs are /// completed after high priority. And they are not executed as long as high /// priority jobs are present. Jobs always run to completion and are never /// preempted by high priority jobs. pub fn queue_lo(&self, f: F) where F: FnOnce(&mut Shelf) + Send + 'static, { self.queue(f, false) } /// Adds an idle callback. This will be invoked whenever the worker becomes /// idle (all high and low priority jobs have been performed). pub fn add_idle(&self, f: F) where F: Fn(&mut Shelf) + Send + Sync + 'static, { let (ref _condvar, ref state) = *self.state; let mut state = state.lock().unwrap(); state.idle_fns.push(Arc::new(f)); } fn queue(&self, f: F, hi_prio: bool) where F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static, { let (ref condvar, ref state) = *self.state; let mut state = state.lock().unwrap(); if hi_prio { state.hi_prio_req.push_back(Box::new(f)); } else { state.lo_prio_req.push_back(Box::new(f)); } if state.state != State::Running { self.spawn_thread(&mut state); } drop(state); condvar.notify_all(); } fn spawn_thread(&self, state: &mut MutexGuard) { if let Some(t) = state.thread.take() { t.join().expect("AsyncTask panicked."); } let cloned_state = self.state.clone(); let timeout_period = state.timeout; state.thread = Some(thread::spawn(move || { let (ref condvar, ref state) = *cloned_state; enum Action { QueuedFn(Box), IdleFns(Vec>), } let mut done_idle = false; // When the worker starts, it takes the shelf and puts it on the stack. let mut shelf = state.lock().unwrap().shelf.take().unwrap_or_default(); loop { if let Some(action) = { let state = state.lock().unwrap(); if !done_idle && state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty() { // No jobs queued so invoke the idle callbacks. Some(Action::IdleFns(state.idle_fns.clone())) } else { // Wait for either a queued job to arrive or a timeout. let (mut state, timeout) = condvar .wait_timeout_while(state, timeout_period, |state| { state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty() }) .unwrap(); match ( state.hi_prio_req.pop_front(), state.lo_prio_req.is_empty(), timeout.timed_out(), ) { (Some(f), _, _) => Some(Action::QueuedFn(f)), (None, false, _) => { state.lo_prio_req.pop_front().map(|f| Action::QueuedFn(f)) } (None, true, true) => { // When the worker exits it puts the shelf back into the shared // state for the next worker to use. So state is preserved not // only across invocations but also across worker thread shut down. state.shelf = Some(shelf); state.state = State::Exiting; break; } (None, true, false) => None, } } } { // Now that the lock has been dropped, perform the action. match action { Action::QueuedFn(f) => { f(&mut shelf); done_idle = false; } Action::IdleFns(idle_fns) => { for idle_fn in idle_fns { idle_fn(&mut shelf); } done_idle = true; } } } } })); state.state = State::Running; } } #[cfg(test)] mod tests { use super::{AsyncTask, Shelf}; use std::sync::{ mpsc::{channel, sync_channel, RecvTimeoutError}, Arc, }; use std::time::Duration; #[test] fn test_shelf() { let mut shelf = Shelf::default(); let s = "A string".to_string(); assert_eq!(shelf.put(s), None); let s2 = "Another string".to_string(); assert_eq!(shelf.put(s2), Some("A string".to_string())); // Put something of a different type on the shelf. #[derive(Debug, PartialEq, Eq)] struct Elf { pub name: String, } let e1 = Elf { name: "Glorfindel".to_string() }; assert_eq!(shelf.put(e1), None); // The String value is still on the shelf. let s3 = shelf.get_downcast_ref::().unwrap(); assert_eq!(s3, "Another string"); // As is the Elf. { let e2 = shelf.get_downcast_mut::().unwrap(); assert_eq!(e2.name, "Glorfindel"); e2.name = "Celeborn".to_string(); } // Take the Elf off the shelf. let e3 = shelf.remove_downcast_ref::().unwrap(); assert_eq!(e3.name, "Celeborn"); assert_eq!(shelf.remove_downcast_ref::(), None); // No u64 value has been put on the shelf, so getting one gives the default value. { let i = shelf.get_mut::(); assert_eq!(*i, 0); *i = 42; } let i2 = shelf.get_downcast_ref::().unwrap(); assert_eq!(*i2, 42); // No i32 value has ever been seen near the shelf. assert_eq!(shelf.get_downcast_ref::(), None); assert_eq!(shelf.get_downcast_mut::(), None); assert_eq!(shelf.remove_downcast_ref::(), None); } #[test] fn test_async_task() { let at = AsyncTask::default(); // First queue up a job that blocks until we release it, to avoid // unpredictable synchronization. let (start_sender, start_receiver) = channel(); at.queue_hi(move |shelf| { start_receiver.recv().unwrap(); // Put a trace vector on the shelf shelf.put(Vec::::new()); }); // Queue up some high-priority and low-priority jobs. for i in 0..3 { let j = i; at.queue_lo(move |shelf| { let trace = shelf.get_mut::>(); trace.push(format!("L{}", j)); }); let j = i; at.queue_hi(move |shelf| { let trace = shelf.get_mut::>(); trace.push(format!("H{}", j)); }); } // Finally queue up a low priority job that emits the trace. let (trace_sender, trace_receiver) = channel(); at.queue_lo(move |shelf| { let trace = shelf.get_downcast_ref::>().unwrap(); trace_sender.send(trace.clone()).unwrap(); }); // Ready, set, go. start_sender.send(()).unwrap(); let trace = trace_receiver.recv().unwrap(); assert_eq!(trace, vec!["H0", "H1", "H2", "L0", "L1", "L2"]); } #[test] fn test_async_task_chain() { let at = Arc::new(AsyncTask::default()); let (sender, receiver) = channel(); // Queue up a job that will queue up another job. This confirms // that the job is not invoked with any internal AsyncTask locks held. let at_clone = at.clone(); at.queue_hi(move |_shelf| { at_clone.queue_lo(move |_shelf| { sender.send(()).unwrap(); }); }); receiver.recv().unwrap(); } #[test] #[should_panic] fn test_async_task_panic() { let at = AsyncTask::default(); at.queue_hi(|_shelf| { panic!("Panic from queued job"); }); // Queue another job afterwards to ensure that the async thread gets joined. let (done_sender, done_receiver) = channel(); at.queue_hi(move |_shelf| { done_sender.send(()).unwrap(); }); done_receiver.recv().unwrap(); } #[test] fn test_async_task_idle() { let at = AsyncTask::new(Duration::from_secs(3)); // Need a SyncSender as it is Send+Sync. let (idle_done_sender, idle_done_receiver) = sync_channel::<()>(3); at.add_idle(move |_shelf| { idle_done_sender.send(()).unwrap(); }); // Queue up some high-priority and low-priority jobs that take time. for _i in 0..3 { at.queue_lo(|_shelf| { std::thread::sleep(Duration::from_millis(500)); }); at.queue_hi(|_shelf| { std::thread::sleep(Duration::from_millis(500)); }); } // Final low-priority job. let (done_sender, done_receiver) = channel(); at.queue_lo(move |_shelf| { done_sender.send(()).unwrap(); }); // Nothing happens until the last job completes. assert_eq!( idle_done_receiver.recv_timeout(Duration::from_secs(1)), Err(RecvTimeoutError::Timeout) ); done_receiver.recv().unwrap(); // Now that the last low-priority job has completed, the idle task should // fire pretty much immediately. idle_done_receiver.recv_timeout(Duration::from_millis(50)).unwrap(); // Idle callback not executed again even if we wait for a while. assert_eq!( idle_done_receiver.recv_timeout(Duration::from_secs(3)), Err(RecvTimeoutError::Timeout) ); // However, if more work is done then there's another chance to go idle. let (done_sender, done_receiver) = channel(); at.queue_hi(move |_shelf| { std::thread::sleep(Duration::from_millis(500)); done_sender.send(()).unwrap(); }); // Idle callback not immediately executed, because the high priority // job is taking a while. assert_eq!( idle_done_receiver.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout) ); done_receiver.recv().unwrap(); idle_done_receiver.recv_timeout(Duration::from_millis(50)).unwrap(); } #[test] fn test_async_task_multiple_idle() { let at = AsyncTask::new(Duration::from_secs(3)); let (idle_sender, idle_receiver) = sync_channel::(5); // Queue a high priority job to start things off at.queue_hi(|_shelf| { std::thread::sleep(Duration::from_millis(500)); }); // Multiple idle callbacks. for i in 0..3 { let idle_sender = idle_sender.clone(); at.add_idle(move |_shelf| { idle_sender.send(i).unwrap(); }); } // Nothing happens immediately. assert_eq!( idle_receiver.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout) ); // Wait for a moment and the idle jobs should have run. std::thread::sleep(Duration::from_secs(1)); let mut results = Vec::new(); while let Ok(i) = idle_receiver.recv_timeout(Duration::from_millis(1)) { results.push(i); } assert_eq!(results, [0, 1, 2]); } #[test] fn test_async_task_idle_queues_job() { let at = Arc::new(AsyncTask::new(Duration::from_secs(1))); let at_clone = at.clone(); let (idle_sender, idle_receiver) = sync_channel::(100); // Add an idle callback that queues a low-priority job. at.add_idle(move |shelf| { at_clone.queue_lo(|_shelf| { // Slow things down so the channel doesn't fill up. std::thread::sleep(Duration::from_millis(50)); }); let i = shelf.get_mut::(); idle_sender.send(*i).unwrap(); *i += 1; }); // Nothing happens immediately. assert_eq!( idle_receiver.recv_timeout(Duration::from_millis(1500)), Err(RecvTimeoutError::Timeout) ); // Once we queue a normal job, things start. at.queue_hi(|_shelf| {}); assert_eq!(0, idle_receiver.recv_timeout(Duration::from_millis(200)).unwrap()); // The idle callback queues a job, and completion of that job // means the task is going idle again...so the idle callback will // be called repeatedly. assert_eq!(1, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap()); assert_eq!(2, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap()); assert_eq!(3, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap()); } #[test] #[should_panic] fn test_async_task_idle_panic() { let at = AsyncTask::new(Duration::from_secs(1)); let (idle_sender, idle_receiver) = sync_channel::<()>(3); // Add an idle callback that panics. at.add_idle(move |_shelf| { idle_sender.send(()).unwrap(); panic!("Panic from idle callback"); }); // Queue a job to trigger idleness and ensuing panic. at.queue_hi(|_shelf| {}); idle_receiver.recv().unwrap(); // Queue another job afterwards to ensure that the async thread gets joined // and the panic detected. let (done_sender, done_receiver) = channel(); at.queue_hi(move |_shelf| { done_sender.send(()).unwrap(); }); done_receiver.recv().unwrap(); } }