diff options
Diffstat (limited to 'src/io/driver/scheduled_io.rs')
-rw-r--r-- | src/io/driver/scheduled_io.rs | 501 |
1 files changed, 425 insertions, 76 deletions
diff --git a/src/io/driver/scheduled_io.rs b/src/io/driver/scheduled_io.rs index 7f6446e..b1354a0 100644 --- a/src/io/driver/scheduled_io.rs +++ b/src/io/driver/scheduled_io.rs @@ -1,47 +1,109 @@ -use crate::loom::future::AtomicWaker; +use super::{Direction, Ready, ReadyEvent, Tick}; use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Mutex; use crate::util::bit; -use crate::util::slab::{Address, Entry, Generation}; +use crate::util::slab::Entry; -use std::sync::atomic::Ordering::{AcqRel, Acquire, SeqCst}; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; +use std::task::{Context, Poll, Waker}; +cfg_io_readiness! { + use crate::util::linked_list::{self, LinkedList}; + + use std::cell::UnsafeCell; + use std::future::Future; + use std::marker::PhantomPinned; + use std::pin::Pin; + use std::ptr::NonNull; +} + +/// Stored in the I/O driver resource slab. #[derive(Debug)] pub(crate) struct ScheduledIo { + /// Packs the resource's readiness with the resource's generation. readiness: AtomicUsize, - pub(crate) reader: AtomicWaker, - pub(crate) writer: AtomicWaker, + + waiters: Mutex<Waiters>, } -const PACK: bit::Pack = bit::Pack::most_significant(Generation::WIDTH); +cfg_io_readiness! { + type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; +} -impl Entry for ScheduledIo { - fn generation(&self) -> Generation { - unpack_generation(self.readiness.load(SeqCst)) +#[derive(Debug, Default)] +struct Waiters { + #[cfg(feature = "net")] + /// List of all current waiters + list: WaitList, + + /// Waker used for AsyncRead + reader: Option<Waker>, + + /// Waker used for AsyncWrite + writer: Option<Waker>, +} + +cfg_io_readiness! { + #[derive(Debug)] + struct Waiter { + pointers: linked_list::Pointers<Waiter>, + + /// The waker for this task + waker: Option<Waker>, + + /// The interest this waiter is waiting on + interest: mio::Interest, + + is_ready: bool, + + /// Should never be `!Unpin` + _p: PhantomPinned, } - fn reset(&self, generation: Generation) -> bool { - let mut current = self.readiness.load(Acquire); + /// Future returned by `readiness()` + struct Readiness<'a> { + scheduled_io: &'a ScheduledIo, - loop { - if unpack_generation(current) != generation { - return false; - } + state: State, - let next = PACK.pack(generation.next().to_usize(), 0); + /// Entry in the waiter `LinkedList`. + waiter: UnsafeCell<Waiter>, + } - match self - .readiness - .compare_exchange(current, next, AcqRel, Acquire) - { - Ok(_) => break, - Err(actual) => current = actual, - } - } + enum State { + Init, + Waiting, + Done, + } +} + +// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. +// +// | reserved | generation | driver tick | readinesss | +// |----------+------------+--------------+------------| +// | 1 bit | 7 bits + 8 bits + 16 bits | + +const READINESS: bit::Pack = bit::Pack::least_significant(16); + +const TICK: bit::Pack = READINESS.then(8); - drop(self.reader.take_waker()); - drop(self.writer.take_waker()); +const GENERATION: bit::Pack = TICK.then(7); - true +#[test] +fn test_generations_assert_same() { + assert_eq!(super::GENERATION, GENERATION); +} + +// ===== impl ScheduledIo ===== + +impl Entry for ScheduledIo { + fn reset(&self) { + let state = self.readiness.load(Acquire); + + let generation = GENERATION.unpack(state); + let next = GENERATION.pack_lossy(generation + 1, 0); + + self.readiness.store(next, Release); } } @@ -49,31 +111,14 @@ impl Default for ScheduledIo { fn default() -> ScheduledIo { ScheduledIo { readiness: AtomicUsize::new(0), - reader: AtomicWaker::new(), - writer: AtomicWaker::new(), + waiters: Mutex::new(Default::default()), } } } impl ScheduledIo { - #[cfg(all(test, loom))] - /// Returns the current readiness value of this `ScheduledIo`, if the - /// provided `token` is still a valid access. - /// - /// # Returns - /// - /// If the given token's generation no longer matches the `ScheduledIo`'s - /// generation, then the corresponding IO resource has been removed and - /// replaced with a new resource. In that case, this method returns `None`. - /// Otherwise, this returns the current readiness. - pub(crate) fn get_readiness(&self, address: Address) -> Option<usize> { - let ready = self.readiness.load(Acquire); - - if unpack_generation(ready) != address.generation() { - return None; - } - - Some(ready & !PACK.mask()) + pub(crate) fn generation(&self) -> usize { + GENERATION.unpack(self.readiness.load(Acquire)) } /// Sets the readiness on this `ScheduledIo` by invoking the given closure on @@ -81,6 +126,8 @@ impl ScheduledIo { /// /// # Arguments /// - `token`: the token for this `ScheduledIo`. + /// - `tick`: whether setting the tick or trying to clear readiness for a + /// specific tick. /// - `f`: a closure returning a new readiness value given the previous /// readiness. /// @@ -90,52 +137,354 @@ impl ScheduledIo { /// generation, then the corresponding IO resource has been removed and /// replaced with a new resource. In that case, this method returns `Err`. /// Otherwise, this returns the previous readiness. - pub(crate) fn set_readiness( + pub(super) fn set_readiness( &self, - address: Address, - f: impl Fn(usize) -> usize, - ) -> Result<usize, ()> { - let generation = address.generation(); - + token: Option<usize>, + tick: Tick, + f: impl Fn(Ready) -> Ready, + ) -> Result<(), ()> { let mut current = self.readiness.load(Acquire); loop { - // Check that the generation for this access is still the current - // one. - if unpack_generation(current) != generation { - return Err(()); + let current_generation = GENERATION.unpack(current); + + if let Some(token) = token { + // Check that the generation for this access is still the + // current one. + if GENERATION.unpack(token) != current_generation { + return Err(()); + } } - // Mask out the generation bits so that the modifying function - // doesn't see them. - let current_readiness = current & mio::Ready::all().as_usize(); + + // Mask out the tick/generation bits so that the modifying + // function doesn't see them. + let current_readiness = Ready::from_usize(current); let new = f(current_readiness); - debug_assert!( - new <= !PACK.max_value(), - "new readiness value would overwrite generation bits!" - ); - - match self.readiness.compare_exchange( - current, - PACK.pack(generation.to_usize(), new), - AcqRel, - Acquire, - ) { - Ok(_) => return Ok(current), + let packed = match tick { + Tick::Set(t) => TICK.pack(t as usize, new.as_usize()), + Tick::Clear(t) => { + if TICK.unpack(current) as u8 != t { + // Trying to clear readiness with an old event! + return Err(()); + } + + TICK.pack(t as usize, new.as_usize()) + } + }; + + let next = GENERATION.pack(current_generation, packed); + + match self + .readiness + .compare_exchange(current, next, AcqRel, Acquire) + { + Ok(_) => return Ok(()), // we lost the race, retry! Err(actual) => current = actual, } } } + + /// Notifies all pending waiters that have registered interest in `ready`. + /// + /// There may be many waiters to notify. Waking the pending task **must** be + /// done from outside of the lock otherwise there is a potential for a + /// deadlock. + /// + /// A stack array of wakers is created and filled with wakers to notify, the + /// lock is released, and the wakers are notified. Because there may be more + /// than 32 wakers to notify, if the stack array fills up, the lock is + /// released, the array is cleared, and the iteration continues. + pub(super) fn wake(&self, ready: Ready) { + const NUM_WAKERS: usize = 32; + + let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default(); + let mut curr = 0; + + let mut waiters = self.waiters.lock(); + + // check for AsyncRead slot + if ready.is_readable() { + if let Some(waker) = waiters.reader.take() { + wakers[curr] = Some(waker); + curr += 1; + } + } + + // check for AsyncWrite slot + if ready.is_writable() { + if let Some(waker) = waiters.writer.take() { + wakers[curr] = Some(waker); + curr += 1; + } + } + + #[cfg(feature = "net")] + 'outer: loop { + let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); + + while curr < NUM_WAKERS { + match iter.next() { + Some(waiter) => { + let waiter = unsafe { &mut *waiter.as_ptr() }; + + if let Some(waker) = waiter.waker.take() { + waiter.is_ready = true; + wakers[curr] = Some(waker); + curr += 1; + } + } + None => { + break 'outer; + } + } + } + + drop(waiters); + + for waker in wakers.iter_mut().take(curr) { + waker.take().unwrap().wake(); + } + + curr = 0; + + // Acquire the lock again. + waiters = self.waiters.lock(); + } + + // Release the lock before notifying + drop(waiters); + + for waker in wakers.iter_mut().take(curr) { + waker.take().unwrap().wake(); + } + } + + /// Poll version of checking readiness for a certain direction. + /// + /// These are to support `AsyncRead` and `AsyncWrite` polling methods, + /// which cannot use the `async fn` version. This uses reserved reader + /// and writer slots. + pub(in crate::io) fn poll_readiness( + &self, + cx: &mut Context<'_>, + direction: Direction, + ) -> Poll<ReadyEvent> { + let curr = self.readiness.load(Acquire); + + let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); + + if ready.is_empty() { + // Update the task info + let mut waiters = self.waiters.lock(); + let slot = match direction { + Direction::Read => &mut waiters.reader, + Direction::Write => &mut waiters.writer, + }; + *slot = Some(cx.waker().clone()); + + // Try again, in case the readiness was changed while we were + // taking the waiters lock + let curr = self.readiness.load(Acquire); + let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); + if ready.is_empty() { + Poll::Pending + } else { + Poll::Ready(ReadyEvent { + tick: TICK.unpack(curr) as u8, + ready, + }) + } + } else { + Poll::Ready(ReadyEvent { + tick: TICK.unpack(curr) as u8, + ready, + }) + } + } + + pub(crate) fn clear_readiness(&self, event: ReadyEvent) { + // This consumes the current readiness state **except** for closed + // states. Closed states are excluded because they are final states. + let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED; + + // result isn't important + let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed); + } } impl Drop for ScheduledIo { fn drop(&mut self) { - self.writer.wake(); - self.reader.wake(); + self.wake(Ready::ALL); } } -fn unpack_generation(src: usize) -> Generation { - Generation::new(PACK.unpack(src)) +unsafe impl Send for ScheduledIo {} +unsafe impl Sync for ScheduledIo {} + +cfg_io_readiness! { + impl ScheduledIo { + /// An async version of `poll_readiness` which uses a linked list of wakers + pub(crate) async fn readiness(&self, interest: mio::Interest) -> ReadyEvent { + self.readiness_fut(interest).await + } + + // This is in a separate function so that the borrow checker doesn't think + // we are borrowing the `UnsafeCell` possibly over await boundaries. + // + // Go figure. + fn readiness_fut(&self, interest: mio::Interest) -> Readiness<'_> { + Readiness { + scheduled_io: self, + state: State::Init, + waiter: UnsafeCell::new(Waiter { + pointers: linked_list::Pointers::new(), + waker: None, + is_ready: false, + interest, + _p: PhantomPinned, + }), + } + } + } + + unsafe impl linked_list::Link for Waiter { + type Handle = NonNull<Waiter>; + type Target = Waiter; + + fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> { + *handle + } + + unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { + ptr + } + + unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { + NonNull::from(&mut target.as_mut().pointers) + } + } + + // ===== impl Readiness ===== + + impl Future for Readiness<'_> { + type Output = ReadyEvent; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + use std::sync::atomic::Ordering::SeqCst; + + let (scheduled_io, state, waiter) = unsafe { + let me = self.get_unchecked_mut(); + (&me.scheduled_io, &mut me.state, &me.waiter) + }; + + loop { + match *state { + State::Init => { + // Optimistically check existing readiness + let curr = scheduled_io.readiness.load(SeqCst); + let ready = Ready::from_usize(READINESS.unpack(curr)); + + // Safety: `waiter.interest` never changes + let interest = unsafe { (*waiter.get()).interest }; + let ready = ready.intersection(interest); + + if !ready.is_empty() { + // Currently ready! + let tick = TICK.unpack(curr) as u8; + *state = State::Done; + return Poll::Ready(ReadyEvent { ready, tick }); + } + + // Wasn't ready, take the lock (and check again while locked). + let mut waiters = scheduled_io.waiters.lock(); + + let curr = scheduled_io.readiness.load(SeqCst); + let ready = Ready::from_usize(READINESS.unpack(curr)); + let ready = ready.intersection(interest); + + if !ready.is_empty() { + // Currently ready! + let tick = TICK.unpack(curr) as u8; + *state = State::Done; + return Poll::Ready(ReadyEvent { ready, tick }); + } + + // Not ready even after locked, insert into list... + + // Safety: called while locked + unsafe { + (*waiter.get()).waker = Some(cx.waker().clone()); + } + + // Insert the waiter into the linked list + // + // safety: pointers from `UnsafeCell` are never null. + waiters + .list + .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); + *state = State::Waiting; + } + State::Waiting => { + // Currently in the "Waiting" state, implying the caller has + // a waiter stored in the waiter list (guarded by + // `notify.waiters`). In order to access the waker fields, + // we must hold the lock. + + let waiters = scheduled_io.waiters.lock(); + + // Safety: called while locked + let w = unsafe { &mut *waiter.get() }; + + if w.is_ready { + // Our waker has been notified. + *state = State::Done; + } else { + // Update the waker, if necessary. + if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { + w.waker = Some(cx.waker().clone()); + } + + return Poll::Pending; + } + + // Explicit drop of the lock to indicate the scope that the + // lock is held. Because holding the lock is required to + // ensure safe access to fields not held within the lock, it + // is helpful to visualize the scope of the critical + // section. + drop(waiters); + } + State::Done => { + let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8; + + // Safety: State::Done means it is no longer shared + let w = unsafe { &mut *waiter.get() }; + + return Poll::Ready(ReadyEvent { + tick, + ready: Ready::from_interest(w.interest), + }); + } + } + } + } + } + + impl Drop for Readiness<'_> { + fn drop(&mut self) { + let mut waiters = self.scheduled_io.waiters.lock(); + + // Safety: `waiter` is only ever stored in `waiters` + unsafe { + waiters + .list + .remove(NonNull::new_unchecked(self.waiter.get())) + }; + } + } + + unsafe impl Send for Readiness<'_> {} + unsafe impl Sync for Readiness<'_> {} } |