use crate::{Interest, Token}; use log::error; use std::mem::MaybeUninit; use std::ops::{Deref, DerefMut}; use std::os::unix::io::{AsRawFd, RawFd}; #[cfg(debug_assertions)] use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::time::Duration; use std::{cmp, io, ptr, slice}; /// Unique id for use as `SelectorId`. #[cfg(debug_assertions)] static NEXT_ID: AtomicUsize = AtomicUsize::new(1); // Type of the `nchanges` and `nevents` parameters in the `kevent` function. #[cfg(not(target_os = "netbsd"))] type Count = libc::c_int; #[cfg(target_os = "netbsd")] type Count = libc::size_t; // Type of the `filter` field in the `kevent` structure. #[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "openbsd"))] type Filter = libc::c_short; #[cfg(any(target_os = "macos", target_os = "ios"))] type Filter = i16; #[cfg(target_os = "netbsd")] type Filter = u32; // Type of the `flags` field in the `kevent` structure. #[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "openbsd"))] type Flags = libc::c_ushort; #[cfg(any(target_os = "macos", target_os = "ios"))] type Flags = u16; #[cfg(target_os = "netbsd")] type Flags = u32; // Type of the `data` field in the `kevent` structure. #[cfg(any( target_os = "dragonfly", target_os = "freebsd", target_os = "ios", target_os = "macos" ))] type Data = libc::intptr_t; #[cfg(any(target_os = "netbsd", target_os = "openbsd"))] type Data = i64; // Type of the `udata` field in the `kevent` structure. #[cfg(not(target_os = "netbsd"))] type UData = *mut libc::c_void; #[cfg(target_os = "netbsd")] type UData = libc::intptr_t; macro_rules! kevent { ($id: expr, $filter: expr, $flags: expr, $data: expr) => { libc::kevent { ident: $id as libc::uintptr_t, filter: $filter as Filter, flags: $flags, fflags: 0, data: 0, udata: $data as UData, } }; } #[derive(Debug)] pub struct Selector { #[cfg(debug_assertions)] id: usize, kq: RawFd, #[cfg(debug_assertions)] has_waker: AtomicBool, } impl Selector { pub fn new() -> io::Result { syscall!(kqueue()) .and_then(|kq| syscall!(fcntl(kq, libc::F_SETFD, libc::FD_CLOEXEC)).map(|_| kq)) .map(|kq| Selector { #[cfg(debug_assertions)] id: NEXT_ID.fetch_add(1, Ordering::Relaxed), kq, #[cfg(debug_assertions)] has_waker: AtomicBool::new(false), }) } pub fn try_clone(&self) -> io::Result { syscall!(fcntl(self.kq, libc::F_DUPFD_CLOEXEC, super::LOWEST_FD)).map(|kq| Selector { // It's the same selector, so we use the same id. #[cfg(debug_assertions)] id: self.id, kq, #[cfg(debug_assertions)] has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), }) } pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { let timeout = timeout.map(|to| libc::timespec { tv_sec: cmp::min(to.as_secs(), libc::time_t::max_value() as u64) as libc::time_t, // `Duration::subsec_nanos` is guaranteed to be less than one // billion (the number of nanoseconds in a second), making the // cast to i32 safe. The cast itself is needed for platforms // where C's long is only 32 bits. tv_nsec: libc::c_long::from(to.subsec_nanos() as i32), }); let timeout = timeout .as_ref() .map(|s| s as *const _) .unwrap_or(ptr::null_mut()); events.clear(); syscall!(kevent( self.kq, ptr::null(), 0, events.as_mut_ptr(), events.capacity() as Count, timeout, )) .map(|n_events| { // This is safe because `kevent` ensures that `n_events` are // assigned. unsafe { events.set_len(n_events as usize) }; }) } pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { let flags = libc::EV_CLEAR | libc::EV_RECEIPT | libc::EV_ADD; // At most we need two changes, but maybe we only need 1. let mut changes: [MaybeUninit; 2] = [MaybeUninit::uninit(), MaybeUninit::uninit()]; let mut n_changes = 0; if interests.is_writable() { let kevent = kevent!(fd, libc::EVFILT_WRITE, flags, token.0); changes[n_changes] = MaybeUninit::new(kevent); n_changes += 1; } if interests.is_readable() { let kevent = kevent!(fd, libc::EVFILT_READ, flags, token.0); changes[n_changes] = MaybeUninit::new(kevent); n_changes += 1; } // Older versions of macOS (OS X 10.11 and 10.10 have been witnessed) // can return EPIPE when registering a pipe file descriptor where the // other end has already disappeared. For example code that creates a // pipe, closes a file descriptor, and then registers the other end will // see an EPIPE returned from `register`. // // It also turns out that kevent will still report events on the file // descriptor, telling us that it's readable/hup at least after we've // done this registration. As a result we just ignore `EPIPE` here // instead of propagating it. // // More info can be found at tokio-rs/mio#582. let changes = unsafe { // This is safe because we ensure that at least `n_changes` are in // the array. slice::from_raw_parts_mut(changes[0].as_mut_ptr(), n_changes) }; kevent_register(self.kq, changes, &[libc::EPIPE as Data]) } pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { let flags = libc::EV_CLEAR | libc::EV_RECEIPT; let write_flags = if interests.is_writable() { flags | libc::EV_ADD } else { flags | libc::EV_DELETE }; let read_flags = if interests.is_readable() { flags | libc::EV_ADD } else { flags | libc::EV_DELETE }; let mut changes: [libc::kevent; 2] = [ kevent!(fd, libc::EVFILT_WRITE, write_flags, token.0), kevent!(fd, libc::EVFILT_READ, read_flags, token.0), ]; // Since there is no way to check with which interests the fd was // registered we modify both readable and write, adding it when required // and removing it otherwise, ignoring the ENOENT error when it comes // up. The ENOENT error informs us that a filter we're trying to remove // wasn't there in first place, but we don't really care since our goal // is accomplished. // // For the explanation of ignoring `EPIPE` see `register`. kevent_register( self.kq, &mut changes, &[libc::ENOENT as Data, libc::EPIPE as Data], ) } pub fn deregister(&self, fd: RawFd) -> io::Result<()> { let flags = libc::EV_DELETE | libc::EV_RECEIPT; let mut changes: [libc::kevent; 2] = [ kevent!(fd, libc::EVFILT_WRITE, flags, 0), kevent!(fd, libc::EVFILT_READ, flags, 0), ]; // Since there is no way to check with which interests the fd was // registered we remove both filters (readable and writeable) and ignore // the ENOENT error when it comes up. The ENOENT error informs us that // the filter wasn't there in first place, but we don't really care // about that since our goal is to remove it. kevent_register(self.kq, &mut changes, &[libc::ENOENT as Data]) } #[cfg(debug_assertions)] pub fn register_waker(&self) -> bool { self.has_waker.swap(true, Ordering::AcqRel) } // Used by `Waker`. #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] pub fn setup_waker(&self, token: Token) -> io::Result<()> { // First attempt to accept user space notifications. let mut kevent = kevent!( 0, libc::EVFILT_USER, libc::EV_ADD | libc::EV_CLEAR | libc::EV_RECEIPT, token.0 ); syscall!(kevent(self.kq, &kevent, 1, &mut kevent, 1, ptr::null())).and_then(|_| { if (kevent.flags & libc::EV_ERROR) != 0 && kevent.data != 0 { Err(io::Error::from_raw_os_error(kevent.data as i32)) } else { Ok(()) } }) } // Used by `Waker`. #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] pub fn wake(&self, token: Token) -> io::Result<()> { let mut kevent = kevent!( 0, libc::EVFILT_USER, libc::EV_ADD | libc::EV_RECEIPT, token.0 ); kevent.fflags = libc::NOTE_TRIGGER; syscall!(kevent(self.kq, &kevent, 1, &mut kevent, 1, ptr::null())).and_then(|_| { if (kevent.flags & libc::EV_ERROR) != 0 && kevent.data != 0 { Err(io::Error::from_raw_os_error(kevent.data as i32)) } else { Ok(()) } }) } } /// Register `changes` with `kq`ueue. fn kevent_register( kq: RawFd, changes: &mut [libc::kevent], ignored_errors: &[Data], ) -> io::Result<()> { syscall!(kevent( kq, changes.as_ptr(), changes.len() as Count, changes.as_mut_ptr(), changes.len() as Count, ptr::null(), )) .map(|_| ()) .or_else(|err| { // According to the manual page of FreeBSD: "When kevent() call fails // with EINTR error, all changes in the changelist have been applied", // so we can safely ignore it. if err.raw_os_error() == Some(libc::EINTR) { Ok(()) } else { Err(err) } }) .and_then(|()| check_errors(&changes, ignored_errors)) } /// Check all events for possible errors, it returns the first error found. fn check_errors(events: &[libc::kevent], ignored_errors: &[Data]) -> io::Result<()> { for event in events { // We can't use references to packed structures (in checking the ignored // errors), so we need copy the data out before use. let data = event.data; // Check for the error flag, the actual error will be in the `data` // field. if (event.flags & libc::EV_ERROR != 0) && data != 0 && !ignored_errors.contains(&data) { return Err(io::Error::from_raw_os_error(data as i32)); } } Ok(()) } cfg_io_source! { #[cfg(debug_assertions)] impl Selector { pub fn id(&self) -> usize { self.id } } } impl AsRawFd for Selector { fn as_raw_fd(&self) -> RawFd { self.kq } } impl Drop for Selector { fn drop(&mut self) { if let Err(err) = syscall!(close(self.kq)) { error!("error closing kqueue: {}", err); } } } pub type Event = libc::kevent; pub struct Events(Vec); impl Events { pub fn with_capacity(capacity: usize) -> Events { Events(Vec::with_capacity(capacity)) } } impl Deref for Events { type Target = Vec; fn deref(&self) -> &Self::Target { &self.0 } } impl DerefMut for Events { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } // `Events` cannot derive `Send` or `Sync` because of the // `udata: *mut ::c_void` field in `libc::kevent`. However, `Events`'s public // API treats the `udata` field as a `uintptr_t` which is `Send`. `Sync` is // safe because with a `events: &Events` value, the only access to the `udata` // field is through `fn token(event: &Event)` which cannot mutate the field. unsafe impl Send for Events {} unsafe impl Sync for Events {} pub mod event { use std::fmt; use crate::sys::Event; use crate::Token; use super::{Filter, Flags}; pub fn token(event: &Event) -> Token { Token(event.udata as usize) } pub fn is_readable(event: &Event) -> bool { event.filter == libc::EVFILT_READ || { #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] // Used by the `Awakener`. On platforms that use `eventfd` or a unix // pipe it will emit a readable event so we'll fake that here as // well. { event.filter == libc::EVFILT_USER } #[cfg(not(any(target_os = "freebsd", target_os = "ios", target_os = "macos")))] { false } } } pub fn is_writable(event: &Event) -> bool { event.filter == libc::EVFILT_WRITE } pub fn is_error(event: &Event) -> bool { (event.flags & libc::EV_ERROR) != 0 || // When the read end of the socket is closed, EV_EOF is set on // flags, and fflags contains the error if there is one. (event.flags & libc::EV_EOF) != 0 && event.fflags != 0 } pub fn is_read_closed(event: &Event) -> bool { event.filter == libc::EVFILT_READ && event.flags & libc::EV_EOF != 0 } pub fn is_write_closed(event: &Event) -> bool { event.filter == libc::EVFILT_WRITE && event.flags & libc::EV_EOF != 0 } pub fn is_priority(_: &Event) -> bool { // kqueue doesn't have priority indicators. false } #[allow(unused_variables)] // `event` is not used on some platforms. pub fn is_aio(event: &Event) -> bool { #[cfg(any( target_os = "dragonfly", target_os = "freebsd", target_os = "ios", target_os = "macos" ))] { event.filter == libc::EVFILT_AIO } #[cfg(not(any( target_os = "dragonfly", target_os = "freebsd", target_os = "ios", target_os = "macos" )))] { false } } #[allow(unused_variables)] // `event` is only used on FreeBSD. pub fn is_lio(event: &Event) -> bool { #[cfg(target_os = "freebsd")] { event.filter == libc::EVFILT_LIO } #[cfg(not(target_os = "freebsd"))] { false } } pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result { debug_detail!( FilterDetails(Filter), PartialEq::eq, libc::EVFILT_READ, libc::EVFILT_WRITE, libc::EVFILT_AIO, libc::EVFILT_VNODE, libc::EVFILT_PROC, libc::EVFILT_SIGNAL, libc::EVFILT_TIMER, #[cfg(target_os = "freebsd")] libc::EVFILT_PROCDESC, #[cfg(any( target_os = "freebsd", target_os = "dragonfly", target_os = "ios", target_os = "macos" ))] libc::EVFILT_FS, #[cfg(target_os = "freebsd")] libc::EVFILT_LIO, #[cfg(any( target_os = "freebsd", target_os = "dragonfly", target_os = "ios", target_os = "macos" ))] libc::EVFILT_USER, #[cfg(target_os = "freebsd")] libc::EVFILT_SENDFILE, #[cfg(target_os = "freebsd")] libc::EVFILT_EMPTY, #[cfg(target_os = "dragonfly")] libc::EVFILT_EXCEPT, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::EVFILT_MACHPORT, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::EVFILT_VM, ); #[allow(clippy::trivially_copy_pass_by_ref)] fn check_flag(got: &Flags, want: &Flags) -> bool { (got & want) != 0 } debug_detail!( FlagsDetails(Flags), check_flag, libc::EV_ADD, libc::EV_DELETE, libc::EV_ENABLE, libc::EV_DISABLE, libc::EV_ONESHOT, libc::EV_CLEAR, libc::EV_RECEIPT, libc::EV_DISPATCH, #[cfg(target_os = "freebsd")] libc::EV_DROP, libc::EV_FLAG1, libc::EV_ERROR, libc::EV_EOF, libc::EV_SYSFLAGS, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::EV_FLAG0, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::EV_POLL, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::EV_OOBAND, #[cfg(target_os = "dragonfly")] libc::EV_NODATA, ); #[allow(clippy::trivially_copy_pass_by_ref)] fn check_fflag(got: &u32, want: &u32) -> bool { (got & want) != 0 } debug_detail!( FflagsDetails(u32), check_fflag, #[cfg(any( target_os = "dragonfly", target_os = "freebsd", target_os = "ios", target_os = "macos" ))] libc::NOTE_TRIGGER, #[cfg(any( target_os = "dragonfly", target_os = "freebsd", target_os = "ios", target_os = "macos" ))] libc::NOTE_FFNOP, #[cfg(any( target_os = "dragonfly", target_os = "freebsd", target_os = "ios", target_os = "macos" ))] libc::NOTE_FFAND, #[cfg(any( target_os = "dragonfly", target_os = "freebsd", target_os = "ios", target_os = "macos" ))] libc::NOTE_FFOR, #[cfg(any( target_os = "dragonfly", target_os = "freebsd", target_os = "ios", target_os = "macos" ))] libc::NOTE_FFCOPY, #[cfg(any( target_os = "dragonfly", target_os = "freebsd", target_os = "ios", target_os = "macos" ))] libc::NOTE_FFCTRLMASK, #[cfg(any( target_os = "dragonfly", target_os = "freebsd", target_os = "ios", target_os = "macos" ))] libc::NOTE_FFLAGSMASK, libc::NOTE_LOWAT, libc::NOTE_DELETE, libc::NOTE_WRITE, #[cfg(target_os = "dragonfly")] libc::NOTE_OOB, #[cfg(target_os = "openbsd")] libc::NOTE_EOF, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::NOTE_EXTEND, libc::NOTE_ATTRIB, libc::NOTE_LINK, libc::NOTE_RENAME, libc::NOTE_REVOKE, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::NOTE_NONE, #[cfg(any(target_os = "openbsd"))] libc::NOTE_TRUNCATE, libc::NOTE_EXIT, libc::NOTE_FORK, libc::NOTE_EXEC, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::NOTE_SIGNAL, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::NOTE_EXITSTATUS, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::NOTE_EXIT_DETAIL, libc::NOTE_PDATAMASK, libc::NOTE_PCTRLMASK, #[cfg(any( target_os = "dragonfly", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd" ))] libc::NOTE_TRACK, #[cfg(any( target_os = "dragonfly", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd" ))] libc::NOTE_TRACKERR, #[cfg(any( target_os = "dragonfly", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd" ))] libc::NOTE_CHILD, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::NOTE_EXIT_DETAIL_MASK, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::NOTE_EXIT_DECRYPTFAIL, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::NOTE_EXIT_MEMORY, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::NOTE_EXIT_CSERROR, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::NOTE_VM_PRESSURE, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::NOTE_VM_PRESSURE_TERMINATE, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::NOTE_VM_PRESSURE_SUDDEN_TERMINATE, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::NOTE_VM_ERROR, #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] libc::NOTE_SECONDS, #[cfg(any(target_os = "freebsd"))] libc::NOTE_MSECONDS, #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] libc::NOTE_USECONDS, #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] libc::NOTE_NSECONDS, #[cfg(any(target_os = "ios", target_os = "macos"))] #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] libc::NOTE_ABSOLUTE, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::NOTE_LEEWAY, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::NOTE_CRITICAL, #[cfg(any(target_os = "ios", target_os = "macos"))] libc::NOTE_BACKGROUND, ); // Can't reference fields in packed structures. let ident = event.ident; let data = event.data; let udata = event.udata; f.debug_struct("kevent") .field("ident", &ident) .field("filter", &FilterDetails(event.filter)) .field("flags", &FlagsDetails(event.flags)) .field("fflags", &FflagsDetails(event.fflags)) .field("data", &data) .field("udata", &udata) .finish() } } #[test] #[cfg(feature = "os-ext")] fn does_not_register_rw() { use crate::unix::SourceFd; use crate::{Poll, Token}; let kq = unsafe { libc::kqueue() }; let mut kqf = SourceFd(&kq); let poll = Poll::new().unwrap(); // Registering kqueue fd will fail if write is requested (On anything but // some versions of macOS). poll.registry() .register(&mut kqf, Token(1234), Interest::READABLE) .unwrap(); }