aboutsummaryrefslogtreecommitdiff
path: root/src/io_source.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/io_source.rs')
-rw-r--r--src/io_source.rs292
1 files changed, 292 insertions, 0 deletions
diff --git a/src/io_source.rs b/src/io_source.rs
new file mode 100644
index 0000000..6939c0d
--- /dev/null
+++ b/src/io_source.rs
@@ -0,0 +1,292 @@
+use std::ops::{Deref, DerefMut};
+#[cfg(unix)]
+use std::os::unix::io::AsRawFd;
+#[cfg(windows)]
+use std::os::windows::io::AsRawSocket;
+#[cfg(debug_assertions)]
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::{fmt, io};
+
+#[cfg(any(unix, debug_assertions))]
+use crate::poll;
+use crate::sys::IoSourceState;
+use crate::{event, Interest, Registry, Token};
+
+/// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`]
+/// implementation.
+///
+/// `IoSource` enables registering any FD or socket wrapper with [`Poll`].
+///
+/// While only implementations for TCP, UDP, and UDS (Unix only) are provided,
+/// Mio supports registering any FD or socket that can be registered with the
+/// underlying OS selector. `IoSource` provides the necessary bridge.
+///
+/// [`RawFd`]: std::os::unix::io::RawFd
+/// [`RawSocket`]: std::os::windows::io::RawSocket
+///
+/// # Notes
+///
+/// To handle the registrations and events properly **all** I/O operations (such
+/// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the
+/// internal state is updated accordingly.
+///
+/// [`Poll`]: crate::Poll
+/// [`do_io`]: IoSource::do_io
+/*
+///
+/// # Examples
+///
+/// Basic usage.
+///
+/// ```
+/// # use std::error::Error;
+/// # fn main() -> Result<(), Box<dyn Error>> {
+/// use mio::{Interest, Poll, Token};
+/// use mio::IoSource;
+///
+/// use std::net;
+///
+/// let poll = Poll::new()?;
+///
+/// // Bind a std TCP listener.
+/// let listener = net::TcpListener::bind("127.0.0.1:0")?;
+/// // Wrap it in the `IoSource` type.
+/// let mut listener = IoSource::new(listener);
+///
+/// // Register the listener.
+/// poll.registry().register(&mut listener, Token(0), Interest::READABLE)?;
+/// # Ok(())
+/// # }
+/// ```
+*/
+pub struct IoSource<T> {
+ state: IoSourceState,
+ inner: T,
+ #[cfg(debug_assertions)]
+ selector_id: SelectorId,
+}
+
+impl<T> IoSource<T> {
+ /// Create a new `IoSource`.
+ pub fn new(io: T) -> IoSource<T> {
+ IoSource {
+ state: IoSourceState::new(),
+ inner: io,
+ #[cfg(debug_assertions)]
+ selector_id: SelectorId::new(),
+ }
+ }
+
+ /// Execute an I/O operations ensuring that the socket receives more events
+ /// if it hits a [`WouldBlock`] error.
+ ///
+ /// # Notes
+ ///
+ /// This method is required to be called for **all** I/O operations to
+ /// ensure the user will receive events once the socket is ready again after
+ /// returning a [`WouldBlock`] error.
+ ///
+ /// [`WouldBlock`]: io::ErrorKind::WouldBlock
+ pub fn do_io<F, R>(&self, f: F) -> io::Result<R>
+ where
+ F: FnOnce(&T) -> io::Result<R>,
+ {
+ self.state.do_io(f, &self.inner)
+ }
+
+ /// Returns the I/O source, dropping the state.
+ ///
+ /// # Notes
+ ///
+ /// To ensure no more events are to be received for this I/O source first
+ /// [`deregister`] it.
+ ///
+ /// [`deregister`]: Registry::deregister
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+}
+
+/// Be careful when using this method. All I/O operations that may block must go
+/// through the [`do_io`] method.
+///
+/// [`do_io`]: IoSource::do_io
+impl<T> Deref for IoSource<T> {
+ type Target = T;
+
+ fn deref(&self) -> &Self::Target {
+ &self.inner
+ }
+}
+
+/// Be careful when using this method. All I/O operations that may block must go
+/// through the [`do_io`] method.
+///
+/// [`do_io`]: IoSource::do_io
+impl<T> DerefMut for IoSource<T> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.inner
+ }
+}
+
+#[cfg(unix)]
+impl<T> event::Source for IoSource<T>
+where
+ T: AsRawFd,
+{
+ fn register(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: Interest,
+ ) -> io::Result<()> {
+ #[cfg(debug_assertions)]
+ self.selector_id.associate(registry)?;
+ poll::selector(registry).register(self.inner.as_raw_fd(), token, interests)
+ }
+
+ fn reregister(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: Interest,
+ ) -> io::Result<()> {
+ #[cfg(debug_assertions)]
+ self.selector_id.check_association(registry)?;
+ poll::selector(registry).reregister(self.inner.as_raw_fd(), token, interests)
+ }
+
+ fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
+ #[cfg(debug_assertions)]
+ self.selector_id.remove_association(registry)?;
+ poll::selector(registry).deregister(self.inner.as_raw_fd())
+ }
+}
+
+#[cfg(windows)]
+impl<T> event::Source for IoSource<T>
+where
+ T: AsRawSocket,
+{
+ fn register(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: Interest,
+ ) -> io::Result<()> {
+ #[cfg(debug_assertions)]
+ self.selector_id.associate(registry)?;
+ self.state
+ .register(registry, token, interests, self.inner.as_raw_socket())
+ }
+
+ fn reregister(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: Interest,
+ ) -> io::Result<()> {
+ #[cfg(debug_assertions)]
+ self.selector_id.check_association(registry)?;
+ self.state.reregister(registry, token, interests)
+ }
+
+ fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
+ #[cfg(debug_assertions)]
+ self.selector_id.remove_association(_registry)?;
+ self.state.deregister()
+ }
+}
+
+impl<T> fmt::Debug for IoSource<T>
+where
+ T: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.inner.fmt(f)
+ }
+}
+
+/// Used to associate an `IoSource` with a `sys::Selector`.
+#[cfg(debug_assertions)]
+#[derive(Debug)]
+struct SelectorId {
+ id: AtomicUsize,
+}
+
+#[cfg(debug_assertions)]
+impl SelectorId {
+ /// Value of `id` if `SelectorId` is not associated with any
+ /// `sys::Selector`. Valid selector ids start at 1.
+ const UNASSOCIATED: usize = 0;
+
+ /// Create a new `SelectorId`.
+ const fn new() -> SelectorId {
+ SelectorId {
+ id: AtomicUsize::new(Self::UNASSOCIATED),
+ }
+ }
+
+ /// Associate an I/O source with `registry`, returning an error if its
+ /// already registered.
+ fn associate(&self, registry: &Registry) -> io::Result<()> {
+ let registry_id = poll::selector(&registry).id();
+ let previous_id = self.id.swap(registry_id, Ordering::AcqRel);
+
+ if previous_id == Self::UNASSOCIATED {
+ Ok(())
+ } else {
+ Err(io::Error::new(
+ io::ErrorKind::AlreadyExists,
+ "I/O source already registered with a `Registry`",
+ ))
+ }
+ }
+
+ /// Check the association of an I/O source with `registry`, returning an
+ /// error if its registered with a different `Registry` or not registered at
+ /// all.
+ fn check_association(&self, registry: &Registry) -> io::Result<()> {
+ let registry_id = poll::selector(&registry).id();
+ let id = self.id.load(Ordering::Acquire);
+
+ if id == registry_id {
+ Ok(())
+ } else if id == Self::UNASSOCIATED {
+ Err(io::Error::new(
+ io::ErrorKind::NotFound,
+ "I/O source not registered with `Registry`",
+ ))
+ } else {
+ Err(io::Error::new(
+ io::ErrorKind::AlreadyExists,
+ "I/O source already registered with a different `Registry`",
+ ))
+ }
+ }
+
+ /// Remove a previously made association from `registry`, returns an error
+ /// if it was not previously associated with `registry`.
+ fn remove_association(&self, registry: &Registry) -> io::Result<()> {
+ let registry_id = poll::selector(&registry).id();
+ let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel);
+
+ if previous_id == registry_id {
+ Ok(())
+ } else {
+ Err(io::Error::new(
+ io::ErrorKind::NotFound,
+ "I/O source not registered with `Registry`",
+ ))
+ }
+ }
+}
+
+#[cfg(debug_assertions)]
+impl Clone for SelectorId {
+ fn clone(&self) -> SelectorId {
+ SelectorId {
+ id: AtomicUsize::new(self.id.load(Ordering::Acquire)),
+ }
+ }
+}