diff options
Diffstat (limited to 'src/sys/windows/named_pipe.rs')
-rw-r--r-- | src/sys/windows/named_pipe.rs | 405 |
1 files changed, 314 insertions, 91 deletions
diff --git a/src/sys/windows/named_pipe.rs b/src/sys/windows/named_pipe.rs index 8c81f38..23f85d1 100644 --- a/src/sys/windows/named_pipe.rs +++ b/src/sys/windows/named_pipe.rs @@ -1,41 +1,31 @@ -use crate::event::Source; -use crate::sys::windows::{Event, Overlapped}; -use crate::{poll, Registry}; -use winapi::um::minwinbase::OVERLAPPED_ENTRY; - use std::ffi::OsStr; -use std::fmt; use std::io::{self, Read, Write}; -use std::mem; -use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle}; -use std::slice; +use std::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle}; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::{Arc, Mutex}; +use std::{fmt, mem, slice}; + +use windows_sys::Win32::Foundation::{ + ERROR_BROKEN_PIPE, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_NO_DATA, ERROR_PIPE_CONNECTED, + ERROR_PIPE_LISTENING, HANDLE, INVALID_HANDLE_VALUE, +}; +use windows_sys::Win32::Storage::FileSystem::{ + ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX, +}; +use windows_sys::Win32::System::Pipes::{ + ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe, PIPE_TYPE_BYTE, + PIPE_UNLIMITED_INSTANCES, +}; +use windows_sys::Win32::System::IO::{ + CancelIoEx, GetOverlappedResult, OVERLAPPED, OVERLAPPED_ENTRY, +}; +use crate::event::Source; +use crate::sys::windows::iocp::{CompletionPort, CompletionStatus}; +use crate::sys::windows::{Event, Handle, Overlapped}; +use crate::Registry; use crate::{Interest, Token}; -use miow::iocp::{CompletionPort, CompletionStatus}; -use miow::pipe; -use winapi::shared::winerror::{ERROR_BROKEN_PIPE, ERROR_PIPE_LISTENING}; -use winapi::um::ioapiset::CancelIoEx; - -/// # Safety -/// -/// Only valid if the strict is annotated with `#[repr(C)]`. This is only used -/// with `Overlapped` and `Inner`, which are correctly annotated. -macro_rules! offset_of { - ($t:ty, $($field:ident).+) => ( - &(*(0 as *const $t)).$($field).+ as *const _ as usize - ) -} - -macro_rules! overlapped2arc { - ($e:expr, $t:ty, $($field:ident).+) => ({ - let offset = offset_of!($t, $($field).+); - debug_assert!(offset < mem::size_of::<$t>()); - Arc::from_raw(($e as usize - offset) as *mut $t) - }) -} /// Non-blocking windows named pipe. /// @@ -83,30 +73,266 @@ pub struct NamedPipe { inner: Arc<Inner>, } +/// # Notes +/// +/// The memory layout of this structure must be fixed as the +/// `ptr_from_*_overlapped` methods depend on it, see the `ptr_from` test. #[repr(C)] struct Inner { - handle: pipe::NamedPipe, - + // NOTE: careful modifying the order of these three fields, the `ptr_from_*` + // methods depend on the layout! connect: Overlapped, - connecting: AtomicBool, - read: Overlapped, write: Overlapped, - + // END NOTE. + handle: Handle, + connecting: AtomicBool, io: Mutex<Io>, - pool: Mutex<BufferPool>, } +impl Inner { + /// Converts a pointer to `Inner.connect` to a pointer to `Inner`. + /// + /// # Unsafety + /// + /// Caller must ensure `ptr` is pointing to `Inner.connect`. + unsafe fn ptr_from_conn_overlapped(ptr: *mut OVERLAPPED) -> *const Inner { + // `connect` is the first field, so the pointer are the same. + ptr.cast() + } + + /// Same as [`ptr_from_conn_overlapped`] but for `Inner.read`. + unsafe fn ptr_from_read_overlapped(ptr: *mut OVERLAPPED) -> *const Inner { + // `read` is after `connect: Overlapped`. + (ptr as *mut Overlapped).wrapping_sub(1) as *const Inner + } + + /// Same as [`ptr_from_conn_overlapped`] but for `Inner.write`. + unsafe fn ptr_from_write_overlapped(ptr: *mut OVERLAPPED) -> *const Inner { + // `read` is after `connect: Overlapped` and `read: Overlapped`. + (ptr as *mut Overlapped).wrapping_sub(2) as *const Inner + } + + /// Issue a connection request with the specified overlapped operation. + /// + /// This function will issue a request to connect a client to this server, + /// returning immediately after starting the overlapped operation. + /// + /// If this function immediately succeeds then `Ok(true)` is returned. If + /// the overlapped operation is enqueued and pending, then `Ok(false)` is + /// returned. Otherwise an error is returned indicating what went wrong. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the + /// `overlapped` pointer is valid until the end of the I/O operation. The + /// kernel also requires that `overlapped` is unique for this I/O operation + /// and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that this pointer is + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool> { + if ConnectNamedPipe(self.handle.raw(), overlapped) != 0 { + return Ok(true); + } + + let err = io::Error::last_os_error(); + + match err.raw_os_error().map(|e| e as u32) { + Some(ERROR_PIPE_CONNECTED) => Ok(true), + Some(ERROR_NO_DATA) => Ok(true), + Some(ERROR_IO_PENDING) => Ok(false), + _ => Err(err), + } + } + + /// Disconnects this named pipe from any connected client. + pub fn disconnect(&self) -> io::Result<()> { + if unsafe { DisconnectNamedPipe(self.handle.raw()) } == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } + } + + /// Issues an overlapped read operation to occur on this pipe. + /// + /// This function will issue an asynchronous read to occur in an overlapped + /// fashion, returning immediately. The `buf` provided will be filled in + /// with data and the request is tracked by the `overlapped` function + /// provided. + /// + /// If the operation succeeds immediately, `Ok(Some(n))` is returned where + /// `n` is the number of bytes read. If an asynchronous operation is + /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred + /// it is returned. + /// + /// When this operation completes (or if it completes immediately), another + /// mechanism must be used to learn how many bytes were transferred (such as + /// looking at the filed in the IOCP status message). + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers to be valid until the end of the I/O operation. + /// The kernel also requires that `overlapped` is unique for this I/O + /// operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that the pointers are + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + pub unsafe fn read_overlapped( + &self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32; + let res = ReadFile( + self.handle.raw(), + buf.as_mut_ptr() as *mut _, + len, + std::ptr::null_mut(), + overlapped, + ); + if res == 0 { + let err = io::Error::last_os_error(); + if err.raw_os_error() != Some(ERROR_IO_PENDING as i32) { + return Err(err); + } + } + + let mut bytes = 0; + let res = GetOverlappedResult(self.handle.raw(), overlapped, &mut bytes, 0); + if res == 0 { + let err = io::Error::last_os_error(); + if err.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) { + Ok(None) + } else { + Err(err) + } + } else { + Ok(Some(bytes as usize)) + } + } + + /// Issues an overlapped write operation to occur on this pipe. + /// + /// This function will issue an asynchronous write to occur in an overlapped + /// fashion, returning immediately. The `buf` provided will be filled in + /// with data and the request is tracked by the `overlapped` function + /// provided. + /// + /// If the operation succeeds immediately, `Ok(Some(n))` is returned where + /// `n` is the number of bytes written. If an asynchronous operation is + /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred + /// it is returned. + /// + /// When this operation completes (or if it completes immediately), another + /// mechanism must be used to learn how many bytes were transferred (such as + /// looking at the filed in the IOCP status message). + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers to be valid until the end of the I/O operation. + /// The kernel also requires that `overlapped` is unique for this I/O + /// operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that the pointers are + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + pub unsafe fn write_overlapped( + &self, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32; + let res = WriteFile( + self.handle.raw(), + buf.as_ptr() as *const _, + len, + std::ptr::null_mut(), + overlapped, + ); + if res == 0 { + let err = io::Error::last_os_error(); + if err.raw_os_error() != Some(ERROR_IO_PENDING as i32) { + return Err(err); + } + } + + let mut bytes = 0; + let res = GetOverlappedResult(self.handle.raw(), overlapped, &mut bytes, 0); + if res == 0 { + let err = io::Error::last_os_error(); + if err.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) { + Ok(None) + } else { + Err(err) + } + } else { + Ok(Some(bytes as usize)) + } + } + + /// Calls the `GetOverlappedResult` function to get the result of an + /// overlapped operation for this handle. + /// + /// This function takes the `OVERLAPPED` argument which must have been used + /// to initiate an overlapped I/O operation, and returns either the + /// successful number of bytes transferred during the operation or an error + /// if one occurred. + /// + /// # Unsafety + /// + /// This function is unsafe as `overlapped` must have previously been used + /// to execute an operation for this handle, and it must also be a valid + /// pointer to an `Overlapped` instance. + #[inline] + unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize> { + let mut transferred = 0; + let r = GetOverlappedResult(self.handle.raw(), overlapped, &mut transferred, 0); + if r == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(transferred as usize) + } + } +} + +#[test] +fn ptr_from() { + use std::mem::ManuallyDrop; + use std::ptr; + + let pipe = unsafe { ManuallyDrop::new(NamedPipe::from_raw_handle(ptr::null_mut())) }; + let inner: &Inner = &pipe.inner; + assert_eq!( + inner as *const Inner, + unsafe { Inner::ptr_from_conn_overlapped(&inner.connect as *const _ as *mut OVERLAPPED) }, + "`ptr_from_conn_overlapped` incorrect" + ); + assert_eq!( + inner as *const Inner, + unsafe { Inner::ptr_from_read_overlapped(&inner.read as *const _ as *mut OVERLAPPED) }, + "`ptr_from_read_overlapped` incorrect" + ); + assert_eq!( + inner as *const Inner, + unsafe { Inner::ptr_from_write_overlapped(&inner.write as *const _ as *mut OVERLAPPED) }, + "`ptr_from_write_overlapped` incorrect" + ); +} + struct Io { // Uniquely identifies the selector associated with this named pipe cp: Option<Arc<CompletionPort>>, // Token used to identify events token: Option<Token>, read: State, - read_interest: bool, write: State, - write_interest: bool, connect_error: Option<io::Error>, } @@ -129,10 +355,30 @@ impl NamedPipe { /// Creates a new named pipe at the specified `addr` given a "reasonable /// set" of initial configuration options. pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> { - let pipe = pipe::NamedPipe::new(addr)?; - // Safety: nothing actually unsafe about this. The trait fn includes - // `unsafe`. - Ok(unsafe { NamedPipe::from_raw_handle(pipe.into_raw_handle()) }) + use std::os::windows::ffi::OsStrExt; + let name: Vec<_> = addr.as_ref().encode_wide().chain(Some(0)).collect(); + + // Safety: syscall + let h = unsafe { + CreateNamedPipeW( + name.as_ptr(), + PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE, + PIPE_UNLIMITED_INSTANCES, + 65536, + 65536, + 0, + std::ptr::null_mut(), + ) + }; + + if h == INVALID_HANDLE_VALUE { + Err(io::Error::last_os_error()) + } else { + // Safety: nothing actually unsafe about this. The trait fn includes + // `unsafe`. + Ok(unsafe { Self::from_raw_handle(h as RawHandle) }) + } } /// Attempts to call `ConnectNamedPipe`, if possible. @@ -167,7 +413,7 @@ impl NamedPipe { // internal state accordingly. let res = unsafe { let overlapped = self.inner.connect.as_ptr() as *mut _; - self.inner.handle.connect_overlapped(overlapped) + self.inner.connect_overlapped(overlapped) }; match res { @@ -219,7 +465,7 @@ impl NamedPipe { /// After a `disconnect` is issued, then a `connect` may be called again to /// connect to another client. pub fn disconnect(&self) -> io::Result<()> { - self.inner.handle.disconnect() + self.inner.disconnect() } } @@ -227,10 +473,7 @@ impl FromRawHandle for NamedPipe { unsafe fn from_raw_handle(handle: RawHandle) -> NamedPipe { NamedPipe { inner: Arc::new(Inner { - // Safety: not really unsafe - handle: pipe::NamedPipe::from_raw_handle(handle), - // transmutes to straddle winapi versions (mio 0.6 is on an - // older winapi) + handle: Handle::new(handle as HANDLE), connect: Overlapped::new(connect_done), connecting: AtomicBool::new(false), read: Overlapped::new(read_done), @@ -239,9 +482,7 @@ impl FromRawHandle for NamedPipe { cp: None, token: None, read: State::None, - read_interest: false, write: State::None, - write_interest: false, connect_error: None, }), pool: Mutex::new(BufferPool::with_capacity(2)), @@ -356,12 +597,7 @@ impl<'a> Write for &'a NamedPipe { } impl Source for NamedPipe { - fn register( - &mut self, - registry: &Registry, - token: Token, - interest: Interest, - ) -> io::Result<()> { + fn register(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()> { let mut io = self.inner.io.lock().unwrap(); io.check_association(registry, false)?; @@ -374,18 +610,15 @@ impl Source for NamedPipe { } if io.cp.is_none() { - io.cp = Some(poll::selector(registry).clone_port()); + let selector = registry.selector(); + + io.cp = Some(selector.clone_port()); let inner_token = NEXT_TOKEN.fetch_add(2, Relaxed) + 2; - poll::selector(registry) - .inner - .cp - .add_handle(inner_token, &self.inner.handle)?; + selector.inner.cp.add_handle(inner_token, self)?; } io.token = Some(token); - io.read_interest = interest.is_readable(); - io.write_interest = interest.is_writable(); drop(io); Inner::post_register(&self.inner, None); @@ -393,19 +626,12 @@ impl Source for NamedPipe { Ok(()) } - fn reregister( - &mut self, - registry: &Registry, - token: Token, - interest: Interest, - ) -> io::Result<()> { + fn reregister(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()> { let mut io = self.inner.io.lock().unwrap(); io.check_association(registry, true)?; io.token = Some(token); - io.read_interest = interest.is_readable(); - io.write_interest = interest.is_writable(); drop(io); Inner::post_register(&self.inner, None); @@ -432,7 +658,7 @@ impl Source for NamedPipe { impl AsRawHandle for NamedPipe { fn as_raw_handle(&self) -> RawHandle { - self.inner.handle.as_raw_handle() + self.inner.handle.raw() as RawHandle } } @@ -452,12 +678,8 @@ impl Drop for NamedPipe { } let io = self.inner.io.lock().unwrap(); - - match io.read { - State::Pending(..) => { - drop(cancel(&self.inner.handle, &self.inner.read)); - } - _ => {} + if let State::Pending(..) = io.read { + drop(cancel(&self.inner.handle, &self.inner.read)); } } } @@ -483,7 +705,7 @@ impl Inner { let e = unsafe { let overlapped = me.read.as_ptr() as *mut _; let slice = slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity()); - me.handle.read_overlapped(slice, overlapped) + me.read_overlapped(slice, overlapped) }; match e { @@ -522,7 +744,7 @@ impl Inner { // Very similar to `schedule_read` above, just done for the write half. let e = unsafe { let overlapped = me.write.as_ptr() as *mut _; - me.handle.write_overlapped(&buf[pos..], overlapped) + me.write_overlapped(&buf[pos..], overlapped) }; // See `connect` above for the rationale behind `forget` @@ -572,7 +794,8 @@ impl Inner { fn post_register(me: &Arc<Inner>, mut events: Option<&mut Vec<Event>>) { let mut io = me.io.lock().unwrap(); - if Inner::schedule_read(&me, &mut io, events.as_mut().map(|ptr| &mut **ptr)) { + #[allow(clippy::needless_option_as_deref)] + if Inner::schedule_read(me, &mut io, events.as_deref_mut()) { if let State::None = io.write { io.notify_writable(events); } @@ -588,8 +811,8 @@ impl Inner { } } -unsafe fn cancel<T: AsRawHandle>(handle: &T, overlapped: &Overlapped) -> io::Result<()> { - let ret = CancelIoEx(handle.as_raw_handle(), overlapped.as_ptr() as *mut _); +unsafe fn cancel(handle: &Handle, overlapped: &Overlapped) -> io::Result<()> { + let ret = CancelIoEx(handle.raw(), overlapped.as_ptr()); // `CancelIoEx` returns 0 on error: // https://docs.microsoft.com/en-us/windows/win32/fileio/cancelioex-func if ret == 0 { @@ -605,7 +828,7 @@ fn connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) { // Acquire the `Arc<Inner>`. Note that we should be guaranteed that // the refcount is available to us due to the `mem::forget` in // `connect` above. - let me = unsafe { overlapped2arc!(status.overlapped(), Inner, connect) }; + let me = unsafe { Arc::from_raw(Inner::ptr_from_conn_overlapped(status.overlapped())) }; // Flag ourselves as no longer using the `connect` overlapped instances. let prev = me.connecting.swap(false, SeqCst); @@ -614,7 +837,7 @@ fn connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) { // Stash away our connect error if one happened debug_assert_eq!(status.bytes_transferred(), 0); unsafe { - match me.handle.result(status.overlapped()) { + match me.result(status.overlapped()) { Ok(n) => debug_assert_eq!(n, 0), Err(e) => me.io.lock().unwrap().connect_error = Some(e), } @@ -631,7 +854,7 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) { // Acquire the `FromRawArc<Inner>`. Note that we should be guaranteed that // the refcount is available to us due to the `mem::forget` in // `schedule_read` above. - let me = unsafe { overlapped2arc!(status.overlapped(), Inner, read) }; + let me = unsafe { Arc::from_raw(Inner::ptr_from_read_overlapped(status.overlapped())) }; // Move from the `Pending` to `Ok` state. let mut io = me.io.lock().unwrap(); @@ -640,7 +863,7 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) { _ => unreachable!(), }; unsafe { - match me.handle.result(status.overlapped()) { + match me.result(status.overlapped()) { Ok(n) => { debug_assert_eq!(status.bytes_transferred() as usize, n); buf.set_len(status.bytes_transferred() as usize); @@ -663,7 +886,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) { // Acquire the `Arc<Inner>`. Note that we should be guaranteed that // the refcount is available to us due to the `mem::forget` in // `schedule_write` above. - let me = unsafe { overlapped2arc!(status.overlapped(), Inner, write) }; + let me = unsafe { Arc::from_raw(Inner::ptr_from_write_overlapped(status.overlapped())) }; // Make the state change out of `Pending`. If we wrote the entire buffer // then we're writable again and otherwise we schedule another write. @@ -680,7 +903,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) { }; unsafe { - match me.handle.result(status.overlapped()) { + match me.result(status.overlapped()) { Ok(n) => { debug_assert_eq!(status.bytes_transferred() as usize, n); let new_pos = pos + (status.bytes_transferred() as usize); @@ -703,7 +926,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) { impl Io { fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> { match self.cp { - Some(ref cp) if !poll::selector(registry).same_port(cp) => Err(io::Error::new( + Some(ref cp) if !registry.selector().same_port(cp) => Err(io::Error::new( io::ErrorKind::AlreadyExists, "I/O source already registered with a different `Registry`", )), |