aboutsummaryrefslogtreecommitdiff
path: root/src/sys
diff options
context:
space:
mode:
Diffstat (limited to 'src/sys')
-rw-r--r--src/sys/mod.rs6
-rw-r--r--src/sys/shell/mod.rs2
-rw-r--r--src/sys/shell/selector.rs21
-rw-r--r--src/sys/shell/tcp.rs118
-rw-r--r--src/sys/shell/udp.rs1
-rw-r--r--src/sys/unix/net.rs63
-rw-r--r--src/sys/unix/pipe.rs172
-rw-r--r--src/sys/unix/selector/epoll.rs55
-rw-r--r--src/sys/unix/selector/kqueue.rs83
-rw-r--r--src/sys/unix/selector/mod.rs12
-rw-r--r--src/sys/unix/sourcefd.rs22
-rw-r--r--src/sys/unix/tcp.rs441
-rw-r--r--src/sys/unix/udp.rs20
-rw-r--r--src/sys/unix/uds/datagram.rs12
-rw-r--r--src/sys/unix/uds/listener.rs36
-rw-r--r--src/sys/unix/uds/mod.rs10
-rw-r--r--src/sys/unix/uds/socketaddr.rs20
-rw-r--r--src/sys/unix/uds/stream.rs17
-rw-r--r--src/sys/unix/waker.rs33
-rw-r--r--src/sys/wasi/mod.rs370
-rw-r--r--src/sys/windows/afd.rs108
-rw-r--r--src/sys/windows/event.rs3
-rw-r--r--src/sys/windows/handle.rs30
-rw-r--r--src/sys/windows/io_status_block.rs6
-rw-r--r--src/sys/windows/iocp.rs270
-rw-r--r--src/sys/windows/mod.rs19
-rw-r--r--src/sys/windows/named_pipe.rs405
-rw-r--r--src/sys/windows/net.rs73
-rw-r--r--src/sys/windows/overlapped.rs10
-rw-r--r--src/sys/windows/selector.rs68
-rw-r--r--src/sys/windows/tcp.rs319
-rw-r--r--src/sys/windows/udp.rs49
-rw-r--r--src/sys/windows/waker.rs2
33 files changed, 1614 insertions, 1262 deletions
diff --git a/src/sys/mod.rs b/src/sys/mod.rs
index 81ae6d2..2a968b2 100644
--- a/src/sys/mod.rs
+++ b/src/sys/mod.rs
@@ -63,6 +63,12 @@ cfg_os_poll! {
pub use self::windows::*;
}
+#[cfg(target_os = "wasi")]
+cfg_os_poll! {
+ mod wasi;
+ pub(crate) use self::wasi::*;
+}
+
cfg_not_os_poll! {
mod shell;
pub(crate) use self::shell::*;
diff --git a/src/sys/shell/mod.rs b/src/sys/shell/mod.rs
index 7e1533f..8a3175f 100644
--- a/src/sys/shell/mod.rs
+++ b/src/sys/shell/mod.rs
@@ -7,7 +7,9 @@ macro_rules! os_required {
mod selector;
pub(crate) use self::selector::{event, Event, Events, Selector};
+#[cfg(not(target_os = "wasi"))]
mod waker;
+#[cfg(not(target_os = "wasi"))]
pub(crate) use self::waker::Waker;
cfg_net! {
diff --git a/src/sys/shell/selector.rs b/src/sys/shell/selector.rs
index 91fc0bf..bfc598a 100644
--- a/src/sys/shell/selector.rs
+++ b/src/sys/shell/selector.rs
@@ -19,7 +19,7 @@ impl Selector {
os_required!();
}
- #[cfg(debug_assertions)]
+ #[cfg(all(debug_assertions, not(target_os = "wasi")))]
pub fn register_waker(&self) -> bool {
os_required!();
}
@@ -44,6 +44,25 @@ cfg_any_os_ext! {
}
}
+#[cfg(target_os = "wasi")]
+cfg_any_os_ext! {
+ use crate::{Interest, Token};
+
+ impl Selector {
+ pub fn register(&self, _: wasi::Fd, _: Token, _: Interest) -> io::Result<()> {
+ os_required!();
+ }
+
+ pub fn reregister(&self, _: wasi::Fd, _: Token, _: Interest) -> io::Result<()> {
+ os_required!();
+ }
+
+ pub fn deregister(&self, _: wasi::Fd) -> io::Result<()> {
+ os_required!();
+ }
+ }
+}
+
cfg_io_source! {
#[cfg(debug_assertions)]
impl Selector {
diff --git a/src/sys/shell/tcp.rs b/src/sys/shell/tcp.rs
index 0ed225f..260763a 100644
--- a/src/sys/shell/tcp.rs
+++ b/src/sys/shell/tcp.rs
@@ -1,127 +1,31 @@
-use crate::net::TcpKeepalive;
use std::io;
use std::net::{self, SocketAddr};
-use std::time::Duration;
-pub(crate) type TcpSocket = i32;
-
-pub(crate) fn new_v4_socket() -> io::Result<TcpSocket> {
- os_required!();
-}
-
-pub(crate) fn new_v6_socket() -> io::Result<TcpSocket> {
- os_required!();
-}
-
-pub(crate) fn bind(_socket: TcpSocket, _addr: SocketAddr) -> io::Result<()> {
- os_required!();
-}
-
-pub(crate) fn connect(_: TcpSocket, _addr: SocketAddr) -> io::Result<net::TcpStream> {
- os_required!();
-}
-
-pub(crate) fn listen(_: TcpSocket, _: u32) -> io::Result<net::TcpListener> {
+#[cfg(not(target_os = "wasi"))]
+pub(crate) fn new_for_addr(_: SocketAddr) -> io::Result<i32> {
os_required!();
}
-pub(crate) fn close(_: TcpSocket) {
+#[cfg(not(target_os = "wasi"))]
+pub(crate) fn bind(_: &net::TcpListener, _: SocketAddr) -> io::Result<()> {
os_required!();
}
-pub(crate) fn set_reuseaddr(_: TcpSocket, _: bool) -> io::Result<()> {
+#[cfg(not(target_os = "wasi"))]
+pub(crate) fn connect(_: &net::TcpStream, _: SocketAddr) -> io::Result<()> {
os_required!();
}
-pub(crate) fn get_reuseaddr(_: TcpSocket) -> io::Result<bool> {
+#[cfg(not(target_os = "wasi"))]
+pub(crate) fn listen(_: &net::TcpListener, _: u32) -> io::Result<()> {
os_required!();
}
-#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
-pub(crate) fn set_reuseport(_: TcpSocket, _: bool) -> io::Result<()> {
- os_required!();
-}
-
-#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
-pub(crate) fn get_reuseport(_: TcpSocket) -> io::Result<bool> {
- os_required!();
-}
-
-pub(crate) fn set_linger(_: TcpSocket, _: Option<Duration>) -> io::Result<()> {
- os_required!();
-}
-
-pub(crate) fn get_linger(_: TcpSocket) -> io::Result<Option<Duration>> {
- os_required!();
-}
-
-pub(crate) fn set_recv_buffer_size(_: TcpSocket, _: u32) -> io::Result<()> {
- os_required!();
-}
-
-pub(crate) fn get_recv_buffer_size(_: TcpSocket) -> io::Result<u32> {
- os_required!();
-}
-
-pub(crate) fn set_send_buffer_size(_: TcpSocket, _: u32) -> io::Result<()> {
- os_required!();
-}
-
-pub(crate) fn get_send_buffer_size(_: TcpSocket) -> io::Result<u32> {
- os_required!();
-}
-
-pub(crate) fn set_keepalive(_: TcpSocket, _: bool) -> io::Result<()> {
- os_required!();
-}
-
-pub(crate) fn get_keepalive(_: TcpSocket) -> io::Result<bool> {
- os_required!();
-}
-
-pub(crate) fn set_keepalive_params(_: TcpSocket, _: TcpKeepalive) -> io::Result<()> {
- os_required!()
-}
-
-#[cfg(any(
- target_os = "android",
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
- target_os = "solaris",
-))]
-pub(crate) fn get_keepalive_time(_: TcpSocket) -> io::Result<Option<Duration>> {
- os_required!()
-}
-
-#[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
-))]
-pub(crate) fn get_keepalive_interval(_: TcpSocket) -> io::Result<Option<Duration>> {
- os_required!()
-}
-
-#[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
-))]
-pub(crate) fn get_keepalive_retries(_: TcpSocket) -> io::Result<Option<u32>> {
- os_required!()
-}
-
-pub fn accept(_: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
+#[cfg(unix)]
+pub(crate) fn set_reuseaddr(_: &net::TcpListener, _: bool) -> io::Result<()> {
os_required!();
}
-pub(crate) fn get_localaddr(_: TcpSocket) -> io::Result<SocketAddr> {
+pub(crate) fn accept(_: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
os_required!();
}
diff --git a/src/sys/shell/udp.rs b/src/sys/shell/udp.rs
index 48ccac7..6a48b69 100644
--- a/src/sys/shell/udp.rs
+++ b/src/sys/shell/udp.rs
@@ -1,3 +1,4 @@
+#![cfg(not(target_os = "wasi"))]
use std::io;
use std::net::{self, SocketAddr};
diff --git a/src/sys/unix/net.rs b/src/sys/unix/net.rs
index 2f8d618..2396ab9 100644
--- a/src/sys/unix/net.rs
+++ b/src/sys/unix/net.rs
@@ -20,44 +20,39 @@ pub(crate) fn new_socket(domain: libc::c_int, socket_type: libc::c_int) -> io::R
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
- target_os = "openbsd"
+ target_os = "openbsd",
))]
let socket_type = socket_type | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC;
- // Gives a warning for platforms without SOCK_NONBLOCK.
- #[allow(clippy::let_and_return)]
- let socket = syscall!(socket(domain, socket_type, 0));
+ let socket = syscall!(socket(domain, socket_type, 0))?;
// Mimick `libstd` and set `SO_NOSIGPIPE` on apple systems.
- #[cfg(target_vendor = "apple")]
- let socket = socket.and_then(|socket| {
- syscall!(setsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_NOSIGPIPE,
- &1 as *const libc::c_int as *const libc::c_void,
- size_of::<libc::c_int>() as libc::socklen_t
- ))
- .map(|_| socket)
- });
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ if let Err(err) = syscall!(setsockopt(
+ socket,
+ libc::SOL_SOCKET,
+ libc::SO_NOSIGPIPE,
+ &1 as *const libc::c_int as *const libc::c_void,
+ size_of::<libc::c_int>() as libc::socklen_t
+ )) {
+ let _ = syscall!(close(socket));
+ return Err(err);
+ }
- // Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC. Not sure about
- // Solaris, couldn't find anything online.
- #[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))]
- let socket = socket.and_then(|socket| {
- // For platforms that don't support flags in socket, we need to
- // set the flags ourselves.
- syscall!(fcntl(socket, libc::F_SETFL, libc::O_NONBLOCK))
- .and_then(|_| syscall!(fcntl(socket, libc::F_SETFD, libc::FD_CLOEXEC)).map(|_| socket))
- .map_err(|e| {
- // If either of the `fcntl` calls failed, ensure the socket is
- // closed and return the error.
- let _ = syscall!(close(socket));
- e
- })
- });
+ // Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC.
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
+ {
+ if let Err(err) = syscall!(fcntl(socket, libc::F_SETFL, libc::O_NONBLOCK)) {
+ let _ = syscall!(close(socket));
+ return Err(err);
+ }
+ if let Err(err) = syscall!(fcntl(socket, libc::F_SETFD, libc::FD_CLOEXEC)) {
+ let _ = syscall!(close(socket));
+ return Err(err);
+ }
+ }
- socket
+ Ok(socket)
}
/// A type with the same memory layout as `libc::sockaddr`. Used in converting Rust level
@@ -97,7 +92,7 @@ pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, libc::socklen_
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
- target_os = "openbsd"
+ target_os = "openbsd",
))]
sin_len: 0,
};
@@ -121,10 +116,10 @@ pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, libc::socklen_
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
- target_os = "openbsd"
+ target_os = "openbsd",
))]
sin6_len: 0,
- #[cfg(any(target_os = "solaris", target_os = "illumos"))]
+ #[cfg(target_os = "illumos")]
__sin6_src_id: 0,
};
diff --git a/src/sys/unix/pipe.rs b/src/sys/unix/pipe.rs
index ccf5252..7b7e4db 100644
--- a/src/sys/unix/pipe.rs
+++ b/src/sys/unix/pipe.rs
@@ -155,6 +155,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> {
target_os = "netbsd",
target_os = "openbsd",
target_os = "illumos",
+ target_os = "redox",
))]
unsafe {
if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
@@ -162,7 +163,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> {
}
}
- #[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))]
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
unsafe {
// For platforms that don't have `pipe2(2)` we need to manually set the
// correct flags on the file descriptor.
@@ -175,7 +176,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> {
|| libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
{
let err = io::Error::last_os_error();
- // Don't leak file descriptors. Can't handle error though.
+ // Don't leak file descriptors. Can't handle closing error though.
let _ = libc::close(fds[0]);
let _ = libc::close(fds[1]);
return Err(err);
@@ -187,19 +188,20 @@ pub fn new() -> io::Result<(Sender, Receiver)> {
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
+ target_os = "illumos",
+ target_os = "ios",
target_os = "linux",
+ target_os = "macos",
target_os = "netbsd",
target_os = "openbsd",
- target_os = "ios",
- target_os = "macos",
- target_os = "solaris",
- target_os = "illumos",
+ target_os = "redox",
)))]
compile_error!("unsupported target for `mio::unix::pipe`");
- // Safety: we just initialised the `fds` above.
+ // SAFETY: we just initialised the `fds` above.
let r = unsafe { Receiver::from_raw_fd(fds[0]) };
let w = unsafe { Sender::from_raw_fd(fds[1]) };
+
Ok((w, r))
}
@@ -216,6 +218,74 @@ impl Sender {
pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
set_nonblocking(self.inner.as_raw_fd(), nonblocking)
}
+
+ /// Execute an I/O operation ensuring that the socket receives more events
+ /// if it hits a [`WouldBlock`] error.
+ ///
+ /// # Notes
+ ///
+ /// This method is required to be called for **all** I/O operations to
+ /// ensure the user will receive events once the socket is ready again after
+ /// returning a [`WouldBlock`] error.
+ ///
+ /// [`WouldBlock`]: io::ErrorKind::WouldBlock
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// #
+ /// # fn main() -> Result<(), Box<dyn Error>> {
+ /// use std::io;
+ /// use std::os::unix::io::AsRawFd;
+ /// use mio::unix::pipe;
+ ///
+ /// let (sender, receiver) = pipe::new()?;
+ ///
+ /// // Wait until the sender is writable...
+ ///
+ /// // Write to the sender using a direct libc call, of course the
+ /// // `io::Write` implementation would be easier to use.
+ /// let buf = b"hello";
+ /// let n = sender.try_io(|| {
+ /// let buf_ptr = &buf as *const _ as *const _;
+ /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("write {} bytes", n);
+ ///
+ /// // Wait until the receiver is readable...
+ ///
+ /// // Read from the receiver using a direct libc call, of course the
+ /// // `io::Read` implementation would be easier to use.
+ /// let mut buf = [0; 512];
+ /// let n = receiver.try_io(|| {
+ /// let buf_ptr = &mut buf as *mut _ as *mut _;
+ /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("read {} bytes", n);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
+ where
+ F: FnOnce() -> io::Result<T>,
+ {
+ self.inner.do_io(|_| f())
+ }
}
impl event::Source for Sender {
@@ -244,29 +314,29 @@ impl event::Source for Sender {
impl Write for Sender {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).write(buf))
+ self.inner.do_io(|mut sender| sender.write(buf))
}
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).write_vectored(bufs))
+ self.inner.do_io(|mut sender| sender.write_vectored(bufs))
}
fn flush(&mut self) -> io::Result<()> {
- self.inner.do_io(|sender| (&*sender).flush())
+ self.inner.do_io(|mut sender| sender.flush())
}
}
impl Write for &Sender {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).write(buf))
+ self.inner.do_io(|mut sender| sender.write(buf))
}
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).write_vectored(bufs))
+ self.inner.do_io(|mut sender| sender.write_vectored(bufs))
}
fn flush(&mut self) -> io::Result<()> {
- self.inner.do_io(|sender| (&*sender).flush())
+ self.inner.do_io(|mut sender| sender.flush())
}
}
@@ -313,6 +383,74 @@ impl Receiver {
pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
set_nonblocking(self.inner.as_raw_fd(), nonblocking)
}
+
+ /// Execute an I/O operation ensuring that the socket receives more events
+ /// if it hits a [`WouldBlock`] error.
+ ///
+ /// # Notes
+ ///
+ /// This method is required to be called for **all** I/O operations to
+ /// ensure the user will receive events once the socket is ready again after
+ /// returning a [`WouldBlock`] error.
+ ///
+ /// [`WouldBlock`]: io::ErrorKind::WouldBlock
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// #
+ /// # fn main() -> Result<(), Box<dyn Error>> {
+ /// use std::io;
+ /// use std::os::unix::io::AsRawFd;
+ /// use mio::unix::pipe;
+ ///
+ /// let (sender, receiver) = pipe::new()?;
+ ///
+ /// // Wait until the sender is writable...
+ ///
+ /// // Write to the sender using a direct libc call, of course the
+ /// // `io::Write` implementation would be easier to use.
+ /// let buf = b"hello";
+ /// let n = sender.try_io(|| {
+ /// let buf_ptr = &buf as *const _ as *const _;
+ /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("write {} bytes", n);
+ ///
+ /// // Wait until the receiver is readable...
+ ///
+ /// // Read from the receiver using a direct libc call, of course the
+ /// // `io::Read` implementation would be easier to use.
+ /// let mut buf = [0; 512];
+ /// let n = receiver.try_io(|| {
+ /// let buf_ptr = &mut buf as *mut _ as *mut _;
+ /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("read {} bytes", n);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
+ where
+ F: FnOnce() -> io::Result<T>,
+ {
+ self.inner.do_io(|_| f())
+ }
}
impl event::Source for Receiver {
@@ -341,21 +479,21 @@ impl event::Source for Receiver {
impl Read for Receiver {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).read(buf))
+ self.inner.do_io(|mut sender| sender.read(buf))
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).read_vectored(bufs))
+ self.inner.do_io(|mut sender| sender.read_vectored(bufs))
}
}
impl Read for &Receiver {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).read(buf))
+ self.inner.do_io(|mut sender| sender.read(buf))
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).read_vectored(bufs))
+ self.inner.do_io(|mut sender| sender.read_vectored(bufs))
}
}
diff --git a/src/sys/unix/selector/epoll.rs b/src/sys/unix/selector/epoll.rs
index 38667d6..1809a2b 100644
--- a/src/sys/unix/selector/epoll.rs
+++ b/src/sys/unix/selector/epoll.rs
@@ -1,6 +1,6 @@
use crate::{Interest, Token};
-use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLRDHUP};
+use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLPRI, EPOLLRDHUP};
use log::error;
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(debug_assertions)]
@@ -23,15 +23,41 @@ pub struct Selector {
impl Selector {
pub fn new() -> io::Result<Selector> {
+ #[cfg(not(target_os = "android"))]
+ let res = syscall!(epoll_create1(libc::EPOLL_CLOEXEC));
+
+ // On Android < API level 16 `epoll_create1` is not defined, so use a
+ // raw system call.
// According to libuv, `EPOLL_CLOEXEC` is not defined on Android API <
// 21. But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on that platform,
// so we use it instead.
#[cfg(target_os = "android")]
- let flag = libc::O_CLOEXEC;
- #[cfg(not(target_os = "android"))]
- let flag = libc::EPOLL_CLOEXEC;
+ let res = syscall!(syscall(libc::SYS_epoll_create1, libc::O_CLOEXEC));
+
+ let ep = match res {
+ Ok(ep) => ep as RawFd,
+ Err(err) => {
+ // When `epoll_create1` is not available fall back to use
+ // `epoll_create` followed by `fcntl`.
+ if let Some(libc::ENOSYS) = err.raw_os_error() {
+ match syscall!(epoll_create(1024)) {
+ Ok(ep) => match syscall!(fcntl(ep, libc::F_SETFD, libc::FD_CLOEXEC)) {
+ Ok(ep) => ep as RawFd,
+ Err(err) => {
+ // `fcntl` failed, cleanup `ep`.
+ let _ = unsafe { libc::close(ep) };
+ return Err(err);
+ }
+ },
+ Err(err) => return Err(err),
+ }
+ } else {
+ return Err(err);
+ }
+ }
+ };
- syscall!(epoll_create1(flag)).map(|ep| Selector {
+ Ok(Selector {
#[cfg(debug_assertions)]
id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
ep,
@@ -61,7 +87,14 @@ impl Selector {
const MAX_SAFE_TIMEOUT: u128 = libc::c_int::max_value() as u128;
let timeout = timeout
- .map(|to| cmp::min(to.as_millis(), MAX_SAFE_TIMEOUT) as libc::c_int)
+ .map(|to| {
+ // `Duration::as_millis` truncates, so round up. This avoids
+ // turning sub-millisecond timeouts into a zero timeout, unless
+ // the caller explicitly requests that by specifying a zero
+ // timeout.
+ let to_ms = (to + Duration::from_nanos(999_999)).as_millis();
+ cmp::min(MAX_SAFE_TIMEOUT, to_ms) as libc::c_int
+ })
.unwrap_or(-1);
events.clear();
@@ -82,6 +115,8 @@ impl Selector {
let mut event = libc::epoll_event {
events: interests_to_epoll(interests),
u64: usize::from(token) as u64,
+ #[cfg(target_os = "redox")]
+ _pad: 0,
};
syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_ADD, fd, &mut event)).map(|_| ())
@@ -91,6 +126,8 @@ impl Selector {
let mut event = libc::epoll_event {
events: interests_to_epoll(interests),
u64: usize::from(token) as u64,
+ #[cfg(target_os = "redox")]
+ _pad: 0,
};
syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_MOD, fd, &mut event)).map(|_| ())
@@ -140,6 +177,10 @@ fn interests_to_epoll(interests: Interest) -> u32 {
kind |= EPOLLOUT;
}
+ if interests.is_priority() {
+ kind |= EPOLLPRI;
+ }
+
kind as u32
}
@@ -222,7 +263,7 @@ pub mod event {
libc::EPOLLET,
libc::EPOLLRDHUP,
libc::EPOLLONESHOT,
- #[cfg(any(target_os = "linux", target_os = "solaris"))]
+ #[cfg(target_os = "linux")]
libc::EPOLLEXCLUSIVE,
#[cfg(any(target_os = "android", target_os = "linux"))]
libc::EPOLLWAKEUP,
diff --git a/src/sys/unix/selector/kqueue.rs b/src/sys/unix/selector/kqueue.rs
index b36a537..1eedec0 100644
--- a/src/sys/unix/selector/kqueue.rs
+++ b/src/sys/unix/selector/kqueue.rs
@@ -1,6 +1,6 @@
use crate::{Interest, Token};
use log::error;
-use std::mem::MaybeUninit;
+use std::mem::{self, MaybeUninit};
use std::ops::{Deref, DerefMut};
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(debug_assertions)]
@@ -21,7 +21,7 @@ type Count = libc::size_t;
// Type of the `filter` field in the `kevent` structure.
#[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "openbsd"))]
type Filter = libc::c_short;
-#[cfg(any(target_os = "macos", target_os = "ios"))]
+#[cfg(any(target_os = "ios", target_os = "macos"))]
type Filter = i16;
#[cfg(target_os = "netbsd")]
type Filter = u32;
@@ -29,22 +29,11 @@ type Filter = u32;
// Type of the `flags` field in the `kevent` structure.
#[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "openbsd"))]
type Flags = libc::c_ushort;
-#[cfg(any(target_os = "macos", target_os = "ios"))]
+#[cfg(any(target_os = "ios", target_os = "macos"))]
type Flags = u16;
#[cfg(target_os = "netbsd")]
type Flags = u32;
-// Type of the `data` field in the `kevent` structure.
-#[cfg(any(
- target_os = "dragonfly",
- target_os = "freebsd",
- target_os = "ios",
- target_os = "macos"
-))]
-type Data = libc::intptr_t;
-#[cfg(any(target_os = "netbsd", target_os = "openbsd"))]
-type Data = i64;
-
// Type of the `udata` field in the `kevent` structure.
#[cfg(not(target_os = "netbsd"))]
type UData = *mut libc::c_void;
@@ -57,9 +46,8 @@ macro_rules! kevent {
ident: $id as libc::uintptr_t,
filter: $filter as Filter,
flags: $flags,
- fflags: 0,
- data: 0,
udata: $data as UData,
+ ..unsafe { mem::zeroed() }
}
};
}
@@ -75,15 +63,17 @@ pub struct Selector {
impl Selector {
pub fn new() -> io::Result<Selector> {
- syscall!(kqueue())
- .and_then(|kq| syscall!(fcntl(kq, libc::F_SETFD, libc::FD_CLOEXEC)).map(|_| kq))
- .map(|kq| Selector {
- #[cfg(debug_assertions)]
- id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
- kq,
- #[cfg(debug_assertions)]
- has_waker: AtomicBool::new(false),
- })
+ let kq = syscall!(kqueue())?;
+ let selector = Selector {
+ #[cfg(debug_assertions)]
+ id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
+ kq,
+ #[cfg(debug_assertions)]
+ has_waker: AtomicBool::new(false),
+ };
+
+ syscall!(fcntl(kq, libc::F_SETFD, libc::FD_CLOEXEC))?;
+ Ok(selector)
}
pub fn try_clone(&self) -> io::Result<Selector> {
@@ -163,7 +153,7 @@ impl Selector {
// the array.
slice::from_raw_parts_mut(changes[0].as_mut_ptr(), n_changes)
};
- kevent_register(self.kq, changes, &[libc::EPIPE as Data])
+ kevent_register(self.kq, changes, &[libc::EPIPE as i64])
}
pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
@@ -195,7 +185,7 @@ impl Selector {
kevent_register(
self.kq,
&mut changes,
- &[libc::ENOENT as Data, libc::EPIPE as Data],
+ &[libc::ENOENT as i64, libc::EPIPE as i64],
)
}
@@ -211,7 +201,7 @@ impl Selector {
// the ENOENT error when it comes up. The ENOENT error informs us that
// the filter wasn't there in first place, but we don't really care
// about that since our goal is to remove it.
- kevent_register(self.kq, &mut changes, &[libc::ENOENT as Data])
+ kevent_register(self.kq, &mut changes, &[libc::ENOENT as i64])
}
#[cfg(debug_assertions)]
@@ -264,7 +254,7 @@ impl Selector {
fn kevent_register(
kq: RawFd,
changes: &mut [libc::kevent],
- ignored_errors: &[Data],
+ ignored_errors: &[i64],
) -> io::Result<()> {
syscall!(kevent(
kq,
@@ -285,15 +275,15 @@ fn kevent_register(
Err(err)
}
})
- .and_then(|()| check_errors(&changes, ignored_errors))
+ .and_then(|()| check_errors(changes, ignored_errors))
}
/// Check all events for possible errors, it returns the first error found.
-fn check_errors(events: &[libc::kevent], ignored_errors: &[Data]) -> io::Result<()> {
+fn check_errors(events: &[libc::kevent], ignored_errors: &[i64]) -> io::Result<()> {
for event in events {
// We can't use references to packed structures (in checking the ignored
// errors), so we need copy the data out before use.
- let data = event.data;
+ let data = event.data as _;
// Check for the error flag, the actual error will be in the `data`
// field.
if (event.flags & libc::EV_ERROR != 0) && data != 0 && !ignored_errors.contains(&data) {
@@ -415,7 +405,7 @@ pub mod event {
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
- target_os = "macos"
+ target_os = "macos",
))]
{
event.filter == libc::EVFILT_AIO
@@ -424,7 +414,7 @@ pub mod event {
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
- target_os = "macos"
+ target_os = "macos",
)))]
{
false
@@ -460,7 +450,7 @@ pub mod event {
target_os = "freebsd",
target_os = "dragonfly",
target_os = "ios",
- target_os = "macos"
+ target_os = "macos",
))]
libc::EVFILT_FS,
#[cfg(target_os = "freebsd")]
@@ -469,7 +459,7 @@ pub mod event {
target_os = "freebsd",
target_os = "dragonfly",
target_os = "ios",
- target_os = "macos"
+ target_os = "macos",
))]
libc::EVFILT_USER,
#[cfg(target_os = "freebsd")]
@@ -526,49 +516,49 @@ pub mod event {
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
- target_os = "macos"
+ target_os = "macos",
))]
libc::NOTE_TRIGGER,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
- target_os = "macos"
+ target_os = "macos",
))]
libc::NOTE_FFNOP,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
- target_os = "macos"
+ target_os = "macos",
))]
libc::NOTE_FFAND,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
- target_os = "macos"
+ target_os = "macos",
))]
libc::NOTE_FFOR,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
- target_os = "macos"
+ target_os = "macos",
))]
libc::NOTE_FFCOPY,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
- target_os = "macos"
+ target_os = "macos",
))]
libc::NOTE_FFCTRLMASK,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
- target_os = "macos"
+ target_os = "macos",
))]
libc::NOTE_FFLAGSMASK,
libc::NOTE_LOWAT,
@@ -603,21 +593,21 @@ pub mod event {
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd",
- target_os = "openbsd"
+ target_os = "openbsd",
))]
libc::NOTE_TRACK,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd",
- target_os = "openbsd"
+ target_os = "openbsd",
))]
libc::NOTE_TRACKERR,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd",
- target_os = "openbsd"
+ target_os = "openbsd",
))]
libc::NOTE_CHILD,
#[cfg(any(target_os = "ios", target_os = "macos"))]
@@ -645,7 +635,6 @@ pub mod event {
#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
libc::NOTE_NSECONDS,
#[cfg(any(target_os = "ios", target_os = "macos"))]
- #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
libc::NOTE_ABSOLUTE,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::NOTE_LEEWAY,
diff --git a/src/sys/unix/selector/mod.rs b/src/sys/unix/selector/mod.rs
index b73d645..c06c7b1 100644
--- a/src/sys/unix/selector/mod.rs
+++ b/src/sys/unix/selector/mod.rs
@@ -2,7 +2,7 @@
target_os = "android",
target_os = "illumos",
target_os = "linux",
- target_os = "solaris"
+ target_os = "redox",
))]
mod epoll;
@@ -10,27 +10,27 @@ mod epoll;
target_os = "android",
target_os = "illumos",
target_os = "linux",
- target_os = "solaris"
+ target_os = "redox",
))]
pub(crate) use self::epoll::{event, Event, Events, Selector};
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
+ target_os = "netbsd",
+ target_os = "openbsd",
target_os = "ios",
target_os = "macos",
- target_os = "netbsd",
- target_os = "openbsd"
))]
mod kqueue;
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
+ target_os = "netbsd",
+ target_os = "openbsd",
target_os = "ios",
target_os = "macos",
- target_os = "netbsd",
- target_os = "openbsd"
))]
pub(crate) use self::kqueue::{event, Event, Events, Selector};
diff --git a/src/sys/unix/sourcefd.rs b/src/sys/unix/sourcefd.rs
index ba52b38..84e776d 100644
--- a/src/sys/unix/sourcefd.rs
+++ b/src/sys/unix/sourcefd.rs
@@ -1,4 +1,4 @@
-use crate::{event, poll, Interest, Registry, Token};
+use crate::{event, Interest, Registry, Token};
use std::io;
use std::os::unix::io::RawFd;
@@ -25,8 +25,14 @@ use std::os::unix::io::RawFd;
///
/// Basic usage.
///
-#[cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")]
-#[cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")]
+#[cfg_attr(
+ all(feature = "os-poll", feature = "net", feature = "os-ext"),
+ doc = "```"
+)]
+#[cfg_attr(
+ not(all(feature = "os-poll", feature = "net", feature = "os-ext")),
+ doc = "```ignore"
+)]
/// # use std::error::Error;
/// # fn main() -> Result<(), Box<dyn Error>> {
/// use mio::{Interest, Poll, Token};
@@ -51,8 +57,8 @@ use std::os::unix::io::RawFd;
///
/// Implementing [`event::Source`] for a custom type backed by a [`RawFd`].
///
-#[cfg_attr(all(feature = "os-poll", features = "os-ext"), doc = "```")]
-#[cfg_attr(not(all(feature = "os-poll", features = "os-ext")), doc = "```ignore")]
+#[cfg_attr(all(feature = "os-poll", feature = "os-ext"), doc = "```")]
+#[cfg_attr(not(all(feature = "os-poll", feature = "os-ext")), doc = "```ignore")]
/// use mio::{event, Interest, Registry, Token};
/// use mio::unix::SourceFd;
///
@@ -92,7 +98,7 @@ impl<'a> event::Source for SourceFd<'a> {
token: Token,
interests: Interest,
) -> io::Result<()> {
- poll::selector(registry).register(*self.0, token, interests)
+ registry.selector().register(*self.0, token, interests)
}
fn reregister(
@@ -101,10 +107,10 @@ impl<'a> event::Source for SourceFd<'a> {
token: Token,
interests: Interest,
) -> io::Result<()> {
- poll::selector(registry).reregister(*self.0, token, interests)
+ registry.selector().reregister(*self.0, token, interests)
}
fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
- poll::selector(registry).deregister(*self.0)
+ registry.selector().deregister(*self.0)
}
}
diff --git a/src/sys/unix/tcp.rs b/src/sys/unix/tcp.rs
index 59642c6..9513cc0 100644
--- a/src/sys/unix/tcp.rs
+++ b/src/sys/unix/tcp.rs
@@ -1,428 +1,57 @@
use std::convert::TryInto;
use std::io;
-use std::mem;
use std::mem::{size_of, MaybeUninit};
use std::net::{self, SocketAddr};
use std::os::unix::io::{AsRawFd, FromRawFd};
-use std::time::Duration;
use crate::sys::unix::net::{new_socket, socket_addr, to_socket_addr};
-use crate::net::TcpKeepalive;
-#[cfg(any(target_os = "openbsd", target_os = "netbsd", target_os = "haiku"))]
-use libc::SO_KEEPALIVE as KEEPALIVE_TIME;
-#[cfg(any(target_os = "macos", target_os = "ios"))]
-use libc::TCP_KEEPALIVE as KEEPALIVE_TIME;
-#[cfg(not(any(
- target_os = "macos",
- target_os = "ios",
- target_os = "openbsd",
- target_os = "netbsd",
- target_os = "haiku"
-)))]
-use libc::TCP_KEEPIDLE as KEEPALIVE_TIME;
-pub type TcpSocket = libc::c_int;
-
-pub(crate) fn new_v4_socket() -> io::Result<TcpSocket> {
- new_socket(libc::AF_INET, libc::SOCK_STREAM)
-}
-
-pub(crate) fn new_v6_socket() -> io::Result<TcpSocket> {
- new_socket(libc::AF_INET6, libc::SOCK_STREAM)
+pub(crate) fn new_for_addr(address: SocketAddr) -> io::Result<libc::c_int> {
+ let domain = match address {
+ SocketAddr::V4(_) => libc::AF_INET,
+ SocketAddr::V6(_) => libc::AF_INET6,
+ };
+ new_socket(domain, libc::SOCK_STREAM)
}
-pub(crate) fn bind(socket: TcpSocket, addr: SocketAddr) -> io::Result<()> {
+pub(crate) fn bind(socket: &net::TcpListener, addr: SocketAddr) -> io::Result<()> {
let (raw_addr, raw_addr_length) = socket_addr(&addr);
- syscall!(bind(socket, raw_addr.as_ptr(), raw_addr_length))?;
+ syscall!(bind(socket.as_raw_fd(), raw_addr.as_ptr(), raw_addr_length))?;
Ok(())
}
-pub(crate) fn connect(socket: TcpSocket, addr: SocketAddr) -> io::Result<net::TcpStream> {
+pub(crate) fn connect(socket: &net::TcpStream, addr: SocketAddr) -> io::Result<()> {
let (raw_addr, raw_addr_length) = socket_addr(&addr);
- match syscall!(connect(socket, raw_addr.as_ptr(), raw_addr_length)) {
- Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => {
- Err(err)
- }
- _ => {
- Ok(unsafe { net::TcpStream::from_raw_fd(socket) })
- }
+ match syscall!(connect(
+ socket.as_raw_fd(),
+ raw_addr.as_ptr(),
+ raw_addr_length
+ )) {
+ Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err),
+ _ => Ok(()),
}
}
-pub(crate) fn listen(socket: TcpSocket, backlog: u32) -> io::Result<net::TcpListener> {
+pub(crate) fn listen(socket: &net::TcpListener, backlog: u32) -> io::Result<()> {
let backlog = backlog.try_into().unwrap_or(i32::max_value());
- syscall!(listen(socket, backlog))?;
- Ok(unsafe { net::TcpListener::from_raw_fd(socket) })
-}
-
-pub(crate) fn close(socket: TcpSocket) {
- let _ = unsafe { net::TcpStream::from_raw_fd(socket) };
+ syscall!(listen(socket.as_raw_fd(), backlog))?;
+ Ok(())
}
-pub(crate) fn set_reuseaddr(socket: TcpSocket, reuseaddr: bool) -> io::Result<()> {
- let val: libc::c_int = if reuseaddr { 1 } else { 0 };
+pub(crate) fn set_reuseaddr(socket: &net::TcpListener, reuseaddr: bool) -> io::Result<()> {
+ let val: libc::c_int = i32::from(reuseaddr);
syscall!(setsockopt(
- socket,
+ socket.as_raw_fd(),
libc::SOL_SOCKET,
libc::SO_REUSEADDR,
&val as *const libc::c_int as *const libc::c_void,
size_of::<libc::c_int>() as libc::socklen_t,
- ))
- .map(|_| ())
-}
-
-pub(crate) fn get_reuseaddr(socket: TcpSocket) -> io::Result<bool> {
- let mut optval: libc::c_int = 0;
- let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t;
-
- syscall!(getsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_REUSEADDR,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ))?;
-
- Ok(optval != 0)
-}
-
-#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
-pub(crate) fn set_reuseport(socket: TcpSocket, reuseport: bool) -> io::Result<()> {
- let val: libc::c_int = if reuseport { 1 } else { 0 };
-
- syscall!(setsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_REUSEPORT,
- &val as *const libc::c_int as *const libc::c_void,
- size_of::<libc::c_int>() as libc::socklen_t,
- ))
- .map(|_| ())
-}
-
-#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
-pub(crate) fn get_reuseport(socket: TcpSocket) -> io::Result<bool> {
- let mut optval: libc::c_int = 0;
- let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t;
-
- syscall!(getsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_REUSEPORT,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ))?;
-
- Ok(optval != 0)
-}
-
-pub(crate) fn get_localaddr(socket: TcpSocket) -> io::Result<SocketAddr> {
- let mut addr: libc::sockaddr_storage = unsafe { std::mem::zeroed() };
- let mut length = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
-
- syscall!(getsockname(
- socket,
- &mut addr as *mut _ as *mut _,
- &mut length
- ))?;
-
- unsafe { to_socket_addr(&addr) }
-}
-
-pub(crate) fn set_linger(socket: TcpSocket, dur: Option<Duration>) -> io::Result<()> {
- let val: libc::linger = libc::linger {
- l_onoff: if dur.is_some() { 1 } else { 0 },
- l_linger: dur
- .map(|dur| dur.as_secs() as libc::c_int)
- .unwrap_or_default(),
- };
- syscall!(setsockopt(
- socket,
- libc::SOL_SOCKET,
- #[cfg(target_vendor = "apple")]
- libc::SO_LINGER_SEC,
- #[cfg(not(target_vendor = "apple"))]
- libc::SO_LINGER,
- &val as *const libc::linger as *const libc::c_void,
- size_of::<libc::linger>() as libc::socklen_t,
- ))
- .map(|_| ())
-}
-
-pub(crate) fn get_linger(socket: TcpSocket) -> io::Result<Option<Duration>> {
- let mut val: libc::linger = unsafe { std::mem::zeroed() };
- let mut len = mem::size_of::<libc::linger>() as libc::socklen_t;
-
- syscall!(getsockopt(
- socket,
- libc::SOL_SOCKET,
- #[cfg(target_vendor = "apple")]
- libc::SO_LINGER_SEC,
- #[cfg(not(target_vendor = "apple"))]
- libc::SO_LINGER,
- &mut val as *mut _ as *mut _,
- &mut len,
- ))?;
-
- if val.l_onoff == 0 {
- Ok(None)
- } else {
- Ok(Some(Duration::from_secs(val.l_linger as u64)))
- }
-}
-
-pub(crate) fn set_recv_buffer_size(socket: TcpSocket, size: u32) -> io::Result<()> {
- let size = size.try_into().ok().unwrap_or_else(i32::max_value);
- syscall!(setsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_RCVBUF,
- &size as *const _ as *const libc::c_void,
- size_of::<libc::c_int>() as libc::socklen_t
- ))
- .map(|_| ())
-}
-
-pub(crate) fn get_recv_buffer_size(socket: TcpSocket) -> io::Result<u32> {
- let mut optval: libc::c_int = 0;
- let mut optlen = size_of::<libc::c_int>() as libc::socklen_t;
- syscall!(getsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_RCVBUF,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ))?;
-
- Ok(optval as u32)
-}
-
-pub(crate) fn set_send_buffer_size(socket: TcpSocket, size: u32) -> io::Result<()> {
- let size = size.try_into().ok().unwrap_or_else(i32::max_value);
- syscall!(setsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_SNDBUF,
- &size as *const _ as *const libc::c_void,
- size_of::<libc::c_int>() as libc::socklen_t
- ))
- .map(|_| ())
-}
-
-pub(crate) fn get_send_buffer_size(socket: TcpSocket) -> io::Result<u32> {
- let mut optval: libc::c_int = 0;
- let mut optlen = size_of::<libc::c_int>() as libc::socklen_t;
-
- syscall!(getsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_SNDBUF,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
))?;
-
- Ok(optval as u32)
-}
-
-pub(crate) fn set_keepalive(socket: TcpSocket, keepalive: bool) -> io::Result<()> {
- let val: libc::c_int = if keepalive { 1 } else { 0 };
- syscall!(setsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_KEEPALIVE,
- &val as *const _ as *const libc::c_void,
- size_of::<libc::c_int>() as libc::socklen_t
- ))
- .map(|_| ())
-}
-
-pub(crate) fn get_keepalive(socket: TcpSocket) -> io::Result<bool> {
- let mut optval: libc::c_int = 0;
- let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t;
-
- syscall!(getsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_KEEPALIVE,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ))?;
-
- Ok(optval != 0)
-}
-
-pub(crate) fn set_keepalive_params(socket: TcpSocket, keepalive: TcpKeepalive) -> io::Result<()> {
- if let Some(dur) = keepalive.time {
- set_keepalive_time(socket, dur)?;
- }
-
- #[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
- ))]
- {
- if let Some(dur) = keepalive.interval {
- set_keepalive_interval(socket, dur)?;
- }
-
- if let Some(retries) = keepalive.retries {
- set_keepalive_retries(socket, retries)?;
- }
- }
-
-
Ok(())
}
-fn set_keepalive_time(socket: TcpSocket, time: Duration) -> io::Result<()> {
- let time_secs = time
- .as_secs()
- .try_into()
- .ok()
- .unwrap_or_else(i32::max_value);
- syscall!(setsockopt(
- socket,
- libc::IPPROTO_TCP,
- KEEPALIVE_TIME,
- &(time_secs as libc::c_int) as *const _ as *const libc::c_void,
- size_of::<libc::c_int>() as libc::socklen_t
- ))
- .map(|_| ())
-}
-
-pub(crate) fn get_keepalive_time(socket: TcpSocket) -> io::Result<Option<Duration>> {
- if !get_keepalive(socket)? {
- return Ok(None);
- }
-
- let mut optval: libc::c_int = 0;
- let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t;
- syscall!(getsockopt(
- socket,
- libc::IPPROTO_TCP,
- KEEPALIVE_TIME,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ))?;
-
- Ok(Some(Duration::from_secs(optval as u64)))
-}
-
-/// Linux, FreeBSD, and NetBSD support setting the keepalive interval via
-/// `TCP_KEEPINTVL`.
-/// See:
-/// - https://man7.org/linux/man-pages/man7/tcp.7.html
-/// - https://www.freebsd.org/cgi/man.cgi?query=tcp#end
-/// - http://man.netbsd.org/tcp.4#DESCRIPTION
-///
-/// OpenBSD does not:
-/// https://man.openbsd.org/tcp
-#[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
-))]
-fn set_keepalive_interval(socket: TcpSocket, interval: Duration) -> io::Result<()> {
- let interval_secs = interval
- .as_secs()
- .try_into()
- .ok()
- .unwrap_or_else(i32::max_value);
- syscall!(setsockopt(
- socket,
- libc::IPPROTO_TCP,
- libc::TCP_KEEPINTVL,
- &(interval_secs as libc::c_int) as *const _ as *const libc::c_void,
- size_of::<libc::c_int>() as libc::socklen_t
- ))
- .map(|_| ())
-}
-
-#[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
-))]
-pub(crate) fn get_keepalive_interval(socket: TcpSocket) -> io::Result<Option<Duration>> {
- if !get_keepalive(socket)? {
- return Ok(None);
- }
-
- let mut optval: libc::c_int = 0;
- let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t;
- syscall!(getsockopt(
- socket,
- libc::IPPROTO_TCP,
- libc::TCP_KEEPINTVL,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ))?;
-
- Ok(Some(Duration::from_secs(optval as u64)))
-}
-
-/// Linux, macOS/iOS, FreeBSD, and NetBSD support setting the number of TCP
-/// keepalive retries via `TCP_KEEPCNT`.
-/// See:
-/// - https://man7.org/linux/man-pages/man7/tcp.7.html
-/// - https://www.freebsd.org/cgi/man.cgi?query=tcp#end
-/// - http://man.netbsd.org/tcp.4#DESCRIPTION
-///
-/// OpenBSD does not:
-/// https://man.openbsd.org/tcp
-#[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
-))]
-fn set_keepalive_retries(socket: TcpSocket, retries: u32) -> io::Result<()> {
- let retries = retries.try_into().ok().unwrap_or_else(i32::max_value);
- syscall!(setsockopt(
- socket,
- libc::IPPROTO_TCP,
- libc::TCP_KEEPCNT,
- &(retries as libc::c_int) as *const _ as *const libc::c_void,
- size_of::<libc::c_int>() as libc::socklen_t
- ))
- .map(|_| ())
-}
-
-#[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
-))]
-pub(crate) fn get_keepalive_retries(socket: TcpSocket) -> io::Result<Option<u32>> {
- if !get_keepalive(socket)? {
- return Ok(None);
- }
-
- let mut optval: libc::c_int = 0;
- let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t;
- syscall!(getsockopt(
- socket,
- libc::IPPROTO_TCP,
- libc::TCP_KEEPCNT,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ))?;
-
- Ok(Some(optval as u32))
-}
-
-pub fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
+pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
let mut addr: MaybeUninit<libc::sockaddr_storage> = MaybeUninit::uninit();
let mut length = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
@@ -431,16 +60,13 @@ pub fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, Socket
#[cfg(any(
// Android x86's seccomp profile forbids calls to `accept4(2)`
// See https://github.com/tokio-rs/mio/issues/1445 for details
- all(
- not(target_arch="x86"),
- target_os = "android"
- ),
+ all(not(target_arch="x86"), target_os = "android"),
target_os = "dragonfly",
target_os = "freebsd",
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
- target_os = "openbsd"
+ target_os = "openbsd",
))]
let stream = {
syscall!(accept4(
@@ -456,13 +82,10 @@ pub fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, Socket
// OSes inherit the non-blocking flag from the listener, so we just have to
// set `CLOEXEC`.
#[cfg(any(
- all(
- target_arch = "x86",
- target_os = "android"
- ),
- target_os = "ios",
- target_os = "macos",
- target_os = "solaris"
+ target_os = "ios",
+ target_os = "macos",
+ target_os = "redox",
+ all(target_arch = "x86", target_os = "android"),
))]
let stream = {
syscall!(accept(
@@ -473,11 +96,11 @@ pub fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, Socket
.map(|socket| unsafe { net::TcpStream::from_raw_fd(socket) })
.and_then(|s| {
syscall!(fcntl(s.as_raw_fd(), libc::F_SETFD, libc::FD_CLOEXEC))?;
-
+
// See https://github.com/tokio-rs/mio/issues/1450
- #[cfg(all(target_arch = "x86",target_os = "android"))]
+ #[cfg(all(target_arch = "x86", target_os = "android"))]
syscall!(fcntl(s.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK))?;
-
+
Ok(s)
})
}?;
diff --git a/src/sys/unix/udp.rs b/src/sys/unix/udp.rs
index 5a97cbd..843ae88 100644
--- a/src/sys/unix/udp.rs
+++ b/src/sys/unix/udp.rs
@@ -6,21 +6,13 @@ use std::net::{self, SocketAddr};
use std::os::unix::io::{AsRawFd, FromRawFd};
pub fn bind(addr: SocketAddr) -> io::Result<net::UdpSocket> {
- // Gives a warning for non Apple platforms.
- #[allow(clippy::let_and_return)]
- let socket = new_ip_socket(addr, libc::SOCK_DGRAM);
+ let fd = new_ip_socket(addr, libc::SOCK_DGRAM)?;
+ let socket = unsafe { net::UdpSocket::from_raw_fd(fd) };
- socket.and_then(|socket| {
- let (raw_addr, raw_addr_length) = socket_addr(&addr);
- syscall!(bind(socket, raw_addr.as_ptr(), raw_addr_length))
- .map_err(|err| {
- // Close the socket if we hit an error, ignoring the error
- // from closing since we can't pass back two errors.
- let _ = unsafe { libc::close(socket) };
- err
- })
- .map(|_| unsafe { net::UdpSocket::from_raw_fd(socket) })
- })
+ let (raw_addr, raw_addr_length) = socket_addr(&addr);
+ syscall!(bind(fd, raw_addr.as_ptr(), raw_addr_length))?;
+
+ Ok(socket)
}
pub(crate) fn only_v6(socket: &net::UdpSocket) -> io::Result<bool> {
diff --git a/src/sys/unix/uds/datagram.rs b/src/sys/unix/uds/datagram.rs
index d3e5314..a5ada72 100644
--- a/src/sys/unix/uds/datagram.rs
+++ b/src/sys/unix/uds/datagram.rs
@@ -7,18 +7,18 @@ use std::os::unix::net;
use std::path::Path;
pub(crate) fn bind(path: &Path) -> io::Result<net::UnixDatagram> {
- let fd = new_socket(libc::AF_UNIX, libc::SOCK_DGRAM)?;
- // Ensure the fd is closed.
- let socket = unsafe { net::UnixDatagram::from_raw_fd(fd) };
let (sockaddr, socklen) = socket_addr(path)?;
let sockaddr = &sockaddr as *const libc::sockaddr_un as *const _;
- syscall!(bind(fd, sockaddr, socklen))?;
+
+ let socket = unbound()?;
+ syscall!(bind(socket.as_raw_fd(), sockaddr, socklen))?;
+
Ok(socket)
}
pub(crate) fn unbound() -> io::Result<net::UnixDatagram> {
- new_socket(libc::AF_UNIX, libc::SOCK_DGRAM)
- .map(|socket| unsafe { net::UnixDatagram::from_raw_fd(socket) })
+ let fd = new_socket(libc::AF_UNIX, libc::SOCK_DGRAM)?;
+ Ok(unsafe { net::UnixDatagram::from_raw_fd(fd) })
}
pub(crate) fn pair() -> io::Result<(net::UnixDatagram, net::UnixDatagram)> {
diff --git a/src/sys/unix/uds/listener.rs b/src/sys/unix/uds/listener.rs
index 547ff57..3e33b30 100644
--- a/src/sys/unix/uds/listener.rs
+++ b/src/sys/unix/uds/listener.rs
@@ -7,19 +7,15 @@ use std::path::Path;
use std::{io, mem};
pub(crate) fn bind(path: &Path) -> io::Result<net::UnixListener> {
- let socket = new_socket(libc::AF_UNIX, libc::SOCK_STREAM)?;
let (sockaddr, socklen) = socket_addr(path)?;
let sockaddr = &sockaddr as *const libc::sockaddr_un as *const libc::sockaddr;
- syscall!(bind(socket, sockaddr, socklen))
- .and_then(|_| syscall!(listen(socket, 1024)))
- .map_err(|err| {
- // Close the socket if we hit an error, ignoring the error from
- // closing since we can't pass back two errors.
- let _ = unsafe { libc::close(socket) };
- err
- })
- .map(|_| unsafe { net::UnixListener::from_raw_fd(socket) })
+ let fd = new_socket(libc::AF_UNIX, libc::SOCK_STREAM)?;
+ let socket = unsafe { net::UnixListener::from_raw_fd(fd) };
+ syscall!(bind(fd, sockaddr, socklen))?;
+ syscall!(listen(fd, 1024))?;
+
+ Ok(socket)
}
pub(crate) fn accept(listener: &net::UnixListener) -> io::Result<(UnixStream, SocketAddr)> {
@@ -42,13 +38,10 @@ pub(crate) fn accept(listener: &net::UnixListener) -> io::Result<(UnixStream, So
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
- target_os = "solaris",
+ target_os = "redox",
// Android x86's seccomp profile forbids calls to `accept4(2)`
// See https://github.com/tokio-rs/mio/issues/1445 for details
- all(
- target_arch = "x86",
- target_os = "android"
- )
+ all(target_arch = "x86", target_os = "android"),
)))]
let socket = {
let flags = libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC;
@@ -62,14 +55,11 @@ pub(crate) fn accept(listener: &net::UnixListener) -> io::Result<(UnixStream, So
};
#[cfg(any(
+ target_os = "netbsd",
+ target_os = "redox",
target_os = "ios",
target_os = "macos",
- target_os = "netbsd",
- target_os = "solaris",
- all(
- target_arch = "x86",
- target_os = "android"
- )
+ all(target_arch = "x86", target_os = "android")
))]
let socket = syscall!(accept(
listener.as_raw_fd(),
@@ -83,9 +73,9 @@ pub(crate) fn accept(listener: &net::UnixListener) -> io::Result<(UnixStream, So
syscall!(fcntl(socket, libc::F_SETFD, libc::FD_CLOEXEC))?;
// See https://github.com/tokio-rs/mio/issues/1450
- #[cfg(all(target_arch = "x86",target_os = "android"))]
+ #[cfg(all(target_arch = "x86", target_os = "android"))]
syscall!(fcntl(socket, libc::F_SETFL, libc::O_NONBLOCK))?;
-
+
Ok(s)
});
diff --git a/src/sys/unix/uds/mod.rs b/src/sys/unix/uds/mod.rs
index 3ec829f..526bbdf 100644
--- a/src/sys/unix/uds/mod.rs
+++ b/src/sys/unix/uds/mod.rs
@@ -40,7 +40,7 @@ cfg_os_poll! {
sockaddr.sun_family = libc::AF_UNIX as libc::sa_family_t;
let bytes = path.as_os_str().as_bytes();
- match (bytes.get(0), bytes.len().cmp(&sockaddr.sun_path.len())) {
+ match (bytes.first(), bytes.len().cmp(&sockaddr.sun_path.len())) {
// Abstract paths don't need a null terminator
(Some(&0), Ordering::Greater) => {
return Err(io::Error::new(
@@ -64,7 +64,7 @@ cfg_os_poll! {
let offset = path_offset(&sockaddr);
let mut socklen = offset + bytes.len();
- match bytes.get(0) {
+ match bytes.first() {
// The struct has already been zeroes so the null byte for pathname
// addresses is already there.
Some(&0) | None => {}
@@ -77,20 +77,20 @@ cfg_os_poll! {
fn pair<T>(flags: libc::c_int) -> io::Result<(T, T)>
where T: FromRawFd,
{
- #[cfg(not(any(target_os = "ios", target_os = "macos", target_os = "solaris")))]
+ #[cfg(not(any(target_os = "ios", target_os = "macos")))]
let flags = flags | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC;
let mut fds = [-1; 2];
syscall!(socketpair(libc::AF_UNIX, flags, 0, fds.as_mut_ptr()))?;
let pair = unsafe { (T::from_raw_fd(fds[0]), T::from_raw_fd(fds[1])) };
- // Darwin and Solaris do not have SOCK_NONBLOCK or SOCK_CLOEXEC.
+ // Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC.
//
// In order to set those flags, additional `fcntl` sys calls must be
// performed. If a `fnctl` fails after the sockets have been created,
// the file descriptors will leak. Creating `pair` above ensures that if
// there is an error, the file descriptors are closed.
- #[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))]
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
{
syscall!(fcntl(fds[0], libc::F_SETFL, libc::O_NONBLOCK))?;
syscall!(fcntl(fds[0], libc::F_SETFD, libc::FD_CLOEXEC))?;
diff --git a/src/sys/unix/uds/socketaddr.rs b/src/sys/unix/uds/socketaddr.rs
index a9f9ea9..4c7c411 100644
--- a/src/sys/unix/uds/socketaddr.rs
+++ b/src/sys/unix/uds/socketaddr.rs
@@ -78,14 +78,8 @@ cfg_os_poll! {
/// Documentation reflected in [`SocketAddr`]
///
/// [`SocketAddr`]: std::os::unix::net::SocketAddr
- // FIXME: The matches macro requires rust 1.42.0 and we still support 1.39.0
- #[allow(clippy::match_like_matches_macro)]
pub fn is_unnamed(&self) -> bool {
- if let AddressKind::Unnamed = self.address() {
- true
- } else {
- false
- }
+ matches!(self.address(), AddressKind::Unnamed)
}
/// Returns the contents of this address if it is a `pathname` address.
@@ -100,6 +94,18 @@ cfg_os_poll! {
None
}
}
+
+ /// Returns the contents of this address if it is an abstract namespace
+ /// without the leading null byte.
+ // Link to std::os::unix::net::SocketAddr pending
+ // https://github.com/rust-lang/rust/issues/85410.
+ pub fn as_abstract_namespace(&self) -> Option<&[u8]> {
+ if let AddressKind::Abstract(path) = self.address() {
+ Some(path)
+ } else {
+ None
+ }
+ }
}
}
diff --git a/src/sys/unix/uds/stream.rs b/src/sys/unix/uds/stream.rs
index 149dd14..461917c 100644
--- a/src/sys/unix/uds/stream.rs
+++ b/src/sys/unix/uds/stream.rs
@@ -7,23 +7,18 @@ use std::os::unix::net;
use std::path::Path;
pub(crate) fn connect(path: &Path) -> io::Result<net::UnixStream> {
- let socket = new_socket(libc::AF_UNIX, libc::SOCK_STREAM)?;
let (sockaddr, socklen) = socket_addr(path)?;
let sockaddr = &sockaddr as *const libc::sockaddr_un as *const libc::sockaddr;
- match syscall!(connect(socket, sockaddr, socklen)) {
+ let fd = new_socket(libc::AF_UNIX, libc::SOCK_STREAM)?;
+ let socket = unsafe { net::UnixStream::from_raw_fd(fd) };
+ match syscall!(connect(fd, sockaddr, socklen)) {
Ok(_) => {}
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
- Err(e) => {
- // Close the socket if we hit an error, ignoring the error
- // from closing since we can't pass back two errors.
- let _ = unsafe { libc::close(socket) };
-
- return Err(e);
- }
+ Err(ref err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {}
+ Err(e) => return Err(e),
}
- Ok(unsafe { net::UnixStream::from_raw_fd(socket) })
+ Ok(socket)
}
pub(crate) fn pair() -> io::Result<(net::UnixStream, net::UnixStream)> {
diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs
index a7cf484..d8764c2 100644
--- a/src/sys/unix/waker.rs
+++ b/src/sys/unix/waker.rs
@@ -20,14 +20,11 @@ mod eventfd {
impl Waker {
pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
- syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK)).and_then(|fd| {
- // Turn the file descriptor into a file first so we're ensured
- // it's closed when dropped, e.g. when register below fails.
- let file = unsafe { File::from_raw_fd(fd) };
- selector
- .register(fd, token, Interest::READABLE)
- .map(|()| Waker { fd: file })
- })
+ let fd = syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?;
+ let file = unsafe { File::from_raw_fd(fd) };
+
+ selector.register(fd, token, Interest::READABLE)?;
+ Ok(Waker { fd: file })
}
pub fn wake(&self) -> io::Result<()> {
@@ -82,11 +79,9 @@ mod kqueue {
impl Waker {
pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
- selector.try_clone().and_then(|selector| {
- selector
- .setup_waker(token)
- .map(|()| Waker { selector, token })
- })
+ let selector = selector.try_clone()?;
+ selector.setup_waker(token)?;
+ Ok(Waker { selector, token })
}
pub fn wake(&self) -> io::Result<()> {
@@ -103,7 +98,7 @@ pub use self::kqueue::Waker;
target_os = "illumos",
target_os = "netbsd",
target_os = "openbsd",
- target_os = "solaris"
+ target_os = "redox",
))]
mod pipe {
use crate::sys::unix::Selector;
@@ -127,13 +122,11 @@ mod pipe {
pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
let mut fds = [-1; 2];
syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?;
- // Turn the file descriptors into files first so we're ensured
- // they're closed when dropped, e.g. when register below fails.
let sender = unsafe { File::from_raw_fd(fds[1]) };
let receiver = unsafe { File::from_raw_fd(fds[0]) };
- selector
- .register(fds[0], token, Interest::READABLE)
- .map(|()| Waker { sender, receiver })
+
+ selector.register(fds[0], token, Interest::READABLE)?;
+ Ok(Waker { sender, receiver })
}
pub fn wake(&self) -> io::Result<()> {
@@ -175,6 +168,6 @@ mod pipe {
target_os = "illumos",
target_os = "netbsd",
target_os = "openbsd",
- target_os = "solaris"
+ target_os = "redox",
))]
pub use self::pipe::Waker;
diff --git a/src/sys/wasi/mod.rs b/src/sys/wasi/mod.rs
new file mode 100644
index 0000000..eeea0f2
--- /dev/null
+++ b/src/sys/wasi/mod.rs
@@ -0,0 +1,370 @@
+//! # Notes
+//!
+//! The current implementation is somewhat limited. The `Waker` is not
+//! implemented, as at the time of writing there is no way to support to wake-up
+//! a thread from calling `poll_oneoff`.
+//!
+//! Furthermore the (re/de)register functions also don't work while concurrently
+//! polling as both registering and polling requires a lock on the
+//! `subscriptions`.
+//!
+//! Finally `Selector::try_clone`, required by `Registry::try_clone`, doesn't
+//! work. However this could be implemented by use of an `Arc`.
+//!
+//! In summary, this only (barely) works using a single thread.
+
+use std::cmp::min;
+use std::io;
+#[cfg(all(feature = "net", debug_assertions))]
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+
+#[cfg(feature = "net")]
+use crate::{Interest, Token};
+
+cfg_net! {
+ pub(crate) mod tcp {
+ use std::io;
+ use std::net::{self, SocketAddr};
+
+ pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
+ let (stream, addr) = listener.accept()?;
+ stream.set_nonblocking(true)?;
+ Ok((stream, addr))
+ }
+ }
+}
+
+/// Unique id for use as `SelectorId`.
+#[cfg(all(debug_assertions, feature = "net"))]
+static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
+
+pub(crate) struct Selector {
+ #[cfg(all(debug_assertions, feature = "net"))]
+ id: usize,
+ /// Subscriptions (reads events) we're interested in.
+ subscriptions: Arc<Mutex<Vec<wasi::Subscription>>>,
+}
+
+impl Selector {
+ pub(crate) fn new() -> io::Result<Selector> {
+ Ok(Selector {
+ #[cfg(all(debug_assertions, feature = "net"))]
+ id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
+ subscriptions: Arc::new(Mutex::new(Vec::new())),
+ })
+ }
+
+ #[cfg(all(debug_assertions, feature = "net"))]
+ pub(crate) fn id(&self) -> usize {
+ self.id
+ }
+
+ pub(crate) fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
+ events.clear();
+
+ let mut subscriptions = self.subscriptions.lock().unwrap();
+
+ // If we want to a use a timeout in the `wasi_poll_oneoff()` function
+ // we need another subscription to the list.
+ if let Some(timeout) = timeout {
+ subscriptions.push(timeout_subscription(timeout));
+ }
+
+ // `poll_oneoff` needs the same number of events as subscriptions.
+ let length = subscriptions.len();
+ events.reserve(length);
+
+ debug_assert!(events.capacity() >= length);
+ #[cfg(debug_assertions)]
+ if length == 0 {
+ log::warn!(
+ "calling mio::Poll::poll with empty subscriptions, this likely not what you want"
+ );
+ }
+
+ let res = unsafe { wasi::poll_oneoff(subscriptions.as_ptr(), events.as_mut_ptr(), length) };
+
+ // Remove the timeout subscription we possibly added above.
+ if timeout.is_some() {
+ let timeout_sub = subscriptions.pop();
+ debug_assert_eq!(
+ timeout_sub.unwrap().u.tag,
+ wasi::EVENTTYPE_CLOCK.raw(),
+ "failed to remove timeout subscription"
+ );
+ }
+
+ drop(subscriptions); // Unlock.
+
+ match res {
+ Ok(n_events) => {
+ // Safety: `poll_oneoff` initialises the `events` for us.
+ unsafe { events.set_len(n_events) };
+
+ // Remove the timeout event.
+ if timeout.is_some() {
+ if let Some(index) = events.iter().position(is_timeout_event) {
+ events.swap_remove(index);
+ }
+ }
+
+ check_errors(&events)
+ }
+ Err(err) => Err(io_err(err)),
+ }
+ }
+
+ pub(crate) fn try_clone(&self) -> io::Result<Selector> {
+ Ok(Selector {
+ #[cfg(all(debug_assertions, feature = "net"))]
+ id: self.id,
+ subscriptions: self.subscriptions.clone(),
+ })
+ }
+
+ #[cfg(feature = "net")]
+ pub(crate) fn register(
+ &self,
+ fd: wasi::Fd,
+ token: Token,
+ interests: Interest,
+ ) -> io::Result<()> {
+ let mut subscriptions = self.subscriptions.lock().unwrap();
+
+ if interests.is_writable() {
+ let subscription = wasi::Subscription {
+ userdata: token.0 as wasi::Userdata,
+ u: wasi::SubscriptionU {
+ tag: wasi::EVENTTYPE_FD_WRITE.raw(),
+ u: wasi::SubscriptionUU {
+ fd_write: wasi::SubscriptionFdReadwrite {
+ file_descriptor: fd,
+ },
+ },
+ },
+ };
+ subscriptions.push(subscription);
+ }
+
+ if interests.is_readable() {
+ let subscription = wasi::Subscription {
+ userdata: token.0 as wasi::Userdata,
+ u: wasi::SubscriptionU {
+ tag: wasi::EVENTTYPE_FD_READ.raw(),
+ u: wasi::SubscriptionUU {
+ fd_read: wasi::SubscriptionFdReadwrite {
+ file_descriptor: fd,
+ },
+ },
+ },
+ };
+ subscriptions.push(subscription);
+ }
+
+ Ok(())
+ }
+
+ #[cfg(feature = "net")]
+ pub(crate) fn reregister(
+ &self,
+ fd: wasi::Fd,
+ token: Token,
+ interests: Interest,
+ ) -> io::Result<()> {
+ self.deregister(fd)
+ .and_then(|()| self.register(fd, token, interests))
+ }
+
+ #[cfg(feature = "net")]
+ pub(crate) fn deregister(&self, fd: wasi::Fd) -> io::Result<()> {
+ let mut subscriptions = self.subscriptions.lock().unwrap();
+
+ let predicate = |subscription: &wasi::Subscription| {
+ // Safety: `subscription.u.tag` defines the type of the union in
+ // `subscription.u.u`.
+ match subscription.u.tag {
+ t if t == wasi::EVENTTYPE_FD_WRITE.raw() => unsafe {
+ subscription.u.u.fd_write.file_descriptor == fd
+ },
+ t if t == wasi::EVENTTYPE_FD_READ.raw() => unsafe {
+ subscription.u.u.fd_read.file_descriptor == fd
+ },
+ _ => false,
+ }
+ };
+
+ let mut ret = Err(io::ErrorKind::NotFound.into());
+
+ while let Some(index) = subscriptions.iter().position(predicate) {
+ subscriptions.swap_remove(index);
+ ret = Ok(())
+ }
+
+ ret
+ }
+}
+
+/// Token used to a add a timeout subscription, also used in removing it again.
+const TIMEOUT_TOKEN: wasi::Userdata = wasi::Userdata::max_value();
+
+/// Returns a `wasi::Subscription` for `timeout`.
+fn timeout_subscription(timeout: Duration) -> wasi::Subscription {
+ wasi::Subscription {
+ userdata: TIMEOUT_TOKEN,
+ u: wasi::SubscriptionU {
+ tag: wasi::EVENTTYPE_CLOCK.raw(),
+ u: wasi::SubscriptionUU {
+ clock: wasi::SubscriptionClock {
+ id: wasi::CLOCKID_MONOTONIC,
+ // Timestamp is in nanoseconds.
+ timeout: min(wasi::Timestamp::MAX as u128, timeout.as_nanos())
+ as wasi::Timestamp,
+ // Give the implementation another millisecond to coalesce
+ // events.
+ precision: Duration::from_millis(1).as_nanos() as wasi::Timestamp,
+ // Zero means the `timeout` is considered relative to the
+ // current time.
+ flags: 0,
+ },
+ },
+ },
+ }
+}
+
+fn is_timeout_event(event: &wasi::Event) -> bool {
+ event.type_ == wasi::EVENTTYPE_CLOCK && event.userdata == TIMEOUT_TOKEN
+}
+
+/// Check all events for possible errors, it returns the first error found.
+fn check_errors(events: &[Event]) -> io::Result<()> {
+ for event in events {
+ if event.error != wasi::ERRNO_SUCCESS {
+ return Err(io_err(event.error));
+ }
+ }
+ Ok(())
+}
+
+/// Convert `wasi::Errno` into an `io::Error`.
+fn io_err(errno: wasi::Errno) -> io::Error {
+ // TODO: check if this is valid.
+ io::Error::from_raw_os_error(errno.raw() as i32)
+}
+
+pub(crate) type Events = Vec<Event>;
+
+pub(crate) type Event = wasi::Event;
+
+pub(crate) mod event {
+ use std::fmt;
+
+ use crate::sys::Event;
+ use crate::Token;
+
+ pub(crate) fn token(event: &Event) -> Token {
+ Token(event.userdata as usize)
+ }
+
+ pub(crate) fn is_readable(event: &Event) -> bool {
+ event.type_ == wasi::EVENTTYPE_FD_READ
+ }
+
+ pub(crate) fn is_writable(event: &Event) -> bool {
+ event.type_ == wasi::EVENTTYPE_FD_WRITE
+ }
+
+ pub(crate) fn is_error(_: &Event) -> bool {
+ // Not supported? It could be that `wasi::Event.error` could be used for
+ // this, but the docs say `error that occurred while processing the
+ // subscription request`, so it's checked in `Select::select` already.
+ false
+ }
+
+ pub(crate) fn is_read_closed(event: &Event) -> bool {
+ event.type_ == wasi::EVENTTYPE_FD_READ
+ // Safety: checked the type of the union above.
+ && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0
+ }
+
+ pub(crate) fn is_write_closed(event: &Event) -> bool {
+ event.type_ == wasi::EVENTTYPE_FD_WRITE
+ // Safety: checked the type of the union above.
+ && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0
+ }
+
+ pub(crate) fn is_priority(_: &Event) -> bool {
+ // Not supported.
+ false
+ }
+
+ pub(crate) fn is_aio(_: &Event) -> bool {
+ // Not supported.
+ false
+ }
+
+ pub(crate) fn is_lio(_: &Event) -> bool {
+ // Not supported.
+ false
+ }
+
+ pub(crate) fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result {
+ debug_detail!(
+ TypeDetails(wasi::Eventtype),
+ PartialEq::eq,
+ wasi::EVENTTYPE_CLOCK,
+ wasi::EVENTTYPE_FD_READ,
+ wasi::EVENTTYPE_FD_WRITE,
+ );
+
+ #[allow(clippy::trivially_copy_pass_by_ref)]
+ fn check_flag(got: &wasi::Eventrwflags, want: &wasi::Eventrwflags) -> bool {
+ (got & want) != 0
+ }
+ debug_detail!(
+ EventrwflagsDetails(wasi::Eventrwflags),
+ check_flag,
+ wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP,
+ );
+
+ struct EventFdReadwriteDetails(wasi::EventFdReadwrite);
+
+ impl fmt::Debug for EventFdReadwriteDetails {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("EventFdReadwrite")
+ .field("nbytes", &self.0.nbytes)
+ .field("flags", &self.0.flags)
+ .finish()
+ }
+ }
+
+ f.debug_struct("Event")
+ .field("userdata", &event.userdata)
+ .field("error", &event.error)
+ .field("type", &TypeDetails(event.type_))
+ .field("fd_readwrite", &EventFdReadwriteDetails(event.fd_readwrite))
+ .finish()
+ }
+}
+
+cfg_os_poll! {
+ cfg_io_source! {
+ pub(crate) struct IoSourceState;
+
+ impl IoSourceState {
+ pub(crate) fn new() -> IoSourceState {
+ IoSourceState
+ }
+
+ pub(crate) fn do_io<T, F, R>(&self, f: F, io: &T) -> io::Result<R>
+ where
+ F: FnOnce(&T) -> io::Result<R>,
+ {
+ // We don't hold state, so we can just call the function and
+ // return.
+ f(io)
+ }
+ }
+ }
+}
diff --git a/src/sys/windows/afd.rs b/src/sys/windows/afd.rs
index 6241a45..0308e2f 100644
--- a/src/sys/windows/afd.rs
+++ b/src/sys/windows/afd.rs
@@ -1,17 +1,32 @@
-use ntapi::ntioapi::{IO_STATUS_BLOCK_u, IO_STATUS_BLOCK};
-use ntapi::ntioapi::{NtCancelIoFileEx, NtDeviceIoControlFile};
-use ntapi::ntrtl::RtlNtStatusToDosError;
+use std::ffi::c_void;
use std::fmt;
use std::fs::File;
use std::io;
use std::mem::size_of;
use std::os::windows::io::AsRawHandle;
-use std::ptr::null_mut;
-use winapi::shared::ntdef::{HANDLE, LARGE_INTEGER, NTSTATUS, PVOID, ULONG};
-use winapi::shared::ntstatus::{STATUS_NOT_FOUND, STATUS_PENDING, STATUS_SUCCESS};
-const IOCTL_AFD_POLL: ULONG = 0x00012024;
+use windows_sys::Win32::Foundation::{
+ RtlNtStatusToDosError, HANDLE, NTSTATUS, STATUS_NOT_FOUND, STATUS_PENDING, STATUS_SUCCESS,
+};
+use windows_sys::Win32::System::WindowsProgramming::{
+ NtDeviceIoControlFile, IO_STATUS_BLOCK, IO_STATUS_BLOCK_0,
+};
+const IOCTL_AFD_POLL: u32 = 0x00012024;
+
+#[link(name = "ntdll")]
+extern "system" {
+ /// See <https://processhacker.sourceforge.io/doc/ntioapi_8h.html#a0d4d550cad4d62d75b76961e25f6550c>
+ ///
+ /// This is an undocumented API and as such not part of <https://github.com/microsoft/win32metadata>
+ /// from which `windows-sys` is generated, and also unlikely to be added, so
+ /// we manually declare it here
+ fn NtCancelIoFileEx(
+ FileHandle: HANDLE,
+ IoRequestToCancel: *mut IO_STATUS_BLOCK,
+ IoStatusBlock: *mut IO_STATUS_BLOCK,
+ ) -> NTSTATUS;
+}
/// Winsock2 AFD driver instance.
///
/// All operations are unsafe due to IO_STATUS_BLOCK parameter are being used by Afd driver during STATUS_PENDING before I/O Completion Port returns its result.
@@ -24,7 +39,7 @@ pub struct Afd {
#[derive(Debug)]
pub struct AfdPollHandleInfo {
pub handle: HANDLE,
- pub events: ULONG,
+ pub events: u32,
pub status: NTSTATUS,
}
@@ -32,10 +47,10 @@ unsafe impl Send for AfdPollHandleInfo {}
#[repr(C)]
pub struct AfdPollInfo {
- pub timeout: LARGE_INTEGER,
+ pub timeout: i64,
// Can have only value 1.
- pub number_of_handles: ULONG,
- pub exclusive: ULONG,
+ pub number_of_handles: u32,
+ pub exclusive: u32,
pub handles: [AfdPollHandleInfo; 1],
}
@@ -58,13 +73,13 @@ impl Afd {
&self,
info: &mut AfdPollInfo,
iosb: *mut IO_STATUS_BLOCK,
- overlapped: PVOID,
+ overlapped: *mut c_void,
) -> io::Result<bool> {
- let info_ptr: PVOID = info as *mut _ as PVOID;
- (*iosb).u.Status = STATUS_PENDING;
+ let info_ptr = info as *mut _ as *mut c_void;
+ (*iosb).Anonymous.Status = STATUS_PENDING;
let status = NtDeviceIoControlFile(
- self.fd.as_raw_handle(),
- null_mut(),
+ self.fd.as_raw_handle() as HANDLE,
+ 0,
None,
overlapped,
iosb,
@@ -93,15 +108,15 @@ impl Afd {
/// Use it only with request is still being polled so that you have valid `IO_STATUS_BLOCK` to use.
/// User should NOT deallocate there overlapped value after the `cancel` to prevent double free.
pub unsafe fn cancel(&self, iosb: *mut IO_STATUS_BLOCK) -> io::Result<()> {
- if (*iosb).u.Status != STATUS_PENDING {
+ if (*iosb).Anonymous.Status != STATUS_PENDING {
return Ok(());
}
let mut cancel_iosb = IO_STATUS_BLOCK {
- u: IO_STATUS_BLOCK_u { Status: 0 },
+ Anonymous: IO_STATUS_BLOCK_0 { Status: 0 },
Information: 0,
};
- let status = NtCancelIoFileEx(self.fd.as_raw_handle(), iosb, &mut cancel_iosb);
+ let status = NtCancelIoFileEx(self.fd.as_raw_handle() as HANDLE, iosb, &mut cancel_iosb);
if status == STATUS_SUCCESS || status == STATUS_NOT_FOUND {
return Ok(());
}
@@ -114,18 +129,21 @@ impl Afd {
cfg_io_source! {
use std::mem::zeroed;
use std::os::windows::io::{FromRawHandle, RawHandle};
+ use std::ptr::null_mut;
use std::sync::atomic::{AtomicUsize, Ordering};
- use miow::iocp::CompletionPort;
- use ntapi::ntioapi::{NtCreateFile, FILE_OPEN};
- use winapi::shared::ntdef::{OBJECT_ATTRIBUTES, UNICODE_STRING, USHORT, WCHAR};
- use winapi::um::handleapi::INVALID_HANDLE_VALUE;
- use winapi::um::winbase::{SetFileCompletionNotificationModes, FILE_SKIP_SET_EVENT_ON_HANDLE};
- use winapi::um::winnt::{SYNCHRONIZE, FILE_SHARE_READ, FILE_SHARE_WRITE};
+ use super::iocp::CompletionPort;
+ use windows_sys::Win32::{
+ Foundation::{UNICODE_STRING, INVALID_HANDLE_VALUE},
+ System::WindowsProgramming::{
+ OBJECT_ATTRIBUTES, FILE_SKIP_SET_EVENT_ON_HANDLE,
+ },
+ Storage::FileSystem::{FILE_OPEN, NtCreateFile, SetFileCompletionNotificationModes, SYNCHRONIZE, FILE_SHARE_READ, FILE_SHARE_WRITE},
+ };
const AFD_HELPER_ATTRIBUTES: OBJECT_ATTRIBUTES = OBJECT_ATTRIBUTES {
- Length: size_of::<OBJECT_ATTRIBUTES>() as ULONG,
- RootDirectory: null_mut(),
+ Length: size_of::<OBJECT_ATTRIBUTES>() as u32,
+ RootDirectory: 0,
ObjectName: &AFD_OBJ_NAME as *const _ as *mut _,
Attributes: 0,
SecurityDescriptor: null_mut(),
@@ -133,12 +151,12 @@ cfg_io_source! {
};
const AFD_OBJ_NAME: UNICODE_STRING = UNICODE_STRING {
- Length: (AFD_HELPER_NAME.len() * size_of::<WCHAR>()) as USHORT,
- MaximumLength: (AFD_HELPER_NAME.len() * size_of::<WCHAR>()) as USHORT,
+ Length: (AFD_HELPER_NAME.len() * size_of::<u16>()) as u16,
+ MaximumLength: (AFD_HELPER_NAME.len() * size_of::<u16>()) as u16,
Buffer: AFD_HELPER_NAME.as_ptr() as *mut _,
};
- const AFD_HELPER_NAME: &[WCHAR] = &[
+ const AFD_HELPER_NAME: &[u16] = &[
'\\' as _,
'D' as _,
'e' as _,
@@ -166,10 +184,10 @@ cfg_io_source! {
impl Afd {
/// Create new Afd instance.
- pub fn new(cp: &CompletionPort) -> io::Result<Afd> {
+ pub(crate) fn new(cp: &CompletionPort) -> io::Result<Afd> {
let mut afd_helper_handle: HANDLE = INVALID_HANDLE_VALUE;
let mut iosb = IO_STATUS_BLOCK {
- u: IO_STATUS_BLOCK_u { Status: 0 },
+ Anonymous: IO_STATUS_BLOCK_0 { Status: 0 },
Information: 0,
};
@@ -180,12 +198,12 @@ cfg_io_source! {
&AFD_HELPER_ATTRIBUTES as *const _ as *mut _,
&mut iosb,
null_mut(),
- 0 as ULONG,
+ 0,
FILE_SHARE_READ | FILE_SHARE_WRITE,
FILE_OPEN,
- 0 as ULONG,
+ 0,
null_mut(),
- 0 as ULONG,
+ 0,
);
if status != STATUS_SUCCESS {
let raw_err = io::Error::from_raw_os_error(
@@ -204,7 +222,7 @@ cfg_io_source! {
cp.add_handle(token, &afd.fd)?;
match SetFileCompletionNotificationModes(
afd_helper_handle,
- FILE_SKIP_SET_EVENT_ON_HANDLE,
+ FILE_SKIP_SET_EVENT_ON_HANDLE as u8 // This is just 2, so fits in u8
) {
0 => Err(io::Error::last_os_error()),
_ => Ok(afd),
@@ -214,18 +232,18 @@ cfg_io_source! {
}
}
-pub const POLL_RECEIVE: u32 = 0b000_000_001;
-pub const POLL_RECEIVE_EXPEDITED: u32 = 0b000_000_010;
-pub const POLL_SEND: u32 = 0b000_000_100;
-pub const POLL_DISCONNECT: u32 = 0b000_001_000;
-pub const POLL_ABORT: u32 = 0b000_010_000;
-pub const POLL_LOCAL_CLOSE: u32 = 0b000_100_000;
+pub const POLL_RECEIVE: u32 = 0b0_0000_0001;
+pub const POLL_RECEIVE_EXPEDITED: u32 = 0b0_0000_0010;
+pub const POLL_SEND: u32 = 0b0_0000_0100;
+pub const POLL_DISCONNECT: u32 = 0b0_0000_1000;
+pub const POLL_ABORT: u32 = 0b0_0001_0000;
+pub const POLL_LOCAL_CLOSE: u32 = 0b0_0010_0000;
// Not used as it indicated in each event where a connection is connected, not
// just the first time a connection is established.
// Also see https://github.com/piscisaureus/wepoll/commit/8b7b340610f88af3d83f40fb728e7b850b090ece.
-pub const POLL_CONNECT: u32 = 0b001_000_000;
-pub const POLL_ACCEPT: u32 = 0b010_000_000;
-pub const POLL_CONNECT_FAIL: u32 = 0b100_000_000;
+pub const POLL_CONNECT: u32 = 0b0_0100_0000;
+pub const POLL_ACCEPT: u32 = 0b0_1000_0000;
+pub const POLL_CONNECT_FAIL: u32 = 0b1_0000_0000;
pub const KNOWN_EVENTS: u32 = POLL_RECEIVE
| POLL_RECEIVE_EXPEDITED
diff --git a/src/sys/windows/event.rs b/src/sys/windows/event.rs
index a49252a..731bd60 100644
--- a/src/sys/windows/event.rs
+++ b/src/sys/windows/event.rs
@@ -1,8 +1,7 @@
use std::fmt;
-use miow::iocp::CompletionStatus;
-
use super::afd;
+use super::iocp::CompletionStatus;
use crate::Token;
#[derive(Clone)]
diff --git a/src/sys/windows/handle.rs b/src/sys/windows/handle.rs
new file mode 100644
index 0000000..5b9ac0b
--- /dev/null
+++ b/src/sys/windows/handle.rs
@@ -0,0 +1,30 @@
+use std::os::windows::io::RawHandle;
+use windows_sys::Win32::Foundation::{CloseHandle, HANDLE};
+
+/// Wrapper around a Windows HANDLE so that we close it upon drop in all scenarios
+#[derive(Debug)]
+pub struct Handle(HANDLE);
+
+impl Handle {
+ #[inline]
+ pub fn new(handle: HANDLE) -> Self {
+ Self(handle)
+ }
+
+ pub fn raw(&self) -> HANDLE {
+ self.0
+ }
+
+ pub fn into_raw(self) -> RawHandle {
+ let ret = self.0;
+ // This is super important so that drop is not called!
+ std::mem::forget(self);
+ ret as RawHandle
+ }
+}
+
+impl Drop for Handle {
+ fn drop(&mut self) {
+ unsafe { CloseHandle(self.0) };
+ }
+}
diff --git a/src/sys/windows/io_status_block.rs b/src/sys/windows/io_status_block.rs
index 3e60334..d7eda6a 100644
--- a/src/sys/windows/io_status_block.rs
+++ b/src/sys/windows/io_status_block.rs
@@ -1,17 +1,17 @@
use std::fmt;
use std::ops::{Deref, DerefMut};
-use ntapi::ntioapi::IO_STATUS_BLOCK;
+use windows_sys::Win32::System::WindowsProgramming::IO_STATUS_BLOCK;
pub struct IoStatusBlock(IO_STATUS_BLOCK);
cfg_io_source! {
- use ntapi::ntioapi::IO_STATUS_BLOCK_u;
+ use windows_sys::Win32::System::WindowsProgramming::{IO_STATUS_BLOCK_0};
impl IoStatusBlock {
pub fn zeroed() -> Self {
Self(IO_STATUS_BLOCK {
- u: IO_STATUS_BLOCK_u { Status: 0 },
+ Anonymous: IO_STATUS_BLOCK_0 { Status: 0 },
Information: 0,
})
}
diff --git a/src/sys/windows/iocp.rs b/src/sys/windows/iocp.rs
new file mode 100644
index 0000000..262c8f2
--- /dev/null
+++ b/src/sys/windows/iocp.rs
@@ -0,0 +1,270 @@
+//! Bindings to IOCP, I/O Completion Ports
+
+use super::{Handle, Overlapped};
+use std::cmp;
+use std::fmt;
+use std::io;
+use std::mem;
+use std::os::windows::io::*;
+use std::time::Duration;
+
+use windows_sys::Win32::Foundation::{HANDLE, INVALID_HANDLE_VALUE};
+use windows_sys::Win32::System::IO::{
+ CreateIoCompletionPort, GetQueuedCompletionStatusEx, PostQueuedCompletionStatus, OVERLAPPED,
+ OVERLAPPED_ENTRY,
+};
+
+/// A handle to an Windows I/O Completion Port.
+#[derive(Debug)]
+pub(crate) struct CompletionPort {
+ handle: Handle,
+}
+
+/// A status message received from an I/O completion port.
+///
+/// These statuses can be created via the `new` or `empty` constructors and then
+/// provided to a completion port, or they are read out of a completion port.
+/// The fields of each status are read through its accessor methods.
+#[derive(Clone, Copy)]
+#[repr(transparent)]
+pub struct CompletionStatus(OVERLAPPED_ENTRY);
+
+impl fmt::Debug for CompletionStatus {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "CompletionStatus(OVERLAPPED_ENTRY)")
+ }
+}
+
+unsafe impl Send for CompletionStatus {}
+unsafe impl Sync for CompletionStatus {}
+
+impl CompletionPort {
+ /// Creates a new I/O completion port with the specified concurrency value.
+ ///
+ /// The number of threads given corresponds to the level of concurrency
+ /// allowed for threads associated with this port. Consult the Windows
+ /// documentation for more information about this value.
+ pub fn new(threads: u32) -> io::Result<CompletionPort> {
+ let ret = unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, threads) };
+ if ret == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(CompletionPort {
+ handle: Handle::new(ret),
+ })
+ }
+ }
+
+ /// Associates a new `HANDLE` to this I/O completion port.
+ ///
+ /// This function will associate the given handle to this port with the
+ /// given `token` to be returned in status messages whenever it receives a
+ /// notification.
+ ///
+ /// Any object which is convertible to a `HANDLE` via the `AsRawHandle`
+ /// trait can be provided to this function, such as `std::fs::File` and
+ /// friends.
+ #[cfg(any(feature = "net", feature = "os-ext"))]
+ pub fn add_handle<T: AsRawHandle + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> {
+ let ret = unsafe {
+ CreateIoCompletionPort(t.as_raw_handle() as HANDLE, self.handle.raw(), token, 0)
+ };
+ if ret == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(())
+ }
+ }
+
+ /// Dequeues a number of completion statuses from this I/O completion port.
+ ///
+ /// This function is the same as `get` except that it may return more than
+ /// one status. A buffer of "zero" statuses is provided (the contents are
+ /// not read) and then on success this function will return a sub-slice of
+ /// statuses which represent those which were dequeued from this port. This
+ /// function does not wait to fill up the entire list of statuses provided.
+ ///
+ /// Like with `get`, a timeout may be specified for this operation.
+ pub fn get_many<'a>(
+ &self,
+ list: &'a mut [CompletionStatus],
+ timeout: Option<Duration>,
+ ) -> io::Result<&'a mut [CompletionStatus]> {
+ debug_assert_eq!(
+ mem::size_of::<CompletionStatus>(),
+ mem::size_of::<OVERLAPPED_ENTRY>()
+ );
+ let mut removed = 0;
+ let timeout = duration_millis(timeout);
+ let len = cmp::min(list.len(), <u32>::max_value() as usize) as u32;
+ let ret = unsafe {
+ GetQueuedCompletionStatusEx(
+ self.handle.raw(),
+ list.as_ptr() as *mut _,
+ len,
+ &mut removed,
+ timeout,
+ 0,
+ )
+ };
+
+ if ret == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(&mut list[..removed as usize])
+ }
+ }
+
+ /// Posts a new completion status onto this I/O completion port.
+ ///
+ /// This function will post the given status, with custom parameters, to the
+ /// port. Threads blocked in `get` or `get_many` will eventually receive
+ /// this status.
+ pub fn post(&self, status: CompletionStatus) -> io::Result<()> {
+ let ret = unsafe {
+ PostQueuedCompletionStatus(
+ self.handle.raw(),
+ status.0.dwNumberOfBytesTransferred,
+ status.0.lpCompletionKey,
+ status.0.lpOverlapped,
+ )
+ };
+
+ if ret == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(())
+ }
+ }
+}
+
+impl AsRawHandle for CompletionPort {
+ fn as_raw_handle(&self) -> RawHandle {
+ self.handle.raw() as RawHandle
+ }
+}
+
+impl FromRawHandle for CompletionPort {
+ unsafe fn from_raw_handle(handle: RawHandle) -> CompletionPort {
+ CompletionPort {
+ handle: Handle::new(handle as HANDLE),
+ }
+ }
+}
+
+impl IntoRawHandle for CompletionPort {
+ fn into_raw_handle(self) -> RawHandle {
+ self.handle.into_raw()
+ }
+}
+
+impl CompletionStatus {
+ /// Creates a new completion status with the provided parameters.
+ ///
+ /// This function is useful when creating a status to send to a port with
+ /// the `post` method. The parameters are opaquely passed through and not
+ /// interpreted by the system at all.
+ pub(crate) fn new(bytes: u32, token: usize, overlapped: *mut Overlapped) -> Self {
+ CompletionStatus(OVERLAPPED_ENTRY {
+ dwNumberOfBytesTransferred: bytes,
+ lpCompletionKey: token,
+ lpOverlapped: overlapped as *mut _,
+ Internal: 0,
+ })
+ }
+
+ /// Creates a new borrowed completion status from the borrowed
+ /// `OVERLAPPED_ENTRY` argument provided.
+ ///
+ /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`,
+ /// returning the wrapped structure.
+ #[cfg(feature = "os-ext")]
+ pub fn from_entry(entry: &OVERLAPPED_ENTRY) -> &Self {
+ // Safety: CompletionStatus is repr(transparent) w/ OVERLAPPED_ENTRY, so
+ // a reference to one is guaranteed to be layout compatible with the
+ // reference to another.
+ unsafe { &*(entry as *const _ as *const _) }
+ }
+
+ /// Creates a new "zero" completion status.
+ ///
+ /// This function is useful when creating a stack buffer or vector of
+ /// completion statuses to be passed to the `get_many` function.
+ pub fn zero() -> Self {
+ Self::new(0, 0, std::ptr::null_mut())
+ }
+
+ /// Returns the number of bytes that were transferred for the I/O operation
+ /// associated with this completion status.
+ pub fn bytes_transferred(&self) -> u32 {
+ self.0.dwNumberOfBytesTransferred
+ }
+
+ /// Returns the completion key value associated with the file handle whose
+ /// I/O operation has completed.
+ ///
+ /// A completion key is a per-handle key that is specified when it is added
+ /// to an I/O completion port via `add_handle` or `add_socket`.
+ pub fn token(&self) -> usize {
+ self.0.lpCompletionKey as usize
+ }
+
+ /// Returns a pointer to the `Overlapped` structure that was specified when
+ /// the I/O operation was started.
+ pub fn overlapped(&self) -> *mut OVERLAPPED {
+ self.0.lpOverlapped
+ }
+
+ /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object.
+ pub fn entry(&self) -> &OVERLAPPED_ENTRY {
+ &self.0
+ }
+}
+
+#[inline]
+fn duration_millis(dur: Option<Duration>) -> u32 {
+ if let Some(dur) = dur {
+ // `Duration::as_millis` truncates, so round up. This avoids
+ // turning sub-millisecond timeouts into a zero timeout, unless
+ // the caller explicitly requests that by specifying a zero
+ // timeout.
+ let dur_ms = (dur + Duration::from_nanos(999_999)).as_millis();
+ cmp::min(dur_ms, u32::MAX as u128) as u32
+ } else {
+ u32::MAX
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::{CompletionPort, CompletionStatus};
+
+ #[test]
+ fn is_send_sync() {
+ fn is_send_sync<T: Send + Sync>() {}
+ is_send_sync::<CompletionPort>();
+ }
+
+ #[test]
+ fn get_many() {
+ let c = CompletionPort::new(1).unwrap();
+
+ c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap();
+ c.post(CompletionStatus::new(4, 5, 6 as *mut _)).unwrap();
+
+ let mut s = vec![CompletionStatus::zero(); 4];
+ {
+ let s = c.get_many(&mut s, None).unwrap();
+ assert_eq!(s.len(), 2);
+ assert_eq!(s[0].bytes_transferred(), 1);
+ assert_eq!(s[0].token(), 2);
+ assert_eq!(s[0].overlapped(), 3 as *mut _);
+ assert_eq!(s[1].bytes_transferred(), 4);
+ assert_eq!(s[1].token(), 5);
+ assert_eq!(s[1].overlapped(), 6 as *mut _);
+ }
+ assert_eq!(s[2].bytes_transferred(), 0);
+ assert_eq!(s[2].token(), 0);
+ assert_eq!(s[2].overlapped(), 0 as *mut _);
+ }
+}
diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs
index 98b6fc6..f8b72fc 100644
--- a/src/sys/windows/mod.rs
+++ b/src/sys/windows/mod.rs
@@ -1,15 +1,20 @@
mod afd;
-mod io_status_block;
pub mod event;
pub use event::{Event, Events};
-mod selector;
-pub use selector::{Selector, SelectorInner, SockState};
+mod handle;
+use handle::Handle;
+
+mod io_status_block;
+mod iocp;
mod overlapped;
use overlapped::Overlapped;
+mod selector;
+pub use selector::{Selector, SelectorInner, SockState};
+
// Macros must be defined before the modules that use them
cfg_net! {
/// Helper macro to execute a system call that returns an `io::Result`.
@@ -45,7 +50,7 @@ cfg_io_source! {
use std::pin::Pin;
use std::sync::{Arc, Mutex};
- use crate::{poll, Interest, Registry, Token};
+ use crate::{Interest, Registry, Token};
struct InternalState {
selector: Arc<SelectorInner>,
@@ -101,7 +106,8 @@ cfg_io_source! {
if self.inner.is_some() {
Err(io::ErrorKind::AlreadyExists.into())
} else {
- poll::selector(registry)
+ registry
+ .selector()
.register(socket, token, interests)
.map(|state| {
self.inner = Some(Box::new(state));
@@ -117,7 +123,8 @@ cfg_io_source! {
) -> io::Result<()> {
match self.inner.as_mut() {
Some(state) => {
- poll::selector(registry)
+ registry
+ .selector()
.reregister(state.sock_state.clone(), token, interests)
.map(|()| {
state.token = token;
diff --git a/src/sys/windows/named_pipe.rs b/src/sys/windows/named_pipe.rs
index 8c81f38..23f85d1 100644
--- a/src/sys/windows/named_pipe.rs
+++ b/src/sys/windows/named_pipe.rs
@@ -1,41 +1,31 @@
-use crate::event::Source;
-use crate::sys::windows::{Event, Overlapped};
-use crate::{poll, Registry};
-use winapi::um::minwinbase::OVERLAPPED_ENTRY;
-
use std::ffi::OsStr;
-use std::fmt;
use std::io::{self, Read, Write};
-use std::mem;
-use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
-use std::slice;
+use std::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle};
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::{Arc, Mutex};
+use std::{fmt, mem, slice};
+
+use windows_sys::Win32::Foundation::{
+ ERROR_BROKEN_PIPE, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_NO_DATA, ERROR_PIPE_CONNECTED,
+ ERROR_PIPE_LISTENING, HANDLE, INVALID_HANDLE_VALUE,
+};
+use windows_sys::Win32::Storage::FileSystem::{
+ ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX,
+};
+use windows_sys::Win32::System::Pipes::{
+ ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe, PIPE_TYPE_BYTE,
+ PIPE_UNLIMITED_INSTANCES,
+};
+use windows_sys::Win32::System::IO::{
+ CancelIoEx, GetOverlappedResult, OVERLAPPED, OVERLAPPED_ENTRY,
+};
+use crate::event::Source;
+use crate::sys::windows::iocp::{CompletionPort, CompletionStatus};
+use crate::sys::windows::{Event, Handle, Overlapped};
+use crate::Registry;
use crate::{Interest, Token};
-use miow::iocp::{CompletionPort, CompletionStatus};
-use miow::pipe;
-use winapi::shared::winerror::{ERROR_BROKEN_PIPE, ERROR_PIPE_LISTENING};
-use winapi::um::ioapiset::CancelIoEx;
-
-/// # Safety
-///
-/// Only valid if the strict is annotated with `#[repr(C)]`. This is only used
-/// with `Overlapped` and `Inner`, which are correctly annotated.
-macro_rules! offset_of {
- ($t:ty, $($field:ident).+) => (
- &(*(0 as *const $t)).$($field).+ as *const _ as usize
- )
-}
-
-macro_rules! overlapped2arc {
- ($e:expr, $t:ty, $($field:ident).+) => ({
- let offset = offset_of!($t, $($field).+);
- debug_assert!(offset < mem::size_of::<$t>());
- Arc::from_raw(($e as usize - offset) as *mut $t)
- })
-}
/// Non-blocking windows named pipe.
///
@@ -83,30 +73,266 @@ pub struct NamedPipe {
inner: Arc<Inner>,
}
+/// # Notes
+///
+/// The memory layout of this structure must be fixed as the
+/// `ptr_from_*_overlapped` methods depend on it, see the `ptr_from` test.
#[repr(C)]
struct Inner {
- handle: pipe::NamedPipe,
-
+ // NOTE: careful modifying the order of these three fields, the `ptr_from_*`
+ // methods depend on the layout!
connect: Overlapped,
- connecting: AtomicBool,
-
read: Overlapped,
write: Overlapped,
-
+ // END NOTE.
+ handle: Handle,
+ connecting: AtomicBool,
io: Mutex<Io>,
-
pool: Mutex<BufferPool>,
}
+impl Inner {
+ /// Converts a pointer to `Inner.connect` to a pointer to `Inner`.
+ ///
+ /// # Unsafety
+ ///
+ /// Caller must ensure `ptr` is pointing to `Inner.connect`.
+ unsafe fn ptr_from_conn_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
+ // `connect` is the first field, so the pointer are the same.
+ ptr.cast()
+ }
+
+ /// Same as [`ptr_from_conn_overlapped`] but for `Inner.read`.
+ unsafe fn ptr_from_read_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
+ // `read` is after `connect: Overlapped`.
+ (ptr as *mut Overlapped).wrapping_sub(1) as *const Inner
+ }
+
+ /// Same as [`ptr_from_conn_overlapped`] but for `Inner.write`.
+ unsafe fn ptr_from_write_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
+ // `read` is after `connect: Overlapped` and `read: Overlapped`.
+ (ptr as *mut Overlapped).wrapping_sub(2) as *const Inner
+ }
+
+ /// Issue a connection request with the specified overlapped operation.
+ ///
+ /// This function will issue a request to connect a client to this server,
+ /// returning immediately after starting the overlapped operation.
+ ///
+ /// If this function immediately succeeds then `Ok(true)` is returned. If
+ /// the overlapped operation is enqueued and pending, then `Ok(false)` is
+ /// returned. Otherwise an error is returned indicating what went wrong.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the
+ /// `overlapped` pointer is valid until the end of the I/O operation. The
+ /// kernel also requires that `overlapped` is unique for this I/O operation
+ /// and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that this pointer is
+ /// valid until the I/O operation is completed, typically via completion
+ /// ports and waiting to receive the completion notification on the port.
+ pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool> {
+ if ConnectNamedPipe(self.handle.raw(), overlapped) != 0 {
+ return Ok(true);
+ }
+
+ let err = io::Error::last_os_error();
+
+ match err.raw_os_error().map(|e| e as u32) {
+ Some(ERROR_PIPE_CONNECTED) => Ok(true),
+ Some(ERROR_NO_DATA) => Ok(true),
+ Some(ERROR_IO_PENDING) => Ok(false),
+ _ => Err(err),
+ }
+ }
+
+ /// Disconnects this named pipe from any connected client.
+ pub fn disconnect(&self) -> io::Result<()> {
+ if unsafe { DisconnectNamedPipe(self.handle.raw()) } == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(())
+ }
+ }
+
+ /// Issues an overlapped read operation to occur on this pipe.
+ ///
+ /// This function will issue an asynchronous read to occur in an overlapped
+ /// fashion, returning immediately. The `buf` provided will be filled in
+ /// with data and the request is tracked by the `overlapped` function
+ /// provided.
+ ///
+ /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
+ /// `n` is the number of bytes read. If an asynchronous operation is
+ /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
+ /// it is returned.
+ ///
+ /// When this operation completes (or if it completes immediately), another
+ /// mechanism must be used to learn how many bytes were transferred (such as
+ /// looking at the filed in the IOCP status message).
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the `buf` and
+ /// `overlapped` pointers to be valid until the end of the I/O operation.
+ /// The kernel also requires that `overlapped` is unique for this I/O
+ /// operation and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that the pointers are
+ /// valid until the I/O operation is completed, typically via completion
+ /// ports and waiting to receive the completion notification on the port.
+ pub unsafe fn read_overlapped(
+ &self,
+ buf: &mut [u8],
+ overlapped: *mut OVERLAPPED,
+ ) -> io::Result<Option<usize>> {
+ let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32;
+ let res = ReadFile(
+ self.handle.raw(),
+ buf.as_mut_ptr() as *mut _,
+ len,
+ std::ptr::null_mut(),
+ overlapped,
+ );
+ if res == 0 {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() != Some(ERROR_IO_PENDING as i32) {
+ return Err(err);
+ }
+ }
+
+ let mut bytes = 0;
+ let res = GetOverlappedResult(self.handle.raw(), overlapped, &mut bytes, 0);
+ if res == 0 {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) {
+ Ok(None)
+ } else {
+ Err(err)
+ }
+ } else {
+ Ok(Some(bytes as usize))
+ }
+ }
+
+ /// Issues an overlapped write operation to occur on this pipe.
+ ///
+ /// This function will issue an asynchronous write to occur in an overlapped
+ /// fashion, returning immediately. The `buf` provided will be filled in
+ /// with data and the request is tracked by the `overlapped` function
+ /// provided.
+ ///
+ /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
+ /// `n` is the number of bytes written. If an asynchronous operation is
+ /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
+ /// it is returned.
+ ///
+ /// When this operation completes (or if it completes immediately), another
+ /// mechanism must be used to learn how many bytes were transferred (such as
+ /// looking at the filed in the IOCP status message).
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the `buf` and
+ /// `overlapped` pointers to be valid until the end of the I/O operation.
+ /// The kernel also requires that `overlapped` is unique for this I/O
+ /// operation and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that the pointers are
+ /// valid until the I/O operation is completed, typically via completion
+ /// ports and waiting to receive the completion notification on the port.
+ pub unsafe fn write_overlapped(
+ &self,
+ buf: &[u8],
+ overlapped: *mut OVERLAPPED,
+ ) -> io::Result<Option<usize>> {
+ let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32;
+ let res = WriteFile(
+ self.handle.raw(),
+ buf.as_ptr() as *const _,
+ len,
+ std::ptr::null_mut(),
+ overlapped,
+ );
+ if res == 0 {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() != Some(ERROR_IO_PENDING as i32) {
+ return Err(err);
+ }
+ }
+
+ let mut bytes = 0;
+ let res = GetOverlappedResult(self.handle.raw(), overlapped, &mut bytes, 0);
+ if res == 0 {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) {
+ Ok(None)
+ } else {
+ Err(err)
+ }
+ } else {
+ Ok(Some(bytes as usize))
+ }
+ }
+
+ /// Calls the `GetOverlappedResult` function to get the result of an
+ /// overlapped operation for this handle.
+ ///
+ /// This function takes the `OVERLAPPED` argument which must have been used
+ /// to initiate an overlapped I/O operation, and returns either the
+ /// successful number of bytes transferred during the operation or an error
+ /// if one occurred.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe as `overlapped` must have previously been used
+ /// to execute an operation for this handle, and it must also be a valid
+ /// pointer to an `Overlapped` instance.
+ #[inline]
+ unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize> {
+ let mut transferred = 0;
+ let r = GetOverlappedResult(self.handle.raw(), overlapped, &mut transferred, 0);
+ if r == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(transferred as usize)
+ }
+ }
+}
+
+#[test]
+fn ptr_from() {
+ use std::mem::ManuallyDrop;
+ use std::ptr;
+
+ let pipe = unsafe { ManuallyDrop::new(NamedPipe::from_raw_handle(ptr::null_mut())) };
+ let inner: &Inner = &pipe.inner;
+ assert_eq!(
+ inner as *const Inner,
+ unsafe { Inner::ptr_from_conn_overlapped(&inner.connect as *const _ as *mut OVERLAPPED) },
+ "`ptr_from_conn_overlapped` incorrect"
+ );
+ assert_eq!(
+ inner as *const Inner,
+ unsafe { Inner::ptr_from_read_overlapped(&inner.read as *const _ as *mut OVERLAPPED) },
+ "`ptr_from_read_overlapped` incorrect"
+ );
+ assert_eq!(
+ inner as *const Inner,
+ unsafe { Inner::ptr_from_write_overlapped(&inner.write as *const _ as *mut OVERLAPPED) },
+ "`ptr_from_write_overlapped` incorrect"
+ );
+}
+
struct Io {
// Uniquely identifies the selector associated with this named pipe
cp: Option<Arc<CompletionPort>>,
// Token used to identify events
token: Option<Token>,
read: State,
- read_interest: bool,
write: State,
- write_interest: bool,
connect_error: Option<io::Error>,
}
@@ -129,10 +355,30 @@ impl NamedPipe {
/// Creates a new named pipe at the specified `addr` given a "reasonable
/// set" of initial configuration options.
pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
- let pipe = pipe::NamedPipe::new(addr)?;
- // Safety: nothing actually unsafe about this. The trait fn includes
- // `unsafe`.
- Ok(unsafe { NamedPipe::from_raw_handle(pipe.into_raw_handle()) })
+ use std::os::windows::ffi::OsStrExt;
+ let name: Vec<_> = addr.as_ref().encode_wide().chain(Some(0)).collect();
+
+ // Safety: syscall
+ let h = unsafe {
+ CreateNamedPipeW(
+ name.as_ptr(),
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE,
+ PIPE_UNLIMITED_INSTANCES,
+ 65536,
+ 65536,
+ 0,
+ std::ptr::null_mut(),
+ )
+ };
+
+ if h == INVALID_HANDLE_VALUE {
+ Err(io::Error::last_os_error())
+ } else {
+ // Safety: nothing actually unsafe about this. The trait fn includes
+ // `unsafe`.
+ Ok(unsafe { Self::from_raw_handle(h as RawHandle) })
+ }
}
/// Attempts to call `ConnectNamedPipe`, if possible.
@@ -167,7 +413,7 @@ impl NamedPipe {
// internal state accordingly.
let res = unsafe {
let overlapped = self.inner.connect.as_ptr() as *mut _;
- self.inner.handle.connect_overlapped(overlapped)
+ self.inner.connect_overlapped(overlapped)
};
match res {
@@ -219,7 +465,7 @@ impl NamedPipe {
/// After a `disconnect` is issued, then a `connect` may be called again to
/// connect to another client.
pub fn disconnect(&self) -> io::Result<()> {
- self.inner.handle.disconnect()
+ self.inner.disconnect()
}
}
@@ -227,10 +473,7 @@ impl FromRawHandle for NamedPipe {
unsafe fn from_raw_handle(handle: RawHandle) -> NamedPipe {
NamedPipe {
inner: Arc::new(Inner {
- // Safety: not really unsafe
- handle: pipe::NamedPipe::from_raw_handle(handle),
- // transmutes to straddle winapi versions (mio 0.6 is on an
- // older winapi)
+ handle: Handle::new(handle as HANDLE),
connect: Overlapped::new(connect_done),
connecting: AtomicBool::new(false),
read: Overlapped::new(read_done),
@@ -239,9 +482,7 @@ impl FromRawHandle for NamedPipe {
cp: None,
token: None,
read: State::None,
- read_interest: false,
write: State::None,
- write_interest: false,
connect_error: None,
}),
pool: Mutex::new(BufferPool::with_capacity(2)),
@@ -356,12 +597,7 @@ impl<'a> Write for &'a NamedPipe {
}
impl Source for NamedPipe {
- fn register(
- &mut self,
- registry: &Registry,
- token: Token,
- interest: Interest,
- ) -> io::Result<()> {
+ fn register(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()> {
let mut io = self.inner.io.lock().unwrap();
io.check_association(registry, false)?;
@@ -374,18 +610,15 @@ impl Source for NamedPipe {
}
if io.cp.is_none() {
- io.cp = Some(poll::selector(registry).clone_port());
+ let selector = registry.selector();
+
+ io.cp = Some(selector.clone_port());
let inner_token = NEXT_TOKEN.fetch_add(2, Relaxed) + 2;
- poll::selector(registry)
- .inner
- .cp
- .add_handle(inner_token, &self.inner.handle)?;
+ selector.inner.cp.add_handle(inner_token, self)?;
}
io.token = Some(token);
- io.read_interest = interest.is_readable();
- io.write_interest = interest.is_writable();
drop(io);
Inner::post_register(&self.inner, None);
@@ -393,19 +626,12 @@ impl Source for NamedPipe {
Ok(())
}
- fn reregister(
- &mut self,
- registry: &Registry,
- token: Token,
- interest: Interest,
- ) -> io::Result<()> {
+ fn reregister(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()> {
let mut io = self.inner.io.lock().unwrap();
io.check_association(registry, true)?;
io.token = Some(token);
- io.read_interest = interest.is_readable();
- io.write_interest = interest.is_writable();
drop(io);
Inner::post_register(&self.inner, None);
@@ -432,7 +658,7 @@ impl Source for NamedPipe {
impl AsRawHandle for NamedPipe {
fn as_raw_handle(&self) -> RawHandle {
- self.inner.handle.as_raw_handle()
+ self.inner.handle.raw() as RawHandle
}
}
@@ -452,12 +678,8 @@ impl Drop for NamedPipe {
}
let io = self.inner.io.lock().unwrap();
-
- match io.read {
- State::Pending(..) => {
- drop(cancel(&self.inner.handle, &self.inner.read));
- }
- _ => {}
+ if let State::Pending(..) = io.read {
+ drop(cancel(&self.inner.handle, &self.inner.read));
}
}
}
@@ -483,7 +705,7 @@ impl Inner {
let e = unsafe {
let overlapped = me.read.as_ptr() as *mut _;
let slice = slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity());
- me.handle.read_overlapped(slice, overlapped)
+ me.read_overlapped(slice, overlapped)
};
match e {
@@ -522,7 +744,7 @@ impl Inner {
// Very similar to `schedule_read` above, just done for the write half.
let e = unsafe {
let overlapped = me.write.as_ptr() as *mut _;
- me.handle.write_overlapped(&buf[pos..], overlapped)
+ me.write_overlapped(&buf[pos..], overlapped)
};
// See `connect` above for the rationale behind `forget`
@@ -572,7 +794,8 @@ impl Inner {
fn post_register(me: &Arc<Inner>, mut events: Option<&mut Vec<Event>>) {
let mut io = me.io.lock().unwrap();
- if Inner::schedule_read(&me, &mut io, events.as_mut().map(|ptr| &mut **ptr)) {
+ #[allow(clippy::needless_option_as_deref)]
+ if Inner::schedule_read(me, &mut io, events.as_deref_mut()) {
if let State::None = io.write {
io.notify_writable(events);
}
@@ -588,8 +811,8 @@ impl Inner {
}
}
-unsafe fn cancel<T: AsRawHandle>(handle: &T, overlapped: &Overlapped) -> io::Result<()> {
- let ret = CancelIoEx(handle.as_raw_handle(), overlapped.as_ptr() as *mut _);
+unsafe fn cancel(handle: &Handle, overlapped: &Overlapped) -> io::Result<()> {
+ let ret = CancelIoEx(handle.raw(), overlapped.as_ptr());
// `CancelIoEx` returns 0 on error:
// https://docs.microsoft.com/en-us/windows/win32/fileio/cancelioex-func
if ret == 0 {
@@ -605,7 +828,7 @@ fn connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// Acquire the `Arc<Inner>`. Note that we should be guaranteed that
// the refcount is available to us due to the `mem::forget` in
// `connect` above.
- let me = unsafe { overlapped2arc!(status.overlapped(), Inner, connect) };
+ let me = unsafe { Arc::from_raw(Inner::ptr_from_conn_overlapped(status.overlapped())) };
// Flag ourselves as no longer using the `connect` overlapped instances.
let prev = me.connecting.swap(false, SeqCst);
@@ -614,7 +837,7 @@ fn connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// Stash away our connect error if one happened
debug_assert_eq!(status.bytes_transferred(), 0);
unsafe {
- match me.handle.result(status.overlapped()) {
+ match me.result(status.overlapped()) {
Ok(n) => debug_assert_eq!(n, 0),
Err(e) => me.io.lock().unwrap().connect_error = Some(e),
}
@@ -631,7 +854,7 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// Acquire the `FromRawArc<Inner>`. Note that we should be guaranteed that
// the refcount is available to us due to the `mem::forget` in
// `schedule_read` above.
- let me = unsafe { overlapped2arc!(status.overlapped(), Inner, read) };
+ let me = unsafe { Arc::from_raw(Inner::ptr_from_read_overlapped(status.overlapped())) };
// Move from the `Pending` to `Ok` state.
let mut io = me.io.lock().unwrap();
@@ -640,7 +863,7 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
_ => unreachable!(),
};
unsafe {
- match me.handle.result(status.overlapped()) {
+ match me.result(status.overlapped()) {
Ok(n) => {
debug_assert_eq!(status.bytes_transferred() as usize, n);
buf.set_len(status.bytes_transferred() as usize);
@@ -663,7 +886,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// Acquire the `Arc<Inner>`. Note that we should be guaranteed that
// the refcount is available to us due to the `mem::forget` in
// `schedule_write` above.
- let me = unsafe { overlapped2arc!(status.overlapped(), Inner, write) };
+ let me = unsafe { Arc::from_raw(Inner::ptr_from_write_overlapped(status.overlapped())) };
// Make the state change out of `Pending`. If we wrote the entire buffer
// then we're writable again and otherwise we schedule another write.
@@ -680,7 +903,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
};
unsafe {
- match me.handle.result(status.overlapped()) {
+ match me.result(status.overlapped()) {
Ok(n) => {
debug_assert_eq!(status.bytes_transferred() as usize, n);
let new_pos = pos + (status.bytes_transferred() as usize);
@@ -703,7 +926,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
impl Io {
fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> {
match self.cp {
- Some(ref cp) if !poll::selector(registry).same_port(cp) => Err(io::Error::new(
+ Some(ref cp) if !registry.selector().same_port(cp) => Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"I/O source already registered with a different `Registry`",
)),
diff --git a/src/sys/windows/net.rs b/src/sys/windows/net.rs
index 2de98fa..44f459a 100644
--- a/src/sys/windows/net.rs
+++ b/src/sys/windows/net.rs
@@ -3,15 +3,14 @@ use std::mem;
use std::net::SocketAddr;
use std::sync::Once;
-use winapi::ctypes::c_int;
-use winapi::shared::inaddr::{in_addr_S_un, IN_ADDR};
-use winapi::shared::in6addr::{in6_addr_u, IN6_ADDR};
-use winapi::shared::ws2def::{AF_INET, AF_INET6, ADDRESS_FAMILY, SOCKADDR, SOCKADDR_IN};
-use winapi::shared::ws2ipdef::{SOCKADDR_IN6_LH, SOCKADDR_IN6_LH_u};
-use winapi::um::winsock2::{ioctlsocket, socket, FIONBIO, INVALID_SOCKET, SOCKET};
+use windows_sys::Win32::Networking::WinSock::{
+ closesocket, ioctlsocket, socket, AF_INET, AF_INET6, FIONBIO, IN6_ADDR, IN6_ADDR_0,
+ INVALID_SOCKET, IN_ADDR, IN_ADDR_0, SOCKADDR, SOCKADDR_IN, SOCKADDR_IN6, SOCKADDR_IN6_0,
+ SOCKET,
+};
/// Initialise the network stack for Windows.
-pub(crate) fn init() {
+fn init() {
static INIT: Once = Once::new();
INIT.call_once(|| {
// Let standard library call `WSAStartup` for us, we can't do it
@@ -22,26 +21,30 @@ pub(crate) fn init() {
}
/// Create a new non-blocking socket.
-pub(crate) fn new_ip_socket(addr: SocketAddr, socket_type: c_int) -> io::Result<SOCKET> {
- use winapi::um::winsock2::{PF_INET, PF_INET6};
-
+pub(crate) fn new_ip_socket(addr: SocketAddr, socket_type: u16) -> io::Result<SOCKET> {
let domain = match addr {
- SocketAddr::V4(..) => PF_INET,
- SocketAddr::V6(..) => PF_INET6,
+ SocketAddr::V4(..) => AF_INET,
+ SocketAddr::V6(..) => AF_INET6,
};
- new_socket(domain, socket_type)
+ new_socket(domain.into(), socket_type)
}
-pub(crate) fn new_socket(domain: c_int, socket_type: c_int) -> io::Result<SOCKET> {
- syscall!(
- socket(domain, socket_type, 0),
+pub(crate) fn new_socket(domain: u32, socket_type: u16) -> io::Result<SOCKET> {
+ init();
+
+ let socket = syscall!(
+ socket(domain as i32, socket_type as i32, 0),
PartialEq::eq,
INVALID_SOCKET
- )
- .and_then(|socket| {
- syscall!(ioctlsocket(socket, FIONBIO, &mut 1), PartialEq::ne, 0).map(|_| socket as SOCKET)
- })
+ )?;
+
+ if let Err(err) = syscall!(ioctlsocket(socket, FIONBIO, &mut 1), PartialEq::ne, 0) {
+ let _ = unsafe { closesocket(socket) };
+ return Err(err);
+ }
+
+ Ok(socket as SOCKET)
}
/// A type with the same memory layout as `SOCKADDR`. Used in converting Rust level
@@ -51,7 +54,7 @@ pub(crate) fn new_socket(domain: c_int, socket_type: c_int) -> io::Result<SOCKET
#[repr(C)]
pub(crate) union SocketAddrCRepr {
v4: SOCKADDR_IN,
- v6: SOCKADDR_IN6_LH,
+ v6: SOCKADDR_IN6,
}
impl SocketAddrCRepr {
@@ -60,49 +63,49 @@ impl SocketAddrCRepr {
}
}
-pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, c_int) {
+pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, i32) {
match addr {
SocketAddr::V4(ref addr) => {
// `s_addr` is stored as BE on all machine and the array is in BE order.
// So the native endian conversion method is used so that it's never swapped.
let sin_addr = unsafe {
- let mut s_un = mem::zeroed::<in_addr_S_un>();
- *s_un.S_addr_mut() = u32::from_ne_bytes(addr.ip().octets());
+ let mut s_un = mem::zeroed::<IN_ADDR_0>();
+ s_un.S_addr = u32::from_ne_bytes(addr.ip().octets());
IN_ADDR { S_un: s_un }
};
let sockaddr_in = SOCKADDR_IN {
- sin_family: AF_INET as ADDRESS_FAMILY,
+ sin_family: AF_INET as u16, // 1
sin_port: addr.port().to_be(),
sin_addr,
sin_zero: [0; 8],
};
let sockaddr = SocketAddrCRepr { v4: sockaddr_in };
- (sockaddr, mem::size_of::<SOCKADDR_IN>() as c_int)
- },
+ (sockaddr, mem::size_of::<SOCKADDR_IN>() as i32)
+ }
SocketAddr::V6(ref addr) => {
let sin6_addr = unsafe {
- let mut u = mem::zeroed::<in6_addr_u>();
- *u.Byte_mut() = addr.ip().octets();
+ let mut u = mem::zeroed::<IN6_ADDR_0>();
+ u.Byte = addr.ip().octets();
IN6_ADDR { u }
};
let u = unsafe {
- let mut u = mem::zeroed::<SOCKADDR_IN6_LH_u>();
- *u.sin6_scope_id_mut() = addr.scope_id();
+ let mut u = mem::zeroed::<SOCKADDR_IN6_0>();
+ u.sin6_scope_id = addr.scope_id();
u
};
- let sockaddr_in6 = SOCKADDR_IN6_LH {
- sin6_family: AF_INET6 as ADDRESS_FAMILY,
+ let sockaddr_in6 = SOCKADDR_IN6 {
+ sin6_family: AF_INET6 as u16, // 23
sin6_port: addr.port().to_be(),
sin6_addr,
sin6_flowinfo: addr.flowinfo(),
- u,
+ Anonymous: u,
};
let sockaddr = SocketAddrCRepr { v6: sockaddr_in6 };
- (sockaddr, mem::size_of::<SOCKADDR_IN6_LH>() as c_int)
+ (sockaddr, mem::size_of::<SOCKADDR_IN6>() as i32)
}
}
}
diff --git a/src/sys/windows/overlapped.rs b/src/sys/windows/overlapped.rs
index 837b78b..d1456de 100644
--- a/src/sys/windows/overlapped.rs
+++ b/src/sys/windows/overlapped.rs
@@ -3,13 +3,11 @@ use crate::sys::windows::Event;
use std::cell::UnsafeCell;
use std::fmt;
-#[cfg(feature = "os-ext")]
-use winapi::um::minwinbase::OVERLAPPED;
-use winapi::um::minwinbase::OVERLAPPED_ENTRY;
+use windows_sys::Win32::System::IO::{OVERLAPPED, OVERLAPPED_ENTRY};
#[repr(C)]
pub(crate) struct Overlapped {
- inner: UnsafeCell<miow::Overlapped>,
+ inner: UnsafeCell<OVERLAPPED>,
pub(crate) callback: fn(&OVERLAPPED_ENTRY, Option<&mut Vec<Event>>),
}
@@ -17,13 +15,13 @@ pub(crate) struct Overlapped {
impl Overlapped {
pub(crate) fn new(cb: fn(&OVERLAPPED_ENTRY, Option<&mut Vec<Event>>)) -> Overlapped {
Overlapped {
- inner: UnsafeCell::new(miow::Overlapped::zero()),
+ inner: UnsafeCell::new(unsafe { std::mem::zeroed() }),
callback: cb,
}
}
pub(crate) fn as_ptr(&self) -> *const OVERLAPPED {
- unsafe { (*self.inner.get()).raw() }
+ self.inner.get()
}
}
diff --git a/src/sys/windows/selector.rs b/src/sys/windows/selector.rs
index 572a9a9..9f3cf68 100644
--- a/src/sys/windows/selector.rs
+++ b/src/sys/windows/selector.rs
@@ -10,8 +10,9 @@ cfg_net! {
use crate::Interest;
}
-use miow::iocp::{CompletionPort, CompletionStatus};
+use super::iocp::{CompletionPort, CompletionStatus};
use std::collections::VecDeque;
+use std::ffi::c_void;
use std::io;
use std::marker::PhantomPinned;
use std::os::windows::io::RawSocket;
@@ -21,14 +22,15 @@ use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
-use winapi::shared::ntdef::NT_SUCCESS;
-use winapi::shared::ntdef::{HANDLE, PVOID};
-use winapi::shared::ntstatus::STATUS_CANCELLED;
-use winapi::shared::winerror::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING, WAIT_TIMEOUT};
-use winapi::um::minwinbase::OVERLAPPED;
+
+use windows_sys::Win32::Foundation::{
+ ERROR_INVALID_HANDLE, ERROR_IO_PENDING, HANDLE, STATUS_CANCELLED, WAIT_TIMEOUT,
+};
+use windows_sys::Win32::System::IO::OVERLAPPED;
#[derive(Debug)]
struct AfdGroup {
+ #[cfg_attr(not(feature = "net"), allow(dead_code))]
cp: Arc<CompletionPort>,
afd_group: Mutex<Vec<Arc<Afd>>>,
}
@@ -43,7 +45,7 @@ impl AfdGroup {
pub fn release_unused_afd(&self) {
let mut afd_group = self.afd_group.lock().unwrap();
- afd_group.retain(|g| Arc::strong_count(&g) > 1);
+ afd_group.retain(|g| Arc::strong_count(g) > 1);
}
}
@@ -57,7 +59,7 @@ cfg_io_source! {
self._alloc_afd_group(&mut afd_group)?;
} else {
// + 1 reference in Vec
- if Arc::strong_count(afd_group.last().unwrap()) >= POLL_GROUP__MAX_GROUP_SIZE + 1 {
+ if Arc::strong_count(afd_group.last().unwrap()) > POLL_GROUP__MAX_GROUP_SIZE {
self._alloc_afd_group(&mut afd_group)?;
}
}
@@ -93,7 +95,6 @@ pub struct SockState {
poll_info: AfdPollInfo,
afd: Arc<Afd>,
- raw_socket: RawSocket,
base_socket: RawSocket,
user_evts: u32,
@@ -107,7 +108,7 @@ pub struct SockState {
// last raw os error
error: Option<i32>,
- pinned: PhantomPinned,
+ _pinned: PhantomPinned,
}
impl SockState {
@@ -141,7 +142,7 @@ impl SockState {
/* No poll operation is pending; start one. */
self.poll_info.exclusive = 0;
self.poll_info.number_of_handles = 1;
- *unsafe { self.poll_info.timeout.QuadPart_mut() } = std::i64::MAX;
+ self.poll_info.timeout = i64::MAX;
self.poll_info.handles[0].handle = self.base_socket as HANDLE;
self.poll_info.handles[0].status = 0;
self.poll_info.handles[0].events = self.user_evts | afd::POLL_LOCAL_CLOSE;
@@ -204,9 +205,9 @@ impl SockState {
unsafe {
if self.delete_pending {
return None;
- } else if self.iosb.u.Status == STATUS_CANCELLED {
+ } else if self.iosb.Anonymous.Status == STATUS_CANCELLED {
/* The poll request was cancelled by CancelIoEx. */
- } else if !NT_SUCCESS(self.iosb.u.Status) {
+ } else if self.iosb.Anonymous.Status < 0 {
/* The overlapped request itself failed in an unexpected way. */
afd_events = afd::POLL_CONNECT_FAIL;
} else if self.poll_info.number_of_handles < 1 {
@@ -263,7 +264,6 @@ cfg_io_source! {
iosb: IoStatusBlock::zeroed(),
poll_info: AfdPollInfo::zeroed(),
afd,
- raw_socket,
base_socket: get_base_socket(raw_socket)?,
user_evts: 0,
pending_evts: 0,
@@ -271,7 +271,7 @@ cfg_io_source! {
poll_status: SockPollStatus::Idle,
delete_pending: false,
error: None,
- pinned: PhantomPinned,
+ _pinned: PhantomPinned,
})
}
@@ -296,7 +296,7 @@ impl Drop for SockState {
/// Converts the pointer to a `SockState` into a raw pointer.
/// To revert see `from_overlapped`.
-fn into_overlapped(sock_state: Pin<Arc<Mutex<SockState>>>) -> PVOID {
+fn into_overlapped(sock_state: Pin<Arc<Mutex<SockState>>>) -> *mut c_void {
let overlapped_ptr: *const Mutex<SockState> =
unsafe { Arc::into_raw(Pin::into_inner_unchecked(sock_state)) };
overlapped_ptr as *mut _
@@ -448,11 +448,11 @@ impl SelectorInner {
if len == 0 {
continue;
}
- return Ok(());
+ break Ok(());
}
} else {
self.select2(&mut events.statuses, &mut events.events, timeout)?;
- return Ok(());
+ Ok(())
}
}
@@ -462,7 +462,7 @@ impl SelectorInner {
events: &mut Vec<Event>,
timeout: Option<Duration>,
) -> io::Result<usize> {
- assert_eq!(self.is_polling.swap(true, Ordering::AcqRel), false);
+ assert!(!self.is_polling.swap(true, Ordering::AcqRel));
unsafe { self.update_sockets_events() }?;
@@ -482,7 +482,7 @@ impl SelectorInner {
for sock in update_queue.iter_mut() {
let mut sock_internal = sock.lock().unwrap();
if !sock_internal.is_pending_deletion() {
- sock_internal.update(&sock)?;
+ sock_internal.update(sock)?;
}
}
@@ -518,12 +518,9 @@ impl SelectorInner {
let sock_state = from_overlapped(iocp_event.overlapped());
let mut sock_guard = sock_state.lock().unwrap();
- match sock_guard.feed_event() {
- Some(e) => {
- events.push(e);
- n += 1;
- }
- None => {}
+ if let Some(e) = sock_guard.feed_event() {
+ events.push(e);
+ n += 1;
}
if !sock_guard.is_pending_deletion() {
@@ -538,9 +535,12 @@ impl SelectorInner {
cfg_io_source! {
use std::mem::size_of;
use std::ptr::null_mut;
- use winapi::um::mswsock;
- use winapi::um::winsock2::WSAGetLastError;
- use winapi::um::winsock2::{WSAIoctl, SOCKET_ERROR};
+
+ use windows_sys::Win32::Networking::WinSock::{
+ WSAGetLastError, WSAIoctl, SIO_BASE_HANDLE, SIO_BSP_HANDLE,
+ SIO_BSP_HANDLE_POLL, SIO_BSP_HANDLE_SELECT, SOCKET_ERROR,
+ };
+
impl SelectorInner {
fn register(
@@ -644,7 +644,7 @@ cfg_io_source! {
ioctl,
null_mut(),
0,
- &mut base_socket as *mut _ as PVOID,
+ &mut base_socket as *mut _ as *mut c_void,
size_of::<RawSocket>() as u32,
&mut bytes,
null_mut(),
@@ -659,7 +659,7 @@ cfg_io_source! {
}
fn get_base_socket(raw_socket: RawSocket) -> io::Result<RawSocket> {
- let res = try_get_base_socket(raw_socket, mswsock::SIO_BASE_HANDLE);
+ let res = try_get_base_socket(raw_socket, SIO_BASE_HANDLE);
if let Ok(base_socket) = res {
return Ok(base_socket);
}
@@ -670,9 +670,9 @@ cfg_io_source! {
// However, at least one known LSP deliberately breaks it, so we try
// some alternative IOCTLs, starting with the most appropriate one.
for &ioctl in &[
- mswsock::SIO_BSP_HANDLE_SELECT,
- mswsock::SIO_BSP_HANDLE_POLL,
- mswsock::SIO_BSP_HANDLE,
+ SIO_BSP_HANDLE_SELECT,
+ SIO_BSP_HANDLE_POLL,
+ SIO_BSP_HANDLE,
] {
if let Ok(base_socket) = try_get_base_socket(raw_socket, ioctl) {
// Since we know now that we're dealing with an LSP (otherwise
diff --git a/src/sys/windows/tcp.rs b/src/sys/windows/tcp.rs
index 6757b44..addd1e8 100644
--- a/src/sys/windows/tcp.rs
+++ b/src/sys/windows/tcp.rs
@@ -1,321 +1,62 @@
use std::io;
-use std::convert::TryInto;
-use std::mem::size_of;
-use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
-use std::time::Duration;
-use std::ptr;
-use std::os::windows::io::FromRawSocket;
-use std::os::windows::raw::SOCKET as StdSocket; // winapi uses usize, stdlib uses u32/u64.
+use std::net::{self, SocketAddr};
+use std::os::windows::io::AsRawSocket;
-use winapi::ctypes::{c_char, c_int, c_ushort, c_ulong};
-use winapi::shared::ws2def::{SOCKADDR_STORAGE, AF_INET, AF_INET6, SOCKADDR_IN};
-use winapi::shared::ws2ipdef::SOCKADDR_IN6_LH;
-use winapi::shared::mstcpip;
+use windows_sys::Win32::Networking::WinSock::{self, SOCKET, SOCKET_ERROR, SOCK_STREAM};
-use winapi::shared::minwindef::{BOOL, TRUE, FALSE, DWORD, LPVOID, LPDWORD};
-use winapi::um::winsock2::{
- self, closesocket, linger, setsockopt, getsockopt, getsockname, PF_INET, PF_INET6, SOCKET, SOCKET_ERROR,
- SOCK_STREAM, SOL_SOCKET, SO_LINGER, SO_REUSEADDR, SO_RCVBUF, SO_SNDBUF, SO_KEEPALIVE, WSAIoctl, LPWSAOVERLAPPED,
-};
+use crate::sys::windows::net::{new_ip_socket, socket_addr};
-use crate::sys::windows::net::{init, new_socket, socket_addr};
-use crate::net::TcpKeepalive;
-
-pub(crate) type TcpSocket = SOCKET;
-
-pub(crate) fn new_v4_socket() -> io::Result<TcpSocket> {
- init();
- new_socket(PF_INET, SOCK_STREAM)
+pub(crate) fn new_for_addr(address: SocketAddr) -> io::Result<SOCKET> {
+ new_ip_socket(address, SOCK_STREAM)
}
-pub(crate) fn new_v6_socket() -> io::Result<TcpSocket> {
- init();
- new_socket(PF_INET6, SOCK_STREAM)
-}
-
-pub(crate) fn bind(socket: TcpSocket, addr: SocketAddr) -> io::Result<()> {
- use winsock2::bind;
+pub(crate) fn bind(socket: &net::TcpListener, addr: SocketAddr) -> io::Result<()> {
+ use WinSock::bind;
let (raw_addr, raw_addr_length) = socket_addr(&addr);
syscall!(
- bind(socket, raw_addr.as_ptr(), raw_addr_length),
+ bind(
+ socket.as_raw_socket() as _,
+ raw_addr.as_ptr(),
+ raw_addr_length
+ ),
PartialEq::eq,
SOCKET_ERROR
)?;
Ok(())
}
-pub(crate) fn connect(socket: TcpSocket, addr: SocketAddr) -> io::Result<net::TcpStream> {
- use winsock2::connect;
+pub(crate) fn connect(socket: &net::TcpStream, addr: SocketAddr) -> io::Result<()> {
+ use WinSock::connect;
let (raw_addr, raw_addr_length) = socket_addr(&addr);
-
let res = syscall!(
- connect(socket, raw_addr.as_ptr(), raw_addr_length),
+ connect(
+ socket.as_raw_socket() as _,
+ raw_addr.as_ptr(),
+ raw_addr_length
+ ),
PartialEq::eq,
SOCKET_ERROR
);
match res {
- Err(err) if err.kind() != io::ErrorKind::WouldBlock => {
- Err(err)
- }
- _ => {
- Ok(unsafe { net::TcpStream::from_raw_socket(socket as StdSocket) })
- }
+ Err(err) if err.kind() != io::ErrorKind::WouldBlock => Err(err),
+ _ => Ok(()),
}
}
-pub(crate) fn listen(socket: TcpSocket, backlog: u32) -> io::Result<net::TcpListener> {
- use winsock2::listen;
+pub(crate) fn listen(socket: &net::TcpListener, backlog: u32) -> io::Result<()> {
use std::convert::TryInto;
+ use WinSock::listen;
let backlog = backlog.try_into().unwrap_or(i32::max_value());
- syscall!(listen(socket, backlog), PartialEq::eq, SOCKET_ERROR)?;
- Ok(unsafe { net::TcpListener::from_raw_socket(socket as StdSocket) })
-}
-
-pub(crate) fn close(socket: TcpSocket) {
- let _ = unsafe { closesocket(socket) };
-}
-
-pub(crate) fn set_reuseaddr(socket: TcpSocket, reuseaddr: bool) -> io::Result<()> {
- let val: BOOL = if reuseaddr { TRUE } else { FALSE };
-
- match unsafe { setsockopt(
- socket,
- SOL_SOCKET,
- SO_REUSEADDR,
- &val as *const _ as *const c_char,
- size_of::<BOOL>() as c_int,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(()),
- }
-}
-
-pub(crate) fn get_reuseaddr(socket: TcpSocket) -> io::Result<bool> {
- let mut optval: c_char = 0;
- let mut optlen = size_of::<BOOL>() as c_int;
-
- match unsafe { getsockopt(
- socket,
- SOL_SOCKET,
- SO_REUSEADDR,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(optval != 0),
- }
-}
-
-pub(crate) fn get_localaddr(socket: TcpSocket) -> io::Result<SocketAddr> {
- let mut storage: SOCKADDR_STORAGE = unsafe { std::mem::zeroed() };
- let mut length = std::mem::size_of_val(&storage) as c_int;
-
- match unsafe { getsockname(
- socket,
- &mut storage as *mut _ as *mut _,
- &mut length
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => {
- if storage.ss_family as c_int == AF_INET {
- // Safety: if the ss_family field is AF_INET then storage must be a sockaddr_in.
- let addr: &SOCKADDR_IN = unsafe { &*(&storage as *const _ as *const SOCKADDR_IN) };
- let ip_bytes = unsafe { addr.sin_addr.S_un.S_un_b() };
- let ip = Ipv4Addr::from([ip_bytes.s_b1, ip_bytes.s_b2, ip_bytes.s_b3, ip_bytes.s_b4]);
- let port = u16::from_be(addr.sin_port);
- Ok(SocketAddr::V4(SocketAddrV4::new(ip, port)))
- } else if storage.ss_family as c_int == AF_INET6 {
- // Safety: if the ss_family field is AF_INET6 then storage must be a sockaddr_in6.
- let addr: &SOCKADDR_IN6_LH = unsafe { &*(&storage as *const _ as *const SOCKADDR_IN6_LH) };
- let ip = Ipv6Addr::from(*unsafe { addr.sin6_addr.u.Byte() });
- let port = u16::from_be(addr.sin6_port);
- let scope_id = unsafe { *addr.u.sin6_scope_id() };
- Ok(SocketAddr::V6(SocketAddrV6::new(ip, port, addr.sin6_flowinfo, scope_id)))
- } else {
- Err(std::io::ErrorKind::InvalidInput.into())
- }
- },
- }
-}
-
-pub(crate) fn set_linger(socket: TcpSocket, dur: Option<Duration>) -> io::Result<()> {
- let val: linger = linger {
- l_onoff: if dur.is_some() { 1 } else { 0 },
- l_linger: dur.map(|dur| dur.as_secs() as c_ushort).unwrap_or_default(),
- };
-
- match unsafe { setsockopt(
- socket,
- SOL_SOCKET,
- SO_LINGER,
- &val as *const _ as *const c_char,
- size_of::<linger>() as c_int,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(()),
- }
-}
-
-pub(crate) fn get_linger(socket: TcpSocket) -> io::Result<Option<Duration>> {
- let mut val: linger = unsafe { std::mem::zeroed() };
- let mut len = size_of::<linger>() as c_int;
-
- match unsafe { getsockopt(
- socket,
- SOL_SOCKET,
- SO_LINGER,
- &mut val as *mut _ as *mut _,
- &mut len,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => {
- if val.l_onoff == 0 {
- Ok(None)
- } else {
- Ok(Some(Duration::from_secs(val.l_linger as u64)))
- }
- },
- }
-}
-
-
-pub(crate) fn set_recv_buffer_size(socket: TcpSocket, size: u32) -> io::Result<()> {
- let size = size.try_into().ok().unwrap_or_else(i32::max_value);
- match unsafe { setsockopt(
- socket,
- SOL_SOCKET,
- SO_RCVBUF,
- &size as *const _ as *const c_char,
- size_of::<c_int>() as c_int
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(()),
- }
-}
-
-pub(crate) fn get_recv_buffer_size(socket: TcpSocket) -> io::Result<u32> {
- let mut optval: c_int = 0;
- let mut optlen = size_of::<c_int>() as c_int;
- match unsafe { getsockopt(
- socket,
- SOL_SOCKET,
- SO_RCVBUF,
- &mut optval as *mut _ as *mut _,
- &mut optlen as *mut _,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(optval as u32),
- }
-}
-
-pub(crate) fn set_send_buffer_size(socket: TcpSocket, size: u32) -> io::Result<()> {
- let size = size.try_into().ok().unwrap_or_else(i32::max_value);
- match unsafe { setsockopt(
- socket,
- SOL_SOCKET,
- SO_SNDBUF,
- &size as *const _ as *const c_char,
- size_of::<c_int>() as c_int
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(()),
- }
-}
-
-pub(crate) fn get_send_buffer_size(socket: TcpSocket) -> io::Result<u32> {
- let mut optval: c_int = 0;
- let mut optlen = size_of::<c_int>() as c_int;
- match unsafe { getsockopt(
- socket,
- SOL_SOCKET,
- SO_SNDBUF,
- &mut optval as *mut _ as *mut _,
- &mut optlen as *mut _,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(optval as u32),
- }
-}
-
-pub(crate) fn set_keepalive(socket: TcpSocket, keepalive: bool) -> io::Result<()> {
- let val: BOOL = if keepalive { TRUE } else { FALSE };
- match unsafe { setsockopt(
- socket,
- SOL_SOCKET,
- SO_KEEPALIVE,
- &val as *const _ as *const c_char,
- size_of::<BOOL>() as c_int
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(()),
- }
-}
-
-pub(crate) fn get_keepalive(socket: TcpSocket) -> io::Result<bool> {
- let mut optval: c_char = 0;
- let mut optlen = size_of::<BOOL>() as c_int;
-
- match unsafe { getsockopt(
- socket,
- SOL_SOCKET,
- SO_KEEPALIVE,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(optval != FALSE as c_char),
- }
-}
-
-pub(crate) fn set_keepalive_params(socket: TcpSocket, keepalive: TcpKeepalive) -> io::Result<()> {
- /// Windows configures keepalive time/interval in a u32 of milliseconds.
- fn dur_to_ulong_ms(dur: Duration) -> c_ulong {
- dur.as_millis().try_into().ok().unwrap_or_else(u32::max_value)
- }
-
- // If any of the fields on the `tcp_keepalive` struct were not provided by
- // the user, just leaving them zero will clobber any existing value.
- // Unfortunately, we can't access the current value, so we will use the
- // defaults if a value for the time or interval was not not provided.
- let time = keepalive.time.unwrap_or_else(|| {
- // The default value is two hours, as per
- // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-keepalive-vals
- let two_hours = 2 * 60 * 60;
- Duration::from_secs(two_hours)
- });
-
- let interval = keepalive.interval.unwrap_or_else(|| {
- // The default value is one second, as per
- // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-keepalive-vals
- Duration::from_secs(1)
- });
-
- let mut keepalive = mstcpip::tcp_keepalive {
- // Enable keepalive
- onoff: 1,
- keepalivetime: dur_to_ulong_ms(time),
- keepaliveinterval: dur_to_ulong_ms(interval),
- };
-
- let mut out = 0;
- match unsafe { WSAIoctl(
- socket,
- mstcpip::SIO_KEEPALIVE_VALS,
- &mut keepalive as *mut _ as LPVOID,
- size_of::<mstcpip::tcp_keepalive>() as DWORD,
- ptr::null_mut() as LPVOID,
- 0 as DWORD,
- &mut out as *mut _ as LPDWORD,
- 0 as LPWSAOVERLAPPED,
- None,
- ) } {
- 0 => Ok(()),
- _ => Err(io::Error::last_os_error())
- }
+ syscall!(
+ listen(socket.as_raw_socket() as _, backlog),
+ PartialEq::eq,
+ SOCKET_ERROR
+ )?;
+ Ok(())
}
pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
diff --git a/src/sys/windows/udp.rs b/src/sys/windows/udp.rs
index 825eccc..87e269f 100644
--- a/src/sys/windows/udp.rs
+++ b/src/sys/windows/udp.rs
@@ -2,43 +2,36 @@ use std::io;
use std::mem::{self, MaybeUninit};
use std::net::{self, SocketAddr};
use std::os::windows::io::{AsRawSocket, FromRawSocket};
-use std::os::windows::raw::SOCKET as StdSocket; // winapi uses usize, stdlib uses u32/u64.
+use std::os::windows::raw::SOCKET as StdSocket; // windows-sys uses usize, stdlib uses u32/u64.
-use winapi::ctypes::c_int;
-use winapi::shared::ws2def::IPPROTO_IPV6;
-use winapi::shared::ws2ipdef::IPV6_V6ONLY;
-use winapi::um::winsock2::{bind as win_bind, closesocket, getsockopt, SOCKET_ERROR, SOCK_DGRAM};
-
-use crate::sys::windows::net::{init, new_ip_socket, socket_addr};
+use crate::sys::windows::net::{new_ip_socket, socket_addr};
+use windows_sys::Win32::Networking::WinSock::{
+ bind as win_bind, getsockopt, IPPROTO_IPV6, IPV6_V6ONLY, SOCKET_ERROR, SOCK_DGRAM,
+};
pub fn bind(addr: SocketAddr) -> io::Result<net::UdpSocket> {
- init();
- new_ip_socket(addr, SOCK_DGRAM).and_then(|socket| {
- let (raw_addr, raw_addr_length) = socket_addr(&addr);
- syscall!(
- win_bind(socket, raw_addr.as_ptr(), raw_addr_length,),
- PartialEq::eq,
- SOCKET_ERROR
- )
- .map_err(|err| {
- // Close the socket if we hit an error, ignoring the error
- // from closing since we can't pass back two errors.
- let _ = unsafe { closesocket(socket) };
- err
- })
- .map(|_| unsafe { net::UdpSocket::from_raw_socket(socket as StdSocket) })
- })
+ let raw_socket = new_ip_socket(addr, SOCK_DGRAM)?;
+ let socket = unsafe { net::UdpSocket::from_raw_socket(raw_socket as StdSocket) };
+
+ let (raw_addr, raw_addr_length) = socket_addr(&addr);
+ syscall!(
+ win_bind(raw_socket, raw_addr.as_ptr(), raw_addr_length),
+ PartialEq::eq,
+ SOCKET_ERROR
+ )?;
+
+ Ok(socket)
}
pub(crate) fn only_v6(socket: &net::UdpSocket) -> io::Result<bool> {
- let mut optval: MaybeUninit<c_int> = MaybeUninit::uninit();
- let mut optlen = mem::size_of::<c_int>() as c_int;
+ let mut optval: MaybeUninit<i32> = MaybeUninit::uninit();
+ let mut optlen = mem::size_of::<i32>() as i32;
syscall!(
getsockopt(
socket.as_raw_socket() as usize,
- IPPROTO_IPV6 as c_int,
- IPV6_V6ONLY as c_int,
+ IPPROTO_IPV6 as i32,
+ IPV6_V6ONLY as i32,
optval.as_mut_ptr().cast(),
&mut optlen,
),
@@ -46,7 +39,7 @@ pub(crate) fn only_v6(socket: &net::UdpSocket) -> io::Result<bool> {
SOCKET_ERROR
)?;
- debug_assert_eq!(optlen as usize, mem::size_of::<c_int>());
+ debug_assert_eq!(optlen as usize, mem::size_of::<i32>());
// Safety: `getsockopt` initialised `optval` for us.
let optval = unsafe { optval.assume_init() };
Ok(optval != 0)
diff --git a/src/sys/windows/waker.rs b/src/sys/windows/waker.rs
index ab12c3c..103aa01 100644
--- a/src/sys/windows/waker.rs
+++ b/src/sys/windows/waker.rs
@@ -2,7 +2,7 @@ use crate::sys::windows::Event;
use crate::sys::windows::Selector;
use crate::Token;
-use miow::iocp::CompletionPort;
+use super::iocp::CompletionPort;
use std::io;
use std::sync::Arc;