diff options
Diffstat (limited to 'src/sys/unix')
-rw-r--r-- | src/sys/unix/mod.rs | 6 | ||||
-rw-r--r-- | src/sys/unix/pipe.rs | 383 | ||||
-rw-r--r-- | src/sys/unix/selector/epoll.rs | 15 | ||||
-rw-r--r-- | src/sys/unix/selector/kqueue.rs | 15 | ||||
-rw-r--r-- | src/sys/unix/tcp.rs | 58 |
5 files changed, 472 insertions, 5 deletions
diff --git a/src/sys/unix/mod.rs b/src/sys/unix/mod.rs index 96d7f4d..f045fb5 100644 --- a/src/sys/unix/mod.rs +++ b/src/sys/unix/mod.rs @@ -38,7 +38,7 @@ cfg_os_poll! { pub use self::uds::SocketAddr; } - cfg_net! { + cfg_io_source! { use std::io; // Both `kqueue` and `epoll` don't need to hold any user space state. @@ -59,6 +59,10 @@ cfg_os_poll! { } } } + + cfg_pipe! { + pub(crate) mod pipe; + } } cfg_not_os_poll! { diff --git a/src/sys/unix/pipe.rs b/src/sys/unix/pipe.rs new file mode 100644 index 0000000..d838ebc --- /dev/null +++ b/src/sys/unix/pipe.rs @@ -0,0 +1,383 @@ +//! Unix pipe. +//! +//! See the [`new`] function for documentation. + +use std::fs::File; +use std::io::{self, IoSlice, IoSliceMut, Read, Write}; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::process::{ChildStderr, ChildStdin, ChildStdout}; + +use crate::io_source::IoSource; +use crate::{event, Interest, Registry, Token}; + +/// Create a new non-blocking Unix pipe. +/// +/// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as +/// inter-process or thread communication channel. +/// +/// This channel may be created before forking the process and then one end used +/// in each process, e.g. the parent process has the sending end to send command +/// to the child process. +/// +/// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html +/// +/// # Events +/// +/// The [`Sender`] can be registered with [`WRITABLE`] interest to receive +/// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is +/// written to the `Sender` the `Receiver` will receive an [readable event]. +/// +/// In addition to those events, events will also be generated if the other side +/// is dropped. To check if the `Sender` is dropped you'll need to check +/// [`is_read_closed`] on events for the `Receiver`, if it returns true the +/// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it +/// returns true the `Receiver` was dropped. Also see the second example below. +/// +/// [`WRITABLE`]: Interest::WRITABLE +/// [writable events]: event::Event::is_writable +/// [`READABLE`]: Interest::READABLE +/// [readable event]: event::Event::is_readable +/// [`is_read_closed`]: event::Event::is_read_closed +/// [`is_write_closed`]: event::Event::is_write_closed +/// +/// # Deregistering +/// +/// Both `Sender` and `Receiver` will deregister themselves when dropped, +/// **iff** the file descriptors are not duplicated (via [`dup(2)`]). +/// +/// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html +/// +/// # Examples +/// +/// Simple example that writes data into the sending end and read it from the +/// receiving end. +/// +/// ``` +/// use std::io::{self, Read, Write}; +/// +/// use mio::{Poll, Events, Interest, Token}; +/// use mio::unix::pipe; +/// +/// // Unique tokens for the two ends of the channel. +/// const PIPE_RECV: Token = Token(0); +/// const PIPE_SEND: Token = Token(1); +/// +/// # fn main() -> io::Result<()> { +/// // Create our `Poll` instance and the `Events` container. +/// let mut poll = Poll::new()?; +/// let mut events = Events::with_capacity(8); +/// +/// // Create a new pipe. +/// let (mut sender, mut receiver) = pipe::new()?; +/// +/// // Register both ends of the channel. +/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; +/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; +/// +/// const MSG: &[u8; 11] = b"Hello world"; +/// +/// loop { +/// poll.poll(&mut events, None)?; +/// +/// for event in events.iter() { +/// match event.token() { +/// PIPE_SEND => sender.write(MSG) +/// .and_then(|n| if n != MSG.len() { +/// // We'll consider a short write an error in this +/// // example. NOTE: we can't use `write_all` with +/// // non-blocking I/O. +/// Err(io::ErrorKind::WriteZero.into()) +/// } else { +/// Ok(()) +/// })?, +/// PIPE_RECV => { +/// let mut buf = [0; 11]; +/// let n = receiver.read(&mut buf)?; +/// println!("received: {:?}", &buf[0..n]); +/// assert_eq!(n, MSG.len()); +/// assert_eq!(&buf, &*MSG); +/// return Ok(()); +/// }, +/// _ => unreachable!(), +/// } +/// } +/// } +/// # } +/// ``` +/// +/// Example that receives an event once the `Sender` is dropped. +/// +/// ``` +/// # use std::io; +/// # +/// # use mio::{Poll, Events, Interest, Token}; +/// # use mio::unix::pipe; +/// # +/// # const PIPE_RECV: Token = Token(0); +/// # const PIPE_SEND: Token = Token(1); +/// # +/// # fn main() -> io::Result<()> { +/// // Same setup as in the example above. +/// let mut poll = Poll::new()?; +/// let mut events = Events::with_capacity(8); +/// +/// let (mut sender, mut receiver) = pipe::new()?; +/// +/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; +/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; +/// +/// // Drop the sender. +/// drop(sender); +/// +/// poll.poll(&mut events, None)?; +/// +/// for event in events.iter() { +/// match event.token() { +/// PIPE_RECV if event.is_read_closed() => { +/// // Detected that the sender was dropped. +/// println!("Sender dropped!"); +/// return Ok(()); +/// }, +/// _ => unreachable!(), +/// } +/// } +/// # unreachable!(); +/// # } +/// ``` +pub fn new() -> io::Result<(Sender, Receiver)> { + let mut fds: [RawFd; 2] = [-1, -1]; + + #[cfg(any( + target_os = "android", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "linux", + target_os = "netbsd", + target_os = "openbsd", + ))] + unsafe { + if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 { + return Err(io::Error::last_os_error()); + } + } + + #[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))] + unsafe { + // For platforms that don't have `pipe2(2)` we need to manually set the + // correct flags on the file descriptor. + if libc::pipe(fds.as_mut_ptr()) != 0 { + return Err(io::Error::last_os_error()); + } + + for fd in &fds { + if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0 + || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0 + { + let err = io::Error::last_os_error(); + // Don't leak file descriptors. Can't handle error though. + let _ = libc::close(fds[0]); + let _ = libc::close(fds[1]); + return Err(err); + } + } + } + + #[cfg(not(any( + target_os = "android", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "linux", + target_os = "netbsd", + target_os = "openbsd", + target_os = "ios", + target_os = "macos", + target_os = "solaris", + )))] + compile_error!("unsupported target for `mio::unix::pipe`"); + + // Safety: we just initialised the `fds` above. + let r = unsafe { Receiver::from_raw_fd(fds[0]) }; + let w = unsafe { Sender::from_raw_fd(fds[1]) }; + Ok((w, r)) +} + +/// Sending end of an Unix pipe. +/// +/// See [`new`] for documentation, including examples. +#[derive(Debug)] +pub struct Sender { + inner: IoSource<File>, +} + +impl Sender { + /// Set the `Sender` into or out of non-blocking mode. + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + set_nonblocking(self.inner.as_raw_fd(), nonblocking) + } +} + +impl event::Source for Sender { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.register(registry, token, interests) + } + + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { + self.inner.deregister(registry) + } +} + +impl Write for Sender { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).write(buf)) + } + + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).write_vectored(bufs)) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.do_io(|sender| (&*sender).flush()) + } +} + +/// # Notes +/// +/// The underlying pipe is **not** set to non-blocking. +impl From<ChildStdin> for Sender { + fn from(stdin: ChildStdin) -> Sender { + // Safety: `ChildStdin` is guaranteed to be a valid file descriptor. + unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) } + } +} + +impl FromRawFd for Sender { + unsafe fn from_raw_fd(fd: RawFd) -> Sender { + Sender { + inner: IoSource::new(File::from_raw_fd(fd)), + } + } +} + +impl AsRawFd for Sender { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +impl IntoRawFd for Sender { + fn into_raw_fd(self) -> RawFd { + self.inner.into_inner().into_raw_fd() + } +} + +/// Receiving end of an Unix pipe. +/// +/// See [`new`] for documentation, including examples. +#[derive(Debug)] +pub struct Receiver { + inner: IoSource<File>, +} + +impl Receiver { + /// Set the `Receiver` into or out of non-blocking mode. + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + set_nonblocking(self.inner.as_raw_fd(), nonblocking) + } +} + +impl event::Source for Receiver { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.register(registry, token, interests) + } + + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { + self.inner.deregister(registry) + } +} + +impl Read for Receiver { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).read(buf)) + } + + fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).read_vectored(bufs)) + } +} + +/// # Notes +/// +/// The underlying pipe is **not** set to non-blocking. +impl From<ChildStdout> for Receiver { + fn from(stdout: ChildStdout) -> Receiver { + // Safety: `ChildStdout` is guaranteed to be a valid file descriptor. + unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) } + } +} + +/// # Notes +/// +/// The underlying pipe is **not** set to non-blocking. +impl From<ChildStderr> for Receiver { + fn from(stderr: ChildStderr) -> Receiver { + // Safety: `ChildStderr` is guaranteed to be a valid file descriptor. + unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) } + } +} + +impl FromRawFd for Receiver { + unsafe fn from_raw_fd(fd: RawFd) -> Receiver { + Receiver { + inner: IoSource::new(File::from_raw_fd(fd)), + } + } +} + +impl AsRawFd for Receiver { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +impl IntoRawFd for Receiver { + fn into_raw_fd(self) -> RawFd { + self.inner.into_inner().into_raw_fd() + } +} + +fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> { + let value = nonblocking as libc::c_int; + if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } +} diff --git a/src/sys/unix/selector/epoll.rs b/src/sys/unix/selector/epoll.rs index 13f1617..76ee7f9 100644 --- a/src/sys/unix/selector/epoll.rs +++ b/src/sys/unix/selector/epoll.rs @@ -4,7 +4,7 @@ use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLRDHUP}; use log::error; use std::os::unix::io::{AsRawFd, RawFd}; #[cfg(debug_assertions)] -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::time::Duration; use std::{cmp, i32, io, ptr}; @@ -17,6 +17,8 @@ pub struct Selector { #[cfg(debug_assertions)] id: usize, ep: RawFd, + #[cfg(debug_assertions)] + has_waker: AtomicBool, } impl Selector { @@ -33,6 +35,8 @@ impl Selector { #[cfg(debug_assertions)] id: NEXT_ID.fetch_add(1, Ordering::Relaxed), ep, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), }) } @@ -42,6 +46,8 @@ impl Selector { #[cfg(debug_assertions)] id: self.id, ep, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), }) } @@ -93,9 +99,14 @@ impl Selector { pub fn deregister(&self, fd: RawFd) -> io::Result<()> { syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ()) } + + #[cfg(debug_assertions)] + pub fn register_waker(&self) -> bool { + self.has_waker.swap(true, Ordering::AcqRel) + } } -cfg_net! { +cfg_io_source! { impl Selector { #[cfg(debug_assertions)] pub fn id(&self) -> usize { diff --git a/src/sys/unix/selector/kqueue.rs b/src/sys/unix/selector/kqueue.rs index 2ebac9a..454f47d 100644 --- a/src/sys/unix/selector/kqueue.rs +++ b/src/sys/unix/selector/kqueue.rs @@ -4,7 +4,7 @@ use std::mem::MaybeUninit; use std::ops::{Deref, DerefMut}; use std::os::unix::io::{AsRawFd, RawFd}; #[cfg(debug_assertions)] -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::time::Duration; use std::{cmp, io, ptr, slice}; @@ -69,6 +69,8 @@ pub struct Selector { #[cfg(debug_assertions)] id: usize, kq: RawFd, + #[cfg(debug_assertions)] + has_waker: AtomicBool, } impl Selector { @@ -79,6 +81,8 @@ impl Selector { #[cfg(debug_assertions)] id: NEXT_ID.fetch_add(1, Ordering::Relaxed), kq, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), }) } @@ -88,6 +92,8 @@ impl Selector { #[cfg(debug_assertions)] id: self.id, kq, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), }) } @@ -208,6 +214,11 @@ impl Selector { kevent_register(self.kq, &mut changes, &[libc::ENOENT as Data]) } + #[cfg(debug_assertions)] + pub fn register_waker(&self) -> bool { + self.has_waker.swap(true, Ordering::AcqRel) + } + // Used by `Waker`. #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] pub fn setup_waker(&self, token: Token) -> io::Result<()> { @@ -292,7 +303,7 @@ fn check_errors(events: &[libc::kevent], ignored_errors: &[Data]) -> io::Result< Ok(()) } -cfg_net! { +cfg_io_source! { #[cfg(debug_assertions)] impl Selector { pub fn id(&self) -> usize { diff --git a/src/sys/unix/tcp.rs b/src/sys/unix/tcp.rs index 81b371b..65b7400 100644 --- a/src/sys/unix/tcp.rs +++ b/src/sys/unix/tcp.rs @@ -1,4 +1,5 @@ use std::io; +use std::mem; use std::mem::{size_of, MaybeUninit}; use std::net::{self, SocketAddr}; use std::time::Duration; @@ -58,6 +59,63 @@ pub(crate) fn set_reuseaddr(socket: TcpSocket, reuseaddr: bool) -> io::Result<() )).map(|_| ()) } +pub(crate) fn get_reuseaddr(socket: TcpSocket) -> io::Result<bool> { + let mut optval: libc::c_int = 0; + let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t; + + syscall!(getsockopt( + socket, + libc::SOL_SOCKET, + libc::SO_REUSEADDR, + &mut optval as *mut _ as *mut _, + &mut optlen, + ))?; + + Ok(optval != 0) +} + +#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))] +pub(crate) fn set_reuseport(socket: TcpSocket, reuseport: bool) -> io::Result<()> { + let val: libc::c_int = if reuseport { 1 } else { 0 }; + + syscall!(setsockopt( + socket, + libc::SOL_SOCKET, + libc::SO_REUSEPORT, + &val as *const libc::c_int as *const libc::c_void, + size_of::<libc::c_int>() as libc::socklen_t, + )).map(|_| ()) +} + +#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))] +pub(crate) fn get_reuseport(socket: TcpSocket) -> io::Result<bool> { + let mut optval: libc::c_int = 0; + let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t; + + syscall!(getsockopt( + socket, + libc::SOL_SOCKET, + libc::SO_REUSEPORT, + &mut optval as *mut _ as *mut _, + &mut optlen, + ))?; + + Ok(optval != 0) +} + +pub(crate) fn get_localaddr(socket: TcpSocket) -> io::Result<SocketAddr> { + let mut addr: libc::sockaddr_storage = unsafe { std::mem::zeroed() }; + let mut length = size_of::<libc::sockaddr_storage>() as libc::socklen_t; + + syscall!(getsockname( + socket, + &mut addr as *mut _ as *mut _, + &mut length + ))?; + + unsafe { to_socket_addr(&addr) } +} + pub(crate) fn set_linger(socket: TcpSocket, dur: Option<Duration>) -> io::Result<()> { let val: libc::linger = libc::linger { l_onoff: if dur.is_some() { 1 } else { 0 }, |