aboutsummaryrefslogtreecommitdiff
path: root/src/sys/windows
diff options
context:
space:
mode:
Diffstat (limited to 'src/sys/windows')
-rw-r--r--src/sys/windows/afd.rs108
-rw-r--r--src/sys/windows/event.rs3
-rw-r--r--src/sys/windows/handle.rs30
-rw-r--r--src/sys/windows/io_status_block.rs6
-rw-r--r--src/sys/windows/iocp.rs270
-rw-r--r--src/sys/windows/mod.rs19
-rw-r--r--src/sys/windows/named_pipe.rs405
-rw-r--r--src/sys/windows/net.rs73
-rw-r--r--src/sys/windows/overlapped.rs10
-rw-r--r--src/sys/windows/selector.rs68
-rw-r--r--src/sys/windows/tcp.rs319
-rw-r--r--src/sys/windows/udp.rs49
-rw-r--r--src/sys/windows/waker.rs2
13 files changed, 822 insertions, 540 deletions
diff --git a/src/sys/windows/afd.rs b/src/sys/windows/afd.rs
index 6241a45..0308e2f 100644
--- a/src/sys/windows/afd.rs
+++ b/src/sys/windows/afd.rs
@@ -1,17 +1,32 @@
-use ntapi::ntioapi::{IO_STATUS_BLOCK_u, IO_STATUS_BLOCK};
-use ntapi::ntioapi::{NtCancelIoFileEx, NtDeviceIoControlFile};
-use ntapi::ntrtl::RtlNtStatusToDosError;
+use std::ffi::c_void;
use std::fmt;
use std::fs::File;
use std::io;
use std::mem::size_of;
use std::os::windows::io::AsRawHandle;
-use std::ptr::null_mut;
-use winapi::shared::ntdef::{HANDLE, LARGE_INTEGER, NTSTATUS, PVOID, ULONG};
-use winapi::shared::ntstatus::{STATUS_NOT_FOUND, STATUS_PENDING, STATUS_SUCCESS};
-const IOCTL_AFD_POLL: ULONG = 0x00012024;
+use windows_sys::Win32::Foundation::{
+ RtlNtStatusToDosError, HANDLE, NTSTATUS, STATUS_NOT_FOUND, STATUS_PENDING, STATUS_SUCCESS,
+};
+use windows_sys::Win32::System::WindowsProgramming::{
+ NtDeviceIoControlFile, IO_STATUS_BLOCK, IO_STATUS_BLOCK_0,
+};
+const IOCTL_AFD_POLL: u32 = 0x00012024;
+
+#[link(name = "ntdll")]
+extern "system" {
+ /// See <https://processhacker.sourceforge.io/doc/ntioapi_8h.html#a0d4d550cad4d62d75b76961e25f6550c>
+ ///
+ /// This is an undocumented API and as such not part of <https://github.com/microsoft/win32metadata>
+ /// from which `windows-sys` is generated, and also unlikely to be added, so
+ /// we manually declare it here
+ fn NtCancelIoFileEx(
+ FileHandle: HANDLE,
+ IoRequestToCancel: *mut IO_STATUS_BLOCK,
+ IoStatusBlock: *mut IO_STATUS_BLOCK,
+ ) -> NTSTATUS;
+}
/// Winsock2 AFD driver instance.
///
/// All operations are unsafe due to IO_STATUS_BLOCK parameter are being used by Afd driver during STATUS_PENDING before I/O Completion Port returns its result.
@@ -24,7 +39,7 @@ pub struct Afd {
#[derive(Debug)]
pub struct AfdPollHandleInfo {
pub handle: HANDLE,
- pub events: ULONG,
+ pub events: u32,
pub status: NTSTATUS,
}
@@ -32,10 +47,10 @@ unsafe impl Send for AfdPollHandleInfo {}
#[repr(C)]
pub struct AfdPollInfo {
- pub timeout: LARGE_INTEGER,
+ pub timeout: i64,
// Can have only value 1.
- pub number_of_handles: ULONG,
- pub exclusive: ULONG,
+ pub number_of_handles: u32,
+ pub exclusive: u32,
pub handles: [AfdPollHandleInfo; 1],
}
@@ -58,13 +73,13 @@ impl Afd {
&self,
info: &mut AfdPollInfo,
iosb: *mut IO_STATUS_BLOCK,
- overlapped: PVOID,
+ overlapped: *mut c_void,
) -> io::Result<bool> {
- let info_ptr: PVOID = info as *mut _ as PVOID;
- (*iosb).u.Status = STATUS_PENDING;
+ let info_ptr = info as *mut _ as *mut c_void;
+ (*iosb).Anonymous.Status = STATUS_PENDING;
let status = NtDeviceIoControlFile(
- self.fd.as_raw_handle(),
- null_mut(),
+ self.fd.as_raw_handle() as HANDLE,
+ 0,
None,
overlapped,
iosb,
@@ -93,15 +108,15 @@ impl Afd {
/// Use it only with request is still being polled so that you have valid `IO_STATUS_BLOCK` to use.
/// User should NOT deallocate there overlapped value after the `cancel` to prevent double free.
pub unsafe fn cancel(&self, iosb: *mut IO_STATUS_BLOCK) -> io::Result<()> {
- if (*iosb).u.Status != STATUS_PENDING {
+ if (*iosb).Anonymous.Status != STATUS_PENDING {
return Ok(());
}
let mut cancel_iosb = IO_STATUS_BLOCK {
- u: IO_STATUS_BLOCK_u { Status: 0 },
+ Anonymous: IO_STATUS_BLOCK_0 { Status: 0 },
Information: 0,
};
- let status = NtCancelIoFileEx(self.fd.as_raw_handle(), iosb, &mut cancel_iosb);
+ let status = NtCancelIoFileEx(self.fd.as_raw_handle() as HANDLE, iosb, &mut cancel_iosb);
if status == STATUS_SUCCESS || status == STATUS_NOT_FOUND {
return Ok(());
}
@@ -114,18 +129,21 @@ impl Afd {
cfg_io_source! {
use std::mem::zeroed;
use std::os::windows::io::{FromRawHandle, RawHandle};
+ use std::ptr::null_mut;
use std::sync::atomic::{AtomicUsize, Ordering};
- use miow::iocp::CompletionPort;
- use ntapi::ntioapi::{NtCreateFile, FILE_OPEN};
- use winapi::shared::ntdef::{OBJECT_ATTRIBUTES, UNICODE_STRING, USHORT, WCHAR};
- use winapi::um::handleapi::INVALID_HANDLE_VALUE;
- use winapi::um::winbase::{SetFileCompletionNotificationModes, FILE_SKIP_SET_EVENT_ON_HANDLE};
- use winapi::um::winnt::{SYNCHRONIZE, FILE_SHARE_READ, FILE_SHARE_WRITE};
+ use super::iocp::CompletionPort;
+ use windows_sys::Win32::{
+ Foundation::{UNICODE_STRING, INVALID_HANDLE_VALUE},
+ System::WindowsProgramming::{
+ OBJECT_ATTRIBUTES, FILE_SKIP_SET_EVENT_ON_HANDLE,
+ },
+ Storage::FileSystem::{FILE_OPEN, NtCreateFile, SetFileCompletionNotificationModes, SYNCHRONIZE, FILE_SHARE_READ, FILE_SHARE_WRITE},
+ };
const AFD_HELPER_ATTRIBUTES: OBJECT_ATTRIBUTES = OBJECT_ATTRIBUTES {
- Length: size_of::<OBJECT_ATTRIBUTES>() as ULONG,
- RootDirectory: null_mut(),
+ Length: size_of::<OBJECT_ATTRIBUTES>() as u32,
+ RootDirectory: 0,
ObjectName: &AFD_OBJ_NAME as *const _ as *mut _,
Attributes: 0,
SecurityDescriptor: null_mut(),
@@ -133,12 +151,12 @@ cfg_io_source! {
};
const AFD_OBJ_NAME: UNICODE_STRING = UNICODE_STRING {
- Length: (AFD_HELPER_NAME.len() * size_of::<WCHAR>()) as USHORT,
- MaximumLength: (AFD_HELPER_NAME.len() * size_of::<WCHAR>()) as USHORT,
+ Length: (AFD_HELPER_NAME.len() * size_of::<u16>()) as u16,
+ MaximumLength: (AFD_HELPER_NAME.len() * size_of::<u16>()) as u16,
Buffer: AFD_HELPER_NAME.as_ptr() as *mut _,
};
- const AFD_HELPER_NAME: &[WCHAR] = &[
+ const AFD_HELPER_NAME: &[u16] = &[
'\\' as _,
'D' as _,
'e' as _,
@@ -166,10 +184,10 @@ cfg_io_source! {
impl Afd {
/// Create new Afd instance.
- pub fn new(cp: &CompletionPort) -> io::Result<Afd> {
+ pub(crate) fn new(cp: &CompletionPort) -> io::Result<Afd> {
let mut afd_helper_handle: HANDLE = INVALID_HANDLE_VALUE;
let mut iosb = IO_STATUS_BLOCK {
- u: IO_STATUS_BLOCK_u { Status: 0 },
+ Anonymous: IO_STATUS_BLOCK_0 { Status: 0 },
Information: 0,
};
@@ -180,12 +198,12 @@ cfg_io_source! {
&AFD_HELPER_ATTRIBUTES as *const _ as *mut _,
&mut iosb,
null_mut(),
- 0 as ULONG,
+ 0,
FILE_SHARE_READ | FILE_SHARE_WRITE,
FILE_OPEN,
- 0 as ULONG,
+ 0,
null_mut(),
- 0 as ULONG,
+ 0,
);
if status != STATUS_SUCCESS {
let raw_err = io::Error::from_raw_os_error(
@@ -204,7 +222,7 @@ cfg_io_source! {
cp.add_handle(token, &afd.fd)?;
match SetFileCompletionNotificationModes(
afd_helper_handle,
- FILE_SKIP_SET_EVENT_ON_HANDLE,
+ FILE_SKIP_SET_EVENT_ON_HANDLE as u8 // This is just 2, so fits in u8
) {
0 => Err(io::Error::last_os_error()),
_ => Ok(afd),
@@ -214,18 +232,18 @@ cfg_io_source! {
}
}
-pub const POLL_RECEIVE: u32 = 0b000_000_001;
-pub const POLL_RECEIVE_EXPEDITED: u32 = 0b000_000_010;
-pub const POLL_SEND: u32 = 0b000_000_100;
-pub const POLL_DISCONNECT: u32 = 0b000_001_000;
-pub const POLL_ABORT: u32 = 0b000_010_000;
-pub const POLL_LOCAL_CLOSE: u32 = 0b000_100_000;
+pub const POLL_RECEIVE: u32 = 0b0_0000_0001;
+pub const POLL_RECEIVE_EXPEDITED: u32 = 0b0_0000_0010;
+pub const POLL_SEND: u32 = 0b0_0000_0100;
+pub const POLL_DISCONNECT: u32 = 0b0_0000_1000;
+pub const POLL_ABORT: u32 = 0b0_0001_0000;
+pub const POLL_LOCAL_CLOSE: u32 = 0b0_0010_0000;
// Not used as it indicated in each event where a connection is connected, not
// just the first time a connection is established.
// Also see https://github.com/piscisaureus/wepoll/commit/8b7b340610f88af3d83f40fb728e7b850b090ece.
-pub const POLL_CONNECT: u32 = 0b001_000_000;
-pub const POLL_ACCEPT: u32 = 0b010_000_000;
-pub const POLL_CONNECT_FAIL: u32 = 0b100_000_000;
+pub const POLL_CONNECT: u32 = 0b0_0100_0000;
+pub const POLL_ACCEPT: u32 = 0b0_1000_0000;
+pub const POLL_CONNECT_FAIL: u32 = 0b1_0000_0000;
pub const KNOWN_EVENTS: u32 = POLL_RECEIVE
| POLL_RECEIVE_EXPEDITED
diff --git a/src/sys/windows/event.rs b/src/sys/windows/event.rs
index a49252a..731bd60 100644
--- a/src/sys/windows/event.rs
+++ b/src/sys/windows/event.rs
@@ -1,8 +1,7 @@
use std::fmt;
-use miow::iocp::CompletionStatus;
-
use super::afd;
+use super::iocp::CompletionStatus;
use crate::Token;
#[derive(Clone)]
diff --git a/src/sys/windows/handle.rs b/src/sys/windows/handle.rs
new file mode 100644
index 0000000..5b9ac0b
--- /dev/null
+++ b/src/sys/windows/handle.rs
@@ -0,0 +1,30 @@
+use std::os::windows::io::RawHandle;
+use windows_sys::Win32::Foundation::{CloseHandle, HANDLE};
+
+/// Wrapper around a Windows HANDLE so that we close it upon drop in all scenarios
+#[derive(Debug)]
+pub struct Handle(HANDLE);
+
+impl Handle {
+ #[inline]
+ pub fn new(handle: HANDLE) -> Self {
+ Self(handle)
+ }
+
+ pub fn raw(&self) -> HANDLE {
+ self.0
+ }
+
+ pub fn into_raw(self) -> RawHandle {
+ let ret = self.0;
+ // This is super important so that drop is not called!
+ std::mem::forget(self);
+ ret as RawHandle
+ }
+}
+
+impl Drop for Handle {
+ fn drop(&mut self) {
+ unsafe { CloseHandle(self.0) };
+ }
+}
diff --git a/src/sys/windows/io_status_block.rs b/src/sys/windows/io_status_block.rs
index 3e60334..d7eda6a 100644
--- a/src/sys/windows/io_status_block.rs
+++ b/src/sys/windows/io_status_block.rs
@@ -1,17 +1,17 @@
use std::fmt;
use std::ops::{Deref, DerefMut};
-use ntapi::ntioapi::IO_STATUS_BLOCK;
+use windows_sys::Win32::System::WindowsProgramming::IO_STATUS_BLOCK;
pub struct IoStatusBlock(IO_STATUS_BLOCK);
cfg_io_source! {
- use ntapi::ntioapi::IO_STATUS_BLOCK_u;
+ use windows_sys::Win32::System::WindowsProgramming::{IO_STATUS_BLOCK_0};
impl IoStatusBlock {
pub fn zeroed() -> Self {
Self(IO_STATUS_BLOCK {
- u: IO_STATUS_BLOCK_u { Status: 0 },
+ Anonymous: IO_STATUS_BLOCK_0 { Status: 0 },
Information: 0,
})
}
diff --git a/src/sys/windows/iocp.rs b/src/sys/windows/iocp.rs
new file mode 100644
index 0000000..262c8f2
--- /dev/null
+++ b/src/sys/windows/iocp.rs
@@ -0,0 +1,270 @@
+//! Bindings to IOCP, I/O Completion Ports
+
+use super::{Handle, Overlapped};
+use std::cmp;
+use std::fmt;
+use std::io;
+use std::mem;
+use std::os::windows::io::*;
+use std::time::Duration;
+
+use windows_sys::Win32::Foundation::{HANDLE, INVALID_HANDLE_VALUE};
+use windows_sys::Win32::System::IO::{
+ CreateIoCompletionPort, GetQueuedCompletionStatusEx, PostQueuedCompletionStatus, OVERLAPPED,
+ OVERLAPPED_ENTRY,
+};
+
+/// A handle to an Windows I/O Completion Port.
+#[derive(Debug)]
+pub(crate) struct CompletionPort {
+ handle: Handle,
+}
+
+/// A status message received from an I/O completion port.
+///
+/// These statuses can be created via the `new` or `empty` constructors and then
+/// provided to a completion port, or they are read out of a completion port.
+/// The fields of each status are read through its accessor methods.
+#[derive(Clone, Copy)]
+#[repr(transparent)]
+pub struct CompletionStatus(OVERLAPPED_ENTRY);
+
+impl fmt::Debug for CompletionStatus {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "CompletionStatus(OVERLAPPED_ENTRY)")
+ }
+}
+
+unsafe impl Send for CompletionStatus {}
+unsafe impl Sync for CompletionStatus {}
+
+impl CompletionPort {
+ /// Creates a new I/O completion port with the specified concurrency value.
+ ///
+ /// The number of threads given corresponds to the level of concurrency
+ /// allowed for threads associated with this port. Consult the Windows
+ /// documentation for more information about this value.
+ pub fn new(threads: u32) -> io::Result<CompletionPort> {
+ let ret = unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, threads) };
+ if ret == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(CompletionPort {
+ handle: Handle::new(ret),
+ })
+ }
+ }
+
+ /// Associates a new `HANDLE` to this I/O completion port.
+ ///
+ /// This function will associate the given handle to this port with the
+ /// given `token` to be returned in status messages whenever it receives a
+ /// notification.
+ ///
+ /// Any object which is convertible to a `HANDLE` via the `AsRawHandle`
+ /// trait can be provided to this function, such as `std::fs::File` and
+ /// friends.
+ #[cfg(any(feature = "net", feature = "os-ext"))]
+ pub fn add_handle<T: AsRawHandle + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> {
+ let ret = unsafe {
+ CreateIoCompletionPort(t.as_raw_handle() as HANDLE, self.handle.raw(), token, 0)
+ };
+ if ret == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(())
+ }
+ }
+
+ /// Dequeues a number of completion statuses from this I/O completion port.
+ ///
+ /// This function is the same as `get` except that it may return more than
+ /// one status. A buffer of "zero" statuses is provided (the contents are
+ /// not read) and then on success this function will return a sub-slice of
+ /// statuses which represent those which were dequeued from this port. This
+ /// function does not wait to fill up the entire list of statuses provided.
+ ///
+ /// Like with `get`, a timeout may be specified for this operation.
+ pub fn get_many<'a>(
+ &self,
+ list: &'a mut [CompletionStatus],
+ timeout: Option<Duration>,
+ ) -> io::Result<&'a mut [CompletionStatus]> {
+ debug_assert_eq!(
+ mem::size_of::<CompletionStatus>(),
+ mem::size_of::<OVERLAPPED_ENTRY>()
+ );
+ let mut removed = 0;
+ let timeout = duration_millis(timeout);
+ let len = cmp::min(list.len(), <u32>::max_value() as usize) as u32;
+ let ret = unsafe {
+ GetQueuedCompletionStatusEx(
+ self.handle.raw(),
+ list.as_ptr() as *mut _,
+ len,
+ &mut removed,
+ timeout,
+ 0,
+ )
+ };
+
+ if ret == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(&mut list[..removed as usize])
+ }
+ }
+
+ /// Posts a new completion status onto this I/O completion port.
+ ///
+ /// This function will post the given status, with custom parameters, to the
+ /// port. Threads blocked in `get` or `get_many` will eventually receive
+ /// this status.
+ pub fn post(&self, status: CompletionStatus) -> io::Result<()> {
+ let ret = unsafe {
+ PostQueuedCompletionStatus(
+ self.handle.raw(),
+ status.0.dwNumberOfBytesTransferred,
+ status.0.lpCompletionKey,
+ status.0.lpOverlapped,
+ )
+ };
+
+ if ret == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(())
+ }
+ }
+}
+
+impl AsRawHandle for CompletionPort {
+ fn as_raw_handle(&self) -> RawHandle {
+ self.handle.raw() as RawHandle
+ }
+}
+
+impl FromRawHandle for CompletionPort {
+ unsafe fn from_raw_handle(handle: RawHandle) -> CompletionPort {
+ CompletionPort {
+ handle: Handle::new(handle as HANDLE),
+ }
+ }
+}
+
+impl IntoRawHandle for CompletionPort {
+ fn into_raw_handle(self) -> RawHandle {
+ self.handle.into_raw()
+ }
+}
+
+impl CompletionStatus {
+ /// Creates a new completion status with the provided parameters.
+ ///
+ /// This function is useful when creating a status to send to a port with
+ /// the `post` method. The parameters are opaquely passed through and not
+ /// interpreted by the system at all.
+ pub(crate) fn new(bytes: u32, token: usize, overlapped: *mut Overlapped) -> Self {
+ CompletionStatus(OVERLAPPED_ENTRY {
+ dwNumberOfBytesTransferred: bytes,
+ lpCompletionKey: token,
+ lpOverlapped: overlapped as *mut _,
+ Internal: 0,
+ })
+ }
+
+ /// Creates a new borrowed completion status from the borrowed
+ /// `OVERLAPPED_ENTRY` argument provided.
+ ///
+ /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`,
+ /// returning the wrapped structure.
+ #[cfg(feature = "os-ext")]
+ pub fn from_entry(entry: &OVERLAPPED_ENTRY) -> &Self {
+ // Safety: CompletionStatus is repr(transparent) w/ OVERLAPPED_ENTRY, so
+ // a reference to one is guaranteed to be layout compatible with the
+ // reference to another.
+ unsafe { &*(entry as *const _ as *const _) }
+ }
+
+ /// Creates a new "zero" completion status.
+ ///
+ /// This function is useful when creating a stack buffer or vector of
+ /// completion statuses to be passed to the `get_many` function.
+ pub fn zero() -> Self {
+ Self::new(0, 0, std::ptr::null_mut())
+ }
+
+ /// Returns the number of bytes that were transferred for the I/O operation
+ /// associated with this completion status.
+ pub fn bytes_transferred(&self) -> u32 {
+ self.0.dwNumberOfBytesTransferred
+ }
+
+ /// Returns the completion key value associated with the file handle whose
+ /// I/O operation has completed.
+ ///
+ /// A completion key is a per-handle key that is specified when it is added
+ /// to an I/O completion port via `add_handle` or `add_socket`.
+ pub fn token(&self) -> usize {
+ self.0.lpCompletionKey as usize
+ }
+
+ /// Returns a pointer to the `Overlapped` structure that was specified when
+ /// the I/O operation was started.
+ pub fn overlapped(&self) -> *mut OVERLAPPED {
+ self.0.lpOverlapped
+ }
+
+ /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object.
+ pub fn entry(&self) -> &OVERLAPPED_ENTRY {
+ &self.0
+ }
+}
+
+#[inline]
+fn duration_millis(dur: Option<Duration>) -> u32 {
+ if let Some(dur) = dur {
+ // `Duration::as_millis` truncates, so round up. This avoids
+ // turning sub-millisecond timeouts into a zero timeout, unless
+ // the caller explicitly requests that by specifying a zero
+ // timeout.
+ let dur_ms = (dur + Duration::from_nanos(999_999)).as_millis();
+ cmp::min(dur_ms, u32::MAX as u128) as u32
+ } else {
+ u32::MAX
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::{CompletionPort, CompletionStatus};
+
+ #[test]
+ fn is_send_sync() {
+ fn is_send_sync<T: Send + Sync>() {}
+ is_send_sync::<CompletionPort>();
+ }
+
+ #[test]
+ fn get_many() {
+ let c = CompletionPort::new(1).unwrap();
+
+ c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap();
+ c.post(CompletionStatus::new(4, 5, 6 as *mut _)).unwrap();
+
+ let mut s = vec![CompletionStatus::zero(); 4];
+ {
+ let s = c.get_many(&mut s, None).unwrap();
+ assert_eq!(s.len(), 2);
+ assert_eq!(s[0].bytes_transferred(), 1);
+ assert_eq!(s[0].token(), 2);
+ assert_eq!(s[0].overlapped(), 3 as *mut _);
+ assert_eq!(s[1].bytes_transferred(), 4);
+ assert_eq!(s[1].token(), 5);
+ assert_eq!(s[1].overlapped(), 6 as *mut _);
+ }
+ assert_eq!(s[2].bytes_transferred(), 0);
+ assert_eq!(s[2].token(), 0);
+ assert_eq!(s[2].overlapped(), 0 as *mut _);
+ }
+}
diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs
index 98b6fc6..f8b72fc 100644
--- a/src/sys/windows/mod.rs
+++ b/src/sys/windows/mod.rs
@@ -1,15 +1,20 @@
mod afd;
-mod io_status_block;
pub mod event;
pub use event::{Event, Events};
-mod selector;
-pub use selector::{Selector, SelectorInner, SockState};
+mod handle;
+use handle::Handle;
+
+mod io_status_block;
+mod iocp;
mod overlapped;
use overlapped::Overlapped;
+mod selector;
+pub use selector::{Selector, SelectorInner, SockState};
+
// Macros must be defined before the modules that use them
cfg_net! {
/// Helper macro to execute a system call that returns an `io::Result`.
@@ -45,7 +50,7 @@ cfg_io_source! {
use std::pin::Pin;
use std::sync::{Arc, Mutex};
- use crate::{poll, Interest, Registry, Token};
+ use crate::{Interest, Registry, Token};
struct InternalState {
selector: Arc<SelectorInner>,
@@ -101,7 +106,8 @@ cfg_io_source! {
if self.inner.is_some() {
Err(io::ErrorKind::AlreadyExists.into())
} else {
- poll::selector(registry)
+ registry
+ .selector()
.register(socket, token, interests)
.map(|state| {
self.inner = Some(Box::new(state));
@@ -117,7 +123,8 @@ cfg_io_source! {
) -> io::Result<()> {
match self.inner.as_mut() {
Some(state) => {
- poll::selector(registry)
+ registry
+ .selector()
.reregister(state.sock_state.clone(), token, interests)
.map(|()| {
state.token = token;
diff --git a/src/sys/windows/named_pipe.rs b/src/sys/windows/named_pipe.rs
index 8c81f38..23f85d1 100644
--- a/src/sys/windows/named_pipe.rs
+++ b/src/sys/windows/named_pipe.rs
@@ -1,41 +1,31 @@
-use crate::event::Source;
-use crate::sys::windows::{Event, Overlapped};
-use crate::{poll, Registry};
-use winapi::um::minwinbase::OVERLAPPED_ENTRY;
-
use std::ffi::OsStr;
-use std::fmt;
use std::io::{self, Read, Write};
-use std::mem;
-use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
-use std::slice;
+use std::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle};
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::{Arc, Mutex};
+use std::{fmt, mem, slice};
+
+use windows_sys::Win32::Foundation::{
+ ERROR_BROKEN_PIPE, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_NO_DATA, ERROR_PIPE_CONNECTED,
+ ERROR_PIPE_LISTENING, HANDLE, INVALID_HANDLE_VALUE,
+};
+use windows_sys::Win32::Storage::FileSystem::{
+ ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX,
+};
+use windows_sys::Win32::System::Pipes::{
+ ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe, PIPE_TYPE_BYTE,
+ PIPE_UNLIMITED_INSTANCES,
+};
+use windows_sys::Win32::System::IO::{
+ CancelIoEx, GetOverlappedResult, OVERLAPPED, OVERLAPPED_ENTRY,
+};
+use crate::event::Source;
+use crate::sys::windows::iocp::{CompletionPort, CompletionStatus};
+use crate::sys::windows::{Event, Handle, Overlapped};
+use crate::Registry;
use crate::{Interest, Token};
-use miow::iocp::{CompletionPort, CompletionStatus};
-use miow::pipe;
-use winapi::shared::winerror::{ERROR_BROKEN_PIPE, ERROR_PIPE_LISTENING};
-use winapi::um::ioapiset::CancelIoEx;
-
-/// # Safety
-///
-/// Only valid if the strict is annotated with `#[repr(C)]`. This is only used
-/// with `Overlapped` and `Inner`, which are correctly annotated.
-macro_rules! offset_of {
- ($t:ty, $($field:ident).+) => (
- &(*(0 as *const $t)).$($field).+ as *const _ as usize
- )
-}
-
-macro_rules! overlapped2arc {
- ($e:expr, $t:ty, $($field:ident).+) => ({
- let offset = offset_of!($t, $($field).+);
- debug_assert!(offset < mem::size_of::<$t>());
- Arc::from_raw(($e as usize - offset) as *mut $t)
- })
-}
/// Non-blocking windows named pipe.
///
@@ -83,30 +73,266 @@ pub struct NamedPipe {
inner: Arc<Inner>,
}
+/// # Notes
+///
+/// The memory layout of this structure must be fixed as the
+/// `ptr_from_*_overlapped` methods depend on it, see the `ptr_from` test.
#[repr(C)]
struct Inner {
- handle: pipe::NamedPipe,
-
+ // NOTE: careful modifying the order of these three fields, the `ptr_from_*`
+ // methods depend on the layout!
connect: Overlapped,
- connecting: AtomicBool,
-
read: Overlapped,
write: Overlapped,
-
+ // END NOTE.
+ handle: Handle,
+ connecting: AtomicBool,
io: Mutex<Io>,
-
pool: Mutex<BufferPool>,
}
+impl Inner {
+ /// Converts a pointer to `Inner.connect` to a pointer to `Inner`.
+ ///
+ /// # Unsafety
+ ///
+ /// Caller must ensure `ptr` is pointing to `Inner.connect`.
+ unsafe fn ptr_from_conn_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
+ // `connect` is the first field, so the pointer are the same.
+ ptr.cast()
+ }
+
+ /// Same as [`ptr_from_conn_overlapped`] but for `Inner.read`.
+ unsafe fn ptr_from_read_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
+ // `read` is after `connect: Overlapped`.
+ (ptr as *mut Overlapped).wrapping_sub(1) as *const Inner
+ }
+
+ /// Same as [`ptr_from_conn_overlapped`] but for `Inner.write`.
+ unsafe fn ptr_from_write_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
+ // `read` is after `connect: Overlapped` and `read: Overlapped`.
+ (ptr as *mut Overlapped).wrapping_sub(2) as *const Inner
+ }
+
+ /// Issue a connection request with the specified overlapped operation.
+ ///
+ /// This function will issue a request to connect a client to this server,
+ /// returning immediately after starting the overlapped operation.
+ ///
+ /// If this function immediately succeeds then `Ok(true)` is returned. If
+ /// the overlapped operation is enqueued and pending, then `Ok(false)` is
+ /// returned. Otherwise an error is returned indicating what went wrong.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the
+ /// `overlapped` pointer is valid until the end of the I/O operation. The
+ /// kernel also requires that `overlapped` is unique for this I/O operation
+ /// and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that this pointer is
+ /// valid until the I/O operation is completed, typically via completion
+ /// ports and waiting to receive the completion notification on the port.
+ pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool> {
+ if ConnectNamedPipe(self.handle.raw(), overlapped) != 0 {
+ return Ok(true);
+ }
+
+ let err = io::Error::last_os_error();
+
+ match err.raw_os_error().map(|e| e as u32) {
+ Some(ERROR_PIPE_CONNECTED) => Ok(true),
+ Some(ERROR_NO_DATA) => Ok(true),
+ Some(ERROR_IO_PENDING) => Ok(false),
+ _ => Err(err),
+ }
+ }
+
+ /// Disconnects this named pipe from any connected client.
+ pub fn disconnect(&self) -> io::Result<()> {
+ if unsafe { DisconnectNamedPipe(self.handle.raw()) } == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(())
+ }
+ }
+
+ /// Issues an overlapped read operation to occur on this pipe.
+ ///
+ /// This function will issue an asynchronous read to occur in an overlapped
+ /// fashion, returning immediately. The `buf` provided will be filled in
+ /// with data and the request is tracked by the `overlapped` function
+ /// provided.
+ ///
+ /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
+ /// `n` is the number of bytes read. If an asynchronous operation is
+ /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
+ /// it is returned.
+ ///
+ /// When this operation completes (or if it completes immediately), another
+ /// mechanism must be used to learn how many bytes were transferred (such as
+ /// looking at the filed in the IOCP status message).
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the `buf` and
+ /// `overlapped` pointers to be valid until the end of the I/O operation.
+ /// The kernel also requires that `overlapped` is unique for this I/O
+ /// operation and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that the pointers are
+ /// valid until the I/O operation is completed, typically via completion
+ /// ports and waiting to receive the completion notification on the port.
+ pub unsafe fn read_overlapped(
+ &self,
+ buf: &mut [u8],
+ overlapped: *mut OVERLAPPED,
+ ) -> io::Result<Option<usize>> {
+ let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32;
+ let res = ReadFile(
+ self.handle.raw(),
+ buf.as_mut_ptr() as *mut _,
+ len,
+ std::ptr::null_mut(),
+ overlapped,
+ );
+ if res == 0 {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() != Some(ERROR_IO_PENDING as i32) {
+ return Err(err);
+ }
+ }
+
+ let mut bytes = 0;
+ let res = GetOverlappedResult(self.handle.raw(), overlapped, &mut bytes, 0);
+ if res == 0 {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) {
+ Ok(None)
+ } else {
+ Err(err)
+ }
+ } else {
+ Ok(Some(bytes as usize))
+ }
+ }
+
+ /// Issues an overlapped write operation to occur on this pipe.
+ ///
+ /// This function will issue an asynchronous write to occur in an overlapped
+ /// fashion, returning immediately. The `buf` provided will be filled in
+ /// with data and the request is tracked by the `overlapped` function
+ /// provided.
+ ///
+ /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
+ /// `n` is the number of bytes written. If an asynchronous operation is
+ /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
+ /// it is returned.
+ ///
+ /// When this operation completes (or if it completes immediately), another
+ /// mechanism must be used to learn how many bytes were transferred (such as
+ /// looking at the filed in the IOCP status message).
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the `buf` and
+ /// `overlapped` pointers to be valid until the end of the I/O operation.
+ /// The kernel also requires that `overlapped` is unique for this I/O
+ /// operation and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that the pointers are
+ /// valid until the I/O operation is completed, typically via completion
+ /// ports and waiting to receive the completion notification on the port.
+ pub unsafe fn write_overlapped(
+ &self,
+ buf: &[u8],
+ overlapped: *mut OVERLAPPED,
+ ) -> io::Result<Option<usize>> {
+ let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32;
+ let res = WriteFile(
+ self.handle.raw(),
+ buf.as_ptr() as *const _,
+ len,
+ std::ptr::null_mut(),
+ overlapped,
+ );
+ if res == 0 {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() != Some(ERROR_IO_PENDING as i32) {
+ return Err(err);
+ }
+ }
+
+ let mut bytes = 0;
+ let res = GetOverlappedResult(self.handle.raw(), overlapped, &mut bytes, 0);
+ if res == 0 {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) {
+ Ok(None)
+ } else {
+ Err(err)
+ }
+ } else {
+ Ok(Some(bytes as usize))
+ }
+ }
+
+ /// Calls the `GetOverlappedResult` function to get the result of an
+ /// overlapped operation for this handle.
+ ///
+ /// This function takes the `OVERLAPPED` argument which must have been used
+ /// to initiate an overlapped I/O operation, and returns either the
+ /// successful number of bytes transferred during the operation or an error
+ /// if one occurred.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe as `overlapped` must have previously been used
+ /// to execute an operation for this handle, and it must also be a valid
+ /// pointer to an `Overlapped` instance.
+ #[inline]
+ unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize> {
+ let mut transferred = 0;
+ let r = GetOverlappedResult(self.handle.raw(), overlapped, &mut transferred, 0);
+ if r == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(transferred as usize)
+ }
+ }
+}
+
+#[test]
+fn ptr_from() {
+ use std::mem::ManuallyDrop;
+ use std::ptr;
+
+ let pipe = unsafe { ManuallyDrop::new(NamedPipe::from_raw_handle(ptr::null_mut())) };
+ let inner: &Inner = &pipe.inner;
+ assert_eq!(
+ inner as *const Inner,
+ unsafe { Inner::ptr_from_conn_overlapped(&inner.connect as *const _ as *mut OVERLAPPED) },
+ "`ptr_from_conn_overlapped` incorrect"
+ );
+ assert_eq!(
+ inner as *const Inner,
+ unsafe { Inner::ptr_from_read_overlapped(&inner.read as *const _ as *mut OVERLAPPED) },
+ "`ptr_from_read_overlapped` incorrect"
+ );
+ assert_eq!(
+ inner as *const Inner,
+ unsafe { Inner::ptr_from_write_overlapped(&inner.write as *const _ as *mut OVERLAPPED) },
+ "`ptr_from_write_overlapped` incorrect"
+ );
+}
+
struct Io {
// Uniquely identifies the selector associated with this named pipe
cp: Option<Arc<CompletionPort>>,
// Token used to identify events
token: Option<Token>,
read: State,
- read_interest: bool,
write: State,
- write_interest: bool,
connect_error: Option<io::Error>,
}
@@ -129,10 +355,30 @@ impl NamedPipe {
/// Creates a new named pipe at the specified `addr` given a "reasonable
/// set" of initial configuration options.
pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
- let pipe = pipe::NamedPipe::new(addr)?;
- // Safety: nothing actually unsafe about this. The trait fn includes
- // `unsafe`.
- Ok(unsafe { NamedPipe::from_raw_handle(pipe.into_raw_handle()) })
+ use std::os::windows::ffi::OsStrExt;
+ let name: Vec<_> = addr.as_ref().encode_wide().chain(Some(0)).collect();
+
+ // Safety: syscall
+ let h = unsafe {
+ CreateNamedPipeW(
+ name.as_ptr(),
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE,
+ PIPE_UNLIMITED_INSTANCES,
+ 65536,
+ 65536,
+ 0,
+ std::ptr::null_mut(),
+ )
+ };
+
+ if h == INVALID_HANDLE_VALUE {
+ Err(io::Error::last_os_error())
+ } else {
+ // Safety: nothing actually unsafe about this. The trait fn includes
+ // `unsafe`.
+ Ok(unsafe { Self::from_raw_handle(h as RawHandle) })
+ }
}
/// Attempts to call `ConnectNamedPipe`, if possible.
@@ -167,7 +413,7 @@ impl NamedPipe {
// internal state accordingly.
let res = unsafe {
let overlapped = self.inner.connect.as_ptr() as *mut _;
- self.inner.handle.connect_overlapped(overlapped)
+ self.inner.connect_overlapped(overlapped)
};
match res {
@@ -219,7 +465,7 @@ impl NamedPipe {
/// After a `disconnect` is issued, then a `connect` may be called again to
/// connect to another client.
pub fn disconnect(&self) -> io::Result<()> {
- self.inner.handle.disconnect()
+ self.inner.disconnect()
}
}
@@ -227,10 +473,7 @@ impl FromRawHandle for NamedPipe {
unsafe fn from_raw_handle(handle: RawHandle) -> NamedPipe {
NamedPipe {
inner: Arc::new(Inner {
- // Safety: not really unsafe
- handle: pipe::NamedPipe::from_raw_handle(handle),
- // transmutes to straddle winapi versions (mio 0.6 is on an
- // older winapi)
+ handle: Handle::new(handle as HANDLE),
connect: Overlapped::new(connect_done),
connecting: AtomicBool::new(false),
read: Overlapped::new(read_done),
@@ -239,9 +482,7 @@ impl FromRawHandle for NamedPipe {
cp: None,
token: None,
read: State::None,
- read_interest: false,
write: State::None,
- write_interest: false,
connect_error: None,
}),
pool: Mutex::new(BufferPool::with_capacity(2)),
@@ -356,12 +597,7 @@ impl<'a> Write for &'a NamedPipe {
}
impl Source for NamedPipe {
- fn register(
- &mut self,
- registry: &Registry,
- token: Token,
- interest: Interest,
- ) -> io::Result<()> {
+ fn register(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()> {
let mut io = self.inner.io.lock().unwrap();
io.check_association(registry, false)?;
@@ -374,18 +610,15 @@ impl Source for NamedPipe {
}
if io.cp.is_none() {
- io.cp = Some(poll::selector(registry).clone_port());
+ let selector = registry.selector();
+
+ io.cp = Some(selector.clone_port());
let inner_token = NEXT_TOKEN.fetch_add(2, Relaxed) + 2;
- poll::selector(registry)
- .inner
- .cp
- .add_handle(inner_token, &self.inner.handle)?;
+ selector.inner.cp.add_handle(inner_token, self)?;
}
io.token = Some(token);
- io.read_interest = interest.is_readable();
- io.write_interest = interest.is_writable();
drop(io);
Inner::post_register(&self.inner, None);
@@ -393,19 +626,12 @@ impl Source for NamedPipe {
Ok(())
}
- fn reregister(
- &mut self,
- registry: &Registry,
- token: Token,
- interest: Interest,
- ) -> io::Result<()> {
+ fn reregister(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()> {
let mut io = self.inner.io.lock().unwrap();
io.check_association(registry, true)?;
io.token = Some(token);
- io.read_interest = interest.is_readable();
- io.write_interest = interest.is_writable();
drop(io);
Inner::post_register(&self.inner, None);
@@ -432,7 +658,7 @@ impl Source for NamedPipe {
impl AsRawHandle for NamedPipe {
fn as_raw_handle(&self) -> RawHandle {
- self.inner.handle.as_raw_handle()
+ self.inner.handle.raw() as RawHandle
}
}
@@ -452,12 +678,8 @@ impl Drop for NamedPipe {
}
let io = self.inner.io.lock().unwrap();
-
- match io.read {
- State::Pending(..) => {
- drop(cancel(&self.inner.handle, &self.inner.read));
- }
- _ => {}
+ if let State::Pending(..) = io.read {
+ drop(cancel(&self.inner.handle, &self.inner.read));
}
}
}
@@ -483,7 +705,7 @@ impl Inner {
let e = unsafe {
let overlapped = me.read.as_ptr() as *mut _;
let slice = slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity());
- me.handle.read_overlapped(slice, overlapped)
+ me.read_overlapped(slice, overlapped)
};
match e {
@@ -522,7 +744,7 @@ impl Inner {
// Very similar to `schedule_read` above, just done for the write half.
let e = unsafe {
let overlapped = me.write.as_ptr() as *mut _;
- me.handle.write_overlapped(&buf[pos..], overlapped)
+ me.write_overlapped(&buf[pos..], overlapped)
};
// See `connect` above for the rationale behind `forget`
@@ -572,7 +794,8 @@ impl Inner {
fn post_register(me: &Arc<Inner>, mut events: Option<&mut Vec<Event>>) {
let mut io = me.io.lock().unwrap();
- if Inner::schedule_read(&me, &mut io, events.as_mut().map(|ptr| &mut **ptr)) {
+ #[allow(clippy::needless_option_as_deref)]
+ if Inner::schedule_read(me, &mut io, events.as_deref_mut()) {
if let State::None = io.write {
io.notify_writable(events);
}
@@ -588,8 +811,8 @@ impl Inner {
}
}
-unsafe fn cancel<T: AsRawHandle>(handle: &T, overlapped: &Overlapped) -> io::Result<()> {
- let ret = CancelIoEx(handle.as_raw_handle(), overlapped.as_ptr() as *mut _);
+unsafe fn cancel(handle: &Handle, overlapped: &Overlapped) -> io::Result<()> {
+ let ret = CancelIoEx(handle.raw(), overlapped.as_ptr());
// `CancelIoEx` returns 0 on error:
// https://docs.microsoft.com/en-us/windows/win32/fileio/cancelioex-func
if ret == 0 {
@@ -605,7 +828,7 @@ fn connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// Acquire the `Arc<Inner>`. Note that we should be guaranteed that
// the refcount is available to us due to the `mem::forget` in
// `connect` above.
- let me = unsafe { overlapped2arc!(status.overlapped(), Inner, connect) };
+ let me = unsafe { Arc::from_raw(Inner::ptr_from_conn_overlapped(status.overlapped())) };
// Flag ourselves as no longer using the `connect` overlapped instances.
let prev = me.connecting.swap(false, SeqCst);
@@ -614,7 +837,7 @@ fn connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// Stash away our connect error if one happened
debug_assert_eq!(status.bytes_transferred(), 0);
unsafe {
- match me.handle.result(status.overlapped()) {
+ match me.result(status.overlapped()) {
Ok(n) => debug_assert_eq!(n, 0),
Err(e) => me.io.lock().unwrap().connect_error = Some(e),
}
@@ -631,7 +854,7 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// Acquire the `FromRawArc<Inner>`. Note that we should be guaranteed that
// the refcount is available to us due to the `mem::forget` in
// `schedule_read` above.
- let me = unsafe { overlapped2arc!(status.overlapped(), Inner, read) };
+ let me = unsafe { Arc::from_raw(Inner::ptr_from_read_overlapped(status.overlapped())) };
// Move from the `Pending` to `Ok` state.
let mut io = me.io.lock().unwrap();
@@ -640,7 +863,7 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
_ => unreachable!(),
};
unsafe {
- match me.handle.result(status.overlapped()) {
+ match me.result(status.overlapped()) {
Ok(n) => {
debug_assert_eq!(status.bytes_transferred() as usize, n);
buf.set_len(status.bytes_transferred() as usize);
@@ -663,7 +886,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// Acquire the `Arc<Inner>`. Note that we should be guaranteed that
// the refcount is available to us due to the `mem::forget` in
// `schedule_write` above.
- let me = unsafe { overlapped2arc!(status.overlapped(), Inner, write) };
+ let me = unsafe { Arc::from_raw(Inner::ptr_from_write_overlapped(status.overlapped())) };
// Make the state change out of `Pending`. If we wrote the entire buffer
// then we're writable again and otherwise we schedule another write.
@@ -680,7 +903,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
};
unsafe {
- match me.handle.result(status.overlapped()) {
+ match me.result(status.overlapped()) {
Ok(n) => {
debug_assert_eq!(status.bytes_transferred() as usize, n);
let new_pos = pos + (status.bytes_transferred() as usize);
@@ -703,7 +926,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
impl Io {
fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> {
match self.cp {
- Some(ref cp) if !poll::selector(registry).same_port(cp) => Err(io::Error::new(
+ Some(ref cp) if !registry.selector().same_port(cp) => Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"I/O source already registered with a different `Registry`",
)),
diff --git a/src/sys/windows/net.rs b/src/sys/windows/net.rs
index 2de98fa..44f459a 100644
--- a/src/sys/windows/net.rs
+++ b/src/sys/windows/net.rs
@@ -3,15 +3,14 @@ use std::mem;
use std::net::SocketAddr;
use std::sync::Once;
-use winapi::ctypes::c_int;
-use winapi::shared::inaddr::{in_addr_S_un, IN_ADDR};
-use winapi::shared::in6addr::{in6_addr_u, IN6_ADDR};
-use winapi::shared::ws2def::{AF_INET, AF_INET6, ADDRESS_FAMILY, SOCKADDR, SOCKADDR_IN};
-use winapi::shared::ws2ipdef::{SOCKADDR_IN6_LH, SOCKADDR_IN6_LH_u};
-use winapi::um::winsock2::{ioctlsocket, socket, FIONBIO, INVALID_SOCKET, SOCKET};
+use windows_sys::Win32::Networking::WinSock::{
+ closesocket, ioctlsocket, socket, AF_INET, AF_INET6, FIONBIO, IN6_ADDR, IN6_ADDR_0,
+ INVALID_SOCKET, IN_ADDR, IN_ADDR_0, SOCKADDR, SOCKADDR_IN, SOCKADDR_IN6, SOCKADDR_IN6_0,
+ SOCKET,
+};
/// Initialise the network stack for Windows.
-pub(crate) fn init() {
+fn init() {
static INIT: Once = Once::new();
INIT.call_once(|| {
// Let standard library call `WSAStartup` for us, we can't do it
@@ -22,26 +21,30 @@ pub(crate) fn init() {
}
/// Create a new non-blocking socket.
-pub(crate) fn new_ip_socket(addr: SocketAddr, socket_type: c_int) -> io::Result<SOCKET> {
- use winapi::um::winsock2::{PF_INET, PF_INET6};
-
+pub(crate) fn new_ip_socket(addr: SocketAddr, socket_type: u16) -> io::Result<SOCKET> {
let domain = match addr {
- SocketAddr::V4(..) => PF_INET,
- SocketAddr::V6(..) => PF_INET6,
+ SocketAddr::V4(..) => AF_INET,
+ SocketAddr::V6(..) => AF_INET6,
};
- new_socket(domain, socket_type)
+ new_socket(domain.into(), socket_type)
}
-pub(crate) fn new_socket(domain: c_int, socket_type: c_int) -> io::Result<SOCKET> {
- syscall!(
- socket(domain, socket_type, 0),
+pub(crate) fn new_socket(domain: u32, socket_type: u16) -> io::Result<SOCKET> {
+ init();
+
+ let socket = syscall!(
+ socket(domain as i32, socket_type as i32, 0),
PartialEq::eq,
INVALID_SOCKET
- )
- .and_then(|socket| {
- syscall!(ioctlsocket(socket, FIONBIO, &mut 1), PartialEq::ne, 0).map(|_| socket as SOCKET)
- })
+ )?;
+
+ if let Err(err) = syscall!(ioctlsocket(socket, FIONBIO, &mut 1), PartialEq::ne, 0) {
+ let _ = unsafe { closesocket(socket) };
+ return Err(err);
+ }
+
+ Ok(socket as SOCKET)
}
/// A type with the same memory layout as `SOCKADDR`. Used in converting Rust level
@@ -51,7 +54,7 @@ pub(crate) fn new_socket(domain: c_int, socket_type: c_int) -> io::Result<SOCKET
#[repr(C)]
pub(crate) union SocketAddrCRepr {
v4: SOCKADDR_IN,
- v6: SOCKADDR_IN6_LH,
+ v6: SOCKADDR_IN6,
}
impl SocketAddrCRepr {
@@ -60,49 +63,49 @@ impl SocketAddrCRepr {
}
}
-pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, c_int) {
+pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, i32) {
match addr {
SocketAddr::V4(ref addr) => {
// `s_addr` is stored as BE on all machine and the array is in BE order.
// So the native endian conversion method is used so that it's never swapped.
let sin_addr = unsafe {
- let mut s_un = mem::zeroed::<in_addr_S_un>();
- *s_un.S_addr_mut() = u32::from_ne_bytes(addr.ip().octets());
+ let mut s_un = mem::zeroed::<IN_ADDR_0>();
+ s_un.S_addr = u32::from_ne_bytes(addr.ip().octets());
IN_ADDR { S_un: s_un }
};
let sockaddr_in = SOCKADDR_IN {
- sin_family: AF_INET as ADDRESS_FAMILY,
+ sin_family: AF_INET as u16, // 1
sin_port: addr.port().to_be(),
sin_addr,
sin_zero: [0; 8],
};
let sockaddr = SocketAddrCRepr { v4: sockaddr_in };
- (sockaddr, mem::size_of::<SOCKADDR_IN>() as c_int)
- },
+ (sockaddr, mem::size_of::<SOCKADDR_IN>() as i32)
+ }
SocketAddr::V6(ref addr) => {
let sin6_addr = unsafe {
- let mut u = mem::zeroed::<in6_addr_u>();
- *u.Byte_mut() = addr.ip().octets();
+ let mut u = mem::zeroed::<IN6_ADDR_0>();
+ u.Byte = addr.ip().octets();
IN6_ADDR { u }
};
let u = unsafe {
- let mut u = mem::zeroed::<SOCKADDR_IN6_LH_u>();
- *u.sin6_scope_id_mut() = addr.scope_id();
+ let mut u = mem::zeroed::<SOCKADDR_IN6_0>();
+ u.sin6_scope_id = addr.scope_id();
u
};
- let sockaddr_in6 = SOCKADDR_IN6_LH {
- sin6_family: AF_INET6 as ADDRESS_FAMILY,
+ let sockaddr_in6 = SOCKADDR_IN6 {
+ sin6_family: AF_INET6 as u16, // 23
sin6_port: addr.port().to_be(),
sin6_addr,
sin6_flowinfo: addr.flowinfo(),
- u,
+ Anonymous: u,
};
let sockaddr = SocketAddrCRepr { v6: sockaddr_in6 };
- (sockaddr, mem::size_of::<SOCKADDR_IN6_LH>() as c_int)
+ (sockaddr, mem::size_of::<SOCKADDR_IN6>() as i32)
}
}
}
diff --git a/src/sys/windows/overlapped.rs b/src/sys/windows/overlapped.rs
index 837b78b..d1456de 100644
--- a/src/sys/windows/overlapped.rs
+++ b/src/sys/windows/overlapped.rs
@@ -3,13 +3,11 @@ use crate::sys::windows::Event;
use std::cell::UnsafeCell;
use std::fmt;
-#[cfg(feature = "os-ext")]
-use winapi::um::minwinbase::OVERLAPPED;
-use winapi::um::minwinbase::OVERLAPPED_ENTRY;
+use windows_sys::Win32::System::IO::{OVERLAPPED, OVERLAPPED_ENTRY};
#[repr(C)]
pub(crate) struct Overlapped {
- inner: UnsafeCell<miow::Overlapped>,
+ inner: UnsafeCell<OVERLAPPED>,
pub(crate) callback: fn(&OVERLAPPED_ENTRY, Option<&mut Vec<Event>>),
}
@@ -17,13 +15,13 @@ pub(crate) struct Overlapped {
impl Overlapped {
pub(crate) fn new(cb: fn(&OVERLAPPED_ENTRY, Option<&mut Vec<Event>>)) -> Overlapped {
Overlapped {
- inner: UnsafeCell::new(miow::Overlapped::zero()),
+ inner: UnsafeCell::new(unsafe { std::mem::zeroed() }),
callback: cb,
}
}
pub(crate) fn as_ptr(&self) -> *const OVERLAPPED {
- unsafe { (*self.inner.get()).raw() }
+ self.inner.get()
}
}
diff --git a/src/sys/windows/selector.rs b/src/sys/windows/selector.rs
index 572a9a9..9f3cf68 100644
--- a/src/sys/windows/selector.rs
+++ b/src/sys/windows/selector.rs
@@ -10,8 +10,9 @@ cfg_net! {
use crate::Interest;
}
-use miow::iocp::{CompletionPort, CompletionStatus};
+use super::iocp::{CompletionPort, CompletionStatus};
use std::collections::VecDeque;
+use std::ffi::c_void;
use std::io;
use std::marker::PhantomPinned;
use std::os::windows::io::RawSocket;
@@ -21,14 +22,15 @@ use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
-use winapi::shared::ntdef::NT_SUCCESS;
-use winapi::shared::ntdef::{HANDLE, PVOID};
-use winapi::shared::ntstatus::STATUS_CANCELLED;
-use winapi::shared::winerror::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING, WAIT_TIMEOUT};
-use winapi::um::minwinbase::OVERLAPPED;
+
+use windows_sys::Win32::Foundation::{
+ ERROR_INVALID_HANDLE, ERROR_IO_PENDING, HANDLE, STATUS_CANCELLED, WAIT_TIMEOUT,
+};
+use windows_sys::Win32::System::IO::OVERLAPPED;
#[derive(Debug)]
struct AfdGroup {
+ #[cfg_attr(not(feature = "net"), allow(dead_code))]
cp: Arc<CompletionPort>,
afd_group: Mutex<Vec<Arc<Afd>>>,
}
@@ -43,7 +45,7 @@ impl AfdGroup {
pub fn release_unused_afd(&self) {
let mut afd_group = self.afd_group.lock().unwrap();
- afd_group.retain(|g| Arc::strong_count(&g) > 1);
+ afd_group.retain(|g| Arc::strong_count(g) > 1);
}
}
@@ -57,7 +59,7 @@ cfg_io_source! {
self._alloc_afd_group(&mut afd_group)?;
} else {
// + 1 reference in Vec
- if Arc::strong_count(afd_group.last().unwrap()) >= POLL_GROUP__MAX_GROUP_SIZE + 1 {
+ if Arc::strong_count(afd_group.last().unwrap()) > POLL_GROUP__MAX_GROUP_SIZE {
self._alloc_afd_group(&mut afd_group)?;
}
}
@@ -93,7 +95,6 @@ pub struct SockState {
poll_info: AfdPollInfo,
afd: Arc<Afd>,
- raw_socket: RawSocket,
base_socket: RawSocket,
user_evts: u32,
@@ -107,7 +108,7 @@ pub struct SockState {
// last raw os error
error: Option<i32>,
- pinned: PhantomPinned,
+ _pinned: PhantomPinned,
}
impl SockState {
@@ -141,7 +142,7 @@ impl SockState {
/* No poll operation is pending; start one. */
self.poll_info.exclusive = 0;
self.poll_info.number_of_handles = 1;
- *unsafe { self.poll_info.timeout.QuadPart_mut() } = std::i64::MAX;
+ self.poll_info.timeout = i64::MAX;
self.poll_info.handles[0].handle = self.base_socket as HANDLE;
self.poll_info.handles[0].status = 0;
self.poll_info.handles[0].events = self.user_evts | afd::POLL_LOCAL_CLOSE;
@@ -204,9 +205,9 @@ impl SockState {
unsafe {
if self.delete_pending {
return None;
- } else if self.iosb.u.Status == STATUS_CANCELLED {
+ } else if self.iosb.Anonymous.Status == STATUS_CANCELLED {
/* The poll request was cancelled by CancelIoEx. */
- } else if !NT_SUCCESS(self.iosb.u.Status) {
+ } else if self.iosb.Anonymous.Status < 0 {
/* The overlapped request itself failed in an unexpected way. */
afd_events = afd::POLL_CONNECT_FAIL;
} else if self.poll_info.number_of_handles < 1 {
@@ -263,7 +264,6 @@ cfg_io_source! {
iosb: IoStatusBlock::zeroed(),
poll_info: AfdPollInfo::zeroed(),
afd,
- raw_socket,
base_socket: get_base_socket(raw_socket)?,
user_evts: 0,
pending_evts: 0,
@@ -271,7 +271,7 @@ cfg_io_source! {
poll_status: SockPollStatus::Idle,
delete_pending: false,
error: None,
- pinned: PhantomPinned,
+ _pinned: PhantomPinned,
})
}
@@ -296,7 +296,7 @@ impl Drop for SockState {
/// Converts the pointer to a `SockState` into a raw pointer.
/// To revert see `from_overlapped`.
-fn into_overlapped(sock_state: Pin<Arc<Mutex<SockState>>>) -> PVOID {
+fn into_overlapped(sock_state: Pin<Arc<Mutex<SockState>>>) -> *mut c_void {
let overlapped_ptr: *const Mutex<SockState> =
unsafe { Arc::into_raw(Pin::into_inner_unchecked(sock_state)) };
overlapped_ptr as *mut _
@@ -448,11 +448,11 @@ impl SelectorInner {
if len == 0 {
continue;
}
- return Ok(());
+ break Ok(());
}
} else {
self.select2(&mut events.statuses, &mut events.events, timeout)?;
- return Ok(());
+ Ok(())
}
}
@@ -462,7 +462,7 @@ impl SelectorInner {
events: &mut Vec<Event>,
timeout: Option<Duration>,
) -> io::Result<usize> {
- assert_eq!(self.is_polling.swap(true, Ordering::AcqRel), false);
+ assert!(!self.is_polling.swap(true, Ordering::AcqRel));
unsafe { self.update_sockets_events() }?;
@@ -482,7 +482,7 @@ impl SelectorInner {
for sock in update_queue.iter_mut() {
let mut sock_internal = sock.lock().unwrap();
if !sock_internal.is_pending_deletion() {
- sock_internal.update(&sock)?;
+ sock_internal.update(sock)?;
}
}
@@ -518,12 +518,9 @@ impl SelectorInner {
let sock_state = from_overlapped(iocp_event.overlapped());
let mut sock_guard = sock_state.lock().unwrap();
- match sock_guard.feed_event() {
- Some(e) => {
- events.push(e);
- n += 1;
- }
- None => {}
+ if let Some(e) = sock_guard.feed_event() {
+ events.push(e);
+ n += 1;
}
if !sock_guard.is_pending_deletion() {
@@ -538,9 +535,12 @@ impl SelectorInner {
cfg_io_source! {
use std::mem::size_of;
use std::ptr::null_mut;
- use winapi::um::mswsock;
- use winapi::um::winsock2::WSAGetLastError;
- use winapi::um::winsock2::{WSAIoctl, SOCKET_ERROR};
+
+ use windows_sys::Win32::Networking::WinSock::{
+ WSAGetLastError, WSAIoctl, SIO_BASE_HANDLE, SIO_BSP_HANDLE,
+ SIO_BSP_HANDLE_POLL, SIO_BSP_HANDLE_SELECT, SOCKET_ERROR,
+ };
+
impl SelectorInner {
fn register(
@@ -644,7 +644,7 @@ cfg_io_source! {
ioctl,
null_mut(),
0,
- &mut base_socket as *mut _ as PVOID,
+ &mut base_socket as *mut _ as *mut c_void,
size_of::<RawSocket>() as u32,
&mut bytes,
null_mut(),
@@ -659,7 +659,7 @@ cfg_io_source! {
}
fn get_base_socket(raw_socket: RawSocket) -> io::Result<RawSocket> {
- let res = try_get_base_socket(raw_socket, mswsock::SIO_BASE_HANDLE);
+ let res = try_get_base_socket(raw_socket, SIO_BASE_HANDLE);
if let Ok(base_socket) = res {
return Ok(base_socket);
}
@@ -670,9 +670,9 @@ cfg_io_source! {
// However, at least one known LSP deliberately breaks it, so we try
// some alternative IOCTLs, starting with the most appropriate one.
for &ioctl in &[
- mswsock::SIO_BSP_HANDLE_SELECT,
- mswsock::SIO_BSP_HANDLE_POLL,
- mswsock::SIO_BSP_HANDLE,
+ SIO_BSP_HANDLE_SELECT,
+ SIO_BSP_HANDLE_POLL,
+ SIO_BSP_HANDLE,
] {
if let Ok(base_socket) = try_get_base_socket(raw_socket, ioctl) {
// Since we know now that we're dealing with an LSP (otherwise
diff --git a/src/sys/windows/tcp.rs b/src/sys/windows/tcp.rs
index 6757b44..addd1e8 100644
--- a/src/sys/windows/tcp.rs
+++ b/src/sys/windows/tcp.rs
@@ -1,321 +1,62 @@
use std::io;
-use std::convert::TryInto;
-use std::mem::size_of;
-use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
-use std::time::Duration;
-use std::ptr;
-use std::os::windows::io::FromRawSocket;
-use std::os::windows::raw::SOCKET as StdSocket; // winapi uses usize, stdlib uses u32/u64.
+use std::net::{self, SocketAddr};
+use std::os::windows::io::AsRawSocket;
-use winapi::ctypes::{c_char, c_int, c_ushort, c_ulong};
-use winapi::shared::ws2def::{SOCKADDR_STORAGE, AF_INET, AF_INET6, SOCKADDR_IN};
-use winapi::shared::ws2ipdef::SOCKADDR_IN6_LH;
-use winapi::shared::mstcpip;
+use windows_sys::Win32::Networking::WinSock::{self, SOCKET, SOCKET_ERROR, SOCK_STREAM};
-use winapi::shared::minwindef::{BOOL, TRUE, FALSE, DWORD, LPVOID, LPDWORD};
-use winapi::um::winsock2::{
- self, closesocket, linger, setsockopt, getsockopt, getsockname, PF_INET, PF_INET6, SOCKET, SOCKET_ERROR,
- SOCK_STREAM, SOL_SOCKET, SO_LINGER, SO_REUSEADDR, SO_RCVBUF, SO_SNDBUF, SO_KEEPALIVE, WSAIoctl, LPWSAOVERLAPPED,
-};
+use crate::sys::windows::net::{new_ip_socket, socket_addr};
-use crate::sys::windows::net::{init, new_socket, socket_addr};
-use crate::net::TcpKeepalive;
-
-pub(crate) type TcpSocket = SOCKET;
-
-pub(crate) fn new_v4_socket() -> io::Result<TcpSocket> {
- init();
- new_socket(PF_INET, SOCK_STREAM)
+pub(crate) fn new_for_addr(address: SocketAddr) -> io::Result<SOCKET> {
+ new_ip_socket(address, SOCK_STREAM)
}
-pub(crate) fn new_v6_socket() -> io::Result<TcpSocket> {
- init();
- new_socket(PF_INET6, SOCK_STREAM)
-}
-
-pub(crate) fn bind(socket: TcpSocket, addr: SocketAddr) -> io::Result<()> {
- use winsock2::bind;
+pub(crate) fn bind(socket: &net::TcpListener, addr: SocketAddr) -> io::Result<()> {
+ use WinSock::bind;
let (raw_addr, raw_addr_length) = socket_addr(&addr);
syscall!(
- bind(socket, raw_addr.as_ptr(), raw_addr_length),
+ bind(
+ socket.as_raw_socket() as _,
+ raw_addr.as_ptr(),
+ raw_addr_length
+ ),
PartialEq::eq,
SOCKET_ERROR
)?;
Ok(())
}
-pub(crate) fn connect(socket: TcpSocket, addr: SocketAddr) -> io::Result<net::TcpStream> {
- use winsock2::connect;
+pub(crate) fn connect(socket: &net::TcpStream, addr: SocketAddr) -> io::Result<()> {
+ use WinSock::connect;
let (raw_addr, raw_addr_length) = socket_addr(&addr);
-
let res = syscall!(
- connect(socket, raw_addr.as_ptr(), raw_addr_length),
+ connect(
+ socket.as_raw_socket() as _,
+ raw_addr.as_ptr(),
+ raw_addr_length
+ ),
PartialEq::eq,
SOCKET_ERROR
);
match res {
- Err(err) if err.kind() != io::ErrorKind::WouldBlock => {
- Err(err)
- }
- _ => {
- Ok(unsafe { net::TcpStream::from_raw_socket(socket as StdSocket) })
- }
+ Err(err) if err.kind() != io::ErrorKind::WouldBlock => Err(err),
+ _ => Ok(()),
}
}
-pub(crate) fn listen(socket: TcpSocket, backlog: u32) -> io::Result<net::TcpListener> {
- use winsock2::listen;
+pub(crate) fn listen(socket: &net::TcpListener, backlog: u32) -> io::Result<()> {
use std::convert::TryInto;
+ use WinSock::listen;
let backlog = backlog.try_into().unwrap_or(i32::max_value());
- syscall!(listen(socket, backlog), PartialEq::eq, SOCKET_ERROR)?;
- Ok(unsafe { net::TcpListener::from_raw_socket(socket as StdSocket) })
-}
-
-pub(crate) fn close(socket: TcpSocket) {
- let _ = unsafe { closesocket(socket) };
-}
-
-pub(crate) fn set_reuseaddr(socket: TcpSocket, reuseaddr: bool) -> io::Result<()> {
- let val: BOOL = if reuseaddr { TRUE } else { FALSE };
-
- match unsafe { setsockopt(
- socket,
- SOL_SOCKET,
- SO_REUSEADDR,
- &val as *const _ as *const c_char,
- size_of::<BOOL>() as c_int,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(()),
- }
-}
-
-pub(crate) fn get_reuseaddr(socket: TcpSocket) -> io::Result<bool> {
- let mut optval: c_char = 0;
- let mut optlen = size_of::<BOOL>() as c_int;
-
- match unsafe { getsockopt(
- socket,
- SOL_SOCKET,
- SO_REUSEADDR,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(optval != 0),
- }
-}
-
-pub(crate) fn get_localaddr(socket: TcpSocket) -> io::Result<SocketAddr> {
- let mut storage: SOCKADDR_STORAGE = unsafe { std::mem::zeroed() };
- let mut length = std::mem::size_of_val(&storage) as c_int;
-
- match unsafe { getsockname(
- socket,
- &mut storage as *mut _ as *mut _,
- &mut length
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => {
- if storage.ss_family as c_int == AF_INET {
- // Safety: if the ss_family field is AF_INET then storage must be a sockaddr_in.
- let addr: &SOCKADDR_IN = unsafe { &*(&storage as *const _ as *const SOCKADDR_IN) };
- let ip_bytes = unsafe { addr.sin_addr.S_un.S_un_b() };
- let ip = Ipv4Addr::from([ip_bytes.s_b1, ip_bytes.s_b2, ip_bytes.s_b3, ip_bytes.s_b4]);
- let port = u16::from_be(addr.sin_port);
- Ok(SocketAddr::V4(SocketAddrV4::new(ip, port)))
- } else if storage.ss_family as c_int == AF_INET6 {
- // Safety: if the ss_family field is AF_INET6 then storage must be a sockaddr_in6.
- let addr: &SOCKADDR_IN6_LH = unsafe { &*(&storage as *const _ as *const SOCKADDR_IN6_LH) };
- let ip = Ipv6Addr::from(*unsafe { addr.sin6_addr.u.Byte() });
- let port = u16::from_be(addr.sin6_port);
- let scope_id = unsafe { *addr.u.sin6_scope_id() };
- Ok(SocketAddr::V6(SocketAddrV6::new(ip, port, addr.sin6_flowinfo, scope_id)))
- } else {
- Err(std::io::ErrorKind::InvalidInput.into())
- }
- },
- }
-}
-
-pub(crate) fn set_linger(socket: TcpSocket, dur: Option<Duration>) -> io::Result<()> {
- let val: linger = linger {
- l_onoff: if dur.is_some() { 1 } else { 0 },
- l_linger: dur.map(|dur| dur.as_secs() as c_ushort).unwrap_or_default(),
- };
-
- match unsafe { setsockopt(
- socket,
- SOL_SOCKET,
- SO_LINGER,
- &val as *const _ as *const c_char,
- size_of::<linger>() as c_int,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(()),
- }
-}
-
-pub(crate) fn get_linger(socket: TcpSocket) -> io::Result<Option<Duration>> {
- let mut val: linger = unsafe { std::mem::zeroed() };
- let mut len = size_of::<linger>() as c_int;
-
- match unsafe { getsockopt(
- socket,
- SOL_SOCKET,
- SO_LINGER,
- &mut val as *mut _ as *mut _,
- &mut len,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => {
- if val.l_onoff == 0 {
- Ok(None)
- } else {
- Ok(Some(Duration::from_secs(val.l_linger as u64)))
- }
- },
- }
-}
-
-
-pub(crate) fn set_recv_buffer_size(socket: TcpSocket, size: u32) -> io::Result<()> {
- let size = size.try_into().ok().unwrap_or_else(i32::max_value);
- match unsafe { setsockopt(
- socket,
- SOL_SOCKET,
- SO_RCVBUF,
- &size as *const _ as *const c_char,
- size_of::<c_int>() as c_int
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(()),
- }
-}
-
-pub(crate) fn get_recv_buffer_size(socket: TcpSocket) -> io::Result<u32> {
- let mut optval: c_int = 0;
- let mut optlen = size_of::<c_int>() as c_int;
- match unsafe { getsockopt(
- socket,
- SOL_SOCKET,
- SO_RCVBUF,
- &mut optval as *mut _ as *mut _,
- &mut optlen as *mut _,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(optval as u32),
- }
-}
-
-pub(crate) fn set_send_buffer_size(socket: TcpSocket, size: u32) -> io::Result<()> {
- let size = size.try_into().ok().unwrap_or_else(i32::max_value);
- match unsafe { setsockopt(
- socket,
- SOL_SOCKET,
- SO_SNDBUF,
- &size as *const _ as *const c_char,
- size_of::<c_int>() as c_int
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(()),
- }
-}
-
-pub(crate) fn get_send_buffer_size(socket: TcpSocket) -> io::Result<u32> {
- let mut optval: c_int = 0;
- let mut optlen = size_of::<c_int>() as c_int;
- match unsafe { getsockopt(
- socket,
- SOL_SOCKET,
- SO_SNDBUF,
- &mut optval as *mut _ as *mut _,
- &mut optlen as *mut _,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(optval as u32),
- }
-}
-
-pub(crate) fn set_keepalive(socket: TcpSocket, keepalive: bool) -> io::Result<()> {
- let val: BOOL = if keepalive { TRUE } else { FALSE };
- match unsafe { setsockopt(
- socket,
- SOL_SOCKET,
- SO_KEEPALIVE,
- &val as *const _ as *const c_char,
- size_of::<BOOL>() as c_int
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(()),
- }
-}
-
-pub(crate) fn get_keepalive(socket: TcpSocket) -> io::Result<bool> {
- let mut optval: c_char = 0;
- let mut optlen = size_of::<BOOL>() as c_int;
-
- match unsafe { getsockopt(
- socket,
- SOL_SOCKET,
- SO_KEEPALIVE,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(optval != FALSE as c_char),
- }
-}
-
-pub(crate) fn set_keepalive_params(socket: TcpSocket, keepalive: TcpKeepalive) -> io::Result<()> {
- /// Windows configures keepalive time/interval in a u32 of milliseconds.
- fn dur_to_ulong_ms(dur: Duration) -> c_ulong {
- dur.as_millis().try_into().ok().unwrap_or_else(u32::max_value)
- }
-
- // If any of the fields on the `tcp_keepalive` struct were not provided by
- // the user, just leaving them zero will clobber any existing value.
- // Unfortunately, we can't access the current value, so we will use the
- // defaults if a value for the time or interval was not not provided.
- let time = keepalive.time.unwrap_or_else(|| {
- // The default value is two hours, as per
- // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-keepalive-vals
- let two_hours = 2 * 60 * 60;
- Duration::from_secs(two_hours)
- });
-
- let interval = keepalive.interval.unwrap_or_else(|| {
- // The default value is one second, as per
- // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-keepalive-vals
- Duration::from_secs(1)
- });
-
- let mut keepalive = mstcpip::tcp_keepalive {
- // Enable keepalive
- onoff: 1,
- keepalivetime: dur_to_ulong_ms(time),
- keepaliveinterval: dur_to_ulong_ms(interval),
- };
-
- let mut out = 0;
- match unsafe { WSAIoctl(
- socket,
- mstcpip::SIO_KEEPALIVE_VALS,
- &mut keepalive as *mut _ as LPVOID,
- size_of::<mstcpip::tcp_keepalive>() as DWORD,
- ptr::null_mut() as LPVOID,
- 0 as DWORD,
- &mut out as *mut _ as LPDWORD,
- 0 as LPWSAOVERLAPPED,
- None,
- ) } {
- 0 => Ok(()),
- _ => Err(io::Error::last_os_error())
- }
+ syscall!(
+ listen(socket.as_raw_socket() as _, backlog),
+ PartialEq::eq,
+ SOCKET_ERROR
+ )?;
+ Ok(())
}
pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
diff --git a/src/sys/windows/udp.rs b/src/sys/windows/udp.rs
index 825eccc..87e269f 100644
--- a/src/sys/windows/udp.rs
+++ b/src/sys/windows/udp.rs
@@ -2,43 +2,36 @@ use std::io;
use std::mem::{self, MaybeUninit};
use std::net::{self, SocketAddr};
use std::os::windows::io::{AsRawSocket, FromRawSocket};
-use std::os::windows::raw::SOCKET as StdSocket; // winapi uses usize, stdlib uses u32/u64.
+use std::os::windows::raw::SOCKET as StdSocket; // windows-sys uses usize, stdlib uses u32/u64.
-use winapi::ctypes::c_int;
-use winapi::shared::ws2def::IPPROTO_IPV6;
-use winapi::shared::ws2ipdef::IPV6_V6ONLY;
-use winapi::um::winsock2::{bind as win_bind, closesocket, getsockopt, SOCKET_ERROR, SOCK_DGRAM};
-
-use crate::sys::windows::net::{init, new_ip_socket, socket_addr};
+use crate::sys::windows::net::{new_ip_socket, socket_addr};
+use windows_sys::Win32::Networking::WinSock::{
+ bind as win_bind, getsockopt, IPPROTO_IPV6, IPV6_V6ONLY, SOCKET_ERROR, SOCK_DGRAM,
+};
pub fn bind(addr: SocketAddr) -> io::Result<net::UdpSocket> {
- init();
- new_ip_socket(addr, SOCK_DGRAM).and_then(|socket| {
- let (raw_addr, raw_addr_length) = socket_addr(&addr);
- syscall!(
- win_bind(socket, raw_addr.as_ptr(), raw_addr_length,),
- PartialEq::eq,
- SOCKET_ERROR
- )
- .map_err(|err| {
- // Close the socket if we hit an error, ignoring the error
- // from closing since we can't pass back two errors.
- let _ = unsafe { closesocket(socket) };
- err
- })
- .map(|_| unsafe { net::UdpSocket::from_raw_socket(socket as StdSocket) })
- })
+ let raw_socket = new_ip_socket(addr, SOCK_DGRAM)?;
+ let socket = unsafe { net::UdpSocket::from_raw_socket(raw_socket as StdSocket) };
+
+ let (raw_addr, raw_addr_length) = socket_addr(&addr);
+ syscall!(
+ win_bind(raw_socket, raw_addr.as_ptr(), raw_addr_length),
+ PartialEq::eq,
+ SOCKET_ERROR
+ )?;
+
+ Ok(socket)
}
pub(crate) fn only_v6(socket: &net::UdpSocket) -> io::Result<bool> {
- let mut optval: MaybeUninit<c_int> = MaybeUninit::uninit();
- let mut optlen = mem::size_of::<c_int>() as c_int;
+ let mut optval: MaybeUninit<i32> = MaybeUninit::uninit();
+ let mut optlen = mem::size_of::<i32>() as i32;
syscall!(
getsockopt(
socket.as_raw_socket() as usize,
- IPPROTO_IPV6 as c_int,
- IPV6_V6ONLY as c_int,
+ IPPROTO_IPV6 as i32,
+ IPV6_V6ONLY as i32,
optval.as_mut_ptr().cast(),
&mut optlen,
),
@@ -46,7 +39,7 @@ pub(crate) fn only_v6(socket: &net::UdpSocket) -> io::Result<bool> {
SOCKET_ERROR
)?;
- debug_assert_eq!(optlen as usize, mem::size_of::<c_int>());
+ debug_assert_eq!(optlen as usize, mem::size_of::<i32>());
// Safety: `getsockopt` initialised `optval` for us.
let optval = unsafe { optval.assume_init() };
Ok(optval != 0)
diff --git a/src/sys/windows/waker.rs b/src/sys/windows/waker.rs
index ab12c3c..103aa01 100644
--- a/src/sys/windows/waker.rs
+++ b/src/sys/windows/waker.rs
@@ -2,7 +2,7 @@ use crate::sys::windows::Event;
use crate::sys::windows::Selector;
use crate::Token;
-use miow::iocp::CompletionPort;
+use super::iocp::CompletionPort;
use std::io;
use std::sync::Arc;