use crate::runtime::blocking::NoopSchedule; use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task}; use crate::util::TryLock; use std::collections::VecDeque; use std::future::Future; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; struct AssertDropHandle { is_dropped: Arc, } impl AssertDropHandle { #[track_caller] fn assert_dropped(&self) { assert!(self.is_dropped.load(Ordering::SeqCst)); } #[track_caller] fn assert_not_dropped(&self) { assert!(!self.is_dropped.load(Ordering::SeqCst)); } } struct AssertDrop { is_dropped: Arc, } impl AssertDrop { fn new() -> (Self, AssertDropHandle) { let shared = Arc::new(AtomicBool::new(false)); ( AssertDrop { is_dropped: shared.clone(), }, AssertDropHandle { is_dropped: shared.clone(), }, ) } } impl Drop for AssertDrop { fn drop(&mut self) { self.is_dropped.store(true, Ordering::SeqCst); } } // A Notified does not shut down on drop, but it is dropped once the ref-count // hits zero. #[test] fn create_drop1() { let (ad, handle) = AssertDrop::new(); let (notified, join) = unowned( async { drop(ad); unreachable!() }, NoopSchedule, Id::next(), ); drop(notified); handle.assert_not_dropped(); drop(join); handle.assert_dropped(); } #[test] fn create_drop2() { let (ad, handle) = AssertDrop::new(); let (notified, join) = unowned( async { drop(ad); unreachable!() }, NoopSchedule, Id::next(), ); drop(join); handle.assert_not_dropped(); drop(notified); handle.assert_dropped(); } #[test] fn drop_abort_handle1() { let (ad, handle) = AssertDrop::new(); let (notified, join) = unowned( async { drop(ad); unreachable!() }, NoopSchedule, Id::next(), ); let abort = join.abort_handle(); drop(join); handle.assert_not_dropped(); drop(notified); handle.assert_not_dropped(); drop(abort); handle.assert_dropped(); } #[test] fn drop_abort_handle2() { let (ad, handle) = AssertDrop::new(); let (notified, join) = unowned( async { drop(ad); unreachable!() }, NoopSchedule, Id::next(), ); let abort = join.abort_handle(); drop(notified); handle.assert_not_dropped(); drop(abort); handle.assert_not_dropped(); drop(join); handle.assert_dropped(); } // Shutting down through Notified works #[test] fn create_shutdown1() { let (ad, handle) = AssertDrop::new(); let (notified, join) = unowned( async { drop(ad); unreachable!() }, NoopSchedule, Id::next(), ); drop(join); handle.assert_not_dropped(); notified.shutdown(); handle.assert_dropped(); } #[test] fn create_shutdown2() { let (ad, handle) = AssertDrop::new(); let (notified, join) = unowned( async { drop(ad); unreachable!() }, NoopSchedule, Id::next(), ); handle.assert_not_dropped(); notified.shutdown(); handle.assert_dropped(); drop(join); } #[test] fn unowned_poll() { let (task, _) = unowned(async {}, NoopSchedule, Id::next()); task.run(); } #[test] fn schedule() { with(|rt| { rt.spawn(async { crate::task::yield_now().await; }); assert_eq!(2, rt.tick()); rt.shutdown(); }) } #[test] fn shutdown() { with(|rt| { rt.spawn(async { loop { crate::task::yield_now().await; } }); rt.tick_max(1); rt.shutdown(); }) } #[test] fn shutdown_immediately() { with(|rt| { rt.spawn(async { loop { crate::task::yield_now().await; } }); rt.shutdown(); }) } #[test] fn spawn_during_shutdown() { static DID_SPAWN: AtomicBool = AtomicBool::new(false); struct SpawnOnDrop(Runtime); impl Drop for SpawnOnDrop { fn drop(&mut self) { DID_SPAWN.store(true, Ordering::SeqCst); self.0.spawn(async {}); } } with(|rt| { let rt2 = rt.clone(); rt.spawn(async move { let _spawn_on_drop = SpawnOnDrop(rt2); loop { crate::task::yield_now().await; } }); rt.tick_max(1); rt.shutdown(); }); assert!(DID_SPAWN.load(Ordering::SeqCst)); } fn with(f: impl FnOnce(Runtime)) { struct Reset; impl Drop for Reset { fn drop(&mut self) { let _rt = CURRENT.try_lock().unwrap().take(); } } let _reset = Reset; let rt = Runtime(Arc::new(Inner { owned: OwnedTasks::new(), core: TryLock::new(Core { queue: VecDeque::new(), }), })); *CURRENT.try_lock().unwrap() = Some(rt.clone()); f(rt) } #[derive(Clone)] struct Runtime(Arc); struct Inner { core: TryLock, owned: OwnedTasks, } struct Core { queue: VecDeque>, } static CURRENT: TryLock> = TryLock::new(None); impl Runtime { fn spawn(&self, future: T) -> JoinHandle where T: 'static + Send + Future, T::Output: 'static + Send, { let (handle, notified) = self.0.owned.bind(future, self.clone(), Id::next()); if let Some(notified) = notified { self.schedule(notified); } handle } fn tick(&self) -> usize { self.tick_max(usize::MAX) } fn tick_max(&self, max: usize) -> usize { let mut n = 0; while !self.is_empty() && n < max { let task = self.next_task(); n += 1; let task = self.0.owned.assert_owner(task); task.run(); } n } fn is_empty(&self) -> bool { self.0.core.try_lock().unwrap().queue.is_empty() } fn next_task(&self) -> task::Notified { self.0.core.try_lock().unwrap().queue.pop_front().unwrap() } fn shutdown(&self) { let mut core = self.0.core.try_lock().unwrap(); self.0.owned.close_and_shutdown_all(); while let Some(task) = core.queue.pop_back() { drop(task); } drop(core); assert!(self.0.owned.is_empty()); } } impl Schedule for Runtime { fn release(&self, task: &Task) -> Option> { self.0.owned.remove(task) } fn schedule(&self, task: task::Notified) { self.0.core.try_lock().unwrap().queue.push_back(task); } }