path: root/src/sys
diff options
Diffstat (limited to 'src/sys')
14 files changed, 572 insertions, 22 deletions
diff --git a/src/sys/mod.rs b/src/sys/mod.rs
index 8852333..08bd271 100644
--- a/src/sys/mod.rs
+++ b/src/sys/mod.rs
@@ -72,7 +72,11 @@ cfg_os_poll! {
pub(crate) use self::unix::uds;
- cfg_net! {
+ cfg_pipe! {
+ pub(crate) use self::unix::pipe;
+ }
+ cfg_io_source! {
pub(crate) use self::unix::IoSourceState;
diff --git a/src/sys/shell/mod.rs b/src/sys/shell/mod.rs
index 8303797..a63760a 100644
--- a/src/sys/shell/mod.rs
+++ b/src/sys/shell/mod.rs
@@ -23,7 +23,7 @@ cfg_uds! {
pub(crate) mod uds;
-cfg_net! {
+cfg_io_source! {
use std::io;
use std::os::windows::io::RawSocket;
diff --git a/src/sys/shell/selector.rs b/src/sys/shell/selector.rs
index 0e0c031..69be370 100644
--- a/src/sys/shell/selector.rs
+++ b/src/sys/shell/selector.rs
@@ -18,6 +18,11 @@ impl Selector {
pub fn select(&self, _: &mut Events, _: Option<Duration>) -> io::Result<()> {
+ #[cfg(debug_assertions)]
+ pub fn register_waker(&self) -> bool {
+ os_required!();
+ }
@@ -39,7 +44,7 @@ cfg_any_os_util! {
-cfg_net! {
+cfg_io_source! {
impl Selector {
pub fn id(&self) -> usize {
diff --git a/src/sys/shell/tcp.rs b/src/sys/shell/tcp.rs
index de1520b..3073d42 100644
--- a/src/sys/shell/tcp.rs
+++ b/src/sys/shell/tcp.rs
@@ -32,6 +32,20 @@ pub(crate) fn set_reuseaddr(_: TcpSocket, _: bool) -> io::Result<()> {
+pub(crate) fn get_reuseaddr(_: TcpSocket) -> io::Result<bool> {
+ os_required!();
+#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
+pub(crate) fn set_reuseport(_: TcpSocket, _: bool) -> io::Result<()> {
+ os_required!();
+#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
+pub(crate) fn get_reuseport(_: TcpSocket) -> io::Result<bool> {
+ os_required!();
pub(crate) fn set_linger(_: TcpSocket, _: Option<Duration>) -> io::Result<()> {
@@ -39,3 +53,7 @@ pub(crate) fn set_linger(_: TcpSocket, _: Option<Duration>) -> io::Result<()> {
pub fn accept(_: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
+pub(crate) fn get_localaddr(_: TcpSocket) -> io::Result<SocketAddr> {
+ os_required!();
diff --git a/src/sys/unix/mod.rs b/src/sys/unix/mod.rs
index 96d7f4d..f045fb5 100644
--- a/src/sys/unix/mod.rs
+++ b/src/sys/unix/mod.rs
@@ -38,7 +38,7 @@ cfg_os_poll! {
pub use self::uds::SocketAddr;
- cfg_net! {
+ cfg_io_source! {
use std::io;
// Both `kqueue` and `epoll` don't need to hold any user space state.
@@ -59,6 +59,10 @@ cfg_os_poll! {
+ cfg_pipe! {
+ pub(crate) mod pipe;
+ }
cfg_not_os_poll! {
diff --git a/src/sys/unix/pipe.rs b/src/sys/unix/pipe.rs
new file mode 100644
index 0000000..d838ebc
--- /dev/null
+++ b/src/sys/unix/pipe.rs
@@ -0,0 +1,383 @@
+//! Unix pipe.
+//! See the [`new`] function for documentation.
+use std::fs::File;
+use std::io::{self, IoSlice, IoSliceMut, Read, Write};
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::process::{ChildStderr, ChildStdin, ChildStdout};
+use crate::io_source::IoSource;
+use crate::{event, Interest, Registry, Token};
+/// Create a new non-blocking Unix pipe.
+/// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as
+/// inter-process or thread communication channel.
+/// This channel may be created before forking the process and then one end used
+/// in each process, e.g. the parent process has the sending end to send command
+/// to the child process.
+/// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html
+/// # Events
+/// The [`Sender`] can be registered with [`WRITABLE`] interest to receive
+/// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is
+/// written to the `Sender` the `Receiver` will receive an [readable event].
+/// In addition to those events, events will also be generated if the other side
+/// is dropped. To check if the `Sender` is dropped you'll need to check
+/// [`is_read_closed`] on events for the `Receiver`, if it returns true the
+/// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it
+/// returns true the `Receiver` was dropped. Also see the second example below.
+/// [`WRITABLE`]: Interest::WRITABLE
+/// [writable events]: event::Event::is_writable
+/// [`READABLE`]: Interest::READABLE
+/// [readable event]: event::Event::is_readable
+/// [`is_read_closed`]: event::Event::is_read_closed
+/// [`is_write_closed`]: event::Event::is_write_closed
+/// # Deregistering
+/// Both `Sender` and `Receiver` will deregister themselves when dropped,
+/// **iff** the file descriptors are not duplicated (via [`dup(2)`]).
+/// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html
+/// # Examples
+/// Simple example that writes data into the sending end and read it from the
+/// receiving end.
+/// ```
+/// use std::io::{self, Read, Write};
+/// use mio::{Poll, Events, Interest, Token};
+/// use mio::unix::pipe;
+/// // Unique tokens for the two ends of the channel.
+/// const PIPE_RECV: Token = Token(0);
+/// const PIPE_SEND: Token = Token(1);
+/// # fn main() -> io::Result<()> {
+/// // Create our `Poll` instance and the `Events` container.
+/// let mut poll = Poll::new()?;
+/// let mut events = Events::with_capacity(8);
+/// // Create a new pipe.
+/// let (mut sender, mut receiver) = pipe::new()?;
+/// // Register both ends of the channel.
+/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
+/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
+/// const MSG: &[u8; 11] = b"Hello world";
+/// loop {
+/// poll.poll(&mut events, None)?;
+/// for event in events.iter() {
+/// match event.token() {
+/// PIPE_SEND => sender.write(MSG)
+/// .and_then(|n| if n != MSG.len() {
+/// // We'll consider a short write an error in this
+/// // example. NOTE: we can't use `write_all` with
+/// // non-blocking I/O.
+/// Err(io::ErrorKind::WriteZero.into())
+/// } else {
+/// Ok(())
+/// })?,
+/// PIPE_RECV => {
+/// let mut buf = [0; 11];
+/// let n = receiver.read(&mut buf)?;
+/// println!("received: {:?}", &buf[0..n]);
+/// assert_eq!(n, MSG.len());
+/// assert_eq!(&buf, &*MSG);
+/// return Ok(());
+/// },
+/// _ => unreachable!(),
+/// }
+/// }
+/// }
+/// # }
+/// ```
+/// Example that receives an event once the `Sender` is dropped.
+/// ```
+/// # use std::io;
+/// #
+/// # use mio::{Poll, Events, Interest, Token};
+/// # use mio::unix::pipe;
+/// #
+/// # const PIPE_RECV: Token = Token(0);
+/// # const PIPE_SEND: Token = Token(1);
+/// #
+/// # fn main() -> io::Result<()> {
+/// // Same setup as in the example above.
+/// let mut poll = Poll::new()?;
+/// let mut events = Events::with_capacity(8);
+/// let (mut sender, mut receiver) = pipe::new()?;
+/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
+/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
+/// // Drop the sender.
+/// drop(sender);
+/// poll.poll(&mut events, None)?;
+/// for event in events.iter() {
+/// match event.token() {
+/// PIPE_RECV if event.is_read_closed() => {
+/// // Detected that the sender was dropped.
+/// println!("Sender dropped!");
+/// return Ok(());
+/// },
+/// _ => unreachable!(),
+/// }
+/// }
+/// # unreachable!();
+/// # }
+/// ```
+pub fn new() -> io::Result<(Sender, Receiver)> {
+ let mut fds: [RawFd; 2] = [-1, -1];
+ #[cfg(any(
+ target_os = "android",
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "linux",
+ target_os = "netbsd",
+ target_os = "openbsd",
+ ))]
+ unsafe {
+ if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
+ return Err(io::Error::last_os_error());
+ }
+ }
+ #[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))]
+ unsafe {
+ // For platforms that don't have `pipe2(2)` we need to manually set the
+ // correct flags on the file descriptor.
+ if libc::pipe(fds.as_mut_ptr()) != 0 {
+ return Err(io::Error::last_os_error());
+ }
+ for fd in &fds {
+ if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0
+ || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
+ {
+ let err = io::Error::last_os_error();
+ // Don't leak file descriptors. Can't handle error though.
+ let _ = libc::close(fds[0]);
+ let _ = libc::close(fds[1]);
+ return Err(err);
+ }
+ }
+ }
+ #[cfg(not(any(
+ target_os = "android",
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "linux",
+ target_os = "netbsd",
+ target_os = "openbsd",
+ target_os = "ios",
+ target_os = "macos",
+ target_os = "solaris",
+ )))]
+ compile_error!("unsupported target for `mio::unix::pipe`");
+ // Safety: we just initialised the `fds` above.
+ let r = unsafe { Receiver::from_raw_fd(fds[0]) };
+ let w = unsafe { Sender::from_raw_fd(fds[1]) };
+ Ok((w, r))
+/// Sending end of an Unix pipe.
+/// See [`new`] for documentation, including examples.
+pub struct Sender {
+ inner: IoSource<File>,
+impl Sender {
+ /// Set the `Sender` into or out of non-blocking mode.
+ pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
+ set_nonblocking(self.inner.as_raw_fd(), nonblocking)
+ }
+impl event::Source for Sender {
+ fn register(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: Interest,
+ ) -> io::Result<()> {
+ self.inner.register(registry, token, interests)
+ }
+ fn reregister(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: Interest,
+ ) -> io::Result<()> {
+ self.inner.reregister(registry, token, interests)
+ }
+ fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
+ self.inner.deregister(registry)
+ }
+impl Write for Sender {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.inner.do_io(|sender| (&*sender).write(buf))
+ }
+ fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
+ self.inner.do_io(|sender| (&*sender).write_vectored(bufs))
+ }
+ fn flush(&mut self) -> io::Result<()> {
+ self.inner.do_io(|sender| (&*sender).flush())
+ }
+/// # Notes
+/// The underlying pipe is **not** set to non-blocking.
+impl From<ChildStdin> for Sender {
+ fn from(stdin: ChildStdin) -> Sender {
+ // Safety: `ChildStdin` is guaranteed to be a valid file descriptor.
+ unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) }
+ }
+impl FromRawFd for Sender {
+ unsafe fn from_raw_fd(fd: RawFd) -> Sender {
+ Sender {
+ inner: IoSource::new(File::from_raw_fd(fd)),
+ }
+ }
+impl AsRawFd for Sender {
+ fn as_raw_fd(&self) -> RawFd {
+ self.inner.as_raw_fd()
+ }
+impl IntoRawFd for Sender {
+ fn into_raw_fd(self) -> RawFd {
+ self.inner.into_inner().into_raw_fd()
+ }
+/// Receiving end of an Unix pipe.
+/// See [`new`] for documentation, including examples.
+pub struct Receiver {
+ inner: IoSource<File>,
+impl Receiver {
+ /// Set the `Receiver` into or out of non-blocking mode.
+ pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
+ set_nonblocking(self.inner.as_raw_fd(), nonblocking)
+ }
+impl event::Source for Receiver {
+ fn register(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: Interest,
+ ) -> io::Result<()> {
+ self.inner.register(registry, token, interests)
+ }
+ fn reregister(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: Interest,
+ ) -> io::Result<()> {
+ self.inner.reregister(registry, token, interests)
+ }
+ fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
+ self.inner.deregister(registry)
+ }
+impl Read for Receiver {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.inner.do_io(|sender| (&*sender).read(buf))
+ }
+ fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
+ self.inner.do_io(|sender| (&*sender).read_vectored(bufs))
+ }
+/// # Notes
+/// The underlying pipe is **not** set to non-blocking.
+impl From<ChildStdout> for Receiver {
+ fn from(stdout: ChildStdout) -> Receiver {
+ // Safety: `ChildStdout` is guaranteed to be a valid file descriptor.
+ unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) }
+ }
+/// # Notes
+/// The underlying pipe is **not** set to non-blocking.
+impl From<ChildStderr> for Receiver {
+ fn from(stderr: ChildStderr) -> Receiver {
+ // Safety: `ChildStderr` is guaranteed to be a valid file descriptor.
+ unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) }
+ }
+impl FromRawFd for Receiver {
+ unsafe fn from_raw_fd(fd: RawFd) -> Receiver {
+ Receiver {
+ inner: IoSource::new(File::from_raw_fd(fd)),
+ }
+ }
+impl AsRawFd for Receiver {
+ fn as_raw_fd(&self) -> RawFd {
+ self.inner.as_raw_fd()
+ }
+impl IntoRawFd for Receiver {
+ fn into_raw_fd(self) -> RawFd {
+ self.inner.into_inner().into_raw_fd()
+ }
+fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
+ let value = nonblocking as libc::c_int;
+ if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(())
+ }
diff --git a/src/sys/unix/selector/epoll.rs b/src/sys/unix/selector/epoll.rs
index 13f1617..76ee7f9 100644
--- a/src/sys/unix/selector/epoll.rs
+++ b/src/sys/unix/selector/epoll.rs
@@ -4,7 +4,7 @@ use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLRDHUP};
use log::error;
use std::os::unix::io::{AsRawFd, RawFd};
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
use std::{cmp, i32, io, ptr};
@@ -17,6 +17,8 @@ pub struct Selector {
id: usize,
ep: RawFd,
+ #[cfg(debug_assertions)]
+ has_waker: AtomicBool,
impl Selector {
@@ -33,6 +35,8 @@ impl Selector {
id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
+ #[cfg(debug_assertions)]
+ has_waker: AtomicBool::new(false),
@@ -42,6 +46,8 @@ impl Selector {
id: self.id,
+ #[cfg(debug_assertions)]
+ has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
@@ -93,9 +99,14 @@ impl Selector {
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ())
+ #[cfg(debug_assertions)]
+ pub fn register_waker(&self) -> bool {
+ self.has_waker.swap(true, Ordering::AcqRel)
+ }
-cfg_net! {
+cfg_io_source! {
impl Selector {
pub fn id(&self) -> usize {
diff --git a/src/sys/unix/selector/kqueue.rs b/src/sys/unix/selector/kqueue.rs
index 2ebac9a..454f47d 100644
--- a/src/sys/unix/selector/kqueue.rs
+++ b/src/sys/unix/selector/kqueue.rs
@@ -4,7 +4,7 @@ use std::mem::MaybeUninit;
use std::ops::{Deref, DerefMut};
use std::os::unix::io::{AsRawFd, RawFd};
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
use std::{cmp, io, ptr, slice};
@@ -69,6 +69,8 @@ pub struct Selector {
id: usize,
kq: RawFd,
+ #[cfg(debug_assertions)]
+ has_waker: AtomicBool,
impl Selector {
@@ -79,6 +81,8 @@ impl Selector {
id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
+ #[cfg(debug_assertions)]
+ has_waker: AtomicBool::new(false),
@@ -88,6 +92,8 @@ impl Selector {
id: self.id,
+ #[cfg(debug_assertions)]
+ has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
@@ -208,6 +214,11 @@ impl Selector {
kevent_register(self.kq, &mut changes, &[libc::ENOENT as Data])
+ #[cfg(debug_assertions)]
+ pub fn register_waker(&self) -> bool {
+ self.has_waker.swap(true, Ordering::AcqRel)
+ }
// Used by `Waker`.
#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
pub fn setup_waker(&self, token: Token) -> io::Result<()> {
@@ -292,7 +303,7 @@ fn check_errors(events: &[libc::kevent], ignored_errors: &[Data]) -> io::Result<
-cfg_net! {
+cfg_io_source! {
impl Selector {
pub fn id(&self) -> usize {
diff --git a/src/sys/unix/tcp.rs b/src/sys/unix/tcp.rs
index 81b371b..65b7400 100644
--- a/src/sys/unix/tcp.rs
+++ b/src/sys/unix/tcp.rs
@@ -1,4 +1,5 @@
use std::io;
+use std::mem;
use std::mem::{size_of, MaybeUninit};
use std::net::{self, SocketAddr};
use std::time::Duration;
@@ -58,6 +59,63 @@ pub(crate) fn set_reuseaddr(socket: TcpSocket, reuseaddr: bool) -> io::Result<()
)).map(|_| ())
+pub(crate) fn get_reuseaddr(socket: TcpSocket) -> io::Result<bool> {
+ let mut optval: libc::c_int = 0;
+ let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t;
+ syscall!(getsockopt(
+ socket,
+ libc::SOL_SOCKET,
+ &mut optval as *mut _ as *mut _,
+ &mut optlen,
+ ))?;
+ Ok(optval != 0)
+#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
+pub(crate) fn set_reuseport(socket: TcpSocket, reuseport: bool) -> io::Result<()> {
+ let val: libc::c_int = if reuseport { 1 } else { 0 };
+ syscall!(setsockopt(
+ socket,
+ libc::SOL_SOCKET,
+ &val as *const libc::c_int as *const libc::c_void,
+ size_of::<libc::c_int>() as libc::socklen_t,
+ )).map(|_| ())
+#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
+pub(crate) fn get_reuseport(socket: TcpSocket) -> io::Result<bool> {
+ let mut optval: libc::c_int = 0;
+ let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t;
+ syscall!(getsockopt(
+ socket,
+ libc::SOL_SOCKET,
+ &mut optval as *mut _ as *mut _,
+ &mut optlen,
+ ))?;
+ Ok(optval != 0)
+pub(crate) fn get_localaddr(socket: TcpSocket) -> io::Result<SocketAddr> {
+ let mut addr: libc::sockaddr_storage = unsafe { std::mem::zeroed() };
+ let mut length = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
+ syscall!(getsockname(
+ socket,
+ &mut addr as *mut _ as *mut _,
+ &mut length
+ ))?;
+ unsafe { to_socket_addr(&addr) }
pub(crate) fn set_linger(socket: TcpSocket, dur: Option<Duration>) -> io::Result<()> {
let val: libc::linger = libc::linger {
l_onoff: if dur.is_some() { 1 } else { 0 },
diff --git a/src/sys/windows/afd.rs b/src/sys/windows/afd.rs
index 82c8e9e..b2e3b11 100644
--- a/src/sys/windows/afd.rs
+++ b/src/sys/windows/afd.rs
@@ -111,7 +111,7 @@ impl Afd {
-cfg_net! {
+cfg_io_source! {
use miow::iocp::CompletionPort;
use ntapi::ntioapi::FILE_OPEN;
use ntapi::ntioapi::NtCreateFile;
diff --git a/src/sys/windows/io_status_block.rs b/src/sys/windows/io_status_block.rs
index db6729c..9da5e7a 100644
--- a/src/sys/windows/io_status_block.rs
+++ b/src/sys/windows/io_status_block.rs
@@ -4,7 +4,7 @@ use std::ops::{Deref, DerefMut};
pub struct IoStatusBlock(IO_STATUS_BLOCK);
-cfg_net! {
+cfg_io_source! {
use ntapi::ntioapi::IO_STATUS_BLOCK_u;
impl IoStatusBlock {
diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs
index 7bba6dd..25590c2 100644
--- a/src/sys/windows/mod.rs
+++ b/src/sys/windows/mod.rs
@@ -42,6 +42,10 @@ mod waker;
pub(crate) use waker::Waker;
cfg_net! {
+ mod net;
+cfg_io_source! {
use std::io;
use std::os::windows::io::RawSocket;
use std::pin::Pin;
@@ -49,8 +53,6 @@ cfg_net! {
use crate::{poll, Interest, Registry, Token};
- mod net;
struct InternalState {
selector: Arc<SelectorInner>,
token: Token,
diff --git a/src/sys/windows/selector.rs b/src/sys/windows/selector.rs
index 4a38300..df2c3f0 100644
--- a/src/sys/windows/selector.rs
+++ b/src/sys/windows/selector.rs
@@ -12,6 +12,7 @@ cfg_net! {
use miow::iocp::{CompletionPort, CompletionStatus};
use std::collections::VecDeque;
+use std::io;
use std::marker::PhantomPinned;
use std::os::windows::io::RawSocket;
use std::pin::Pin;
@@ -20,7 +21,6 @@ use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
-use std::io;
use winapi::shared::ntdef::NT_SUCCESS;
use winapi::shared::ntdef::{HANDLE, PVOID};
use winapi::shared::ntstatus::STATUS_CANCELLED;
@@ -47,7 +47,7 @@ impl AfdGroup {
-cfg_net! {
+cfg_io_source! {
const POLL_GROUP__MAX_GROUP_SIZE: usize = 32;
impl AfdGroup {
@@ -256,7 +256,7 @@ impl SockState {
-cfg_net! {
+cfg_io_source! {
impl SockState {
fn new(raw_socket: RawSocket, afd: Arc<Afd>) -> io::Result<SockState> {
Ok(SockState {
@@ -327,8 +327,9 @@ static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
pub struct Selector {
id: usize,
pub(super) inner: Arc<SelectorInner>,
+ #[cfg(debug_assertions)]
+ has_waker: AtomicBool,
impl Selector {
@@ -340,6 +341,8 @@ impl Selector {
inner: Arc::new(inner),
+ #[cfg(debug_assertions)]
+ has_waker: AtomicBool::new(false),
@@ -349,6 +352,8 @@ impl Selector {
id: self.id,
inner: Arc::clone(&self.inner),
+ #[cfg(debug_assertions)]
+ has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
@@ -360,6 +365,11 @@ impl Selector {
self.inner.select(events, timeout)
+ #[cfg(debug_assertions)]
+ pub fn register_waker(&self) -> bool {
+ self.has_waker.swap(true, Ordering::AcqRel)
+ }
pub(super) fn clone_port(&self) -> Arc<CompletionPort> {
@@ -370,7 +380,7 @@ impl Selector {
-cfg_net! {
+cfg_io_source! {
use super::InternalState;
use crate::Token;
@@ -499,7 +509,7 @@ impl SelectorInner {
} else if iocp_event.token() % 2 == 1 {
// Handle is a named pipe. This could be extended to be any non-AFD event.
let callback = (*(iocp_event.overlapped() as *mut super::Overlapped)).callback;
let len = events.len();
callback(iocp_event.entry(), Some(events));
n += events.len() - len;
@@ -525,7 +535,7 @@ impl SelectorInner {
-cfg_net! {
+cfg_io_source! {
use std::mem::size_of;
use std::ptr::null_mut;
use winapi::um::mswsock;
@@ -701,7 +711,7 @@ impl Drop for SelectorInner {
let callback = unsafe {
(*(iocp_event.overlapped() as *mut super::Overlapped)).callback
callback(iocp_event.entry(), None);
} else {
// drain sock state to release memory of Arc reference
diff --git a/src/sys/windows/tcp.rs b/src/sys/windows/tcp.rs
index 46ac1ac..b78d864 100644
--- a/src/sys/windows/tcp.rs
+++ b/src/sys/windows/tcp.rs
@@ -1,14 +1,17 @@
use std::io;
use std::mem::size_of;
-use std::net::{self, SocketAddr};
+use std::net::{self, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::time::Duration;
use std::os::windows::io::FromRawSocket;
use std::os::windows::raw::SOCKET as StdSocket; // winapi uses usize, stdlib uses u32/u64.
use winapi::ctypes::{c_char, c_int, c_ushort};
+use winapi::shared::ws2def::{SOCKADDR_STORAGE, AF_INET, SOCKADDR_IN};
+use winapi::shared::ws2ipdef::SOCKADDR_IN6_LH;
use winapi::shared::minwindef::{BOOL, TRUE, FALSE};
use winapi::um::winsock2::{
- self, closesocket, linger, setsockopt, PF_INET, PF_INET6, SOCKET, SOCKET_ERROR,
+ self, closesocket, linger, setsockopt, getsockopt, getsockname, PF_INET, PF_INET6, SOCKET, SOCKET_ERROR,
@@ -87,6 +90,47 @@ pub(crate) fn set_reuseaddr(socket: TcpSocket, reuseaddr: bool) -> io::Result<()
+pub(crate) fn get_reuseaddr(socket: TcpSocket) -> io::Result<bool> {
+ let mut optval: c_char = 0;
+ let mut optlen = size_of::<BOOL>() as c_int;
+ match unsafe { getsockopt(
+ socket,
+ &mut optval as *mut _ as *mut _,
+ &mut optlen,
+ ) } {
+ SOCKET_ERROR => Err(io::Error::last_os_error()),
+ _ => Ok(optval != 0),
+ }
+pub(crate) fn get_localaddr(socket: TcpSocket) -> io::Result<SocketAddr> {
+ let mut addr: SOCKADDR_STORAGE = unsafe { std::mem::zeroed() };
+ let mut length = std::mem::size_of_val(&addr) as c_int;
+ match unsafe { getsockname(
+ socket,
+ &mut addr as *mut _ as *mut _,
+ &mut length
+ ) } {
+ SOCKET_ERROR => Err(io::Error::last_os_error()),
+ _ => {
+ let storage: *const SOCKADDR_STORAGE = (&addr) as *const _;
+ if addr.ss_family as c_int == AF_INET {
+ let sock_addr : SocketAddrV4 = unsafe { *(storage as *const SOCKADDR_IN as *const _) };
+ Ok(sock_addr.into())
+ } else {
+ let sock_addr : SocketAddrV6 = unsafe { *(storage as *const SOCKADDR_IN6_LH as *const _) };
+ Ok(sock_addr.into())
+ }
+ },
+ }
pub(crate) fn set_linger(socket: TcpSocket, dur: Option<Duration>) -> io::Result<()> {
let val: linger = linger {
l_onoff: if dur.is_some() { 1 } else { 0 },