aboutsummaryrefslogtreecommitdiff
path: root/src/io/bsd/poll_aio.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/io/bsd/poll_aio.rs')
-rw-r--r--src/io/bsd/poll_aio.rs195
1 files changed, 195 insertions, 0 deletions
diff --git a/src/io/bsd/poll_aio.rs b/src/io/bsd/poll_aio.rs
new file mode 100644
index 0000000..f1ac4b2
--- /dev/null
+++ b/src/io/bsd/poll_aio.rs
@@ -0,0 +1,195 @@
+//! Use POSIX AIO futures with Tokio.
+
+use crate::io::driver::{Handle, Interest, ReadyEvent, Registration};
+use mio::event::Source;
+use mio::Registry;
+use mio::Token;
+use std::fmt;
+use std::io;
+use std::ops::{Deref, DerefMut};
+use std::os::unix::io::AsRawFd;
+use std::os::unix::prelude::RawFd;
+use std::task::{Context, Poll};
+
+/// Like [`mio::event::Source`], but for POSIX AIO only.
+///
+/// Tokio's consumer must pass an implementor of this trait to create a
+/// [`Aio`] object.
+pub trait AioSource {
+ /// Registers this AIO event source with Tokio's reactor.
+ fn register(&mut self, kq: RawFd, token: usize);
+
+ /// Deregisters this AIO event source with Tokio's reactor.
+ fn deregister(&mut self);
+}
+
+/// Wraps the user's AioSource in order to implement mio::event::Source, which
+/// is what the rest of the crate wants.
+struct MioSource<T>(T);
+
+impl<T: AioSource> Source for MioSource<T> {
+ fn register(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: mio::Interest,
+ ) -> io::Result<()> {
+ assert!(interests.is_aio() || interests.is_lio());
+ self.0.register(registry.as_raw_fd(), usize::from(token));
+ Ok(())
+ }
+
+ fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
+ self.0.deregister();
+ Ok(())
+ }
+
+ fn reregister(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: mio::Interest,
+ ) -> io::Result<()> {
+ assert!(interests.is_aio() || interests.is_lio());
+ self.0.register(registry.as_raw_fd(), usize::from(token));
+ Ok(())
+ }
+}
+
+/// Associates a POSIX AIO control block with the reactor that drives it.
+///
+/// `Aio`'s wrapped type must implement [`AioSource`] to be driven
+/// by the reactor.
+///
+/// The wrapped source may be accessed through the `Aio` via the `Deref` and
+/// `DerefMut` traits.
+///
+/// ## Clearing readiness
+///
+/// If [`Aio::poll_ready`] returns ready, but the consumer determines that the
+/// Source is not completely ready and must return to the Pending state,
+/// [`Aio::clear_ready`] may be used. This can be useful with
+/// [`lio_listio`], which may generate a kevent when only a portion of the
+/// operations have completed.
+///
+/// ## Platforms
+///
+/// Only FreeBSD implements POSIX AIO with kqueue notification, so
+/// `Aio` is only available for that operating system.
+///
+/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
+// Note: Unlike every other kqueue event source, POSIX AIO registers events not
+// via kevent(2) but when the aiocb is submitted to the kernel via aio_read,
+// aio_write, etc. It needs the kqueue's file descriptor to do that. So
+// AsyncFd can't be used for POSIX AIO.
+//
+// Note that Aio doesn't implement Drop. There's no need. Unlike other
+// kqueue sources, simply dropping the object effectively deregisters it.
+pub struct Aio<E> {
+ io: MioSource<E>,
+ registration: Registration,
+}
+
+// ===== impl Aio =====
+
+impl<E: AioSource> Aio<E> {
+ /// Creates a new `Aio` suitable for use with POSIX AIO functions.
+ ///
+ /// It will be associated with the default reactor. The runtime is usually
+ /// set implicitly when this function is called from a future driven by a
+ /// Tokio runtime, otherwise runtime can be set explicitly with
+ /// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ pub fn new_for_aio(io: E) -> io::Result<Self> {
+ Self::new_with_interest(io, Interest::AIO)
+ }
+
+ /// Creates a new `Aio` suitable for use with [`lio_listio`].
+ ///
+ /// It will be associated with the default reactor. The runtime is usually
+ /// set implicitly when this function is called from a future driven by a
+ /// Tokio runtime, otherwise runtime can be set explicitly with
+ /// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ ///
+ /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
+ pub fn new_for_lio(io: E) -> io::Result<Self> {
+ Self::new_with_interest(io, Interest::LIO)
+ }
+
+ fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
+ let mut io = MioSource(io);
+ let handle = Handle::current();
+ let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
+ Ok(Self { io, registration })
+ }
+
+ /// Indicates to Tokio that the source is no longer ready. The internal
+ /// readiness flag will be cleared, and tokio will wait for the next
+ /// edge-triggered readiness notification from the OS.
+ ///
+ /// It is critical that this method not be called unless your code
+ /// _actually observes_ that the source is _not_ ready. The OS must
+ /// deliver a subsequent notification, or this source will block
+ /// forever. It is equally critical that you `do` call this method if you
+ /// resubmit the same structure to the kernel and poll it again.
+ ///
+ /// This method is not very useful with AIO readiness, since each `aiocb`
+ /// structure is typically only used once. It's main use with
+ /// [`lio_listio`], which will sometimes send notification when only a
+ /// portion of its elements are complete. In that case, the caller must
+ /// call `clear_ready` before resubmitting it.
+ ///
+ /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
+ pub fn clear_ready(&self, ev: AioEvent) {
+ self.registration.clear_readiness(ev.0)
+ }
+
+ /// Destroy the [`Aio`] and return its inner source.
+ pub fn into_inner(self) -> E {
+ self.io.0
+ }
+
+ /// Polls for readiness. Either AIO or LIO counts.
+ ///
+ /// This method returns:
+ /// * `Poll::Pending` if the underlying operation is not complete, whether
+ /// or not it completed successfully. This will be true if the OS is
+ /// still processing it, or if it has not yet been submitted to the OS.
+ /// * `Poll::Ready(Ok(_))` if the underlying operation is complete.
+ /// * `Poll::Ready(Err(_))` if the reactor has been shutdown. This does
+ /// _not_ indicate that the underlying operation encountered an error.
+ ///
+ /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context`
+ /// is scheduled to receive a wakeup when the underlying operation
+ /// completes. Note that on multiple calls to `poll_ready`, only the `Waker` from the
+ /// `Context` passed to the most recent call is scheduled to receive a wakeup.
+ pub fn poll_ready<'a>(&'a self, cx: &mut Context<'_>) -> Poll<io::Result<AioEvent>> {
+ let ev = ready!(self.registration.poll_read_ready(cx))?;
+ Poll::Ready(Ok(AioEvent(ev)))
+ }
+}
+
+impl<E: AioSource> Deref for Aio<E> {
+ type Target = E;
+
+ fn deref(&self) -> &E {
+ &self.io.0
+ }
+}
+
+impl<E: AioSource> DerefMut for Aio<E> {
+ fn deref_mut(&mut self) -> &mut E {
+ &mut self.io.0
+ }
+}
+
+impl<E: AioSource + fmt::Debug> fmt::Debug for Aio<E> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Aio").field("io", &self.io.0).finish()
+ }
+}
+
+/// Opaque data returned by [`Aio::poll_ready`].
+///
+/// It can be fed back to [`Aio::clear_ready`].
+#[derive(Debug)]
+pub struct AioEvent(ReadyEvent);