aboutsummaryrefslogtreecommitdiff
path: root/src/sys/windows/named_pipe.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sys/windows/named_pipe.rs')
-rw-r--r--src/sys/windows/named_pipe.rs405
1 files changed, 314 insertions, 91 deletions
diff --git a/src/sys/windows/named_pipe.rs b/src/sys/windows/named_pipe.rs
index 8c81f38..23f85d1 100644
--- a/src/sys/windows/named_pipe.rs
+++ b/src/sys/windows/named_pipe.rs
@@ -1,41 +1,31 @@
-use crate::event::Source;
-use crate::sys::windows::{Event, Overlapped};
-use crate::{poll, Registry};
-use winapi::um::minwinbase::OVERLAPPED_ENTRY;
-
use std::ffi::OsStr;
-use std::fmt;
use std::io::{self, Read, Write};
-use std::mem;
-use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
-use std::slice;
+use std::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle};
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::{Arc, Mutex};
+use std::{fmt, mem, slice};
+
+use windows_sys::Win32::Foundation::{
+ ERROR_BROKEN_PIPE, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_NO_DATA, ERROR_PIPE_CONNECTED,
+ ERROR_PIPE_LISTENING, HANDLE, INVALID_HANDLE_VALUE,
+};
+use windows_sys::Win32::Storage::FileSystem::{
+ ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX,
+};
+use windows_sys::Win32::System::Pipes::{
+ ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe, PIPE_TYPE_BYTE,
+ PIPE_UNLIMITED_INSTANCES,
+};
+use windows_sys::Win32::System::IO::{
+ CancelIoEx, GetOverlappedResult, OVERLAPPED, OVERLAPPED_ENTRY,
+};
+use crate::event::Source;
+use crate::sys::windows::iocp::{CompletionPort, CompletionStatus};
+use crate::sys::windows::{Event, Handle, Overlapped};
+use crate::Registry;
use crate::{Interest, Token};
-use miow::iocp::{CompletionPort, CompletionStatus};
-use miow::pipe;
-use winapi::shared::winerror::{ERROR_BROKEN_PIPE, ERROR_PIPE_LISTENING};
-use winapi::um::ioapiset::CancelIoEx;
-
-/// # Safety
-///
-/// Only valid if the strict is annotated with `#[repr(C)]`. This is only used
-/// with `Overlapped` and `Inner`, which are correctly annotated.
-macro_rules! offset_of {
- ($t:ty, $($field:ident).+) => (
- &(*(0 as *const $t)).$($field).+ as *const _ as usize
- )
-}
-
-macro_rules! overlapped2arc {
- ($e:expr, $t:ty, $($field:ident).+) => ({
- let offset = offset_of!($t, $($field).+);
- debug_assert!(offset < mem::size_of::<$t>());
- Arc::from_raw(($e as usize - offset) as *mut $t)
- })
-}
/// Non-blocking windows named pipe.
///
@@ -83,30 +73,266 @@ pub struct NamedPipe {
inner: Arc<Inner>,
}
+/// # Notes
+///
+/// The memory layout of this structure must be fixed as the
+/// `ptr_from_*_overlapped` methods depend on it, see the `ptr_from` test.
#[repr(C)]
struct Inner {
- handle: pipe::NamedPipe,
-
+ // NOTE: careful modifying the order of these three fields, the `ptr_from_*`
+ // methods depend on the layout!
connect: Overlapped,
- connecting: AtomicBool,
-
read: Overlapped,
write: Overlapped,
-
+ // END NOTE.
+ handle: Handle,
+ connecting: AtomicBool,
io: Mutex<Io>,
-
pool: Mutex<BufferPool>,
}
+impl Inner {
+ /// Converts a pointer to `Inner.connect` to a pointer to `Inner`.
+ ///
+ /// # Unsafety
+ ///
+ /// Caller must ensure `ptr` is pointing to `Inner.connect`.
+ unsafe fn ptr_from_conn_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
+ // `connect` is the first field, so the pointer are the same.
+ ptr.cast()
+ }
+
+ /// Same as [`ptr_from_conn_overlapped`] but for `Inner.read`.
+ unsafe fn ptr_from_read_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
+ // `read` is after `connect: Overlapped`.
+ (ptr as *mut Overlapped).wrapping_sub(1) as *const Inner
+ }
+
+ /// Same as [`ptr_from_conn_overlapped`] but for `Inner.write`.
+ unsafe fn ptr_from_write_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
+ // `read` is after `connect: Overlapped` and `read: Overlapped`.
+ (ptr as *mut Overlapped).wrapping_sub(2) as *const Inner
+ }
+
+ /// Issue a connection request with the specified overlapped operation.
+ ///
+ /// This function will issue a request to connect a client to this server,
+ /// returning immediately after starting the overlapped operation.
+ ///
+ /// If this function immediately succeeds then `Ok(true)` is returned. If
+ /// the overlapped operation is enqueued and pending, then `Ok(false)` is
+ /// returned. Otherwise an error is returned indicating what went wrong.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the
+ /// `overlapped` pointer is valid until the end of the I/O operation. The
+ /// kernel also requires that `overlapped` is unique for this I/O operation
+ /// and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that this pointer is
+ /// valid until the I/O operation is completed, typically via completion
+ /// ports and waiting to receive the completion notification on the port.
+ pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool> {
+ if ConnectNamedPipe(self.handle.raw(), overlapped) != 0 {
+ return Ok(true);
+ }
+
+ let err = io::Error::last_os_error();
+
+ match err.raw_os_error().map(|e| e as u32) {
+ Some(ERROR_PIPE_CONNECTED) => Ok(true),
+ Some(ERROR_NO_DATA) => Ok(true),
+ Some(ERROR_IO_PENDING) => Ok(false),
+ _ => Err(err),
+ }
+ }
+
+ /// Disconnects this named pipe from any connected client.
+ pub fn disconnect(&self) -> io::Result<()> {
+ if unsafe { DisconnectNamedPipe(self.handle.raw()) } == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(())
+ }
+ }
+
+ /// Issues an overlapped read operation to occur on this pipe.
+ ///
+ /// This function will issue an asynchronous read to occur in an overlapped
+ /// fashion, returning immediately. The `buf` provided will be filled in
+ /// with data and the request is tracked by the `overlapped` function
+ /// provided.
+ ///
+ /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
+ /// `n` is the number of bytes read. If an asynchronous operation is
+ /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
+ /// it is returned.
+ ///
+ /// When this operation completes (or if it completes immediately), another
+ /// mechanism must be used to learn how many bytes were transferred (such as
+ /// looking at the filed in the IOCP status message).
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the `buf` and
+ /// `overlapped` pointers to be valid until the end of the I/O operation.
+ /// The kernel also requires that `overlapped` is unique for this I/O
+ /// operation and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that the pointers are
+ /// valid until the I/O operation is completed, typically via completion
+ /// ports and waiting to receive the completion notification on the port.
+ pub unsafe fn read_overlapped(
+ &self,
+ buf: &mut [u8],
+ overlapped: *mut OVERLAPPED,
+ ) -> io::Result<Option<usize>> {
+ let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32;
+ let res = ReadFile(
+ self.handle.raw(),
+ buf.as_mut_ptr() as *mut _,
+ len,
+ std::ptr::null_mut(),
+ overlapped,
+ );
+ if res == 0 {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() != Some(ERROR_IO_PENDING as i32) {
+ return Err(err);
+ }
+ }
+
+ let mut bytes = 0;
+ let res = GetOverlappedResult(self.handle.raw(), overlapped, &mut bytes, 0);
+ if res == 0 {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) {
+ Ok(None)
+ } else {
+ Err(err)
+ }
+ } else {
+ Ok(Some(bytes as usize))
+ }
+ }
+
+ /// Issues an overlapped write operation to occur on this pipe.
+ ///
+ /// This function will issue an asynchronous write to occur in an overlapped
+ /// fashion, returning immediately. The `buf` provided will be filled in
+ /// with data and the request is tracked by the `overlapped` function
+ /// provided.
+ ///
+ /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
+ /// `n` is the number of bytes written. If an asynchronous operation is
+ /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
+ /// it is returned.
+ ///
+ /// When this operation completes (or if it completes immediately), another
+ /// mechanism must be used to learn how many bytes were transferred (such as
+ /// looking at the filed in the IOCP status message).
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the `buf` and
+ /// `overlapped` pointers to be valid until the end of the I/O operation.
+ /// The kernel also requires that `overlapped` is unique for this I/O
+ /// operation and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that the pointers are
+ /// valid until the I/O operation is completed, typically via completion
+ /// ports and waiting to receive the completion notification on the port.
+ pub unsafe fn write_overlapped(
+ &self,
+ buf: &[u8],
+ overlapped: *mut OVERLAPPED,
+ ) -> io::Result<Option<usize>> {
+ let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32;
+ let res = WriteFile(
+ self.handle.raw(),
+ buf.as_ptr() as *const _,
+ len,
+ std::ptr::null_mut(),
+ overlapped,
+ );
+ if res == 0 {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() != Some(ERROR_IO_PENDING as i32) {
+ return Err(err);
+ }
+ }
+
+ let mut bytes = 0;
+ let res = GetOverlappedResult(self.handle.raw(), overlapped, &mut bytes, 0);
+ if res == 0 {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) {
+ Ok(None)
+ } else {
+ Err(err)
+ }
+ } else {
+ Ok(Some(bytes as usize))
+ }
+ }
+
+ /// Calls the `GetOverlappedResult` function to get the result of an
+ /// overlapped operation for this handle.
+ ///
+ /// This function takes the `OVERLAPPED` argument which must have been used
+ /// to initiate an overlapped I/O operation, and returns either the
+ /// successful number of bytes transferred during the operation or an error
+ /// if one occurred.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe as `overlapped` must have previously been used
+ /// to execute an operation for this handle, and it must also be a valid
+ /// pointer to an `Overlapped` instance.
+ #[inline]
+ unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize> {
+ let mut transferred = 0;
+ let r = GetOverlappedResult(self.handle.raw(), overlapped, &mut transferred, 0);
+ if r == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(transferred as usize)
+ }
+ }
+}
+
+#[test]
+fn ptr_from() {
+ use std::mem::ManuallyDrop;
+ use std::ptr;
+
+ let pipe = unsafe { ManuallyDrop::new(NamedPipe::from_raw_handle(ptr::null_mut())) };
+ let inner: &Inner = &pipe.inner;
+ assert_eq!(
+ inner as *const Inner,
+ unsafe { Inner::ptr_from_conn_overlapped(&inner.connect as *const _ as *mut OVERLAPPED) },
+ "`ptr_from_conn_overlapped` incorrect"
+ );
+ assert_eq!(
+ inner as *const Inner,
+ unsafe { Inner::ptr_from_read_overlapped(&inner.read as *const _ as *mut OVERLAPPED) },
+ "`ptr_from_read_overlapped` incorrect"
+ );
+ assert_eq!(
+ inner as *const Inner,
+ unsafe { Inner::ptr_from_write_overlapped(&inner.write as *const _ as *mut OVERLAPPED) },
+ "`ptr_from_write_overlapped` incorrect"
+ );
+}
+
struct Io {
// Uniquely identifies the selector associated with this named pipe
cp: Option<Arc<CompletionPort>>,
// Token used to identify events
token: Option<Token>,
read: State,
- read_interest: bool,
write: State,
- write_interest: bool,
connect_error: Option<io::Error>,
}
@@ -129,10 +355,30 @@ impl NamedPipe {
/// Creates a new named pipe at the specified `addr` given a "reasonable
/// set" of initial configuration options.
pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
- let pipe = pipe::NamedPipe::new(addr)?;
- // Safety: nothing actually unsafe about this. The trait fn includes
- // `unsafe`.
- Ok(unsafe { NamedPipe::from_raw_handle(pipe.into_raw_handle()) })
+ use std::os::windows::ffi::OsStrExt;
+ let name: Vec<_> = addr.as_ref().encode_wide().chain(Some(0)).collect();
+
+ // Safety: syscall
+ let h = unsafe {
+ CreateNamedPipeW(
+ name.as_ptr(),
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE,
+ PIPE_UNLIMITED_INSTANCES,
+ 65536,
+ 65536,
+ 0,
+ std::ptr::null_mut(),
+ )
+ };
+
+ if h == INVALID_HANDLE_VALUE {
+ Err(io::Error::last_os_error())
+ } else {
+ // Safety: nothing actually unsafe about this. The trait fn includes
+ // `unsafe`.
+ Ok(unsafe { Self::from_raw_handle(h as RawHandle) })
+ }
}
/// Attempts to call `ConnectNamedPipe`, if possible.
@@ -167,7 +413,7 @@ impl NamedPipe {
// internal state accordingly.
let res = unsafe {
let overlapped = self.inner.connect.as_ptr() as *mut _;
- self.inner.handle.connect_overlapped(overlapped)
+ self.inner.connect_overlapped(overlapped)
};
match res {
@@ -219,7 +465,7 @@ impl NamedPipe {
/// After a `disconnect` is issued, then a `connect` may be called again to
/// connect to another client.
pub fn disconnect(&self) -> io::Result<()> {
- self.inner.handle.disconnect()
+ self.inner.disconnect()
}
}
@@ -227,10 +473,7 @@ impl FromRawHandle for NamedPipe {
unsafe fn from_raw_handle(handle: RawHandle) -> NamedPipe {
NamedPipe {
inner: Arc::new(Inner {
- // Safety: not really unsafe
- handle: pipe::NamedPipe::from_raw_handle(handle),
- // transmutes to straddle winapi versions (mio 0.6 is on an
- // older winapi)
+ handle: Handle::new(handle as HANDLE),
connect: Overlapped::new(connect_done),
connecting: AtomicBool::new(false),
read: Overlapped::new(read_done),
@@ -239,9 +482,7 @@ impl FromRawHandle for NamedPipe {
cp: None,
token: None,
read: State::None,
- read_interest: false,
write: State::None,
- write_interest: false,
connect_error: None,
}),
pool: Mutex::new(BufferPool::with_capacity(2)),
@@ -356,12 +597,7 @@ impl<'a> Write for &'a NamedPipe {
}
impl Source for NamedPipe {
- fn register(
- &mut self,
- registry: &Registry,
- token: Token,
- interest: Interest,
- ) -> io::Result<()> {
+ fn register(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()> {
let mut io = self.inner.io.lock().unwrap();
io.check_association(registry, false)?;
@@ -374,18 +610,15 @@ impl Source for NamedPipe {
}
if io.cp.is_none() {
- io.cp = Some(poll::selector(registry).clone_port());
+ let selector = registry.selector();
+
+ io.cp = Some(selector.clone_port());
let inner_token = NEXT_TOKEN.fetch_add(2, Relaxed) + 2;
- poll::selector(registry)
- .inner
- .cp
- .add_handle(inner_token, &self.inner.handle)?;
+ selector.inner.cp.add_handle(inner_token, self)?;
}
io.token = Some(token);
- io.read_interest = interest.is_readable();
- io.write_interest = interest.is_writable();
drop(io);
Inner::post_register(&self.inner, None);
@@ -393,19 +626,12 @@ impl Source for NamedPipe {
Ok(())
}
- fn reregister(
- &mut self,
- registry: &Registry,
- token: Token,
- interest: Interest,
- ) -> io::Result<()> {
+ fn reregister(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()> {
let mut io = self.inner.io.lock().unwrap();
io.check_association(registry, true)?;
io.token = Some(token);
- io.read_interest = interest.is_readable();
- io.write_interest = interest.is_writable();
drop(io);
Inner::post_register(&self.inner, None);
@@ -432,7 +658,7 @@ impl Source for NamedPipe {
impl AsRawHandle for NamedPipe {
fn as_raw_handle(&self) -> RawHandle {
- self.inner.handle.as_raw_handle()
+ self.inner.handle.raw() as RawHandle
}
}
@@ -452,12 +678,8 @@ impl Drop for NamedPipe {
}
let io = self.inner.io.lock().unwrap();
-
- match io.read {
- State::Pending(..) => {
- drop(cancel(&self.inner.handle, &self.inner.read));
- }
- _ => {}
+ if let State::Pending(..) = io.read {
+ drop(cancel(&self.inner.handle, &self.inner.read));
}
}
}
@@ -483,7 +705,7 @@ impl Inner {
let e = unsafe {
let overlapped = me.read.as_ptr() as *mut _;
let slice = slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity());
- me.handle.read_overlapped(slice, overlapped)
+ me.read_overlapped(slice, overlapped)
};
match e {
@@ -522,7 +744,7 @@ impl Inner {
// Very similar to `schedule_read` above, just done for the write half.
let e = unsafe {
let overlapped = me.write.as_ptr() as *mut _;
- me.handle.write_overlapped(&buf[pos..], overlapped)
+ me.write_overlapped(&buf[pos..], overlapped)
};
// See `connect` above for the rationale behind `forget`
@@ -572,7 +794,8 @@ impl Inner {
fn post_register(me: &Arc<Inner>, mut events: Option<&mut Vec<Event>>) {
let mut io = me.io.lock().unwrap();
- if Inner::schedule_read(&me, &mut io, events.as_mut().map(|ptr| &mut **ptr)) {
+ #[allow(clippy::needless_option_as_deref)]
+ if Inner::schedule_read(me, &mut io, events.as_deref_mut()) {
if let State::None = io.write {
io.notify_writable(events);
}
@@ -588,8 +811,8 @@ impl Inner {
}
}
-unsafe fn cancel<T: AsRawHandle>(handle: &T, overlapped: &Overlapped) -> io::Result<()> {
- let ret = CancelIoEx(handle.as_raw_handle(), overlapped.as_ptr() as *mut _);
+unsafe fn cancel(handle: &Handle, overlapped: &Overlapped) -> io::Result<()> {
+ let ret = CancelIoEx(handle.raw(), overlapped.as_ptr());
// `CancelIoEx` returns 0 on error:
// https://docs.microsoft.com/en-us/windows/win32/fileio/cancelioex-func
if ret == 0 {
@@ -605,7 +828,7 @@ fn connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// Acquire the `Arc<Inner>`. Note that we should be guaranteed that
// the refcount is available to us due to the `mem::forget` in
// `connect` above.
- let me = unsafe { overlapped2arc!(status.overlapped(), Inner, connect) };
+ let me = unsafe { Arc::from_raw(Inner::ptr_from_conn_overlapped(status.overlapped())) };
// Flag ourselves as no longer using the `connect` overlapped instances.
let prev = me.connecting.swap(false, SeqCst);
@@ -614,7 +837,7 @@ fn connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// Stash away our connect error if one happened
debug_assert_eq!(status.bytes_transferred(), 0);
unsafe {
- match me.handle.result(status.overlapped()) {
+ match me.result(status.overlapped()) {
Ok(n) => debug_assert_eq!(n, 0),
Err(e) => me.io.lock().unwrap().connect_error = Some(e),
}
@@ -631,7 +854,7 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// Acquire the `FromRawArc<Inner>`. Note that we should be guaranteed that
// the refcount is available to us due to the `mem::forget` in
// `schedule_read` above.
- let me = unsafe { overlapped2arc!(status.overlapped(), Inner, read) };
+ let me = unsafe { Arc::from_raw(Inner::ptr_from_read_overlapped(status.overlapped())) };
// Move from the `Pending` to `Ok` state.
let mut io = me.io.lock().unwrap();
@@ -640,7 +863,7 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
_ => unreachable!(),
};
unsafe {
- match me.handle.result(status.overlapped()) {
+ match me.result(status.overlapped()) {
Ok(n) => {
debug_assert_eq!(status.bytes_transferred() as usize, n);
buf.set_len(status.bytes_transferred() as usize);
@@ -663,7 +886,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// Acquire the `Arc<Inner>`. Note that we should be guaranteed that
// the refcount is available to us due to the `mem::forget` in
// `schedule_write` above.
- let me = unsafe { overlapped2arc!(status.overlapped(), Inner, write) };
+ let me = unsafe { Arc::from_raw(Inner::ptr_from_write_overlapped(status.overlapped())) };
// Make the state change out of `Pending`. If we wrote the entire buffer
// then we're writable again and otherwise we schedule another write.
@@ -680,7 +903,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
};
unsafe {
- match me.handle.result(status.overlapped()) {
+ match me.result(status.overlapped()) {
Ok(n) => {
debug_assert_eq!(status.bytes_transferred() as usize, n);
let new_pos = pos + (status.bytes_transferred() as usize);
@@ -703,7 +926,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
impl Io {
fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> {
match self.cp {
- Some(ref cp) if !poll::selector(registry).same_port(cp) => Err(io::Error::new(
+ Some(ref cp) if !registry.selector().same_port(cp) => Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"I/O source already registered with a different `Registry`",
)),