aboutsummaryrefslogtreecommitdiff
path: root/src/io/registration.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/io/registration.rs')
-rw-r--r--src/io/registration.rs286
1 files changed, 48 insertions, 238 deletions
diff --git a/src/io/registration.rs b/src/io/registration.rs
index 77fe6db..ce6cffd 100644
--- a/src/io/registration.rs
+++ b/src/io/registration.rs
@@ -1,7 +1,7 @@
-use crate::io::driver::{platform, Direction, Handle};
-use crate::util::slab::Address;
+use crate::io::driver::{Direction, Handle, ReadyEvent, ScheduledIo};
+use crate::util::slab;
-use mio::{self, Evented};
+use mio::event::Source;
use std::io;
use std::task::{Context, Poll};
@@ -38,74 +38,38 @@ cfg_io_driver! {
/// [`poll_read_ready`]: method@Self::poll_read_ready`
/// [`poll_write_ready`]: method@Self::poll_write_ready`
#[derive(Debug)]
- pub struct Registration {
+ pub(crate) struct Registration {
+ /// Handle to the associated driver.
handle: Handle,
- address: Address,
+
+ /// Reference to state stored by the driver.
+ shared: slab::Ref<ScheduledIo>,
}
}
+unsafe impl Send for Registration {}
+unsafe impl Sync for Registration {}
+
// ===== impl Registration =====
impl Registration {
- /// Registers the I/O resource with the default reactor.
- ///
- /// # Return
- ///
- /// - `Ok` if the registration happened successfully
- /// - `Err` if an error was encountered during registration
- ///
- ///
- /// # Panics
- ///
- /// This function panics if thread-local runtime is not set.
- ///
- /// The runtime is usually set implicitly when this function is called
- /// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
- pub fn new<T>(io: &T) -> io::Result<Registration>
- where
- T: Evented,
- {
- Registration::new_with_ready(io, mio::Ready::all())
- }
-
- /// Registers the I/O resource with the default reactor, for a specific `mio::Ready` state.
- /// `new_with_ready` should be used over `new` when you need control over the readiness state,
+ /// Registers the I/O resource with the default reactor, for a specific `mio::Interest`.
+ /// `new_with_interest` should be used over `new` when you need control over the readiness state,
/// such as when a file descriptor only allows reads. This does not add `hup` or `error` so if
/// you are interested in those states, you will need to add them to the readiness state passed
/// to this function.
///
- /// An example to listen to read only
- ///
- /// ```rust
- /// ##[cfg(unix)]
- /// mio::Ready::from_usize(
- /// mio::Ready::readable().as_usize()
- /// | mio::unix::UnixReady::error().as_usize()
- /// | mio::unix::UnixReady::hup().as_usize()
- /// );
- /// ```
- ///
/// # Return
///
/// - `Ok` if the registration happened successfully
/// - `Err` if an error was encountered during registration
- ///
- ///
- /// # Panics
- ///
- /// This function panics if thread-local runtime is not set.
- ///
- /// The runtime is usually set implicitly when this function is called
- /// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
- pub fn new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration>
- where
- T: Evented,
- {
- let handle = Handle::current();
- let address = if let Some(inner) = handle.inner() {
- inner.add_source(io, ready)?
+ pub(crate) fn new_with_interest_and_handle(
+ io: &mut impl Source,
+ interest: mio::Interest,
+ handle: Handle,
+ ) -> io::Result<Registration> {
+ let shared = if let Some(inner) = handle.inner() {
+ inner.add_source(io, interest)?
} else {
return Err(io::Error::new(
io::ErrorKind::Other,
@@ -113,7 +77,7 @@ impl Registration {
));
};
- Ok(Registration { handle, address })
+ Ok(Registration { handle, shared })
}
/// Deregisters the I/O resource from the reactor it is associated with.
@@ -132,10 +96,7 @@ impl Registration {
/// no longer result in notifications getting sent for this registration.
///
/// `Err` is returned if an error is encountered.
- pub fn deregister<T>(&mut self, io: &T) -> io::Result<()>
- where
- T: Evented,
- {
+ pub(super) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
@@ -143,198 +104,47 @@ impl Registration {
inner.deregister_source(io)
}
- /// Polls for events on the I/O resource's read readiness stream.
- ///
- /// If the I/O resource receives a new read readiness event since the last
- /// call to `poll_read_ready`, it is returned. If it has not, the current
- /// task is notified once a new event is received.
- ///
- /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
- /// the function will always return `Ready(HUP)`. This should be treated as
- /// the end of the readiness stream.
- ///
- /// # Return value
- ///
- /// There are several possible return values:
- ///
- /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received
- /// a new readiness event. The readiness value is included.
- ///
- /// * `Poll::Pending` means that no new readiness events have been received
- /// since the last call to `poll_read_ready`.
- ///
- /// * `Poll::Ready(Err(err))` means that the registration has encountered an
- /// error. This could represent a permanent internal error for example.
- ///
- /// [edge-triggered]: struct@mio::Poll#edge-triggered-and-level-triggered
- ///
- /// # Panics
- ///
- /// This function will panic if called from outside of a task context.
- pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
- // Keep track of task budget
- let coop = ready!(crate::coop::poll_proceed(cx));
-
- let v = self.poll_ready(Direction::Read, Some(cx)).map_err(|e| {
- coop.made_progress();
- e
- })?;
- match v {
- Some(v) => {
- coop.made_progress();
- Poll::Ready(Ok(v))
- }
- None => Poll::Pending,
- }
- }
-
- /// Consume any pending read readiness event.
- ///
- /// This function is identical to [`poll_read_ready`] **except** that it
- /// will not notify the current task when a new event is received. As such,
- /// it is safe to call this function from outside of a task context.
- ///
- /// [`poll_read_ready`]: method@Self::poll_read_ready
- pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> {
- self.poll_ready(Direction::Read, None)
- }
-
- /// Polls for events on the I/O resource's write readiness stream.
- ///
- /// If the I/O resource receives a new write readiness event since the last
- /// call to `poll_write_ready`, it is returned. If it has not, the current
- /// task is notified once a new event is received.
- ///
- /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
- /// the function will always return `Ready(HUP)`. This should be treated as
- /// the end of the readiness stream.
- ///
- /// # Return value
- ///
- /// There are several possible return values:
- ///
- /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received
- /// a new readiness event. The readiness value is included.
- ///
- /// * `Poll::Pending` means that no new readiness events have been received
- /// since the last call to `poll_write_ready`.
- ///
- /// * `Poll::Ready(Err(err))` means that the registration has encountered an
- /// error. This could represent a permanent internal error for example.
- ///
- /// [edge-triggered]: struct@mio::Poll#edge-triggered-and-level-triggered
- ///
- /// # Panics
- ///
- /// This function will panic if called from outside of a task context.
- pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
- // Keep track of task budget
- let coop = ready!(crate::coop::poll_proceed(cx));
-
- let v = self.poll_ready(Direction::Write, Some(cx)).map_err(|e| {
- coop.made_progress();
- e
- })?;
- match v {
- Some(v) => {
- coop.made_progress();
- Poll::Ready(Ok(v))
- }
- None => Poll::Pending,
- }
- }
-
- /// Consumes any pending write readiness event.
- ///
- /// This function is identical to [`poll_write_ready`] **except** that it
- /// will not notify the current task when a new event is received. As such,
- /// it is safe to call this function from outside of a task context.
- ///
- /// [`poll_write_ready`]: method@Self::poll_write_ready
- pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> {
- self.poll_ready(Direction::Write, None)
+ pub(super) fn clear_readiness(&self, event: ReadyEvent) {
+ self.shared.clear_readiness(event);
}
/// Polls for events on the I/O resource's `direction` readiness stream.
///
/// If called with a task context, notify the task when a new event is
/// received.
- fn poll_ready(
+ pub(super) fn poll_readiness(
&self,
+ cx: &mut Context<'_>,
direction: Direction,
- cx: Option<&mut Context<'_>>,
- ) -> io::Result<Option<mio::Ready>> {
- let inner = match self.handle.inner() {
- Some(inner) => inner,
- None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
- };
-
- // If the task should be notified about new events, ensure that it has
- // been registered
- if let Some(ref cx) = cx {
- inner.register(self.address, direction, cx.waker().clone())
+ ) -> Poll<io::Result<ReadyEvent>> {
+ if self.handle.inner().is_none() {
+ return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone")));
}
- let mask = direction.mask();
- let mask_no_hup = (mask - platform::hup() - platform::error()).as_usize();
-
- let sched = inner.io_dispatch.get(self.address).unwrap();
+ // Keep track of task budget
+ let coop = ready!(crate::coop::poll_proceed(cx));
+ let ev = ready!(self.shared.poll_readiness(cx, direction));
+ coop.made_progress();
+ Poll::Ready(Ok(ev))
+ }
+}
- // This consumes the current readiness state **except** for HUP and
- // error. HUP and error are excluded because a) they are final states
- // and never transitition out and b) both the read AND the write
- // directions need to be able to obvserve these states.
- //
- // # Platform-specific behavior
- //
- // HUP and error readiness are platform-specific. On epoll platforms,
- // HUP has specific conditions that must be met by both peers of a
- // connection in order to be triggered.
- //
- // On epoll platforms, `EPOLLERR` is signaled through
- // `UnixReady::error()` and is important to be observable by both read
- // AND write. A specific case that `EPOLLERR` occurs is when the read
- // end of a pipe is closed. When this occurs, a peer blocked by
- // writing to the pipe should be notified.
- let curr_ready = sched
- .set_readiness(self.address, |curr| curr & (!mask_no_hup))
- .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address));
+cfg_io_readiness! {
+ impl Registration {
+ pub(super) async fn readiness(&self, interest: mio::Interest) -> io::Result<ReadyEvent> {
+ use std::future::Future;
+ use std::pin::Pin;
- let mut ready = mask & mio::Ready::from_usize(curr_ready);
+ let fut = self.shared.readiness(interest);
+ pin!(fut);
- if ready.is_empty() {
- if let Some(cx) = cx {
- // Update the task info
- match direction {
- Direction::Read => sched.reader.register_by_ref(cx.waker()),
- Direction::Write => sched.writer.register_by_ref(cx.waker()),
+ crate::future::poll_fn(|cx| {
+ if self.handle.inner().is_none() {
+ return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone")));
}
- // Try again
- let curr_ready = sched
- .set_readiness(self.address, |curr| curr & (!mask_no_hup))
- .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address));
- ready = mask & mio::Ready::from_usize(curr_ready);
- }
- }
-
- if ready.is_empty() {
- Ok(None)
- } else {
- Ok(Some(ready))
+ Pin::new(&mut fut).poll(cx).map(Ok)
+ }).await
}
}
}
-
-unsafe impl Send for Registration {}
-unsafe impl Sync for Registration {}
-
-impl Drop for Registration {
- fn drop(&mut self) {
- let inner = match self.handle.inner() {
- Some(inner) => inner,
- None => return,
- };
- inner.drop_source(self.address);
- }
-}