aboutsummaryrefslogtreecommitdiff
path: root/src/sys/unix
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2020-10-23 08:03:13 -0700
committerJoel Galenson <jgalenson@google.com>2020-10-23 08:03:13 -0700
commit4bf0c30e286d482eb711dc677be906adcba4650b (patch)
treede038d1f2268222ebb5925c40db9bda0e2d8e565 /src/sys/unix
parent33e7b955494b6d15ed72863ad35c620e904302ed (diff)
downloadmio-4bf0c30e286d482eb711dc677be906adcba4650b.tar.gz
Import mio-0.7.3
Test: None Change-Id: I7df903972aaf06adb1ecb20a63793fcf128edb8f
Diffstat (limited to 'src/sys/unix')
-rw-r--r--src/sys/unix/mod.rs74
-rw-r--r--src/sys/unix/net.rs103
-rw-r--r--src/sys/unix/selector/epoll.rs235
-rw-r--r--src/sys/unix/selector/kqueue.rs676
-rw-r--r--src/sys/unix/selector/mod.rs35
-rw-r--r--src/sys/unix/sourcefd.rs108
-rw-r--r--src/sys/unix/tcp.rs103
-rw-r--r--src/sys/unix/udp.rs23
-rw-r--r--src/sys/unix/uds/datagram.rs56
-rw-r--r--src/sys/unix/uds/listener.rs83
-rw-r--r--src/sys/unix/uds/mod.rs149
-rw-r--r--src/sys/unix/uds/socketaddr.rs120
-rw-r--r--src/sys/unix/uds/stream.rs39
-rw-r--r--src/sys/unix/waker.rs174
14 files changed, 1978 insertions, 0 deletions
diff --git a/src/sys/unix/mod.rs b/src/sys/unix/mod.rs
new file mode 100644
index 0000000..96d7f4d
--- /dev/null
+++ b/src/sys/unix/mod.rs
@@ -0,0 +1,74 @@
+/// Helper macro to execute a system call that returns an `io::Result`.
+//
+// Macro must be defined before any modules that uses them.
+#[allow(unused_macros)]
+macro_rules! syscall {
+ ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{
+ let res = unsafe { libc::$fn($($arg, )*) };
+ if res == -1 {
+ Err(std::io::Error::last_os_error())
+ } else {
+ Ok(res)
+ }
+ }};
+}
+
+cfg_os_poll! {
+ mod net;
+
+ mod selector;
+ pub(crate) use self::selector::{event, Event, Events, Selector};
+
+ mod sourcefd;
+ pub use self::sourcefd::SourceFd;
+
+ mod waker;
+ pub(crate) use self::waker::Waker;
+
+ cfg_tcp! {
+ pub(crate) mod tcp;
+ }
+
+ cfg_udp! {
+ pub(crate) mod udp;
+ }
+
+ cfg_uds! {
+ pub(crate) mod uds;
+ pub use self::uds::SocketAddr;
+ }
+
+ cfg_net! {
+ use std::io;
+
+ // Both `kqueue` and `epoll` don't need to hold any user space state.
+ pub(crate) struct IoSourceState;
+
+ impl IoSourceState {
+ pub fn new() -> IoSourceState {
+ IoSourceState
+ }
+
+ pub fn do_io<T, F, R>(&self, f: F, io: &T) -> io::Result<R>
+ where
+ F: FnOnce(&T) -> io::Result<R>,
+ {
+ // We don't hold state, so we can just call the function and
+ // return.
+ f(io)
+ }
+ }
+ }
+}
+
+cfg_not_os_poll! {
+ cfg_uds! {
+ mod uds;
+ pub use self::uds::SocketAddr;
+ }
+
+ cfg_any_os_util! {
+ mod sourcefd;
+ pub use self::sourcefd::SourceFd;
+ }
+}
diff --git a/src/sys/unix/net.rs b/src/sys/unix/net.rs
new file mode 100644
index 0000000..2671b42
--- /dev/null
+++ b/src/sys/unix/net.rs
@@ -0,0 +1,103 @@
+#[cfg(all(feature = "os-poll", any(feature = "tcp", feature = "udp")))]
+use std::net::SocketAddr;
+
+#[cfg(all(feature = "os-poll", any(feature = "udp")))]
+pub(crate) fn new_ip_socket(
+ addr: SocketAddr,
+ socket_type: libc::c_int,
+) -> std::io::Result<libc::c_int> {
+ let domain = match addr {
+ SocketAddr::V4(..) => libc::AF_INET,
+ SocketAddr::V6(..) => libc::AF_INET6,
+ };
+
+ new_socket(domain, socket_type)
+}
+
+/// Create a new non-blocking socket.
+#[cfg(all(
+ feature = "os-poll",
+ any(feature = "tcp", feature = "udp", feature = "uds")
+))]
+pub(crate) fn new_socket(
+ domain: libc::c_int,
+ socket_type: libc::c_int,
+) -> std::io::Result<libc::c_int> {
+ #[cfg(any(
+ target_os = "android",
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "illumos",
+ target_os = "linux",
+ target_os = "netbsd",
+ target_os = "openbsd"
+ ))]
+ let socket_type = socket_type | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC;
+
+ // Gives a warning for platforms without SOCK_NONBLOCK.
+ #[allow(clippy::let_and_return)]
+ let socket = syscall!(socket(domain, socket_type, 0));
+
+ // Mimick `libstd` and set `SO_NOSIGPIPE` on apple systems.
+ #[cfg(target_vendor = "apple")]
+ let socket = socket.and_then(|socket| {
+ syscall!(setsockopt(
+ socket,
+ libc::SOL_SOCKET,
+ libc::SO_NOSIGPIPE,
+ &1 as *const libc::c_int as *const libc::c_void,
+ std::mem::size_of::<libc::c_int>() as libc::socklen_t
+ ))
+ .map(|_| socket)
+ });
+
+ // Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC. Not sure about
+ // Solaris, couldn't find anything online.
+ #[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))]
+ let socket = socket.and_then(|socket| {
+ // For platforms that don't support flags in socket, we need to
+ // set the flags ourselves.
+ syscall!(fcntl(socket, libc::F_SETFL, libc::O_NONBLOCK))
+ .and_then(|_| syscall!(fcntl(socket, libc::F_SETFD, libc::FD_CLOEXEC)).map(|_| socket))
+ .map_err(|e| {
+ // If either of the `fcntl` calls failed, ensure the socket is
+ // closed and return the error.
+ let _ = syscall!(close(socket));
+ e
+ })
+ });
+
+ socket
+}
+
+#[cfg(all(feature = "os-poll", any(feature = "tcp", feature = "udp")))]
+pub(crate) fn socket_addr(addr: &SocketAddr) -> (*const libc::sockaddr, libc::socklen_t) {
+ use std::mem::size_of_val;
+
+ match addr {
+ SocketAddr::V4(ref addr) => (
+ addr as *const _ as *const libc::sockaddr,
+ size_of_val(addr) as libc::socklen_t,
+ ),
+ SocketAddr::V6(ref addr) => (
+ addr as *const _ as *const libc::sockaddr,
+ size_of_val(addr) as libc::socklen_t,
+ ),
+ }
+}
+
+/// `storage` must be initialised to `sockaddr_in` or `sockaddr_in6`.
+#[cfg(all(feature = "os-poll", feature = "tcp"))]
+pub(crate) unsafe fn to_socket_addr(
+ storage: *const libc::sockaddr_storage,
+) -> std::io::Result<SocketAddr> {
+ match (*storage).ss_family as libc::c_int {
+ libc::AF_INET => Ok(SocketAddr::V4(
+ *(storage as *const libc::sockaddr_in as *const _),
+ )),
+ libc::AF_INET6 => Ok(SocketAddr::V6(
+ *(storage as *const libc::sockaddr_in6 as *const _),
+ )),
+ _ => Err(std::io::ErrorKind::InvalidInput.into()),
+ }
+}
diff --git a/src/sys/unix/selector/epoll.rs b/src/sys/unix/selector/epoll.rs
new file mode 100644
index 0000000..13f1617
--- /dev/null
+++ b/src/sys/unix/selector/epoll.rs
@@ -0,0 +1,235 @@
+use crate::{Interest, Token};
+
+use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLRDHUP};
+use log::error;
+use std::os::unix::io::{AsRawFd, RawFd};
+#[cfg(debug_assertions)]
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::time::Duration;
+use std::{cmp, i32, io, ptr};
+
+/// Unique id for use as `SelectorId`.
+#[cfg(debug_assertions)]
+static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
+
+#[derive(Debug)]
+pub struct Selector {
+ #[cfg(debug_assertions)]
+ id: usize,
+ ep: RawFd,
+}
+
+impl Selector {
+ pub fn new() -> io::Result<Selector> {
+ // According to libuv, `EPOLL_CLOEXEC` is not defined on Android API <
+ // 21. But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on that platform,
+ // so we use it instead.
+ #[cfg(target_os = "android")]
+ let flag = libc::O_CLOEXEC;
+ #[cfg(not(target_os = "android"))]
+ let flag = libc::EPOLL_CLOEXEC;
+
+ syscall!(epoll_create1(flag)).map(|ep| Selector {
+ #[cfg(debug_assertions)]
+ id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
+ ep,
+ })
+ }
+
+ pub fn try_clone(&self) -> io::Result<Selector> {
+ syscall!(dup(self.ep)).map(|ep| Selector {
+ // It's the same selector, so we use the same id.
+ #[cfg(debug_assertions)]
+ id: self.id,
+ ep,
+ })
+ }
+
+ pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
+ // A bug in kernels < 2.6.37 makes timeouts larger than LONG_MAX / CONFIG_HZ
+ // (approx. 30 minutes with CONFIG_HZ=1200) effectively infinite on 32 bits
+ // architectures. The magic number is the same constant used by libuv.
+ #[cfg(target_pointer_width = "32")]
+ const MAX_SAFE_TIMEOUT: u128 = 1789569;
+ #[cfg(not(target_pointer_width = "32"))]
+ const MAX_SAFE_TIMEOUT: u128 = libc::c_int::max_value() as u128;
+
+ let timeout = timeout
+ .map(|to| cmp::min(to.as_millis(), MAX_SAFE_TIMEOUT) as libc::c_int)
+ .unwrap_or(-1);
+
+ events.clear();
+ syscall!(epoll_wait(
+ self.ep,
+ events.as_mut_ptr(),
+ events.capacity() as i32,
+ timeout,
+ ))
+ .map(|n_events| {
+ // This is safe because `epoll_wait` ensures that `n_events` are
+ // assigned.
+ unsafe { events.set_len(n_events as usize) };
+ })
+ }
+
+ pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
+ let mut event = libc::epoll_event {
+ events: interests_to_epoll(interests),
+ u64: usize::from(token) as u64,
+ };
+
+ syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_ADD, fd, &mut event)).map(|_| ())
+ }
+
+ pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
+ let mut event = libc::epoll_event {
+ events: interests_to_epoll(interests),
+ u64: usize::from(token) as u64,
+ };
+
+ syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_MOD, fd, &mut event)).map(|_| ())
+ }
+
+ pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
+ syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ())
+ }
+}
+
+cfg_net! {
+ impl Selector {
+ #[cfg(debug_assertions)]
+ pub fn id(&self) -> usize {
+ self.id
+ }
+ }
+}
+
+impl AsRawFd for Selector {
+ fn as_raw_fd(&self) -> RawFd {
+ self.ep
+ }
+}
+
+impl Drop for Selector {
+ fn drop(&mut self) {
+ if let Err(err) = syscall!(close(self.ep)) {
+ error!("error closing epoll: {}", err);
+ }
+ }
+}
+
+fn interests_to_epoll(interests: Interest) -> u32 {
+ let mut kind = EPOLLET;
+
+ if interests.is_readable() {
+ kind = kind | EPOLLIN | EPOLLRDHUP;
+ }
+
+ if interests.is_writable() {
+ kind |= EPOLLOUT;
+ }
+
+ kind as u32
+}
+
+pub type Event = libc::epoll_event;
+pub type Events = Vec<Event>;
+
+pub mod event {
+ use std::fmt;
+
+ use crate::sys::Event;
+ use crate::Token;
+
+ pub fn token(event: &Event) -> Token {
+ Token(event.u64 as usize)
+ }
+
+ pub fn is_readable(event: &Event) -> bool {
+ (event.events as libc::c_int & libc::EPOLLIN) != 0
+ || (event.events as libc::c_int & libc::EPOLLPRI) != 0
+ }
+
+ pub fn is_writable(event: &Event) -> bool {
+ (event.events as libc::c_int & libc::EPOLLOUT) != 0
+ }
+
+ pub fn is_error(event: &Event) -> bool {
+ (event.events as libc::c_int & libc::EPOLLERR) != 0
+ }
+
+ pub fn is_read_closed(event: &Event) -> bool {
+ // Both halves of the socket have closed
+ event.events as libc::c_int & libc::EPOLLHUP != 0
+ // Socket has received FIN or called shutdown(SHUT_RD)
+ || (event.events as libc::c_int & libc::EPOLLIN != 0
+ && event.events as libc::c_int & libc::EPOLLRDHUP != 0)
+ }
+
+ pub fn is_write_closed(event: &Event) -> bool {
+ // Both halves of the socket have closed
+ event.events as libc::c_int & libc::EPOLLHUP != 0
+ // Unix pipe write end has closed
+ || (event.events as libc::c_int & libc::EPOLLOUT != 0
+ && event.events as libc::c_int & libc::EPOLLERR != 0)
+ // The other side (read end) of a Unix pipe has closed.
+ || event.events as libc::c_int == libc::EPOLLERR
+ }
+
+ pub fn is_priority(event: &Event) -> bool {
+ (event.events as libc::c_int & libc::EPOLLPRI) != 0
+ }
+
+ pub fn is_aio(_: &Event) -> bool {
+ // Not supported in the kernel, only in libc.
+ false
+ }
+
+ pub fn is_lio(_: &Event) -> bool {
+ // Not supported.
+ false
+ }
+
+ pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result {
+ #[allow(clippy::trivially_copy_pass_by_ref)]
+ fn check_events(got: &u32, want: &libc::c_int) -> bool {
+ (*got as libc::c_int & want) != 0
+ }
+ debug_detail!(
+ EventsDetails(u32),
+ check_events,
+ libc::EPOLLIN,
+ libc::EPOLLPRI,
+ libc::EPOLLOUT,
+ libc::EPOLLRDNORM,
+ libc::EPOLLRDBAND,
+ libc::EPOLLWRNORM,
+ libc::EPOLLWRBAND,
+ libc::EPOLLMSG,
+ libc::EPOLLERR,
+ libc::EPOLLHUP,
+ libc::EPOLLET,
+ libc::EPOLLRDHUP,
+ libc::EPOLLONESHOT,
+ #[cfg(any(target_os = "linux", target_os = "solaris"))]
+ libc::EPOLLEXCLUSIVE,
+ #[cfg(any(target_os = "android", target_os = "linux"))]
+ libc::EPOLLWAKEUP,
+ libc::EPOLL_CLOEXEC,
+ );
+
+ // Can't reference fields in packed structures.
+ let e_u64 = event.u64;
+ f.debug_struct("epoll_event")
+ .field("events", &EventsDetails(event.events))
+ .field("u64", &e_u64)
+ .finish()
+ }
+}
+
+#[cfg(target_os = "android")]
+#[test]
+fn assert_close_on_exec_flag() {
+ // This assertion need to be true for Selector::new.
+ assert_eq!(libc::O_CLOEXEC, libc::EPOLL_CLOEXEC);
+}
diff --git a/src/sys/unix/selector/kqueue.rs b/src/sys/unix/selector/kqueue.rs
new file mode 100644
index 0000000..2ebac9a
--- /dev/null
+++ b/src/sys/unix/selector/kqueue.rs
@@ -0,0 +1,676 @@
+use crate::{Interest, Token};
+use log::error;
+use std::mem::MaybeUninit;
+use std::ops::{Deref, DerefMut};
+use std::os::unix::io::{AsRawFd, RawFd};
+#[cfg(debug_assertions)]
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::time::Duration;
+use std::{cmp, io, ptr, slice};
+
+/// Unique id for use as `SelectorId`.
+#[cfg(debug_assertions)]
+static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
+
+// Type of the `nchanges` and `nevents` parameters in the `kevent` function.
+#[cfg(not(target_os = "netbsd"))]
+type Count = libc::c_int;
+#[cfg(target_os = "netbsd")]
+type Count = libc::size_t;
+
+// Type of the `filter` field in the `kevent` structure.
+#[cfg(any(target_os = "freebsd", target_os = "openbsd"))]
+type Filter = libc::c_short;
+#[cfg(any(target_os = "macos", target_os = "ios"))]
+type Filter = i16;
+#[cfg(target_os = "netbsd")]
+type Filter = u32;
+
+// Type of the `flags` field in the `kevent` structure.
+#[cfg(any(target_os = "freebsd", target_os = "openbsd"))]
+type Flags = libc::c_ushort;
+#[cfg(any(target_os = "macos", target_os = "ios"))]
+type Flags = u16;
+#[cfg(target_os = "netbsd")]
+type Flags = u32;
+
+// Type of the `data` field in the `kevent` structure.
+#[cfg(any(
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "ios",
+ target_os = "macos"
+))]
+type Data = libc::intptr_t;
+#[cfg(any(target_os = "netbsd", target_os = "openbsd"))]
+type Data = i64;
+
+// Type of the `udata` field in the `kevent` structure.
+#[cfg(not(target_os = "netbsd"))]
+type UData = *mut libc::c_void;
+#[cfg(target_os = "netbsd")]
+type UData = libc::intptr_t;
+
+macro_rules! kevent {
+ ($id: expr, $filter: expr, $flags: expr, $data: expr) => {
+ libc::kevent {
+ ident: $id as libc::uintptr_t,
+ filter: $filter as Filter,
+ flags: $flags,
+ fflags: 0,
+ data: 0,
+ udata: $data as UData,
+ }
+ };
+}
+
+#[derive(Debug)]
+pub struct Selector {
+ #[cfg(debug_assertions)]
+ id: usize,
+ kq: RawFd,
+}
+
+impl Selector {
+ pub fn new() -> io::Result<Selector> {
+ syscall!(kqueue())
+ .and_then(|kq| syscall!(fcntl(kq, libc::F_SETFD, libc::FD_CLOEXEC)).map(|_| kq))
+ .map(|kq| Selector {
+ #[cfg(debug_assertions)]
+ id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
+ kq,
+ })
+ }
+
+ pub fn try_clone(&self) -> io::Result<Selector> {
+ syscall!(dup(self.kq)).map(|kq| Selector {
+ // It's the same selector, so we use the same id.
+ #[cfg(debug_assertions)]
+ id: self.id,
+ kq,
+ })
+ }
+
+ pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
+ let timeout = timeout.map(|to| libc::timespec {
+ tv_sec: cmp::min(to.as_secs(), libc::time_t::max_value() as u64) as libc::time_t,
+ // `Duration::subsec_nanos` is guaranteed to be less than one
+ // billion (the number of nanoseconds in a second), making the
+ // cast to i32 safe. The cast itself is needed for platforms
+ // where C's long is only 32 bits.
+ tv_nsec: libc::c_long::from(to.subsec_nanos() as i32),
+ });
+ let timeout = timeout
+ .as_ref()
+ .map(|s| s as *const _)
+ .unwrap_or(ptr::null_mut());
+
+ events.clear();
+ syscall!(kevent(
+ self.kq,
+ ptr::null(),
+ 0,
+ events.as_mut_ptr(),
+ events.capacity() as Count,
+ timeout,
+ ))
+ .map(|n_events| {
+ // This is safe because `kevent` ensures that `n_events` are
+ // assigned.
+ unsafe { events.set_len(n_events as usize) };
+ })
+ }
+
+ pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
+ let flags = libc::EV_CLEAR | libc::EV_RECEIPT | libc::EV_ADD;
+ // At most we need two changes, but maybe we only need 1.
+ let mut changes: [MaybeUninit<libc::kevent>; 2] =
+ [MaybeUninit::uninit(), MaybeUninit::uninit()];
+ let mut n_changes = 0;
+
+ if interests.is_writable() {
+ let kevent = kevent!(fd, libc::EVFILT_WRITE, flags, token.0);
+ changes[n_changes] = MaybeUninit::new(kevent);
+ n_changes += 1;
+ }
+
+ if interests.is_readable() {
+ let kevent = kevent!(fd, libc::EVFILT_READ, flags, token.0);
+ changes[n_changes] = MaybeUninit::new(kevent);
+ n_changes += 1;
+ }
+
+ // Older versions of macOS (OS X 10.11 and 10.10 have been witnessed)
+ // can return EPIPE when registering a pipe file descriptor where the
+ // other end has already disappeared. For example code that creates a
+ // pipe, closes a file descriptor, and then registers the other end will
+ // see an EPIPE returned from `register`.
+ //
+ // It also turns out that kevent will still report events on the file
+ // descriptor, telling us that it's readable/hup at least after we've
+ // done this registration. As a result we just ignore `EPIPE` here
+ // instead of propagating it.
+ //
+ // More info can be found at tokio-rs/mio#582.
+ let changes = unsafe {
+ // This is safe because we ensure that at least `n_changes` are in
+ // the array.
+ slice::from_raw_parts_mut(changes[0].as_mut_ptr(), n_changes)
+ };
+ kevent_register(self.kq, changes, &[libc::EPIPE as Data])
+ }
+
+ pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
+ let flags = libc::EV_CLEAR | libc::EV_RECEIPT;
+ let write_flags = if interests.is_writable() {
+ flags | libc::EV_ADD
+ } else {
+ flags | libc::EV_DELETE
+ };
+ let read_flags = if interests.is_readable() {
+ flags | libc::EV_ADD
+ } else {
+ flags | libc::EV_DELETE
+ };
+
+ let mut changes: [libc::kevent; 2] = [
+ kevent!(fd, libc::EVFILT_WRITE, write_flags, token.0),
+ kevent!(fd, libc::EVFILT_READ, read_flags, token.0),
+ ];
+
+ // Since there is no way to check with which interests the fd was
+ // registered we modify both readable and write, adding it when required
+ // and removing it otherwise, ignoring the ENOENT error when it comes
+ // up. The ENOENT error informs us that a filter we're trying to remove
+ // wasn't there in first place, but we don't really care since our goal
+ // is accomplished.
+ //
+ // For the explanation of ignoring `EPIPE` see `register`.
+ kevent_register(
+ self.kq,
+ &mut changes,
+ &[libc::ENOENT as Data, libc::EPIPE as Data],
+ )
+ }
+
+ pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
+ let flags = libc::EV_DELETE | libc::EV_RECEIPT;
+ let mut changes: [libc::kevent; 2] = [
+ kevent!(fd, libc::EVFILT_WRITE, flags, 0),
+ kevent!(fd, libc::EVFILT_READ, flags, 0),
+ ];
+
+ // Since there is no way to check with which interests the fd was
+ // registered we remove both filters (readable and writeable) and ignore
+ // the ENOENT error when it comes up. The ENOENT error informs us that
+ // the filter wasn't there in first place, but we don't really care
+ // about that since our goal is to remove it.
+ kevent_register(self.kq, &mut changes, &[libc::ENOENT as Data])
+ }
+
+ // Used by `Waker`.
+ #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
+ pub fn setup_waker(&self, token: Token) -> io::Result<()> {
+ // First attempt to accept user space notifications.
+ let mut kevent = kevent!(
+ 0,
+ libc::EVFILT_USER,
+ libc::EV_ADD | libc::EV_CLEAR | libc::EV_RECEIPT,
+ token.0
+ );
+
+ syscall!(kevent(self.kq, &kevent, 1, &mut kevent, 1, ptr::null())).and_then(|_| {
+ if (kevent.flags & libc::EV_ERROR) != 0 && kevent.data != 0 {
+ Err(io::Error::from_raw_os_error(kevent.data as i32))
+ } else {
+ Ok(())
+ }
+ })
+ }
+
+ // Used by `Waker`.
+ #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
+ pub fn wake(&self, token: Token) -> io::Result<()> {
+ let mut kevent = kevent!(
+ 0,
+ libc::EVFILT_USER,
+ libc::EV_ADD | libc::EV_RECEIPT,
+ token.0
+ );
+ kevent.fflags = libc::NOTE_TRIGGER;
+
+ syscall!(kevent(self.kq, &kevent, 1, &mut kevent, 1, ptr::null())).and_then(|_| {
+ if (kevent.flags & libc::EV_ERROR) != 0 && kevent.data != 0 {
+ Err(io::Error::from_raw_os_error(kevent.data as i32))
+ } else {
+ Ok(())
+ }
+ })
+ }
+}
+
+/// Register `changes` with `kq`ueue.
+fn kevent_register(
+ kq: RawFd,
+ changes: &mut [libc::kevent],
+ ignored_errors: &[Data],
+) -> io::Result<()> {
+ syscall!(kevent(
+ kq,
+ changes.as_ptr(),
+ changes.len() as Count,
+ changes.as_mut_ptr(),
+ changes.len() as Count,
+ ptr::null(),
+ ))
+ .map(|_| ())
+ .or_else(|err| {
+ // According to the manual page of FreeBSD: "When kevent() call fails
+ // with EINTR error, all changes in the changelist have been applied",
+ // so we can safely ignore it.
+ if err.raw_os_error() == Some(libc::EINTR) {
+ Ok(())
+ } else {
+ Err(err)
+ }
+ })
+ .and_then(|()| check_errors(&changes, ignored_errors))
+}
+
+/// Check all events for possible errors, it returns the first error found.
+fn check_errors(events: &[libc::kevent], ignored_errors: &[Data]) -> io::Result<()> {
+ for event in events {
+ // We can't use references to packed structures (in checking the ignored
+ // errors), so we need copy the data out before use.
+ let data = event.data;
+ // Check for the error flag, the actual error will be in the `data`
+ // field.
+ if (event.flags & libc::EV_ERROR != 0) && data != 0 && !ignored_errors.contains(&data) {
+ return Err(io::Error::from_raw_os_error(data as i32));
+ }
+ }
+ Ok(())
+}
+
+cfg_net! {
+ #[cfg(debug_assertions)]
+ impl Selector {
+ pub fn id(&self) -> usize {
+ self.id
+ }
+ }
+}
+
+impl AsRawFd for Selector {
+ fn as_raw_fd(&self) -> RawFd {
+ self.kq
+ }
+}
+
+impl Drop for Selector {
+ fn drop(&mut self) {
+ if let Err(err) = syscall!(close(self.kq)) {
+ error!("error closing kqueue: {}", err);
+ }
+ }
+}
+
+pub type Event = libc::kevent;
+pub struct Events(Vec<libc::kevent>);
+
+impl Events {
+ pub fn with_capacity(capacity: usize) -> Events {
+ Events(Vec::with_capacity(capacity))
+ }
+}
+
+impl Deref for Events {
+ type Target = Vec<libc::kevent>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl DerefMut for Events {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
+}
+
+// `Events` cannot derive `Send` or `Sync` because of the
+// `udata: *mut ::c_void` field in `libc::kevent`. However, `Events`'s public
+// API treats the `udata` field as a `uintptr_t` which is `Send`. `Sync` is
+// safe because with a `events: &Events` value, the only access to the `udata`
+// field is through `fn token(event: &Event)` which cannot mutate the field.
+unsafe impl Send for Events {}
+unsafe impl Sync for Events {}
+
+pub mod event {
+ use std::fmt;
+
+ use crate::sys::Event;
+ use crate::Token;
+
+ use super::{Filter, Flags};
+
+ pub fn token(event: &Event) -> Token {
+ Token(event.udata as usize)
+ }
+
+ pub fn is_readable(event: &Event) -> bool {
+ event.filter == libc::EVFILT_READ || {
+ #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
+ // Used by the `Awakener`. On platforms that use `eventfd` or a unix
+ // pipe it will emit a readable event so we'll fake that here as
+ // well.
+ {
+ event.filter == libc::EVFILT_USER
+ }
+ #[cfg(not(any(target_os = "freebsd", target_os = "ios", target_os = "macos")))]
+ {
+ false
+ }
+ }
+ }
+
+ pub fn is_writable(event: &Event) -> bool {
+ event.filter == libc::EVFILT_WRITE
+ }
+
+ pub fn is_error(event: &Event) -> bool {
+ (event.flags & libc::EV_ERROR) != 0 ||
+ // When the read end of the socket is closed, EV_EOF is set on
+ // flags, and fflags contains the error if there is one.
+ (event.flags & libc::EV_EOF) != 0 && event.fflags != 0
+ }
+
+ pub fn is_read_closed(event: &Event) -> bool {
+ event.filter == libc::EVFILT_READ && event.flags & libc::EV_EOF != 0
+ }
+
+ pub fn is_write_closed(event: &Event) -> bool {
+ event.filter == libc::EVFILT_WRITE && event.flags & libc::EV_EOF != 0
+ }
+
+ pub fn is_priority(_: &Event) -> bool {
+ // kqueue doesn't have priority indicators.
+ false
+ }
+
+ #[allow(unused_variables)] // `event` is not used on some platforms.
+ pub fn is_aio(event: &Event) -> bool {
+ #[cfg(any(
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "ios",
+ target_os = "macos"
+ ))]
+ {
+ event.filter == libc::EVFILT_AIO
+ }
+ #[cfg(not(any(
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "ios",
+ target_os = "macos"
+ )))]
+ {
+ false
+ }
+ }
+
+ #[allow(unused_variables)] // `event` is only used on FreeBSD.
+ pub fn is_lio(event: &Event) -> bool {
+ #[cfg(target_os = "freebsd")]
+ {
+ event.filter == libc::EVFILT_LIO
+ }
+ #[cfg(not(target_os = "freebsd"))]
+ {
+ false
+ }
+ }
+
+ pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result {
+ debug_detail!(
+ FilterDetails(Filter),
+ PartialEq::eq,
+ libc::EVFILT_READ,
+ libc::EVFILT_WRITE,
+ libc::EVFILT_AIO,
+ libc::EVFILT_VNODE,
+ libc::EVFILT_PROC,
+ libc::EVFILT_SIGNAL,
+ libc::EVFILT_TIMER,
+ #[cfg(target_os = "freebsd")]
+ libc::EVFILT_PROCDESC,
+ #[cfg(any(
+ target_os = "freebsd",
+ target_os = "dragonfly",
+ target_os = "ios",
+ target_os = "macos"
+ ))]
+ libc::EVFILT_FS,
+ #[cfg(target_os = "freebsd")]
+ libc::EVFILT_LIO,
+ #[cfg(any(
+ target_os = "freebsd",
+ target_os = "dragonfly",
+ target_os = "ios",
+ target_os = "macos"
+ ))]
+ libc::EVFILT_USER,
+ #[cfg(target_os = "freebsd")]
+ libc::EVFILT_SENDFILE,
+ #[cfg(target_os = "freebsd")]
+ libc::EVFILT_EMPTY,
+ #[cfg(target_os = "dragonfly")]
+ libc::EVFILT_EXCEPT,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::EVFILT_MACHPORT,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::EVFILT_VM,
+ );
+
+ #[allow(clippy::trivially_copy_pass_by_ref)]
+ fn check_flag(got: &Flags, want: &Flags) -> bool {
+ (got & want) != 0
+ }
+ debug_detail!(
+ FlagsDetails(Flags),
+ check_flag,
+ libc::EV_ADD,
+ libc::EV_DELETE,
+ libc::EV_ENABLE,
+ libc::EV_DISABLE,
+ libc::EV_ONESHOT,
+ libc::EV_CLEAR,
+ libc::EV_RECEIPT,
+ libc::EV_DISPATCH,
+ #[cfg(target_os = "freebsd")]
+ libc::EV_DROP,
+ libc::EV_FLAG1,
+ libc::EV_ERROR,
+ libc::EV_EOF,
+ libc::EV_SYSFLAGS,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::EV_FLAG0,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::EV_POLL,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::EV_OOBAND,
+ #[cfg(target_os = "dragonfly")]
+ libc::EV_NODATA,
+ );
+
+ #[allow(clippy::trivially_copy_pass_by_ref)]
+ fn check_fflag(got: &u32, want: &u32) -> bool {
+ (got & want) != 0
+ }
+ debug_detail!(
+ FflagsDetails(u32),
+ check_fflag,
+ #[cfg(any(
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "ios",
+ target_os = "macos"
+ ))]
+ libc::NOTE_TRIGGER,
+ #[cfg(any(
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "ios",
+ target_os = "macos"
+ ))]
+ libc::NOTE_FFNOP,
+ #[cfg(any(
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "ios",
+ target_os = "macos"
+ ))]
+ libc::NOTE_FFAND,
+ #[cfg(any(
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "ios",
+ target_os = "macos"
+ ))]
+ libc::NOTE_FFOR,
+ #[cfg(any(
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "ios",
+ target_os = "macos"
+ ))]
+ libc::NOTE_FFCOPY,
+ #[cfg(any(
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "ios",
+ target_os = "macos"
+ ))]
+ libc::NOTE_FFCTRLMASK,
+ #[cfg(any(
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "ios",
+ target_os = "macos"
+ ))]
+ libc::NOTE_FFLAGSMASK,
+ libc::NOTE_LOWAT,
+ libc::NOTE_DELETE,
+ libc::NOTE_WRITE,
+ #[cfg(target_os = "dragonfly")]
+ libc::NOTE_OOB,
+ #[cfg(target_os = "openbsd")]
+ libc::NOTE_EOF,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::NOTE_EXTEND,
+ libc::NOTE_ATTRIB,
+ libc::NOTE_LINK,
+ libc::NOTE_RENAME,
+ libc::NOTE_REVOKE,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::NOTE_NONE,
+ #[cfg(any(target_os = "openbsd"))]
+ libc::NOTE_TRUNCATE,
+ libc::NOTE_EXIT,
+ libc::NOTE_FORK,
+ libc::NOTE_EXEC,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::NOTE_SIGNAL,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::NOTE_EXITSTATUS,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::NOTE_EXIT_DETAIL,
+ libc::NOTE_PDATAMASK,
+ libc::NOTE_PCTRLMASK,
+ #[cfg(any(
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "netbsd",
+ target_os = "openbsd"
+ ))]
+ libc::NOTE_TRACK,
+ #[cfg(any(
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "netbsd",
+ target_os = "openbsd"
+ ))]
+ libc::NOTE_TRACKERR,
+ #[cfg(any(
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "netbsd",
+ target_os = "openbsd"
+ ))]
+ libc::NOTE_CHILD,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::NOTE_EXIT_DETAIL_MASK,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::NOTE_EXIT_DECRYPTFAIL,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::NOTE_EXIT_MEMORY,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::NOTE_EXIT_CSERROR,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::NOTE_VM_PRESSURE,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::NOTE_VM_PRESSURE_TERMINATE,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::NOTE_VM_PRESSURE_SUDDEN_TERMINATE,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::NOTE_VM_ERROR,
+ #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
+ libc::NOTE_SECONDS,
+ #[cfg(any(target_os = "freebsd"))]
+ libc::NOTE_MSECONDS,
+ #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
+ libc::NOTE_USECONDS,
+ #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
+ libc::NOTE_NSECONDS,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
+ libc::NOTE_ABSOLUTE,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::NOTE_LEEWAY,
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ libc::NOTE_CRITICAL,
+ #[cfg(any(target_os = "dragonfly"))]
+ libc::NOTE_BACKGROUND,
+ );
+
+ // Can't reference fields in packed structures.
+ let ident = event.ident;
+ let data = event.data;
+ let udata = event.udata;
+ f.debug_struct("kevent")
+ .field("ident", &ident)
+ .field("filter", &FilterDetails(event.filter))
+ .field("flags", &FlagsDetails(event.flags))
+ .field("fflags", &FflagsDetails(event.fflags))
+ .field("data", &data)
+ .field("udata", &udata)
+ .finish()
+ }
+}
+
+#[test]
+fn does_not_register_rw() {
+ use crate::unix::SourceFd;
+ use crate::{Poll, Token};
+
+ let kq = unsafe { libc::kqueue() };
+ let mut kqf = SourceFd(&kq);
+ let poll = Poll::new().unwrap();
+
+ // Registering kqueue fd will fail if write is requested (On anything but
+ // some versions of macOS).
+ poll.registry()
+ .register(&mut kqf, Token(1234), Interest::READABLE)
+ .unwrap();
+}
diff --git a/src/sys/unix/selector/mod.rs b/src/sys/unix/selector/mod.rs
new file mode 100644
index 0000000..7525898
--- /dev/null
+++ b/src/sys/unix/selector/mod.rs
@@ -0,0 +1,35 @@
+#[cfg(any(
+ target_os = "android",
+ target_os = "illumos",
+ target_os = "linux",
+ target_os = "solaris"
+))]
+mod epoll;
+
+#[cfg(any(
+ target_os = "android",
+ target_os = "illumos",
+ target_os = "linux",
+ target_os = "solaris"
+))]
+pub(crate) use self::epoll::{event, Event, Events, Selector};
+
+#[cfg(any(
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "ios",
+ target_os = "macos",
+ target_os = "netbsd",
+ target_os = "openbsd"
+))]
+mod kqueue;
+
+#[cfg(any(
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "ios",
+ target_os = "macos",
+ target_os = "netbsd",
+ target_os = "openbsd"
+))]
+pub(crate) use self::kqueue::{event, Event, Events, Selector};
diff --git a/src/sys/unix/sourcefd.rs b/src/sys/unix/sourcefd.rs
new file mode 100644
index 0000000..68511d7
--- /dev/null
+++ b/src/sys/unix/sourcefd.rs
@@ -0,0 +1,108 @@
+use crate::{event, poll, Interest, Registry, Token};
+
+use std::io;
+use std::os::unix::io::RawFd;
+
+/// Adapter for [`RawFd`] providing an [`event::Source`] implementation.
+///
+/// `SourceFd` enables registering any type with an FD with [`Poll`].
+///
+/// While only implementations for TCP and UDP are provided, Mio supports
+/// registering any FD that can be registered with the underlying OS selector.
+/// `SourceFd` provides the necessary bridge.
+///
+/// Note that `SourceFd` takes a `&RawFd`. This is because `SourceFd` **does
+/// not** take ownership of the FD. Specifically, it will not manage any
+/// lifecycle related operations, such as closing the FD on drop. It is expected
+/// that the `SourceFd` is constructed right before a call to
+/// [`Registry::register`]. See the examples for more detail.
+///
+/// [`event::Source`]: ../event/trait.Source.html
+/// [`Poll`]: ../struct.Poll.html
+/// [`Registry::register`]: ../struct.Registry.html#method.register
+///
+/// # Examples
+///
+/// Basic usage.
+///
+/// ```
+/// # use std::error::Error;
+/// # fn main() -> Result<(), Box<dyn Error>> {
+/// use mio::{Interest, Poll, Token};
+/// use mio::unix::SourceFd;
+///
+/// use std::os::unix::io::AsRawFd;
+/// use std::net::TcpListener;
+///
+/// // Bind a std listener
+/// let listener = TcpListener::bind("127.0.0.1:0")?;
+///
+/// let poll = Poll::new()?;
+///
+/// // Register the listener
+/// poll.registry().register(
+/// &mut SourceFd(&listener.as_raw_fd()),
+/// Token(0),
+/// Interest::READABLE)?;
+/// # Ok(())
+/// # }
+/// ```
+///
+/// Implementing [`event::Source`] for a custom type backed by a [`RawFd`].
+///
+/// ```
+/// use mio::{event, Interest, Registry, Token};
+/// use mio::unix::SourceFd;
+///
+/// use std::os::unix::io::RawFd;
+/// use std::io;
+///
+/// # #[allow(dead_code)]
+/// pub struct MyIo {
+/// fd: RawFd,
+/// }
+///
+/// impl event::Source for MyIo {
+/// fn register(&mut self, registry: &Registry, token: Token, interests: Interest)
+/// -> io::Result<()>
+/// {
+/// SourceFd(&self.fd).register(registry, token, interests)
+/// }
+///
+/// fn reregister(&mut self, registry: &Registry, token: Token, interests: Interest)
+/// -> io::Result<()>
+/// {
+/// SourceFd(&self.fd).reregister(registry, token, interests)
+/// }
+///
+/// fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
+/// SourceFd(&self.fd).deregister(registry)
+/// }
+/// }
+/// ```
+#[derive(Debug)]
+pub struct SourceFd<'a>(pub &'a RawFd);
+
+impl<'a> event::Source for SourceFd<'a> {
+ fn register(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: Interest,
+ ) -> io::Result<()> {
+ poll::selector(registry).register(*self.0, token, interests)
+ }
+
+ fn reregister(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: Interest,
+ ) -> io::Result<()> {
+ poll::selector(registry).reregister(*self.0, token, interests)
+ }
+
+ fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
+ poll::selector(registry).deregister(*self.0)
+ }
+}
diff --git a/src/sys/unix/tcp.rs b/src/sys/unix/tcp.rs
new file mode 100644
index 0000000..9d59632
--- /dev/null
+++ b/src/sys/unix/tcp.rs
@@ -0,0 +1,103 @@
+use std::io;
+use std::mem::{size_of, MaybeUninit};
+use std::net::{self, SocketAddr};
+use std::os::unix::io::{AsRawFd, FromRawFd};
+
+use crate::sys::unix::net::{new_socket, socket_addr, to_socket_addr};
+
+pub type TcpSocket = libc::c_int;
+
+pub(crate) fn new_v4_socket() -> io::Result<TcpSocket> {
+ new_socket(libc::AF_INET, libc::SOCK_STREAM)
+}
+
+pub(crate) fn new_v6_socket() -> io::Result<TcpSocket> {
+ new_socket(libc::AF_INET6, libc::SOCK_STREAM)
+}
+
+pub(crate) fn bind(socket: TcpSocket, addr: SocketAddr) -> io::Result<()> {
+ let (raw_addr, raw_addr_length) = socket_addr(&addr);
+ syscall!(bind(socket, raw_addr, raw_addr_length))?;
+ Ok(())
+}
+
+pub(crate) fn connect(socket: TcpSocket, addr: SocketAddr) -> io::Result<net::TcpStream> {
+ let (raw_addr, raw_addr_length) = socket_addr(&addr);
+
+ match syscall!(connect(socket, raw_addr, raw_addr_length)) {
+ Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => {
+ Err(err)
+ }
+ _ => {
+ Ok(unsafe { net::TcpStream::from_raw_fd(socket) })
+ }
+ }
+}
+
+pub(crate) fn listen(socket: TcpSocket, backlog: u32) -> io::Result<net::TcpListener> {
+ use std::convert::TryInto;
+
+ let backlog = backlog.try_into().unwrap_or(i32::max_value());
+ syscall!(listen(socket, backlog))?;
+ Ok(unsafe { net::TcpListener::from_raw_fd(socket) })
+}
+
+pub(crate) fn close(socket: TcpSocket) {
+ let _ = unsafe { net::TcpStream::from_raw_fd(socket) };
+}
+
+pub(crate) fn set_reuseaddr(socket: TcpSocket, reuseaddr: bool) -> io::Result<()> {
+ let val: libc::c_int = if reuseaddr { 1 } else { 0 };
+ syscall!(setsockopt(
+ socket,
+ libc::SOL_SOCKET,
+ libc::SO_REUSEADDR,
+ &val as *const libc::c_int as *const libc::c_void,
+ size_of::<libc::c_int>() as libc::socklen_t,
+ ))?;
+ Ok(())
+}
+
+pub fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
+ let mut addr: MaybeUninit<libc::sockaddr_storage> = MaybeUninit::uninit();
+ let mut length = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
+
+ // On platforms that support it we can use `accept4(2)` to set `NONBLOCK`
+ // and `CLOEXEC` in the call to accept the connection.
+ #[cfg(any(
+ target_os = "android",
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "illumos",
+ target_os = "linux",
+ target_os = "netbsd",
+ target_os = "openbsd"
+ ))]
+ let stream = {
+ syscall!(accept4(
+ listener.as_raw_fd(),
+ addr.as_mut_ptr() as *mut _,
+ &mut length,
+ libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK,
+ ))
+ .map(|socket| unsafe { net::TcpStream::from_raw_fd(socket) })
+ }?;
+
+ // But not all platforms have the `accept4(2)` call. Luckily BSD (derived)
+ // OSes inherit the non-blocking flag from the listener, so we just have to
+ // set `CLOEXEC`.
+ #[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))]
+ let stream = {
+ syscall!(accept(
+ listener.as_raw_fd(),
+ addr.as_mut_ptr() as *mut _,
+ &mut length
+ ))
+ .map(|socket| unsafe { net::TcpStream::from_raw_fd(socket) })
+ .and_then(|s| syscall!(fcntl(s.as_raw_fd(), libc::F_SETFD, libc::FD_CLOEXEC)).map(|_| s))
+ }?;
+
+ // This is safe because `accept` calls above ensures the address
+ // initialised.
+ unsafe { to_socket_addr(addr.as_ptr()) }.map(|addr| (stream, addr))
+}
diff --git a/src/sys/unix/udp.rs b/src/sys/unix/udp.rs
new file mode 100644
index 0000000..947a60a
--- /dev/null
+++ b/src/sys/unix/udp.rs
@@ -0,0 +1,23 @@
+use crate::sys::unix::net::{new_ip_socket, socket_addr};
+
+use std::io;
+use std::net::{self, SocketAddr};
+use std::os::unix::io::FromRawFd;
+
+pub fn bind(addr: SocketAddr) -> io::Result<net::UdpSocket> {
+ // Gives a warning for non Apple platforms.
+ #[allow(clippy::let_and_return)]
+ let socket = new_ip_socket(addr, libc::SOCK_DGRAM);
+
+ socket.and_then(|socket| {
+ let (raw_addr, raw_addr_length) = socket_addr(&addr);
+ syscall!(bind(socket, raw_addr, raw_addr_length))
+ .map_err(|err| {
+ // Close the socket if we hit an error, ignoring the error
+ // from closing since we can't pass back two errors.
+ let _ = unsafe { libc::close(socket) };
+ err
+ })
+ .map(|_| unsafe { net::UdpSocket::from_raw_fd(socket) })
+ })
+}
diff --git a/src/sys/unix/uds/datagram.rs b/src/sys/unix/uds/datagram.rs
new file mode 100644
index 0000000..d3e5314
--- /dev/null
+++ b/src/sys/unix/uds/datagram.rs
@@ -0,0 +1,56 @@
+use super::{socket_addr, SocketAddr};
+use crate::sys::unix::net::new_socket;
+
+use std::io;
+use std::os::unix::io::{AsRawFd, FromRawFd};
+use std::os::unix::net;
+use std::path::Path;
+
+pub(crate) fn bind(path: &Path) -> io::Result<net::UnixDatagram> {
+ let fd = new_socket(libc::AF_UNIX, libc::SOCK_DGRAM)?;
+ // Ensure the fd is closed.
+ let socket = unsafe { net::UnixDatagram::from_raw_fd(fd) };
+ let (sockaddr, socklen) = socket_addr(path)?;
+ let sockaddr = &sockaddr as *const libc::sockaddr_un as *const _;
+ syscall!(bind(fd, sockaddr, socklen))?;
+ Ok(socket)
+}
+
+pub(crate) fn unbound() -> io::Result<net::UnixDatagram> {
+ new_socket(libc::AF_UNIX, libc::SOCK_DGRAM)
+ .map(|socket| unsafe { net::UnixDatagram::from_raw_fd(socket) })
+}
+
+pub(crate) fn pair() -> io::Result<(net::UnixDatagram, net::UnixDatagram)> {
+ super::pair(libc::SOCK_DGRAM)
+}
+
+pub(crate) fn local_addr(socket: &net::UnixDatagram) -> io::Result<SocketAddr> {
+ super::local_addr(socket.as_raw_fd())
+}
+
+pub(crate) fn peer_addr(socket: &net::UnixDatagram) -> io::Result<SocketAddr> {
+ super::peer_addr(socket.as_raw_fd())
+}
+
+pub(crate) fn recv_from(
+ socket: &net::UnixDatagram,
+ dst: &mut [u8],
+) -> io::Result<(usize, SocketAddr)> {
+ let mut count = 0;
+ let socketaddr = SocketAddr::new(|sockaddr, socklen| {
+ syscall!(recvfrom(
+ socket.as_raw_fd(),
+ dst.as_mut_ptr() as *mut _,
+ dst.len(),
+ 0,
+ sockaddr,
+ socklen,
+ ))
+ .map(|c| {
+ count = c;
+ c as libc::c_int
+ })
+ })?;
+ Ok((count as usize, socketaddr))
+}
diff --git a/src/sys/unix/uds/listener.rs b/src/sys/unix/uds/listener.rs
new file mode 100644
index 0000000..b8fb5a9
--- /dev/null
+++ b/src/sys/unix/uds/listener.rs
@@ -0,0 +1,83 @@
+use super::socket_addr;
+use crate::net::{SocketAddr, UnixStream};
+use crate::sys::unix::net::new_socket;
+use std::os::unix::io::{AsRawFd, FromRawFd};
+use std::os::unix::net;
+use std::path::Path;
+use std::{io, mem};
+
+pub(crate) fn bind(path: &Path) -> io::Result<net::UnixListener> {
+ let socket = new_socket(libc::AF_UNIX, libc::SOCK_STREAM)?;
+ let (sockaddr, socklen) = socket_addr(path)?;
+ let sockaddr = &sockaddr as *const libc::sockaddr_un as *const libc::sockaddr;
+
+ syscall!(bind(socket, sockaddr, socklen))
+ .and_then(|_| syscall!(listen(socket, 1024)))
+ .map_err(|err| {
+ // Close the socket if we hit an error, ignoring the error from
+ // closing since we can't pass back two errors.
+ let _ = unsafe { libc::close(socket) };
+ err
+ })
+ .map(|_| unsafe { net::UnixListener::from_raw_fd(socket) })
+}
+
+pub(crate) fn accept(listener: &net::UnixListener) -> io::Result<(UnixStream, SocketAddr)> {
+ let sockaddr = mem::MaybeUninit::<libc::sockaddr_un>::zeroed();
+
+ // This is safe to assume because a `libc::sockaddr_un` filled with `0`
+ // bytes is properly initialized.
+ //
+ // `0` is a valid value for `sockaddr_un::sun_family`; it is
+ // `libc::AF_UNSPEC`.
+ //
+ // `[0; 108]` is a valid value for `sockaddr_un::sun_path`; it begins an
+ // abstract path.
+ let mut sockaddr = unsafe { sockaddr.assume_init() };
+
+ sockaddr.sun_family = libc::AF_UNIX as libc::sa_family_t;
+ let mut socklen = mem::size_of_val(&sockaddr) as libc::socklen_t;
+
+ #[cfg(not(any(
+ target_os = "ios",
+ target_os = "macos",
+ target_os = "netbsd",
+ target_os = "solaris"
+ )))]
+ let socket = {
+ let flags = libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC;
+ syscall!(accept4(
+ listener.as_raw_fd(),
+ &mut sockaddr as *mut libc::sockaddr_un as *mut libc::sockaddr,
+ &mut socklen,
+ flags
+ ))
+ .map(|socket| unsafe { net::UnixStream::from_raw_fd(socket) })
+ };
+
+ #[cfg(any(
+ target_os = "ios",
+ target_os = "macos",
+ target_os = "netbsd",
+ target_os = "solaris"
+ ))]
+ let socket = syscall!(accept(
+ listener.as_raw_fd(),
+ &mut sockaddr as *mut libc::sockaddr_un as *mut libc::sockaddr,
+ &mut socklen,
+ ))
+ .and_then(|socket| {
+ // Ensure the socket is closed if either of the `fcntl` calls
+ // error below.
+ let s = unsafe { net::UnixStream::from_raw_fd(socket) };
+ syscall!(fcntl(socket, libc::F_SETFD, libc::FD_CLOEXEC)).map(|_| s)
+ });
+
+ socket
+ .map(UnixStream::from_std)
+ .map(|stream| (stream, SocketAddr::from_parts(sockaddr, socklen)))
+}
+
+pub(crate) fn local_addr(listener: &net::UnixListener) -> io::Result<SocketAddr> {
+ super::local_addr(listener.as_raw_fd())
+}
diff --git a/src/sys/unix/uds/mod.rs b/src/sys/unix/uds/mod.rs
new file mode 100644
index 0000000..3ec829f
--- /dev/null
+++ b/src/sys/unix/uds/mod.rs
@@ -0,0 +1,149 @@
+mod socketaddr;
+pub use self::socketaddr::SocketAddr;
+
+/// Get the `sun_path` field offset of `sockaddr_un` for the target OS.
+///
+/// On Linux, this funtion equates to the same value as
+/// `size_of::<sa_family_t>()`, but some other implementations include
+/// other fields before `sun_path`, so the expression more portably
+/// describes the size of the address structure.
+pub(in crate::sys) fn path_offset(sockaddr: &libc::sockaddr_un) -> usize {
+ let base = sockaddr as *const _ as usize;
+ let path = &sockaddr.sun_path as *const _ as usize;
+ path - base
+}
+
+cfg_os_poll! {
+ use std::cmp::Ordering;
+ use std::os::unix::ffi::OsStrExt;
+ use std::os::unix::io::{RawFd, FromRawFd};
+ use std::path::Path;
+ use std::{io, mem};
+
+ pub(crate) mod datagram;
+ pub(crate) mod listener;
+ pub(crate) mod stream;
+
+ pub(in crate::sys) fn socket_addr(path: &Path) -> io::Result<(libc::sockaddr_un, libc::socklen_t)> {
+ let sockaddr = mem::MaybeUninit::<libc::sockaddr_un>::zeroed();
+
+ // This is safe to assume because a `libc::sockaddr_un` filled with `0`
+ // bytes is properly initialized.
+ //
+ // `0` is a valid value for `sockaddr_un::sun_family`; it is
+ // `libc::AF_UNSPEC`.
+ //
+ // `[0; 108]` is a valid value for `sockaddr_un::sun_path`; it begins an
+ // abstract path.
+ let mut sockaddr = unsafe { sockaddr.assume_init() };
+
+ sockaddr.sun_family = libc::AF_UNIX as libc::sa_family_t;
+
+ let bytes = path.as_os_str().as_bytes();
+ match (bytes.get(0), bytes.len().cmp(&sockaddr.sun_path.len())) {
+ // Abstract paths don't need a null terminator
+ (Some(&0), Ordering::Greater) => {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "path must be no longer than libc::sockaddr_un.sun_path",
+ ));
+ }
+ (_, Ordering::Greater) | (_, Ordering::Equal) => {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "path must be shorter than libc::sockaddr_un.sun_path",
+ ));
+ }
+ _ => {}
+ }
+
+ for (dst, src) in sockaddr.sun_path.iter_mut().zip(bytes.iter()) {
+ *dst = *src as libc::c_char;
+ }
+
+ let offset = path_offset(&sockaddr);
+ let mut socklen = offset + bytes.len();
+
+ match bytes.get(0) {
+ // The struct has already been zeroes so the null byte for pathname
+ // addresses is already there.
+ Some(&0) | None => {}
+ Some(_) => socklen += 1,
+ }
+
+ Ok((sockaddr, socklen as libc::socklen_t))
+ }
+
+ fn pair<T>(flags: libc::c_int) -> io::Result<(T, T)>
+ where T: FromRawFd,
+ {
+ #[cfg(not(any(target_os = "ios", target_os = "macos", target_os = "solaris")))]
+ let flags = flags | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC;
+
+ let mut fds = [-1; 2];
+ syscall!(socketpair(libc::AF_UNIX, flags, 0, fds.as_mut_ptr()))?;
+ let pair = unsafe { (T::from_raw_fd(fds[0]), T::from_raw_fd(fds[1])) };
+
+ // Darwin and Solaris do not have SOCK_NONBLOCK or SOCK_CLOEXEC.
+ //
+ // In order to set those flags, additional `fcntl` sys calls must be
+ // performed. If a `fnctl` fails after the sockets have been created,
+ // the file descriptors will leak. Creating `pair` above ensures that if
+ // there is an error, the file descriptors are closed.
+ #[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))]
+ {
+ syscall!(fcntl(fds[0], libc::F_SETFL, libc::O_NONBLOCK))?;
+ syscall!(fcntl(fds[0], libc::F_SETFD, libc::FD_CLOEXEC))?;
+ syscall!(fcntl(fds[1], libc::F_SETFL, libc::O_NONBLOCK))?;
+ syscall!(fcntl(fds[1], libc::F_SETFD, libc::FD_CLOEXEC))?;
+ }
+ Ok(pair)
+ }
+
+ // The following functions can't simply be replaced with a call to
+ // `net::UnixDatagram` because of our `SocketAddr` type.
+
+ fn local_addr(socket: RawFd) -> io::Result<SocketAddr> {
+ SocketAddr::new(|sockaddr, socklen| syscall!(getsockname(socket, sockaddr, socklen)))
+ }
+
+ fn peer_addr(socket: RawFd) -> io::Result<SocketAddr> {
+ SocketAddr::new(|sockaddr, socklen| syscall!(getpeername(socket, sockaddr, socklen)))
+ }
+
+ #[cfg(test)]
+ mod tests {
+ use super::{path_offset, socket_addr};
+ use std::path::Path;
+ use std::str;
+
+ #[test]
+ fn pathname_address() {
+ const PATH: &str = "./foo/bar.txt";
+ const PATH_LEN: usize = 13;
+
+ // Pathname addresses do have a null terminator, so `socklen` is
+ // expected to be `PATH_LEN` + `offset` + 1.
+ let path = Path::new(PATH);
+ let (sockaddr, actual) = socket_addr(path).unwrap();
+ let offset = path_offset(&sockaddr);
+ let expected = PATH_LEN + offset + 1;
+ assert_eq!(expected as libc::socklen_t, actual)
+ }
+
+ #[test]
+ fn abstract_address() {
+ const PATH: &[u8] = &[0, 116, 111, 107, 105, 111];
+ const PATH_LEN: usize = 6;
+
+ // Abstract addresses do not have a null terminator, so `socklen` is
+ // expected to be `PATH_LEN` + `offset`.
+ let abstract_path = str::from_utf8(PATH).unwrap();
+ let path = Path::new(abstract_path);
+ let (sockaddr, actual) = socket_addr(path).unwrap();
+ let offset = path_offset(&sockaddr);
+ let expected = PATH_LEN + offset;
+ assert_eq!(expected as libc::socklen_t, actual)
+ }
+ }
+}
diff --git a/src/sys/unix/uds/socketaddr.rs b/src/sys/unix/uds/socketaddr.rs
new file mode 100644
index 0000000..69f311a
--- /dev/null
+++ b/src/sys/unix/uds/socketaddr.rs
@@ -0,0 +1,120 @@
+use super::path_offset;
+use std::ffi::OsStr;
+use std::os::unix::ffi::OsStrExt;
+use std::path::Path;
+use std::{ascii, fmt};
+
+/// An address associated with a `mio` specific Unix socket.
+///
+/// This is implemented instead of imported from [`net::SocketAddr`] because
+/// there is no way to create a [`net::SocketAddr`]. One must be returned by
+/// [`accept`], so this is returned instead.
+///
+/// [`net::SocketAddr`]: std::os::unix::net::SocketAddr
+/// [`accept`]: #method.accept
+pub struct SocketAddr {
+ sockaddr: libc::sockaddr_un,
+ socklen: libc::socklen_t,
+}
+
+struct AsciiEscaped<'a>(&'a [u8]);
+
+enum AddressKind<'a> {
+ Unnamed,
+ Pathname(&'a Path),
+ Abstract(&'a [u8]),
+}
+
+impl SocketAddr {
+ fn address(&self) -> AddressKind<'_> {
+ let offset = path_offset(&self.sockaddr);
+ let len = self.socklen as usize - offset;
+ let path = unsafe { &*(&self.sockaddr.sun_path as *const [libc::c_char] as *const [u8]) };
+
+ // macOS seems to return a len of 16 and a zeroed sun_path for unnamed addresses
+ if len == 0
+ || (cfg!(not(any(target_os = "linux", target_os = "android")))
+ && self.sockaddr.sun_path[0] == 0)
+ {
+ AddressKind::Unnamed
+ } else if self.sockaddr.sun_path[0] == 0 {
+ AddressKind::Abstract(&path[1..len])
+ } else {
+ AddressKind::Pathname(OsStr::from_bytes(&path[..len - 1]).as_ref())
+ }
+ }
+}
+
+cfg_os_poll! {
+ use std::{io, mem};
+
+ impl SocketAddr {
+ pub(crate) fn new<F>(f: F) -> io::Result<SocketAddr>
+ where
+ F: FnOnce(*mut libc::sockaddr, &mut libc::socklen_t) -> io::Result<libc::c_int>,
+ {
+ let mut sockaddr = {
+ let sockaddr = mem::MaybeUninit::<libc::sockaddr_un>::zeroed();
+ unsafe { sockaddr.assume_init() }
+ };
+
+ let raw_sockaddr = &mut sockaddr as *mut libc::sockaddr_un as *mut libc::sockaddr;
+ let mut socklen = mem::size_of_val(&sockaddr) as libc::socklen_t;
+
+ f(raw_sockaddr, &mut socklen)?;
+ Ok(SocketAddr::from_parts(sockaddr, socklen))
+ }
+
+ pub(crate) fn from_parts(sockaddr: libc::sockaddr_un, socklen: libc::socklen_t) -> SocketAddr {
+ SocketAddr { sockaddr, socklen }
+ }
+
+ /// Returns `true` if the address is unnamed.
+ ///
+ /// Documentation reflected in [`SocketAddr`]
+ ///
+ /// [`SocketAddr`]: std::os::unix::net::SocketAddr
+ pub fn is_unnamed(&self) -> bool {
+ if let AddressKind::Unnamed = self.address() {
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Returns the contents of this address if it is a `pathname` address.
+ ///
+ /// Documentation reflected in [`SocketAddr`]
+ ///
+ /// [`SocketAddr`]: std::os::unix::net::SocketAddr
+ pub fn as_pathname(&self) -> Option<&Path> {
+ if let AddressKind::Pathname(path) = self.address() {
+ Some(path)
+ } else {
+ None
+ }
+ }
+ }
+}
+
+impl fmt::Debug for SocketAddr {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self.address() {
+ AddressKind::Unnamed => write!(fmt, "(unnamed)"),
+ AddressKind::Abstract(name) => write!(fmt, "{} (abstract)", AsciiEscaped(name)),
+ AddressKind::Pathname(path) => write!(fmt, "{:?} (pathname)", path),
+ }
+ }
+}
+
+// ===== impl AsciiEscaped =====
+
+impl<'a> fmt::Display for AsciiEscaped<'a> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "\"")?;
+ for byte in self.0.iter().cloned().flat_map(ascii::escape_default) {
+ write!(fmt, "{}", byte as char)?;
+ }
+ write!(fmt, "\"")
+ }
+}
diff --git a/src/sys/unix/uds/stream.rs b/src/sys/unix/uds/stream.rs
new file mode 100644
index 0000000..149dd14
--- /dev/null
+++ b/src/sys/unix/uds/stream.rs
@@ -0,0 +1,39 @@
+use super::{socket_addr, SocketAddr};
+use crate::sys::unix::net::new_socket;
+
+use std::io;
+use std::os::unix::io::{AsRawFd, FromRawFd};
+use std::os::unix::net;
+use std::path::Path;
+
+pub(crate) fn connect(path: &Path) -> io::Result<net::UnixStream> {
+ let socket = new_socket(libc::AF_UNIX, libc::SOCK_STREAM)?;
+ let (sockaddr, socklen) = socket_addr(path)?;
+ let sockaddr = &sockaddr as *const libc::sockaddr_un as *const libc::sockaddr;
+
+ match syscall!(connect(socket, sockaddr, socklen)) {
+ Ok(_) => {}
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
+ Err(e) => {
+ // Close the socket if we hit an error, ignoring the error
+ // from closing since we can't pass back two errors.
+ let _ = unsafe { libc::close(socket) };
+
+ return Err(e);
+ }
+ }
+
+ Ok(unsafe { net::UnixStream::from_raw_fd(socket) })
+}
+
+pub(crate) fn pair() -> io::Result<(net::UnixStream, net::UnixStream)> {
+ super::pair(libc::SOCK_STREAM)
+}
+
+pub(crate) fn local_addr(socket: &net::UnixStream) -> io::Result<SocketAddr> {
+ super::local_addr(socket.as_raw_fd())
+}
+
+pub(crate) fn peer_addr(socket: &net::UnixStream) -> io::Result<SocketAddr> {
+ super::peer_addr(socket.as_raw_fd())
+}
diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs
new file mode 100644
index 0000000..1305bd6
--- /dev/null
+++ b/src/sys/unix/waker.rs
@@ -0,0 +1,174 @@
+#[cfg(any(target_os = "linux", target_os = "android"))]
+mod eventfd {
+ use crate::sys::Selector;
+ use crate::{Interest, Token};
+
+ use std::fs::File;
+ use std::io::{self, Read, Write};
+ use std::os::unix::io::FromRawFd;
+
+ /// Waker backed by `eventfd`.
+ ///
+ /// `eventfd` is effectively an 64 bit counter. All writes must be of 8
+ /// bytes (64 bits) and are converted (native endian) into an 64 bit
+ /// unsigned integer and added to the count. Reads must also be 8 bytes and
+ /// reset the count to 0, returning the count.
+ #[derive(Debug)]
+ pub struct Waker {
+ fd: File,
+ }
+
+ impl Waker {
+ pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
+ syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK)).and_then(|fd| {
+ // Turn the file descriptor into a file first so we're ensured
+ // it's closed when dropped, e.g. when register below fails.
+ let file = unsafe { File::from_raw_fd(fd) };
+ selector
+ .register(fd, token, Interest::READABLE)
+ .map(|()| Waker { fd: file })
+ })
+ }
+
+ pub fn wake(&self) -> io::Result<()> {
+ let buf: [u8; 8] = 1u64.to_ne_bytes();
+ match (&self.fd).write(&buf) {
+ Ok(_) => Ok(()),
+ Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
+ // Writing only blocks if the counter is going to overflow.
+ // So we'll reset the counter to 0 and wake it again.
+ self.reset()?;
+ self.wake()
+ }
+ Err(err) => Err(err),
+ }
+ }
+
+ /// Reset the eventfd object, only need to call this if `wake` fails.
+ fn reset(&self) -> io::Result<()> {
+ let mut buf: [u8; 8] = 0u64.to_ne_bytes();
+ match (&self.fd).read(&mut buf) {
+ Ok(_) => Ok(()),
+ // If the `Waker` hasn't been awoken yet this will return a
+ // `WouldBlock` error which we can safely ignore.
+ Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(()),
+ Err(err) => Err(err),
+ }
+ }
+ }
+}
+
+#[cfg(any(target_os = "linux", target_os = "android"))]
+pub use self::eventfd::Waker;
+
+#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
+mod kqueue {
+ use crate::sys::Selector;
+ use crate::Token;
+
+ use std::io;
+
+ /// Waker backed by kqueue user space notifications (`EVFILT_USER`).
+ ///
+ /// The implementation is fairly simple, first the kqueue must be setup to
+ /// receive waker events this done by calling `Selector.setup_waker`. Next
+ /// we need access to kqueue, thus we need to duplicate the file descriptor.
+ /// Now waking is as simple as adding an event to the kqueue.
+ #[derive(Debug)]
+ pub struct Waker {
+ selector: Selector,
+ token: Token,
+ }
+
+ impl Waker {
+ pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
+ selector.try_clone().and_then(|selector| {
+ selector
+ .setup_waker(token)
+ .map(|()| Waker { selector, token })
+ })
+ }
+
+ pub fn wake(&self) -> io::Result<()> {
+ self.selector.wake(self.token)
+ }
+ }
+}
+
+#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
+pub use self::kqueue::Waker;
+
+#[cfg(any(
+ target_os = "dragonfly",
+ target_os = "illumos",
+ target_os = "netbsd",
+ target_os = "openbsd",
+ target_os = "solaris"
+))]
+mod pipe {
+ use crate::sys::unix::Selector;
+ use crate::{Interest, Token};
+
+ use std::fs::File;
+ use std::io::{self, Read, Write};
+ use std::os::unix::io::FromRawFd;
+
+ /// Waker backed by a unix pipe.
+ ///
+ /// Waker controls both the sending and receiving ends and empties the pipe
+ /// if writing to it (waking) fails.
+ #[derive(Debug)]
+ pub struct Waker {
+ sender: File,
+ receiver: File,
+ }
+
+ impl Waker {
+ pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
+ let mut fds = [-1; 2];
+ syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?;
+ // Turn the file descriptors into files first so we're ensured
+ // they're closed when dropped, e.g. when register below fails.
+ let sender = unsafe { File::from_raw_fd(fds[1]) };
+ let receiver = unsafe { File::from_raw_fd(fds[0]) };
+ selector
+ .register(fds[0], token, Interest::READABLE)
+ .map(|()| Waker { sender, receiver })
+ }
+
+ pub fn wake(&self) -> io::Result<()> {
+ match (&self.sender).write(&[1]) {
+ Ok(_) => Ok(()),
+ Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
+ // The reading end is full so we'll empty the buffer and try
+ // again.
+ self.empty();
+ self.wake()
+ }
+ Err(ref err) if err.kind() == io::ErrorKind::Interrupted => self.wake(),
+ Err(err) => Err(err),
+ }
+ }
+
+ /// Empty the pipe's buffer, only need to call this if `wake` fails.
+ /// This ignores any errors.
+ fn empty(&self) {
+ let mut buf = [0; 4096];
+ loop {
+ match (&self.receiver).read(&mut buf) {
+ Ok(n) if n > 0 => continue,
+ _ => return,
+ }
+ }
+ }
+ }
+}
+
+#[cfg(any(
+ target_os = "dragonfly",
+ target_os = "illumos",
+ target_os = "netbsd",
+ target_os = "openbsd",
+ target_os = "solaris"
+))]
+pub use self::pipe::Waker;