aboutsummaryrefslogtreecommitdiff
path: root/src/sys/windows/named_pipe.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sys/windows/named_pipe.rs')
-rw-r--r--src/sys/windows/named_pipe.rs131
1 files changed, 96 insertions, 35 deletions
diff --git a/src/sys/windows/named_pipe.rs b/src/sys/windows/named_pipe.rs
index a5688ce..8c81f38 100644
--- a/src/sys/windows/named_pipe.rs
+++ b/src/sys/windows/named_pipe.rs
@@ -1,6 +1,6 @@
-use crate::{poll, Registry};
use crate::event::Source;
use crate::sys::windows::{Event, Overlapped};
+use crate::{poll, Registry};
use winapi::um::minwinbase::OVERLAPPED_ENTRY;
use std::ffi::OsStr;
@@ -9,8 +9,8 @@ use std::io::{self, Read, Write};
use std::mem;
use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
use std::slice;
-use std::sync::atomic::{AtomicUsize, AtomicBool};
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
+use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::{Arc, Mutex};
use crate::{Interest, Token};
@@ -128,9 +128,7 @@ fn would_block() -> io::Error {
impl NamedPipe {
/// Creates a new named pipe at the specified `addr` given a "reasonable
/// set" of initial configuration options.
- pub fn new<A: AsRef<OsStr>>(
- addr: A,
- ) -> io::Result<NamedPipe> {
+ pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
let pipe = pipe::NamedPipe::new(addr)?;
// Safety: nothing actually unsafe about this. The trait fn includes
// `unsafe`.
@@ -226,9 +224,7 @@ impl NamedPipe {
}
impl FromRawHandle for NamedPipe {
- unsafe fn from_raw_handle(
- handle: RawHandle,
- ) -> NamedPipe {
+ unsafe fn from_raw_handle(handle: RawHandle) -> NamedPipe {
NamedPipe {
inner: Arc::new(Inner {
// Safety: not really unsafe
@@ -281,9 +277,7 @@ impl<'a> Read for &'a NamedPipe {
match mem::replace(&mut state.read, State::None) {
// In theory not possible with `token` checked above,
// but return would block for now.
- State::None => {
- Err(would_block())
- }
+ State::None => Err(would_block()),
// A read is in flight, still waiting for it to finish
State::Pending(buf, amt) => {
@@ -324,7 +318,7 @@ impl<'a> Read for &'a NamedPipe {
}
impl<'a> Write for &'a NamedPipe {
- fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// Make sure there's no writes pending
let mut io = self.inner.io.lock().unwrap();
@@ -334,6 +328,12 @@ impl<'a> Write for &'a NamedPipe {
match io.write {
State::None => {}
+ State::Err(_) => match mem::replace(&mut io.write, State::None) {
+ State::Err(e) => return Err(e),
+ // `io` is locked, so this branch is unreachable
+ _ => unreachable!(),
+ },
+ // any other state should be handled in `write_done`
_ => {
return Err(would_block());
}
@@ -342,17 +342,26 @@ impl<'a> Write for &'a NamedPipe {
// Move `buf` onto the heap and fire off the write
let mut owned_buf = self.inner.get_buffer();
owned_buf.extend(buf);
- Inner::schedule_write(&self.inner, owned_buf, 0, &mut io, None);
- Ok(buf.len())
+ match Inner::maybe_schedule_write(&self.inner, owned_buf, 0, &mut io)? {
+ // Some bytes are written immediately
+ Some(n) => Ok(n),
+ // Write operation is anqueued for whole buffer
+ None => Ok(buf.len()),
+ }
}
- fn flush(&mut self) -> io::Result<()> {
- Ok(())
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
}
}
impl Source for NamedPipe {
- fn register(&mut self, registry: &Registry, token: Token, interest: Interest) -> io::Result<()> {
+ fn register(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interest: Interest,
+ ) -> io::Result<()> {
let mut io = self.inner.io.lock().unwrap();
io.check_association(registry, false)?;
@@ -368,7 +377,10 @@ impl Source for NamedPipe {
io.cp = Some(poll::selector(registry).clone_port());
let inner_token = NEXT_TOKEN.fetch_add(2, Relaxed) + 2;
- poll::selector(registry).inner.cp.add_handle(inner_token, &self.inner.handle)?;
+ poll::selector(registry)
+ .inner
+ .cp
+ .add_handle(inner_token, &self.inner.handle)?;
}
io.token = Some(token);
@@ -381,7 +393,12 @@ impl Source for NamedPipe {
Ok(())
}
- fn reregister(&mut self, registry: &Registry, token: Token, interest: Interest) -> io::Result<()> {
+ fn reregister(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interest: Interest,
+ ) -> io::Result<()> {
let mut io = self.inner.io.lock().unwrap();
io.check_association(registry, true)?;
@@ -491,19 +508,61 @@ impl Inner {
}
}
- fn schedule_write(me: &Arc<Inner>, buf: Vec<u8>, pos: usize, io: &mut Io, events: Option<&mut Vec<Event>>) {
+ /// Maybe schedules overlapped write operation.
+ ///
+ /// * `None` means that overlapped operation was enqueued
+ /// * `Some(n)` means that `n` bytes was immediately written.
+ /// Note, that `write_done` will fire anyway to clean up the state.
+ fn maybe_schedule_write(
+ me: &Arc<Inner>,
+ buf: Vec<u8>,
+ pos: usize,
+ io: &mut Io,
+ ) -> io::Result<Option<usize>> {
// Very similar to `schedule_read` above, just done for the write half.
let e = unsafe {
let overlapped = me.write.as_ptr() as *mut _;
me.handle.write_overlapped(&buf[pos..], overlapped)
};
+ // See `connect` above for the rationale behind `forget`
match e {
- // See `connect` above for the rationale behind `forget`
- Ok(_) => {
+ // `n` bytes are written immediately
+ Ok(Some(n)) => {
+ io.write = State::Ok(buf, pos);
+ mem::forget(me.clone());
+ Ok(Some(n))
+ }
+ // write operation is enqueued
+ Ok(None) => {
io.write = State::Pending(buf, pos);
- mem::forget(me.clone())
+ mem::forget(me.clone());
+ Ok(None)
}
+ Err(e) => Err(e),
+ }
+ }
+
+ fn schedule_write(
+ me: &Arc<Inner>,
+ buf: Vec<u8>,
+ pos: usize,
+ io: &mut Io,
+ events: Option<&mut Vec<Event>>,
+ ) {
+ match Inner::maybe_schedule_write(me, buf, pos, io) {
+ Ok(Some(_)) => {
+ // immediate result will be handled in `write_done`,
+ // so we'll reinterpret the `Ok` state
+ let state = mem::replace(&mut io.write, State::None);
+ io.write = match state {
+ State::Ok(buf, pos) => State::Pending(buf, pos),
+ // io is locked, so this branch is unreachable
+ _ => unreachable!(),
+ };
+ mem::forget(me.clone());
+ }
+ Ok(None) => (),
Err(e) => {
io.write = State::Err(e);
io.notify_writable(events);
@@ -610,6 +669,12 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// then we're writable again and otherwise we schedule another write.
let mut io = me.io.lock().unwrap();
let (buf, pos) = match mem::replace(&mut io.write, State::None) {
+ // `Ok` here means, that the operation was completed immediately
+ // `bytes_transferred` is already reported to a client
+ State::Ok(..) => {
+ io.notify_writable(events);
+ return;
+ }
State::Pending(buf, pos) => (buf, pos),
_ => unreachable!(),
};
@@ -638,18 +703,14 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
impl Io {
fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> {
match self.cp {
- Some(ref cp) if !poll::selector(registry).same_port(cp) => {
- Err(io::Error::new(
- io::ErrorKind::AlreadyExists,
- "I/O source already registered with a different `Registry`"
- ))
- }
- None if required => {
- Err(io::Error::new(
- io::ErrorKind::NotFound,
- "I/O source not registered with `Registry`"
- ))
- }
+ Some(ref cp) if !poll::selector(registry).same_port(cp) => Err(io::Error::new(
+ io::ErrorKind::AlreadyExists,
+ "I/O source already registered with a different `Registry`",
+ )),
+ None if required => Err(io::Error::new(
+ io::ErrorKind::NotFound,
+ "I/O source not registered with `Registry`",
+ )),
_ => Ok(()),
}
}