From 3a1a50620397b347db105e65b3cab650c2c7bb72 Mon Sep 17 00:00:00 2001 From: Haibo Huang Date: Mon, 2 Nov 2020 18:45:36 -0800 Subject: Upgrade rust/crates/mio to 0.7.5 Test: make Change-Id: I0b3424bb3d27a0db04cd91b8573fca44e311c491 --- .cargo_vcs_info.json | 2 +- CHANGELOG.md | 15 ++ Cargo.lock | 11 +- Cargo.toml | 8 +- Cargo.toml.orig | 6 +- METADATA | 8 +- examples/tcp_server.rs | 24 ++- src/interest.rs | 26 ++- src/lib.rs | 32 +++- src/macros/mod.rs | 32 +++- src/net/tcp/socket.rs | 26 +++ src/poll.rs | 9 + src/sys/mod.rs | 6 +- src/sys/shell/mod.rs | 2 +- src/sys/shell/selector.rs | 7 +- src/sys/shell/tcp.rs | 18 ++ src/sys/unix/mod.rs | 6 +- src/sys/unix/pipe.rs | 383 +++++++++++++++++++++++++++++++++++++ src/sys/unix/selector/epoll.rs | 15 +- src/sys/unix/selector/kqueue.rs | 15 +- src/sys/unix/tcp.rs | 58 ++++++ src/sys/windows/afd.rs | 2 +- src/sys/windows/io_status_block.rs | 2 +- src/sys/windows/mod.rs | 6 +- src/sys/windows/selector.rs | 26 ++- src/sys/windows/tcp.rs | 48 ++++- src/waker.rs | 4 +- 27 files changed, 738 insertions(+), 59 deletions(-) create mode 100644 src/sys/unix/pipe.rs diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 24a92d4..3096345 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "d40494bf6a9d5437207671282aa1be6b93a964cd" + "sha1": "27fbd5f04bb5f52a4d1c358cf0c04c6074a3d46b" } } diff --git a/CHANGELOG.md b/CHANGELOG.md index 5827f0c..1ca5125 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,18 @@ +# 0.7.5 + +## Added + +* `TcpSocket::get_localaddr()` retrieves local address + (https://github.com/tokio-rs/mio/commit/b41a022b2242eef1969c70c8ba93e04c528dba47) +* `TcpSocket::set_reuseport()` & `TcpSocket::get_reuseport()` configures and reads SO_REUSEPORT + (https://github.com/tokio-rs/mio/commit/183bbe409ab69cbf9db41d0263b41ec86202d9a0) +* `unix:pipe()` a wrapper around pipe(2) sys call + (https://github.com/tokio-rs/mio/commit/2b7c0967a7362303946deb3d4ca2ae507af6c72d) +* Add a check that a single Waker is active per Poll instance (only in debug mode) + (https://github.com/tokio-rs/mio/commit/f4874f28b32efcf4841691884c65a89734d96a56) +* Added `Interest:remove()` + (https://github.com/tokio-rs/mio/commit/b8639c3d9ac07bb7e2e27685680c8a6510fa1357) + # 0.7.4 ## Fixes diff --git a/Cargo.lock b/Cargo.lock index df276dc..2a0e767 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,9 +23,9 @@ checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" [[package]] name = "libc" -version = "0.2.73" +version = "0.2.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd7d4bd64732af4bf3a67f367c27df8520ad7e230c5817b8ff485864d80242b9" +checksum = "4d58d1b70b004888f764dfbf6a26a3b0342a1632d33968e4a179d8011c760614" [[package]] name = "log" @@ -38,7 +38,7 @@ dependencies = [ [[package]] name = "mio" -version = "0.7.4" +version = "0.7.5" dependencies = [ "env_logger", "libc", @@ -46,7 +46,6 @@ dependencies = [ "miow", "ntapi", "rand", - "socket2", "winapi", ] @@ -62,9 +61,9 @@ dependencies = [ [[package]] name = "ntapi" -version = "0.3.4" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a31937dea023539c72ddae0e3571deadc1414b300483fa7aaec176168cfa9d2" +checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" dependencies = [ "winapi", ] diff --git a/Cargo.toml b/Cargo.toml index 8045b6c..a490943 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,12 +13,12 @@ [package] edition = "2018" name = "mio" -version = "0.7.4" +version = "0.7.5" authors = ["Carl Lerche "] include = ["Cargo.toml", "LICENSE", "README.md", "CHANGELOG.md", "src/**/*.rs", "examples/**/*.rs"] description = "Lightweight non-blocking IO" homepage = "https://github.com/tokio-rs/mio" -documentation = "https://docs.rs/mio/0.7.4" +documentation = "https://docs.rs/mio/0.7.5" readme = "README.md" keywords = ["io", "async", "non-blocking"] categories = ["asynchronous"] @@ -47,14 +47,12 @@ default-features = false [dev-dependencies.rand] version = "0.4" -[dev-dependencies.socket2] -version = "0.3.15" - [features] default = [] extra-docs = [] os-poll = [] os-util = [] +pipe = ["os-poll"] tcp = [] udp = [] uds = [] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index ce99d1c..e01d453 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -6,11 +6,11 @@ name = "mio" # - Update CHANGELOG.md. # - Update doc URL. # - Create git tag -version = "0.7.4" +version = "0.7.5" license = "MIT" authors = ["Carl Lerche "] description = "Lightweight non-blocking IO" -documentation = "https://docs.rs/mio/0.7.4" +documentation = "https://docs.rs/mio/0.7.5" homepage = "https://github.com/tokio-rs/mio" repository = "https://github.com/tokio-rs/mio" readme = "README.md" @@ -29,6 +29,7 @@ include = [ default = [] os-poll = [] os-util = [] +pipe = ["os-poll"] tcp = [] udp = [] uds = [] @@ -48,7 +49,6 @@ ntapi = "0.3" [dev-dependencies] env_logger = { version = "0.6.2", default-features = false } rand = "0.4" -socket2 = "0.3.15" [package.metadata.docs.rs] all-features = true diff --git a/METADATA b/METADATA index 34c31cd..a63bcc3 100644 --- a/METADATA +++ b/METADATA @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/mio/mio-0.7.4.crate" + value: "https://static.crates.io/crates/mio/mio-0.7.5.crate" } - version: "0.7.4" + version: "0.7.5" license_type: NOTICE last_upgrade_date { year: 2020 - month: 10 - day: 26 + month: 11 + day: 2 } } diff --git a/examples/tcp_server.rs b/examples/tcp_server.rs index dc871f3..42426ee 100644 --- a/examples/tcp_server.rs +++ b/examples/tcp_server.rs @@ -128,18 +128,23 @@ fn handle_connection_event( if event.is_readable() { let mut connection_closed = false; - let mut received_data = Vec::with_capacity(4096); + let mut received_data = vec![0; 4096]; + let mut bytes_read = 0; // We can (maybe) read from the connection. loop { - let mut buf = [0; 256]; - match connection.read(&mut buf) { + match connection.read(&mut received_data[bytes_read..]) { Ok(0) => { // Reading 0 bytes means the other side has closed the // connection or is done writing, then so are we. connection_closed = true; break; } - Ok(n) => received_data.extend_from_slice(&buf[..n]), + Ok(n) => { + bytes_read += n; + if bytes_read == received_data.len() { + received_data.resize(received_data.len() + 1024, 0); + } + } // Would block "errors" are the OS's way of saying that the // connection is not actually ready to perform this I/O operation. Err(ref err) if would_block(err) => break, @@ -149,10 +154,13 @@ fn handle_connection_event( } } - if let Ok(str_buf) = from_utf8(&received_data) { - println!("Received data: {}", str_buf.trim_end()); - } else { - println!("Received (none UTF-8) data: {:?}", &received_data); + if bytes_read != 0 { + let received_data = &received_data[..bytes_read]; + if let Ok(str_buf) = from_utf8(received_data) { + println!("Received data: {}", str_buf.trim_end()); + } else { + println!("Received (none UTF-8) data: {:?}", received_data); + } } if connection_closed { diff --git a/src/interest.rs b/src/interest.rs index 6bc5929..ee5158a 100644 --- a/src/interest.rs +++ b/src/interest.rs @@ -8,7 +8,7 @@ use std::{fmt, ops}; /// registered with [readable] interests and the socket becomes writable, no /// event will be returned from a call to [`poll`]. /// -/// [registering]: struct.Registry.html#method.reregister +/// [registering]: struct.Registry.html#method.register /// [`event::Source`]: ./event/trait.Source.html /// [`Poll`]: struct.Poll.html /// [readable]: struct.Interest.html#associatedconstant.READABLE @@ -70,6 +70,30 @@ impl Interest { Interest(unsafe { NonZeroU8::new_unchecked(self.0.get() | other.0.get()) }) } + /// Removes `other` `Interest` from `self`. + /// + /// Returns `None` if the set would be empty after removing `other`. + /// + /// ``` + /// use mio::Interest; + /// + /// const RW_INTERESTS: Interest = Interest::READABLE.add(Interest::WRITABLE); + /// + /// // As long a one interest remain this will return `Some`. + /// let w_interest = RW_INTERESTS.remove(Interest::READABLE).unwrap(); + /// assert!(!w_interest.is_readable()); + /// assert!(w_interest.is_writable()); + /// + /// // Removing all interests from the set will return `None`. + /// assert_eq!(w_interest.remove(Interest::WRITABLE), None); + /// + /// // Its also possible to remove multiple interests at once. + /// assert_eq!(RW_INTERESTS.remove(RW_INTERESTS), None); + /// ``` + pub fn remove(self, other: Interest) -> Option { + NonZeroU8::new(self.0.get() & !other.0.get()).map(Interest) + } + /// Returns true if the value includes readable readiness. pub const fn is_readable(self) -> bool { (self.0.get() & READABLE) != 0 diff --git a/src/lib.rs b/src/lib.rs index 6e98c87..332ee24 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -#![doc(html_root_url = "https://docs.rs/mio/0.7.4")] +#![doc(html_root_url = "https://docs.rs/mio/0.7.5")] #![deny( missing_docs, missing_debug_implementations, @@ -69,9 +69,11 @@ mod waker; pub mod event; -cfg_net! { +cfg_io_source! { mod io_source; +} +cfg_net! { pub mod net; } @@ -82,11 +84,27 @@ pub use poll::{Poll, Registry}; pub use token::Token; pub use waker::Waker; -#[cfg(all(unix, feature = "os-util"))] -#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "os-util"))))] +#[cfg(all(unix, any(feature = "os-util", feature = "pipe")))] +#[cfg_attr( + docsrs, + doc(cfg(all(unix, any(feature = "os-util", feature = "pipe")))) +)] pub mod unix { //! Unix only extensions. + + #[cfg(feature = "os-util")] + #[cfg_attr(docsrs, doc(cfg(all(unix, feature = "os-util"))))] pub use crate::sys::SourceFd; + + cfg_pipe! { + pub mod pipe { + //! Unix pipe. + //! + //! See the [`new`] function for documentation. + + pub use crate::sys::pipe::{new, Receiver, Sender}; + } + } } #[cfg(all(windows, feature = "os-util"))] @@ -121,6 +139,12 @@ pub mod features { //! `os-util` enables additional OS specific facilities. Currently this //! means the `unix` module (with `SourceFd`) becomes available. //! + #![cfg_attr(feature = "pipe", doc = "## `pipe` (enabled)")] + #![cfg_attr(not(feature = "pipe"), doc = "## `pipe` (disabled)")] + //! + //! The `pipe` feature adds `unix::pipe`, and related types, a non-blocking + //! wrapper around the `pipe(2)` system call. + //! //! ## Network types //! //! Mio provide three features to enable network types: diff --git a/src/macros/mod.rs b/src/macros/mod.rs index 7db2579..2275ed9 100644 --- a/src/macros/mod.rs +++ b/src/macros/mod.rs @@ -36,6 +36,18 @@ macro_rules! cfg_net { } } +/// One of the features enabled that needs `IoSource`. That is `tcp`, or `udp`, +/// or on Unix `uds` or `pipe`. +macro_rules! cfg_io_source { + ($($item:item)*) => { + $( + #[cfg(any(feature = "tcp", feature = "udp", all(unix, any(feature = "uds", feature = "pipe"))))] + #[cfg_attr(docsrs, doc(any(feature = "tcp", feature = "udp", all(unix, any(feature = "uds", feature = "pipe")))))] + $item + )* + } +} + /// One of the `tcp`, `udp` features enabled. #[cfg(windows)] macro_rules! cfg_net { @@ -82,13 +94,25 @@ macro_rules! cfg_uds { } } +/// Feature `pipe` enabled. +#[cfg(unix)] +macro_rules! cfg_pipe { + ($($item:item)*) => { + $( + #[cfg(feature = "pipe")] + #[cfg_attr(docsrs, doc(cfg(feature = "pipe")))] + $item + )* + } +} + /// Feature `os-util` enabled, or one of the features that need `os-util`. #[cfg(unix)] macro_rules! cfg_any_os_util { ($($item:item)*) => { $( - #[cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "uds"))] - #[cfg_attr(docsrs, doc(cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "uds"))))] + #[cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "uds", feature = "pipe"))] + #[cfg_attr(docsrs, doc(cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "uds", feature = "pipe"))))] $item )* } @@ -99,8 +123,8 @@ macro_rules! cfg_any_os_util { macro_rules! cfg_any_os_util { ($($item:item)*) => { $( - #[cfg(any(feature = "os-util", feature = "tcp", feature = "udp"))] - #[cfg_attr(docsrs, doc(cfg(any(feature = "os-util", feature = "tcp", feature = "udp"))))] + #[cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "pipe"))] + #[cfg_attr(docsrs, doc(cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "pipe"))))] $item )* } diff --git a/src/net/tcp/socket.rs b/src/net/tcp/socket.rs index a91f665..f3e27c3 100644 --- a/src/net/tcp/socket.rs +++ b/src/net/tcp/socket.rs @@ -82,10 +82,36 @@ impl TcpSocket { sys::tcp::set_reuseaddr(self.sys, reuseaddr) } + /// Get the value of `SO_REUSEADDR` set on this socket. + pub fn get_reuseaddr(&self) -> io::Result { + sys::tcp::get_reuseaddr(self.sys) + } + + /// Sets the value of `SO_REUSEPORT` on this socket. + /// Only supported available in unix + #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))] + pub fn set_reuseport(&self, reuseport: bool) -> io::Result<()> { + sys::tcp::set_reuseport(self.sys, reuseport) + } + + /// Get the value of `SO_REUSEPORT` set on this socket. + /// Only supported available in unix + #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))] + pub fn get_reuseport(&self) -> io::Result { + sys::tcp::get_reuseport(self.sys) + } + /// Sets the value of `SO_LINGER` on this socket. pub fn set_linger(&self, dur: Option) -> io::Result<()> { sys::tcp::set_linger(self.sys, dur) } + + /// Returns the local address of this socket + /// + /// Will return `Err` result in windows if called before calling `bind` + pub fn get_localaddr(&self) -> io::Result { + sys::tcp::get_localaddr(self.sys) + } } impl Drop for TcpSocket { diff --git a/src/poll.rs b/src/poll.rs index 6446281..7ff2038 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -613,6 +613,15 @@ impl Registry { .try_clone() .map(|selector| Registry { selector }) } + + /// Internal check to ensure only a single `Waker` is active per [`Poll`] + /// instance. + #[cfg(debug_assertions)] + pub(crate) fn register_waker(&self) { + if self.selector.register_waker() { + panic!("Only a single `Waker` can be active per `Poll` instance"); + } + } } impl fmt::Debug for Registry { diff --git a/src/sys/mod.rs b/src/sys/mod.rs index 8852333..08bd271 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -72,7 +72,11 @@ cfg_os_poll! { pub(crate) use self::unix::uds; } - cfg_net! { + cfg_pipe! { + pub(crate) use self::unix::pipe; + } + + cfg_io_source! { pub(crate) use self::unix::IoSourceState; } } diff --git a/src/sys/shell/mod.rs b/src/sys/shell/mod.rs index 8303797..a63760a 100644 --- a/src/sys/shell/mod.rs +++ b/src/sys/shell/mod.rs @@ -23,7 +23,7 @@ cfg_uds! { pub(crate) mod uds; } -cfg_net! { +cfg_io_source! { use std::io; #[cfg(windows)] use std::os::windows::io::RawSocket; diff --git a/src/sys/shell/selector.rs b/src/sys/shell/selector.rs index 0e0c031..69be370 100644 --- a/src/sys/shell/selector.rs +++ b/src/sys/shell/selector.rs @@ -18,6 +18,11 @@ impl Selector { pub fn select(&self, _: &mut Events, _: Option) -> io::Result<()> { os_required!(); } + + #[cfg(debug_assertions)] + pub fn register_waker(&self) -> bool { + os_required!(); + } } #[cfg(unix)] @@ -39,7 +44,7 @@ cfg_any_os_util! { } } -cfg_net! { +cfg_io_source! { #[cfg(debug_assertions)] impl Selector { pub fn id(&self) -> usize { diff --git a/src/sys/shell/tcp.rs b/src/sys/shell/tcp.rs index de1520b..3073d42 100644 --- a/src/sys/shell/tcp.rs +++ b/src/sys/shell/tcp.rs @@ -32,6 +32,20 @@ pub(crate) fn set_reuseaddr(_: TcpSocket, _: bool) -> io::Result<()> { os_required!(); } +pub(crate) fn get_reuseaddr(_: TcpSocket) -> io::Result { + os_required!(); +} + +#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))] +pub(crate) fn set_reuseport(_: TcpSocket, _: bool) -> io::Result<()> { + os_required!(); +} + +#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))] +pub(crate) fn get_reuseport(_: TcpSocket) -> io::Result { + os_required!(); +} + pub(crate) fn set_linger(_: TcpSocket, _: Option) -> io::Result<()> { os_required!(); } @@ -39,3 +53,7 @@ pub(crate) fn set_linger(_: TcpSocket, _: Option) -> io::Result<()> { pub fn accept(_: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> { os_required!(); } + +pub(crate) fn get_localaddr(_: TcpSocket) -> io::Result { + os_required!(); +} diff --git a/src/sys/unix/mod.rs b/src/sys/unix/mod.rs index 96d7f4d..f045fb5 100644 --- a/src/sys/unix/mod.rs +++ b/src/sys/unix/mod.rs @@ -38,7 +38,7 @@ cfg_os_poll! { pub use self::uds::SocketAddr; } - cfg_net! { + cfg_io_source! { use std::io; // Both `kqueue` and `epoll` don't need to hold any user space state. @@ -59,6 +59,10 @@ cfg_os_poll! { } } } + + cfg_pipe! { + pub(crate) mod pipe; + } } cfg_not_os_poll! { diff --git a/src/sys/unix/pipe.rs b/src/sys/unix/pipe.rs new file mode 100644 index 0000000..d838ebc --- /dev/null +++ b/src/sys/unix/pipe.rs @@ -0,0 +1,383 @@ +//! Unix pipe. +//! +//! See the [`new`] function for documentation. + +use std::fs::File; +use std::io::{self, IoSlice, IoSliceMut, Read, Write}; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::process::{ChildStderr, ChildStdin, ChildStdout}; + +use crate::io_source::IoSource; +use crate::{event, Interest, Registry, Token}; + +/// Create a new non-blocking Unix pipe. +/// +/// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as +/// inter-process or thread communication channel. +/// +/// This channel may be created before forking the process and then one end used +/// in each process, e.g. the parent process has the sending end to send command +/// to the child process. +/// +/// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html +/// +/// # Events +/// +/// The [`Sender`] can be registered with [`WRITABLE`] interest to receive +/// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is +/// written to the `Sender` the `Receiver` will receive an [readable event]. +/// +/// In addition to those events, events will also be generated if the other side +/// is dropped. To check if the `Sender` is dropped you'll need to check +/// [`is_read_closed`] on events for the `Receiver`, if it returns true the +/// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it +/// returns true the `Receiver` was dropped. Also see the second example below. +/// +/// [`WRITABLE`]: Interest::WRITABLE +/// [writable events]: event::Event::is_writable +/// [`READABLE`]: Interest::READABLE +/// [readable event]: event::Event::is_readable +/// [`is_read_closed`]: event::Event::is_read_closed +/// [`is_write_closed`]: event::Event::is_write_closed +/// +/// # Deregistering +/// +/// Both `Sender` and `Receiver` will deregister themselves when dropped, +/// **iff** the file descriptors are not duplicated (via [`dup(2)`]). +/// +/// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html +/// +/// # Examples +/// +/// Simple example that writes data into the sending end and read it from the +/// receiving end. +/// +/// ``` +/// use std::io::{self, Read, Write}; +/// +/// use mio::{Poll, Events, Interest, Token}; +/// use mio::unix::pipe; +/// +/// // Unique tokens for the two ends of the channel. +/// const PIPE_RECV: Token = Token(0); +/// const PIPE_SEND: Token = Token(1); +/// +/// # fn main() -> io::Result<()> { +/// // Create our `Poll` instance and the `Events` container. +/// let mut poll = Poll::new()?; +/// let mut events = Events::with_capacity(8); +/// +/// // Create a new pipe. +/// let (mut sender, mut receiver) = pipe::new()?; +/// +/// // Register both ends of the channel. +/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; +/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; +/// +/// const MSG: &[u8; 11] = b"Hello world"; +/// +/// loop { +/// poll.poll(&mut events, None)?; +/// +/// for event in events.iter() { +/// match event.token() { +/// PIPE_SEND => sender.write(MSG) +/// .and_then(|n| if n != MSG.len() { +/// // We'll consider a short write an error in this +/// // example. NOTE: we can't use `write_all` with +/// // non-blocking I/O. +/// Err(io::ErrorKind::WriteZero.into()) +/// } else { +/// Ok(()) +/// })?, +/// PIPE_RECV => { +/// let mut buf = [0; 11]; +/// let n = receiver.read(&mut buf)?; +/// println!("received: {:?}", &buf[0..n]); +/// assert_eq!(n, MSG.len()); +/// assert_eq!(&buf, &*MSG); +/// return Ok(()); +/// }, +/// _ => unreachable!(), +/// } +/// } +/// } +/// # } +/// ``` +/// +/// Example that receives an event once the `Sender` is dropped. +/// +/// ``` +/// # use std::io; +/// # +/// # use mio::{Poll, Events, Interest, Token}; +/// # use mio::unix::pipe; +/// # +/// # const PIPE_RECV: Token = Token(0); +/// # const PIPE_SEND: Token = Token(1); +/// # +/// # fn main() -> io::Result<()> { +/// // Same setup as in the example above. +/// let mut poll = Poll::new()?; +/// let mut events = Events::with_capacity(8); +/// +/// let (mut sender, mut receiver) = pipe::new()?; +/// +/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; +/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; +/// +/// // Drop the sender. +/// drop(sender); +/// +/// poll.poll(&mut events, None)?; +/// +/// for event in events.iter() { +/// match event.token() { +/// PIPE_RECV if event.is_read_closed() => { +/// // Detected that the sender was dropped. +/// println!("Sender dropped!"); +/// return Ok(()); +/// }, +/// _ => unreachable!(), +/// } +/// } +/// # unreachable!(); +/// # } +/// ``` +pub fn new() -> io::Result<(Sender, Receiver)> { + let mut fds: [RawFd; 2] = [-1, -1]; + + #[cfg(any( + target_os = "android", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "linux", + target_os = "netbsd", + target_os = "openbsd", + ))] + unsafe { + if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 { + return Err(io::Error::last_os_error()); + } + } + + #[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))] + unsafe { + // For platforms that don't have `pipe2(2)` we need to manually set the + // correct flags on the file descriptor. + if libc::pipe(fds.as_mut_ptr()) != 0 { + return Err(io::Error::last_os_error()); + } + + for fd in &fds { + if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0 + || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0 + { + let err = io::Error::last_os_error(); + // Don't leak file descriptors. Can't handle error though. + let _ = libc::close(fds[0]); + let _ = libc::close(fds[1]); + return Err(err); + } + } + } + + #[cfg(not(any( + target_os = "android", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "linux", + target_os = "netbsd", + target_os = "openbsd", + target_os = "ios", + target_os = "macos", + target_os = "solaris", + )))] + compile_error!("unsupported target for `mio::unix::pipe`"); + + // Safety: we just initialised the `fds` above. + let r = unsafe { Receiver::from_raw_fd(fds[0]) }; + let w = unsafe { Sender::from_raw_fd(fds[1]) }; + Ok((w, r)) +} + +/// Sending end of an Unix pipe. +/// +/// See [`new`] for documentation, including examples. +#[derive(Debug)] +pub struct Sender { + inner: IoSource, +} + +impl Sender { + /// Set the `Sender` into or out of non-blocking mode. + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + set_nonblocking(self.inner.as_raw_fd(), nonblocking) + } +} + +impl event::Source for Sender { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.register(registry, token, interests) + } + + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { + self.inner.deregister(registry) + } +} + +impl Write for Sender { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.inner.do_io(|sender| (&*sender).write(buf)) + } + + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { + self.inner.do_io(|sender| (&*sender).write_vectored(bufs)) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.do_io(|sender| (&*sender).flush()) + } +} + +/// # Notes +/// +/// The underlying pipe is **not** set to non-blocking. +impl From for Sender { + fn from(stdin: ChildStdin) -> Sender { + // Safety: `ChildStdin` is guaranteed to be a valid file descriptor. + unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) } + } +} + +impl FromRawFd for Sender { + unsafe fn from_raw_fd(fd: RawFd) -> Sender { + Sender { + inner: IoSource::new(File::from_raw_fd(fd)), + } + } +} + +impl AsRawFd for Sender { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +impl IntoRawFd for Sender { + fn into_raw_fd(self) -> RawFd { + self.inner.into_inner().into_raw_fd() + } +} + +/// Receiving end of an Unix pipe. +/// +/// See [`new`] for documentation, including examples. +#[derive(Debug)] +pub struct Receiver { + inner: IoSource, +} + +impl Receiver { + /// Set the `Receiver` into or out of non-blocking mode. + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + set_nonblocking(self.inner.as_raw_fd(), nonblocking) + } +} + +impl event::Source for Receiver { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.register(registry, token, interests) + } + + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { + self.inner.deregister(registry) + } +} + +impl Read for Receiver { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.inner.do_io(|sender| (&*sender).read(buf)) + } + + fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result { + self.inner.do_io(|sender| (&*sender).read_vectored(bufs)) + } +} + +/// # Notes +/// +/// The underlying pipe is **not** set to non-blocking. +impl From for Receiver { + fn from(stdout: ChildStdout) -> Receiver { + // Safety: `ChildStdout` is guaranteed to be a valid file descriptor. + unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) } + } +} + +/// # Notes +/// +/// The underlying pipe is **not** set to non-blocking. +impl From for Receiver { + fn from(stderr: ChildStderr) -> Receiver { + // Safety: `ChildStderr` is guaranteed to be a valid file descriptor. + unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) } + } +} + +impl FromRawFd for Receiver { + unsafe fn from_raw_fd(fd: RawFd) -> Receiver { + Receiver { + inner: IoSource::new(File::from_raw_fd(fd)), + } + } +} + +impl AsRawFd for Receiver { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +impl IntoRawFd for Receiver { + fn into_raw_fd(self) -> RawFd { + self.inner.into_inner().into_raw_fd() + } +} + +fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> { + let value = nonblocking as libc::c_int; + if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } +} diff --git a/src/sys/unix/selector/epoll.rs b/src/sys/unix/selector/epoll.rs index 13f1617..76ee7f9 100644 --- a/src/sys/unix/selector/epoll.rs +++ b/src/sys/unix/selector/epoll.rs @@ -4,7 +4,7 @@ use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLRDHUP}; use log::error; use std::os::unix::io::{AsRawFd, RawFd}; #[cfg(debug_assertions)] -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::time::Duration; use std::{cmp, i32, io, ptr}; @@ -17,6 +17,8 @@ pub struct Selector { #[cfg(debug_assertions)] id: usize, ep: RawFd, + #[cfg(debug_assertions)] + has_waker: AtomicBool, } impl Selector { @@ -33,6 +35,8 @@ impl Selector { #[cfg(debug_assertions)] id: NEXT_ID.fetch_add(1, Ordering::Relaxed), ep, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), }) } @@ -42,6 +46,8 @@ impl Selector { #[cfg(debug_assertions)] id: self.id, ep, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), }) } @@ -93,9 +99,14 @@ impl Selector { pub fn deregister(&self, fd: RawFd) -> io::Result<()> { syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ()) } + + #[cfg(debug_assertions)] + pub fn register_waker(&self) -> bool { + self.has_waker.swap(true, Ordering::AcqRel) + } } -cfg_net! { +cfg_io_source! { impl Selector { #[cfg(debug_assertions)] pub fn id(&self) -> usize { diff --git a/src/sys/unix/selector/kqueue.rs b/src/sys/unix/selector/kqueue.rs index 2ebac9a..454f47d 100644 --- a/src/sys/unix/selector/kqueue.rs +++ b/src/sys/unix/selector/kqueue.rs @@ -4,7 +4,7 @@ use std::mem::MaybeUninit; use std::ops::{Deref, DerefMut}; use std::os::unix::io::{AsRawFd, RawFd}; #[cfg(debug_assertions)] -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::time::Duration; use std::{cmp, io, ptr, slice}; @@ -69,6 +69,8 @@ pub struct Selector { #[cfg(debug_assertions)] id: usize, kq: RawFd, + #[cfg(debug_assertions)] + has_waker: AtomicBool, } impl Selector { @@ -79,6 +81,8 @@ impl Selector { #[cfg(debug_assertions)] id: NEXT_ID.fetch_add(1, Ordering::Relaxed), kq, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), }) } @@ -88,6 +92,8 @@ impl Selector { #[cfg(debug_assertions)] id: self.id, kq, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), }) } @@ -208,6 +214,11 @@ impl Selector { kevent_register(self.kq, &mut changes, &[libc::ENOENT as Data]) } + #[cfg(debug_assertions)] + pub fn register_waker(&self) -> bool { + self.has_waker.swap(true, Ordering::AcqRel) + } + // Used by `Waker`. #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] pub fn setup_waker(&self, token: Token) -> io::Result<()> { @@ -292,7 +303,7 @@ fn check_errors(events: &[libc::kevent], ignored_errors: &[Data]) -> io::Result< Ok(()) } -cfg_net! { +cfg_io_source! { #[cfg(debug_assertions)] impl Selector { pub fn id(&self) -> usize { diff --git a/src/sys/unix/tcp.rs b/src/sys/unix/tcp.rs index 81b371b..65b7400 100644 --- a/src/sys/unix/tcp.rs +++ b/src/sys/unix/tcp.rs @@ -1,4 +1,5 @@ use std::io; +use std::mem; use std::mem::{size_of, MaybeUninit}; use std::net::{self, SocketAddr}; use std::time::Duration; @@ -58,6 +59,63 @@ pub(crate) fn set_reuseaddr(socket: TcpSocket, reuseaddr: bool) -> io::Result<() )).map(|_| ()) } +pub(crate) fn get_reuseaddr(socket: TcpSocket) -> io::Result { + let mut optval: libc::c_int = 0; + let mut optlen = mem::size_of::() as libc::socklen_t; + + syscall!(getsockopt( + socket, + libc::SOL_SOCKET, + libc::SO_REUSEADDR, + &mut optval as *mut _ as *mut _, + &mut optlen, + ))?; + + Ok(optval != 0) +} + +#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))] +pub(crate) fn set_reuseport(socket: TcpSocket, reuseport: bool) -> io::Result<()> { + let val: libc::c_int = if reuseport { 1 } else { 0 }; + + syscall!(setsockopt( + socket, + libc::SOL_SOCKET, + libc::SO_REUSEPORT, + &val as *const libc::c_int as *const libc::c_void, + size_of::() as libc::socklen_t, + )).map(|_| ()) +} + +#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))] +pub(crate) fn get_reuseport(socket: TcpSocket) -> io::Result { + let mut optval: libc::c_int = 0; + let mut optlen = mem::size_of::() as libc::socklen_t; + + syscall!(getsockopt( + socket, + libc::SOL_SOCKET, + libc::SO_REUSEPORT, + &mut optval as *mut _ as *mut _, + &mut optlen, + ))?; + + Ok(optval != 0) +} + +pub(crate) fn get_localaddr(socket: TcpSocket) -> io::Result { + let mut addr: libc::sockaddr_storage = unsafe { std::mem::zeroed() }; + let mut length = size_of::() as libc::socklen_t; + + syscall!(getsockname( + socket, + &mut addr as *mut _ as *mut _, + &mut length + ))?; + + unsafe { to_socket_addr(&addr) } +} + pub(crate) fn set_linger(socket: TcpSocket, dur: Option) -> io::Result<()> { let val: libc::linger = libc::linger { l_onoff: if dur.is_some() { 1 } else { 0 }, diff --git a/src/sys/windows/afd.rs b/src/sys/windows/afd.rs index 82c8e9e..b2e3b11 100644 --- a/src/sys/windows/afd.rs +++ b/src/sys/windows/afd.rs @@ -111,7 +111,7 @@ impl Afd { } } -cfg_net! { +cfg_io_source! { use miow::iocp::CompletionPort; use ntapi::ntioapi::FILE_OPEN; use ntapi::ntioapi::NtCreateFile; diff --git a/src/sys/windows/io_status_block.rs b/src/sys/windows/io_status_block.rs index db6729c..9da5e7a 100644 --- a/src/sys/windows/io_status_block.rs +++ b/src/sys/windows/io_status_block.rs @@ -4,7 +4,7 @@ use std::ops::{Deref, DerefMut}; pub struct IoStatusBlock(IO_STATUS_BLOCK); -cfg_net! { +cfg_io_source! { use ntapi::ntioapi::IO_STATUS_BLOCK_u; impl IoStatusBlock { diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs index 7bba6dd..25590c2 100644 --- a/src/sys/windows/mod.rs +++ b/src/sys/windows/mod.rs @@ -42,6 +42,10 @@ mod waker; pub(crate) use waker::Waker; cfg_net! { + mod net; +} + +cfg_io_source! { use std::io; use std::os::windows::io::RawSocket; use std::pin::Pin; @@ -49,8 +53,6 @@ cfg_net! { use crate::{poll, Interest, Registry, Token}; - mod net; - struct InternalState { selector: Arc, token: Token, diff --git a/src/sys/windows/selector.rs b/src/sys/windows/selector.rs index 4a38300..df2c3f0 100644 --- a/src/sys/windows/selector.rs +++ b/src/sys/windows/selector.rs @@ -12,6 +12,7 @@ cfg_net! { use miow::iocp::{CompletionPort, CompletionStatus}; use std::collections::VecDeque; +use std::io; use std::marker::PhantomPinned; use std::os::windows::io::RawSocket; use std::pin::Pin; @@ -20,7 +21,6 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; -use std::io; use winapi::shared::ntdef::NT_SUCCESS; use winapi::shared::ntdef::{HANDLE, PVOID}; use winapi::shared::ntstatus::STATUS_CANCELLED; @@ -47,7 +47,7 @@ impl AfdGroup { } } -cfg_net! { +cfg_io_source! { const POLL_GROUP__MAX_GROUP_SIZE: usize = 32; impl AfdGroup { @@ -256,7 +256,7 @@ impl SockState { } } -cfg_net! { +cfg_io_source! { impl SockState { fn new(raw_socket: RawSocket, afd: Arc) -> io::Result { Ok(SockState { @@ -327,8 +327,9 @@ static NEXT_ID: AtomicUsize = AtomicUsize::new(0); pub struct Selector { #[cfg(debug_assertions)] id: usize, - pub(super) inner: Arc, + #[cfg(debug_assertions)] + has_waker: AtomicBool, } impl Selector { @@ -340,6 +341,8 @@ impl Selector { #[cfg(debug_assertions)] id, inner: Arc::new(inner), + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), } }) } @@ -349,6 +352,8 @@ impl Selector { #[cfg(debug_assertions)] id: self.id, inner: Arc::clone(&self.inner), + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), }) } @@ -360,6 +365,11 @@ impl Selector { self.inner.select(events, timeout) } + #[cfg(debug_assertions)] + pub fn register_waker(&self) -> bool { + self.has_waker.swap(true, Ordering::AcqRel) + } + pub(super) fn clone_port(&self) -> Arc { self.inner.cp.clone() } @@ -370,7 +380,7 @@ impl Selector { } } -cfg_net! { +cfg_io_source! { use super::InternalState; use crate::Token; @@ -499,7 +509,7 @@ impl SelectorInner { } else if iocp_event.token() % 2 == 1 { // Handle is a named pipe. This could be extended to be any non-AFD event. let callback = (*(iocp_event.overlapped() as *mut super::Overlapped)).callback; - + let len = events.len(); callback(iocp_event.entry(), Some(events)); n += events.len() - len; @@ -525,7 +535,7 @@ impl SelectorInner { } } -cfg_net! { +cfg_io_source! { use std::mem::size_of; use std::ptr::null_mut; use winapi::um::mswsock; @@ -701,7 +711,7 @@ impl Drop for SelectorInner { let callback = unsafe { (*(iocp_event.overlapped() as *mut super::Overlapped)).callback }; - + callback(iocp_event.entry(), None); } else { // drain sock state to release memory of Arc reference diff --git a/src/sys/windows/tcp.rs b/src/sys/windows/tcp.rs index 46ac1ac..b78d864 100644 --- a/src/sys/windows/tcp.rs +++ b/src/sys/windows/tcp.rs @@ -1,14 +1,17 @@ use std::io; use std::mem::size_of; -use std::net::{self, SocketAddr}; +use std::net::{self, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::time::Duration; use std::os::windows::io::FromRawSocket; use std::os::windows::raw::SOCKET as StdSocket; // winapi uses usize, stdlib uses u32/u64. use winapi::ctypes::{c_char, c_int, c_ushort}; +use winapi::shared::ws2def::{SOCKADDR_STORAGE, AF_INET, SOCKADDR_IN}; +use winapi::shared::ws2ipdef::SOCKADDR_IN6_LH; + use winapi::shared::minwindef::{BOOL, TRUE, FALSE}; use winapi::um::winsock2::{ - self, closesocket, linger, setsockopt, PF_INET, PF_INET6, SOCKET, SOCKET_ERROR, + self, closesocket, linger, setsockopt, getsockopt, getsockname, PF_INET, PF_INET6, SOCKET, SOCKET_ERROR, SOCK_STREAM, SOL_SOCKET, SO_LINGER, SO_REUSEADDR, }; @@ -87,6 +90,47 @@ pub(crate) fn set_reuseaddr(socket: TcpSocket, reuseaddr: bool) -> io::Result<() } } +pub(crate) fn get_reuseaddr(socket: TcpSocket) -> io::Result { + let mut optval: c_char = 0; + let mut optlen = size_of::() as c_int; + + match unsafe { getsockopt( + socket, + SOL_SOCKET, + SO_REUSEADDR, + &mut optval as *mut _ as *mut _, + &mut optlen, + ) } { + SOCKET_ERROR => Err(io::Error::last_os_error()), + _ => Ok(optval != 0), + } +} + +pub(crate) fn get_localaddr(socket: TcpSocket) -> io::Result { + let mut addr: SOCKADDR_STORAGE = unsafe { std::mem::zeroed() }; + let mut length = std::mem::size_of_val(&addr) as c_int; + + match unsafe { getsockname( + socket, + &mut addr as *mut _ as *mut _, + &mut length + ) } { + SOCKET_ERROR => Err(io::Error::last_os_error()), + _ => { + let storage: *const SOCKADDR_STORAGE = (&addr) as *const _; + if addr.ss_family as c_int == AF_INET { + let sock_addr : SocketAddrV4 = unsafe { *(storage as *const SOCKADDR_IN as *const _) }; + Ok(sock_addr.into()) + } else { + let sock_addr : SocketAddrV6 = unsafe { *(storage as *const SOCKADDR_IN6_LH as *const _) }; + Ok(sock_addr.into()) + } + }, + } + + +} + pub(crate) fn set_linger(socket: TcpSocket, dur: Option) -> io::Result<()> { let val: linger = linger { l_onoff: if dur.is_some() { 1 } else { 0 }, diff --git a/src/waker.rs b/src/waker.rs index 44766ce..b8e4496 100644 --- a/src/waker.rs +++ b/src/waker.rs @@ -16,7 +16,7 @@ use std::io; /// `Waker` events are only guaranteed to be delivered while the `Waker` value /// is alive. /// -/// Only a single `Waker` should active per [`Poll`], if multiple threads need +/// Only a single `Waker` can be active per [`Poll`], if multiple threads need /// access to the `Waker` it can be shared via for example an `Arc`. What /// happens if multiple `Waker`s are registered with the same `Poll` is /// undefined. @@ -81,6 +81,8 @@ pub struct Waker { impl Waker { /// Create a new `Waker`. pub fn new(registry: &Registry, token: Token) -> io::Result { + #[cfg(debug_assertions)] + registry.register_waker(); sys::Waker::new(poll::selector(®istry), token).map(|inner| Waker { inner }) } -- cgit v1.2.3