aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Wailes <chriswailes@google.com>2023-01-18 22:41:32 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2023-01-18 22:41:32 +0000
commit38cf5f8e60957f3112fbd30182585f899754c5b6 (patch)
tree8f8ce659f4ab5452d5d9ac13e45aa4dab999a642
parent2abaf1825fcdc674d2849b33338a201735afc6f5 (diff)
parent42496b42d24933003414a55554b68de4bc8b4a35 (diff)
downloadmio-38cf5f8e60957f3112fbd30182585f899754c5b6.tar.gz
Merge "Upgrade mio to 0.8.5" am: 6e75afe357 am: dc3b61d71f am: 42496b42d2
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/mio/+/2351542 Change-Id: Ie48c21a960ce105b590612f5bc6afd49b5ca92c8 Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
-rw-r--r--.cargo_vcs_info.json7
-rw-r--r--Android.bp12
-rw-r--r--CHANGELOG.md112
-rw-r--r--Cargo.lock140
-rw-r--r--Cargo.toml120
-rw-r--r--Cargo.toml.orig50
-rw-r--r--METADATA14
-rw-r--r--README.md22
-rw-r--r--cargo2android.json2
-rw-r--r--examples/tcp_listenfd_server.rs209
-rw-r--r--examples/tcp_server.rs14
-rw-r--r--examples/udp_server.rs12
-rw-r--r--src/event/event.rs19
-rw-r--r--src/event/source.rs10
-rw-r--r--src/interest.rs8
-rw-r--r--src/io_source.rs58
-rw-r--r--src/lib.rs13
-rw-r--r--src/net/mod.rs22
-rw-r--r--src/net/tcp/listener.rs49
-rw-r--r--src/net/tcp/mod.rs3
-rw-r--r--src/net/tcp/socket.rs490
-rw-r--r--src/net/tcp/stream.rs156
-rw-r--r--src/net/udp.rs85
-rw-r--r--src/net/uds/datagram.rs75
-rw-r--r--src/net/uds/stream.rs91
-rw-r--r--src/poll.rs206
-rw-r--r--src/sys/mod.rs6
-rw-r--r--src/sys/shell/mod.rs2
-rw-r--r--src/sys/shell/selector.rs21
-rw-r--r--src/sys/shell/tcp.rs118
-rw-r--r--src/sys/shell/udp.rs1
-rw-r--r--src/sys/unix/net.rs7
-rw-r--r--src/sys/unix/pipe.rs161
-rw-r--r--src/sys/unix/selector/epoll.rs54
-rw-r--r--src/sys/unix/selector/kqueue.rs30
-rw-r--r--src/sys/unix/selector/mod.rs4
-rw-r--r--src/sys/unix/sourcefd.rs22
-rw-r--r--src/sys/unix/tcp.rs432
-rw-r--r--src/sys/unix/uds/listener.rs13
-rw-r--r--src/sys/unix/uds/mod.rs10
-rw-r--r--src/sys/unix/uds/socketaddr.rs20
-rw-r--r--src/sys/unix/uds/stream.rs2
-rw-r--r--src/sys/unix/waker.rs4
-rw-r--r--src/sys/wasi/mod.rs370
-rw-r--r--src/sys/windows/afd.rs108
-rw-r--r--src/sys/windows/event.rs3
-rw-r--r--src/sys/windows/handle.rs30
-rw-r--r--src/sys/windows/io_status_block.rs6
-rw-r--r--src/sys/windows/iocp.rs275
-rw-r--r--src/sys/windows/mod.rs19
-rw-r--r--src/sys/windows/named_pipe.rs405
-rw-r--r--src/sys/windows/net.rs52
-rw-r--r--src/sys/windows/overlapped.rs10
-rw-r--r--src/sys/windows/selector.rs68
-rw-r--r--src/sys/windows/tcp.rs320
-rw-r--r--src/sys/windows/udp.rs20
-rw-r--r--src/sys/windows/waker.rs2
-rw-r--r--src/token.rs4
-rw-r--r--src/waker.rs8
59 files changed, 2720 insertions, 1886 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index a4887c5..c60d385 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,6 @@
{
"git": {
- "sha1": "75f41fb304d299dfbc07679d15193e03273c4597"
- }
-}
+ "sha1": "0accf7dc22f197245e6a1aa84096262cd6f6e4d4"
+ },
+ "path_in_vcs": ""
+} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index f545e40..660a924 100644
--- a/Android.bp
+++ b/Android.bp
@@ -23,17 +23,13 @@ rust_library {
host_supported: true,
crate_name: "mio",
cargo_env_compat: true,
- cargo_pkg_version: "0.7.13",
+ cargo_pkg_version: "0.8.5",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
"net",
"os-ext",
"os-poll",
- "os-util",
- "tcp",
- "udp",
- "uds",
],
rustlibs: [
"liblibc",
@@ -54,7 +50,7 @@ rust_test {
host_supported: true,
crate_name: "mio",
cargo_env_compat: true,
- cargo_pkg_version: "0.7.13",
+ cargo_pkg_version: "0.8.5",
srcs: ["src/lib.rs"],
test_suites: ["general-tests"],
auto_gen_config: true,
@@ -66,10 +62,6 @@ rust_test {
"net",
"os-ext",
"os-poll",
- "os-util",
- "tcp",
- "udp",
- "uds",
],
rustlibs: [
"libenv_logger",
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4e12fc3..10d9154 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,112 @@
+# 0.8.5
+
+## Changed
+
+* Updated `windows-sys` to 0.42.0
+ (https://github.com/tokio-rs/mio/pull/1624).
+* Officially document Wine as not supported, some people claimed it worked,
+ other claims it doesn't, but nobody stepped up to fix the problem
+ (https://github.com/tokio-rs/mio/pull/1596).
+* Switch to GitHub Actions
+ (https://github.com/tokio-rs/mio/pull/1598, https://github.com/tokio-rs/mio/pull/1601).
+* Documented the current Poll::poll time behaviour
+ (https://github.com/tokio-rs/mio/pull/1603).
+
+## Fixed
+
+* Timeout less than one millisecond becoming zero millsiconds
+ (https://github.com/tokio-rs/mio/pull/1615, https://github.com/tokio-rs/mio/pull/1616)
+* Undefined reference to `epoll\_create1` on Android API level < 21.
+ (https://github.com/tokio-rs/mio/pull/1590).
+
+# 0.8.4
+
+## Added
+
+* Support `Registery::try_clone` on `wasm32-wasi`
+ (https://github.com/tokio-rs/mio/pull/1576).
+* Add docs about polling without registering event sources
+ (https://github.com/tokio-rs/mio/pull/1585).
+
+# 0.8.3
+
+## Changed
+
+* Replace `winapi` dependency with `windows-sys`.
+ (https://github.com/tokio-rs/mio/pull/1556).
+* Future proofed the kevent ABI for FreeBSD
+ (https://github.com/tokio-rs/mio/pull/1572).
+
+## Fixed
+
+* Improved support for Redox, making it possible to run on stable Rust
+ (https://github.com/tokio-rs/mio/pull/1555).
+* Don't ignore EAGAIN in UDS connect call
+ (https://github.com/tokio-rs/mio/pull/1564).
+* Documentation of `TcpStream::connect`
+ (https://github.com/tokio-rs/mio/pull/1565).
+
+# 0.8.2
+
+## Added
+
+* Experimental support for Redox.
+
+# 0.8.1
+
+## Added
+
+* Add `try_io` method to all I/O types (#1551). This execute a user defined I/O
+ closure while updating Mio's internal state ensuring that the I/O type
+ receives more events if it hits a WouldBlock error. This is added to the
+ following types:
+ * `TcpStream`
+ * `UdpSocket`
+ * `UnixDatagram`
+ * `UnixStream`
+ * `unix::pipe::Sender`
+ * `unix::pipe::Receiver`
+* Basic, experimental support for `wasm32-wasi` target (#1549). Note that a lot
+ of time type are still missing, e.g. the `Waker`, and may never be possible to
+ implement.
+
+# 0.8.0
+
+## Removed
+
+* Deprecated features (https://github.com/tokio-rs/mio/commit/105f8f2afb57b01ddea716a0aa9720f226c520e3):
+ * extra-docs (always enabled)
+ * tcp (replaced with "net" feature).
+ * udp (replaced with "net" feature).
+ * uds (replaced with "net" feature).
+ * pipe (replaced with "os-ext" feature).
+ * os-util (replaced with "os-ext" feature).
+* `TcpSocket` type
+ (https://github.com/tokio-rs/mio/commit/02e9be41f27daf822575444fdd2b3067433a5996).
+ The socket2 crate provides all the functionality and more.
+* Support for Solaris, it never really worked anyway
+ (https://github.com/tokio-rs/mio/pull/1528).
+
+## Changes
+
+* Update minimum Rustc version (MSVR) to 1.46.0
+ (https://github.com/tokio-rs/mio/commit/5c577efecd23750a9a3e0f6ad080ab98f14a255d).
+
+## Added
+
+* `UdpSocket::peer_addr`
+ (https://github.com/tokio-rs/mio/commit/5fc104d08e0e74c8a19247f7cba0f058699fc438).
+
+# 0.7.14
+
+## Fixes
+
+* Remove use unsound internal macro (#1519).
+
+## Added
+
+* `sys::unix::SocketAddr::as_abstract_namespace()` (#1520).
+
# 0.7.13
## Fixes
@@ -27,6 +136,8 @@
* Fix an instance of not doc(cfg(.*))
(https://github.com/tokio-rs/mio/commit/25e8f911357c740034f10a170dfa4ea1b28234ce).
+# 0.7.9
+
## Fixes
* Fix error handling in `NamedPipe::write`
@@ -61,7 +172,6 @@
themselves already
(https://github.com/tokio-rs/mio/commit/1be481dcbbcb6906364008b5d61e7f53cddc3eb3).
-
## Fixes
* Underflow in `SocketAddr::address`
diff --git a/Cargo.lock b/Cargo.lock
index 8a0bb77..76cbc55 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -10,132 +10,146 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "env_logger"
-version = "0.6.2"
+version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "aafcde04e90a5226a6443b7aabdb016ba2f8307c847d524724bd9b346dd1a2d3"
+checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3"
dependencies = [
"log",
]
[[package]]
-name = "fuchsia-cprng"
-version = "0.1.1"
+name = "getrandom"
+version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
+checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6"
+dependencies = [
+ "cfg-if",
+ "libc",
+ "wasi",
+]
[[package]]
name = "libc"
-version = "0.2.86"
+version = "0.2.126"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b7282d924be3275cec7f6756ff4121987bc6481325397dde6ba3e7802b1a8b1c"
+checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836"
[[package]]
name = "log"
-version = "0.4.14"
+version = "0.4.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
+checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
dependencies = [
"cfg-if",
]
[[package]]
name = "mio"
-version = "0.7.13"
+version = "0.8.5"
dependencies = [
"env_logger",
"libc",
"log",
- "miow",
- "ntapi",
"rand",
- "winapi",
+ "wasi",
+ "windows-sys",
]
[[package]]
-name = "miow"
-version = "0.3.6"
+name = "ppv-lite86"
+version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897"
-dependencies = [
- "socket2",
- "winapi",
-]
+checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
-name = "ntapi"
-version = "0.3.6"
+name = "rand"
+version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44"
+checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
- "winapi",
+ "libc",
+ "rand_chacha",
+ "rand_core",
]
[[package]]
-name = "rand"
-version = "0.4.6"
+name = "rand_chacha"
+version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293"
+checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
- "fuchsia-cprng",
- "libc",
- "rand_core 0.3.1",
- "rdrand",
- "winapi",
+ "ppv-lite86",
+ "rand_core",
]
[[package]]
name = "rand_core"
-version = "0.3.1"
+version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b"
+checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
dependencies = [
- "rand_core 0.4.2",
+ "getrandom",
]
[[package]]
-name = "rand_core"
-version = "0.4.2"
+name = "wasi"
+version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"
+checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
-name = "rdrand"
-version = "0.4.0"
+name = "windows-sys"
+version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"
+checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7"
dependencies = [
- "rand_core 0.3.1",
+ "windows_aarch64_gnullvm",
+ "windows_aarch64_msvc",
+ "windows_i686_gnu",
+ "windows_i686_msvc",
+ "windows_x86_64_gnu",
+ "windows_x86_64_gnullvm",
+ "windows_x86_64_msvc",
]
[[package]]
-name = "socket2"
-version = "0.3.19"
+name = "windows_aarch64_gnullvm"
+version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e"
-dependencies = [
- "cfg-if",
- "libc",
- "winapi",
-]
+checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e"
[[package]]
-name = "winapi"
-version = "0.3.9"
+name = "windows_aarch64_msvc"
+version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
-dependencies = [
- "winapi-i686-pc-windows-gnu",
- "winapi-x86_64-pc-windows-gnu",
-]
+checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4"
+
+[[package]]
+name = "windows_i686_gnu"
+version = "0.42.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7"
+
+[[package]]
+name = "windows_i686_msvc"
+version = "0.42.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246"
+
+[[package]]
+name = "windows_x86_64_gnu"
+version = "0.42.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed"
[[package]]
-name = "winapi-i686-pc-windows-gnu"
-version = "0.4.0"
+name = "windows_x86_64_gnullvm"
+version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
+checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028"
[[package]]
-name = "winapi-x86_64-pc-windows-gnu"
-version = "0.4.0"
+name = "windows_x86_64_msvc"
+version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
+checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5"
diff --git a/Cargo.toml b/Cargo.toml
index 3a103b3..7460010 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,70 +3,124 @@
# When uploading crates to the registry Cargo will automatically
# "normalize" Cargo.toml files for maximal compatibility
# with all versions of Cargo and also rewrite `path` dependencies
-# to registry (e.g., crates.io) dependencies
+# to registry (e.g., crates.io) dependencies.
#
-# If you believe there's an error in this file please file an
-# issue against the rust-lang/cargo repository. If you're
-# editing this file be aware that the upstream Cargo.toml
-# will likely look very different (and much more reasonable)
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
[package]
edition = "2018"
name = "mio"
-version = "0.7.13"
-authors = ["Carl Lerche <me@carllerche.com>", "Thomas de Zeeuw <thomasdezeeuw@gmail.com>", "Tokio Contributors <team@tokio.rs>"]
-include = ["Cargo.toml", "LICENSE", "README.md", "CHANGELOG.md", "src/**/*.rs", "examples/**/*.rs"]
+version = "0.8.5"
+authors = [
+ "Carl Lerche <me@carllerche.com>",
+ "Thomas de Zeeuw <thomasdezeeuw@gmail.com>",
+ "Tokio Contributors <team@tokio.rs>",
+]
+include = [
+ "Cargo.toml",
+ "LICENSE",
+ "README.md",
+ "CHANGELOG.md",
+ "src/**/*.rs",
+ "examples/**/*.rs",
+]
description = "Lightweight non-blocking IO"
homepage = "https://github.com/tokio-rs/mio"
-documentation = "https://docs.rs/mio/0.7.13"
readme = "README.md"
-keywords = ["io", "async", "non-blocking"]
+keywords = [
+ "io",
+ "async",
+ "non-blocking",
+]
categories = ["asynchronous"]
license = "MIT"
repository = "https://github.com/tokio-rs/mio"
+
[package.metadata.docs.rs]
all-features = true
-rustdoc-args = ["--cfg", "docsrs"]
-targets = ["aarch64-apple-ios", "aarch64-linux-android", "x86_64-apple-darwin", "x86_64-pc-windows-msvc", "x86_64-sun-solaris", "x86_64-unknown-dragonfly", "x86_64-unknown-freebsd", "x86_64-unknown-linux-gnu", "x86_64-unknown-netbsd", "x86_64-unknown-openbsd"]
+rustdoc-args = [
+ "--cfg",
+ "docsrs",
+]
+targets = [
+ "aarch64-apple-ios",
+ "aarch64-linux-android",
+ "wasm32-wasi",
+ "x86_64-apple-darwin",
+ "x86_64-pc-windows-msvc",
+ "x86_64-unknown-dragonfly",
+ "x86_64-unknown-freebsd",
+ "x86_64-unknown-illumos",
+ "x86_64-unknown-linux-gnu",
+ "x86_64-unknown-netbsd",
+ "x86_64-unknown-openbsd",
+]
[package.metadata.playground]
-features = ["os-poll", "os-ext", "net"]
+features = [
+ "os-poll",
+ "os-ext",
+ "net",
+]
[[example]]
name = "tcp_server"
-required-features = ["os-poll", "net"]
+required-features = [
+ "os-poll",
+ "net",
+]
+
+[[example]]
+name = "tcp_listenfd_server"
+required-features = [
+ "os-poll",
+ "net",
+]
[[example]]
name = "udp_server"
-required-features = ["os-poll", "net"]
+required-features = [
+ "os-poll",
+ "net",
+]
+
[dependencies.log]
version = "0.4.8"
+
[dev-dependencies.env_logger]
-version = "0.6.2"
+version = "0.8.4"
default-features = false
[dev-dependencies.rand]
-version = "0.4"
+version = "0.8"
[features]
default = []
-extra-docs = []
net = []
-os-ext = ["os-poll"]
+os-ext = [
+ "os-poll",
+ "windows-sys/Win32_System_Pipes",
+ "windows-sys/Win32_Security",
+]
os-poll = []
-os-util = ["os-ext"]
-pipe = ["os-ext"]
-tcp = ["net"]
-udp = ["net"]
-uds = ["net"]
-[target."cfg(unix)".dependencies.libc]
-version = "0.2.86"
-[target."cfg(windows)".dependencies.miow]
-version = "0.3.6"
-[target."cfg(windows)".dependencies.ntapi]
-version = "0.3"
+[target."cfg(target_os = \"wasi\")".dependencies.libc]
+version = "0.2.121"
+
+[target."cfg(target_os = \"wasi\")".dependencies.wasi]
+version = "0.11.0"
+
+[target."cfg(unix)".dependencies.libc]
+version = "0.2.121"
-[target."cfg(windows)".dependencies.winapi]
-version = "0.3"
-features = ["winsock2", "mswsock", "mstcpip"]
+[target."cfg(windows)".dependencies.windows-sys]
+version = "0.42"
+features = [
+ "Win32_Foundation",
+ "Win32_Networking_WinSock",
+ "Win32_Storage_FileSystem",
+ "Win32_System_IO",
+ "Win32_System_WindowsProgramming",
+]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index aa2e677..c5052a7 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -2,11 +2,9 @@
edition = "2018"
name = "mio"
# When releasing to crates.io:
-# - Update html_root_url.
# - Update CHANGELOG.md.
-# - Update doc URL.
# - Create git tag
-version = "0.7.13"
+version = "0.8.5"
license = "MIT"
authors = [
"Carl Lerche <me@carllerche.com>",
@@ -14,7 +12,6 @@ authors = [
"Tokio Contributors <team@tokio.rs>",
]
description = "Lightweight non-blocking IO"
-documentation = "https://docs.rs/mio/0.7.13"
homepage = "https://github.com/tokio-rs/mio"
repository = "https://github.com/tokio-rs/mio"
readme = "README.md"
@@ -37,32 +34,37 @@ default = []
# Enables the `Poll` and `Registry` types.
os-poll = []
# Enables additional OS specific extensions, e.g. Unix `pipe(2)`.
-os-ext = ["os-poll"]
+os-ext = [
+ "os-poll",
+ "windows-sys/Win32_System_Pipes",
+ "windows-sys/Win32_Security",
+]
# Enables `mio::net` module containing networking primitives.
net = []
-# Deprecated features, will be removed in a future version.
-extra-docs = [] # Docs are now always present.
-tcp = ["net"] # Replaced with "net" feature.
-udp = ["net"] # Replaced with "net" feature.
-uds = ["net"] # Replaced with "net" feature.
-pipe = ["os-ext"] # Replaced with "os-ext" feature.
-os-util = ["os-ext"]# Replaced with "os-ext" feature.
-
[dependencies]
log = "0.4.8"
[target.'cfg(unix)'.dependencies]
-libc = "0.2.86"
+libc = "0.2.121"
+
+[target.'cfg(windows)'.dependencies.windows-sys]
+version = "0.42"
+features = [
+ "Win32_Foundation", # Basic types eg HANDLE
+ "Win32_Networking_WinSock", # winsock2 types/functions
+ "Win32_Storage_FileSystem", # Enables NtCreateFile
+ "Win32_System_IO", # IO types like OVERLAPPED etc
+ "Win32_System_WindowsProgramming", # General future used for various types/funcs
+]
-[target.'cfg(windows)'.dependencies]
-miow = "0.3.6"
-winapi = { version = "0.3", features = ["winsock2", "mswsock", "mstcpip"] }
-ntapi = "0.3"
+[target.'cfg(target_os = "wasi")'.dependencies]
+wasi = "0.11.0"
+libc = "0.2.121"
[dev-dependencies]
-env_logger = { version = "0.6.2", default-features = false }
-rand = "0.4"
+env_logger = { version = "0.8.4", default-features = false }
+rand = "0.8"
[package.metadata.docs.rs]
all-features = true
@@ -70,17 +72,17 @@ rustdoc-args = ["--cfg", "docsrs"]
targets = [
"aarch64-apple-ios",
"aarch64-linux-android",
+ "wasm32-wasi",
"x86_64-apple-darwin",
"x86_64-pc-windows-msvc",
- "x86_64-sun-solaris",
"x86_64-unknown-dragonfly",
"x86_64-unknown-freebsd",
+ "x86_64-unknown-illumos",
"x86_64-unknown-linux-gnu",
"x86_64-unknown-netbsd",
"x86_64-unknown-openbsd",
]
-
[package.metadata.playground]
features = ["os-poll", "os-ext", "net"]
@@ -89,5 +91,9 @@ name = "tcp_server"
required-features = ["os-poll", "net"]
[[example]]
+name = "tcp_listenfd_server"
+required-features = ["os-poll", "net"]
+
+[[example]]
name = "udp_server"
required-features = ["os-poll", "net"]
diff --git a/METADATA b/METADATA
index 79077c2..5d0b7d4 100644
--- a/METADATA
+++ b/METADATA
@@ -1,3 +1,7 @@
+# This project was upgraded with external_updater.
+# Usage: tools/external_updater/updater.sh update rust/crates/mio
+# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md
+
name: "mio"
description: "Lightweight non-blocking IO"
third_party {
@@ -7,13 +11,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/mio/mio-0.7.13.crate"
+ value: "https://static.crates.io/crates/mio/mio-0.8.5.crate"
}
- version: "0.7.13"
+ version: "0.8.5"
license_type: NOTICE
last_upgrade_date {
- year: 2021
- month: 6
- day: 21
+ year: 2022
+ month: 12
+ day: 12
}
}
diff --git a/README.md b/README.md
index c8653f8..c17f3d3 100644
--- a/README.md
+++ b/README.md
@@ -6,21 +6,22 @@ overhead as possible over the OS abstractions.
[![Crates.io][crates-badge]][crates-url]
[![MIT licensed][mit-badge]][mit-url]
-[![Build Status][azure-badge]][azure-url]
+[![Build Status][actions-badge]][actions-url]
[![Build Status][cirrus-badge]][cirrus-url]
[crates-badge]: https://img.shields.io/crates/v/mio.svg
[crates-url]: https://crates.io/crates/mio
[mit-badge]: https://img.shields.io/badge/license-MIT-blue.svg
[mit-url]: LICENSE
-[azure-badge]: https://dev.azure.com/tokio-rs/Tokio/_apis/build/status/tokio-rs.mio?branchName=master
-[azure-url]: https://dev.azure.com/tokio-rs/Tokio/_build/latest?definitionId=2&branchName=master
+[actions-badge]: https://github.com/tokio-rs/mio/workflows/CI/badge.svg
+[actions-url]: https://github.com/tokio-rs/mio/actions?query=workflow%3ACI+branch%3Amaster
[cirrus-badge]: https://api.cirrus-ci.com/github/tokio-rs/mio.svg
[cirrus-url]: https://cirrus-ci.com/github/tokio-rs/mio
**API documentation**
* [master](https://tokio-rs.github.io/mio/doc/mio/)
+* [v0.8](https://docs.rs/mio/^0.8)
* [v0.7](https://docs.rs/mio/^0.7)
* [v0.6](https://docs.rs/mio/^0.6)
@@ -33,7 +34,7 @@ To use `mio`, first add this to your `Cargo.toml`:
```toml
[dependencies]
-mio = "0.7"
+mio = "0.8"
```
Next we can start using Mio. The following is quick introduction using
@@ -129,13 +130,12 @@ or higher-level libraries.
Currently supported platforms:
-* Android
+* Android (API level 21)
* DragonFly BSD
* FreeBSD
* Linux
* NetBSD
* OpenBSD
-* Solaris
* Windows
* iOS
* macOS
@@ -152,6 +152,16 @@ This uses the Windows AFD system to access socket readiness events.
[wepoll]: https://github.com/piscisaureus/wepoll
+### Unsupported
+
+* Haiku, see [issue #1472]
+* Solaris, see [issue #1152]
+* Wine, see [issue #1444]
+
+[issue #1472]: https://github.com/tokio-rs/mio/issues/1472
+[issue #1152]: https://github.com/tokio-rs/mio/issues/1152
+[issue #1444]: https://github.com/tokio-rs/mio/issues/1444
+
## Community
A group of Mio users hang out on [Discord], this can be a good place to go for
diff --git a/cargo2android.json b/cargo2android.json
index 1652767..93f98c6 100644
--- a/cargo2android.json
+++ b/cargo2android.json
@@ -7,7 +7,7 @@
],
"dependencies": true,
"device": true,
- "features": "os-poll,tcp,udp,uds,os-util",
+ "features": "net,os-ext,os-poll",
"min-sdk-version": "29",
"run": true,
"vendor-available": true,
diff --git a/examples/tcp_listenfd_server.rs b/examples/tcp_listenfd_server.rs
new file mode 100644
index 0000000..941d7f0
--- /dev/null
+++ b/examples/tcp_listenfd_server.rs
@@ -0,0 +1,209 @@
+// You can run this example from the root of the mio repo:
+// cargo run --example tcp_listenfd_server --features="os-poll net"
+// or with wasi:
+// cargo +nightly build --target wasm32-wasi --example tcp_listenfd_server --features="os-poll net"
+// wasmtime run --tcplisten 127.0.0.1:9000 --env 'LISTEN_FDS=1' target/wasm32-wasi/debug/examples/tcp_listenfd_server.wasm
+
+use mio::event::Event;
+use mio::net::{TcpListener, TcpStream};
+use mio::{Events, Interest, Poll, Registry, Token};
+use std::collections::HashMap;
+use std::io::{self, Read, Write};
+use std::str::from_utf8;
+
+// Setup some tokens to allow us to identify which event is for which socket.
+const SERVER: Token = Token(0);
+
+// Some data we'll send over the connection.
+const DATA: &[u8] = b"Hello world!\n";
+
+#[cfg(not(windows))]
+fn get_first_listen_fd_listener() -> Option<std::net::TcpListener> {
+ #[cfg(unix)]
+ use std::os::unix::io::FromRawFd;
+ #[cfg(target_os = "wasi")]
+ use std::os::wasi::io::FromRawFd;
+
+ let stdlistener = unsafe { std::net::TcpListener::from_raw_fd(3) };
+ stdlistener.set_nonblocking(true).unwrap();
+ Some(stdlistener)
+}
+
+#[cfg(windows)]
+fn get_first_listen_fd_listener() -> Option<std::net::TcpListener> {
+ // Windows does not support `LISTEN_FDS`
+ None
+}
+
+fn main() -> io::Result<()> {
+ env_logger::init();
+
+ std::env::var("LISTEN_FDS").expect("LISTEN_FDS environment variable unset");
+
+ // Create a poll instance.
+ let mut poll = Poll::new()?;
+ // Create storage for events.
+ let mut events = Events::with_capacity(128);
+
+ // Setup the TCP server socket.
+ let mut server = {
+ let stdlistener = get_first_listen_fd_listener().unwrap();
+ println!("Using preopened socket FD 3");
+ println!("You can connect to the server using `nc`:");
+ match stdlistener.local_addr() {
+ Ok(a) => println!(" $ nc {} {}", a.ip(), a.port()),
+ Err(_) => println!(" $ nc <IP> <PORT>"),
+ }
+ println!("You'll see our welcome message and anything you type will be printed here.");
+ TcpListener::from_std(stdlistener)
+ };
+
+ // Register the server with poll we can receive events for it.
+ poll.registry()
+ .register(&mut server, SERVER, Interest::READABLE)?;
+
+ // Map of `Token` -> `TcpStream`.
+ let mut connections = HashMap::new();
+ // Unique token for each incoming connection.
+ let mut unique_token = Token(SERVER.0 + 1);
+
+ loop {
+ poll.poll(&mut events, None)?;
+
+ for event in events.iter() {
+ match event.token() {
+ SERVER => loop {
+ // Received an event for the TCP server socket, which
+ // indicates we can accept an connection.
+ let (mut connection, address) = match server.accept() {
+ Ok((connection, address)) => (connection, address),
+ Err(ref e) if would_block(e) => {
+ // If we get a `WouldBlock` error we know our
+ // listener has no more incoming connections queued,
+ // so we can return to polling and wait for some
+ // more.
+ break;
+ }
+ Err(e) => {
+ // If it was any other kind of error, something went
+ // wrong and we terminate with an error.
+ return Err(e);
+ }
+ };
+
+ println!("Accepted connection from: {}", address);
+
+ let token = next(&mut unique_token);
+ poll.registry()
+ .register(&mut connection, token, Interest::WRITABLE)?;
+
+ connections.insert(token, connection);
+ },
+ token => {
+ // Maybe received an event for a TCP connection.
+ let done = if let Some(connection) = connections.get_mut(&token) {
+ handle_connection_event(poll.registry(), connection, event)?
+ } else {
+ // Sporadic events happen, we can safely ignore them.
+ false
+ };
+ if done {
+ if let Some(mut connection) = connections.remove(&token) {
+ poll.registry().deregister(&mut connection)?;
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+fn next(current: &mut Token) -> Token {
+ let next = current.0;
+ current.0 += 1;
+ Token(next)
+}
+
+/// Returns `true` if the connection is done.
+fn handle_connection_event(
+ registry: &Registry,
+ connection: &mut TcpStream,
+ event: &Event,
+) -> io::Result<bool> {
+ if event.is_writable() {
+ // We can (maybe) write to the connection.
+ match connection.write(DATA) {
+ // We want to write the entire `DATA` buffer in a single go. If we
+ // write less we'll return a short write error (same as
+ // `io::Write::write_all` does).
+ Ok(n) if n < DATA.len() => return Err(io::ErrorKind::WriteZero.into()),
+ Ok(_) => {
+ // After we've written something we'll reregister the connection
+ // to only respond to readable events.
+ registry.reregister(connection, event.token(), Interest::READABLE)?
+ }
+ // Would block "errors" are the OS's way of saying that the
+ // connection is not actually ready to perform this I/O operation.
+ Err(ref err) if would_block(err) => {}
+ // Got interrupted (how rude!), we'll try again.
+ Err(ref err) if interrupted(err) => {
+ return handle_connection_event(registry, connection, event)
+ }
+ // Other errors we'll consider fatal.
+ Err(err) => return Err(err),
+ }
+ }
+
+ if event.is_readable() {
+ let mut connection_closed = false;
+ let mut received_data = vec![0; 4096];
+ let mut bytes_read = 0;
+ // We can (maybe) read from the connection.
+ loop {
+ match connection.read(&mut received_data[bytes_read..]) {
+ Ok(0) => {
+ // Reading 0 bytes means the other side has closed the
+ // connection or is done writing, then so are we.
+ connection_closed = true;
+ break;
+ }
+ Ok(n) => {
+ bytes_read += n;
+ if bytes_read == received_data.len() {
+ received_data.resize(received_data.len() + 1024, 0);
+ }
+ }
+ // Would block "errors" are the OS's way of saying that the
+ // connection is not actually ready to perform this I/O operation.
+ Err(ref err) if would_block(err) => break,
+ Err(ref err) if interrupted(err) => continue,
+ // Other errors we'll consider fatal.
+ Err(err) => return Err(err),
+ }
+ }
+
+ if bytes_read != 0 {
+ let received_data = &received_data[..bytes_read];
+ if let Ok(str_buf) = from_utf8(received_data) {
+ println!("Received data: {}", str_buf.trim_end());
+ } else {
+ println!("Received (none UTF-8) data: {:?}", received_data);
+ }
+ }
+
+ if connection_closed {
+ println!("Connection closed");
+ return Ok(true);
+ }
+ }
+
+ Ok(false)
+}
+
+fn would_block(err: &io::Error) -> bool {
+ err.kind() == io::ErrorKind::WouldBlock
+}
+
+fn interrupted(err: &io::Error) -> bool {
+ err.kind() == io::ErrorKind::Interrupted
+}
diff --git a/examples/tcp_server.rs b/examples/tcp_server.rs
index 42426ee..cc611ca 100644
--- a/examples/tcp_server.rs
+++ b/examples/tcp_server.rs
@@ -1,5 +1,5 @@
// You can run this example from the root of the mio repo:
-// cargo run --example tcp_server --features="os-poll tcp"
+// cargo run --example tcp_server --features="os-poll net"
use mio::event::Event;
use mio::net::{TcpListener, TcpStream};
use mio::{Events, Interest, Poll, Registry, Token};
@@ -13,6 +13,7 @@ const SERVER: Token = Token(0);
// Some data we'll send over the connection.
const DATA: &[u8] = b"Hello world!\n";
+#[cfg(not(target_os = "wasi"))]
fn main() -> io::Result<()> {
env_logger::init();
@@ -36,7 +37,7 @@ fn main() -> io::Result<()> {
println!("You can connect to the server using `nc`:");
println!(" $ nc 127.0.0.1 9000");
- println!("You'll see our welcome message and anything you type we'll be printed here.");
+ println!("You'll see our welcome message and anything you type will be printed here.");
loop {
poll.poll(&mut events, None)?;
@@ -82,7 +83,9 @@ fn main() -> io::Result<()> {
false
};
if done {
- connections.remove(&token);
+ if let Some(mut connection) = connections.remove(&token) {
+ poll.registry().deregister(&mut connection)?;
+ }
}
}
}
@@ -179,3 +182,8 @@ fn would_block(err: &io::Error) -> bool {
fn interrupted(err: &io::Error) -> bool {
err.kind() == io::ErrorKind::Interrupted
}
+
+#[cfg(target_os = "wasi")]
+fn main() {
+ panic!("can't bind to an address with wasi")
+}
diff --git a/examples/udp_server.rs b/examples/udp_server.rs
index febb662..95f8a83 100644
--- a/examples/udp_server.rs
+++ b/examples/udp_server.rs
@@ -1,14 +1,16 @@
// You can run this example from the root of the mio repo:
-// cargo run --example udp_server --features="os-poll udp"
+// cargo run --example udp_server --features="os-poll net"
use log::warn;
-use mio::net::UdpSocket;
use mio::{Events, Interest, Poll, Token};
use std::io;
// A token to allow us to identify which event is for the `UdpSocket`.
const UDP_SOCKET: Token = Token(0);
+#[cfg(not(target_os = "wasi"))]
fn main() -> io::Result<()> {
+ use mio::net::UdpSocket;
+
env_logger::init();
// Create a poll instance.
@@ -19,6 +21,7 @@ fn main() -> io::Result<()> {
// Setup the UDP socket.
let addr = "127.0.0.1:9000".parse().unwrap();
+
let mut socket = UdpSocket::bind(addr)?;
// Register our socket with the token defined above and an interest in being
@@ -75,3 +78,8 @@ fn main() -> io::Result<()> {
}
}
}
+
+#[cfg(target_os = "wasi")]
+fn main() {
+ panic!("can't bind to an address with wasi")
+}
diff --git a/src/event/event.rs b/src/event/event.rs
index 9e4a95e..2d85742 100644
--- a/src/event/event.rs
+++ b/src/event/event.rs
@@ -25,6 +25,15 @@ impl Event {
}
/// Returns true if the event contains readable readiness.
+ ///
+ /// # Notes
+ ///
+ /// Out-of-band (OOB) data also triggers readable events. But must
+ /// application don't actually read OOB data, this could leave an
+ /// application open to a Denial-of-Service (Dos) attack, see
+ /// <https://github.com/sandstorm-io/sandstorm-website/blob/58f93346028c0576e8147627667328eaaf4be9fa/_posts/2015-04-08-osx-security-bug.md>.
+ /// However because Mio uses edge-triggers it will not result in an infinite
+ /// loop as described in the article above.
pub fn is_readable(&self) -> bool {
sys::event::is_readable(&self.inner)
}
@@ -53,7 +62,7 @@ impl Event {
/// | [kqueue] | `EV_ERROR` and `EV_EOF` with `fflags` set to `0`. |
///
/// [OS selector]: ../struct.Poll.html#implementation-notes
- /// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
+ /// [epoll]: https://man7.org/linux/man-pages/man7/epoll.7.html
/// [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
pub fn is_error(&self) -> bool {
sys::event::is_error(&self.inner)
@@ -84,7 +93,7 @@ impl Event {
/// | [kqueue] | `EV_EOF` |
///
/// [OS selector]: ../struct.Poll.html#implementation-notes
- /// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
+ /// [epoll]: https://man7.org/linux/man-pages/man7/epoll.7.html
/// [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
pub fn is_read_closed(&self) -> bool {
sys::event::is_read_closed(&self.inner)
@@ -114,7 +123,7 @@ impl Event {
/// | [kqueue] | `EV_EOF` |
///
/// [OS selector]: ../struct.Poll.html#implementation-notes
- /// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
+ /// [epoll]: https://man7.org/linux/man-pages/man7/epoll.7.html
/// [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
pub fn is_write_closed(&self) -> bool {
sys::event::is_write_closed(&self.inner)
@@ -135,7 +144,7 @@ impl Event {
/// | [kqueue] | *Not supported* |
///
/// [OS selector]: ../struct.Poll.html#implementation-notes
- /// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
+ /// [epoll]: https://man7.org/linux/man-pages/man7/epoll.7.html
/// [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
#[inline]
pub fn is_priority(&self) -> bool {
@@ -158,7 +167,7 @@ impl Event {
/// 1: Only supported on DragonFly BSD, FreeBSD, iOS and macOS.
///
/// [OS selector]: ../struct.Poll.html#implementation-notes
- /// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
+ /// [epoll]: https://man7.org/linux/man-pages/man7/epoll.7.html
/// [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
pub fn is_aio(&self) -> bool {
sys::event::is_aio(&self.inner)
diff --git a/src/event/source.rs b/src/event/source.rs
index f38268a..619f72d 100644
--- a/src/event/source.rs
+++ b/src/event/source.rs
@@ -38,8 +38,8 @@ use std::io;
///
/// Implementing `Source` on a struct containing a socket:
///
-#[cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")]
-#[cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")]
+#[cfg_attr(all(feature = "os-poll", feature = "net"), doc = "```")]
+#[cfg_attr(not(all(feature = "os-poll", feature = "net")), doc = "```ignore")]
/// use mio::{Interest, Registry, Token};
/// use mio::event::Source;
/// use mio::net::TcpStream;
@@ -121,7 +121,7 @@ where
token: Token,
interests: Interest,
) -> io::Result<()> {
- (&mut **self).register(registry, token, interests)
+ (**self).register(registry, token, interests)
}
fn reregister(
@@ -130,10 +130,10 @@ where
token: Token,
interests: Interest,
) -> io::Result<()> {
- (&mut **self).reregister(registry, token, interests)
+ (**self).reregister(registry, token, interests)
}
fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
- (&mut **self).deregister(registry)
+ (**self).deregister(registry)
}
}
diff --git a/src/interest.rs b/src/interest.rs
index ee5158a..0aa0bda 100644
--- a/src/interest.rs
+++ b/src/interest.rs
@@ -17,8 +17,8 @@ use std::{fmt, ops};
pub struct Interest(NonZeroU8);
// These must be unique.
-const READABLE: u8 = 0b0_001;
-const WRITABLE: u8 = 0b0_010;
+const READABLE: u8 = 0b0001;
+const WRITABLE: u8 = 0b0010;
// The following are not available on all platforms.
#[cfg_attr(
not(any(
@@ -29,9 +29,9 @@ const WRITABLE: u8 = 0b0_010;
)),
allow(dead_code)
)]
-const AIO: u8 = 0b0_100;
+const AIO: u8 = 0b0100;
#[cfg_attr(not(target_os = "freebsd"), allow(dead_code))]
-const LIO: u8 = 0b1_000;
+const LIO: u8 = 0b1000;
impl Interest {
/// Returns a `Interest` set representing readable interests.
diff --git a/src/io_source.rs b/src/io_source.rs
index 6939c0d..99623c1 100644
--- a/src/io_source.rs
+++ b/src/io_source.rs
@@ -1,14 +1,14 @@
use std::ops::{Deref, DerefMut};
#[cfg(unix)]
use std::os::unix::io::AsRawFd;
+#[cfg(target_os = "wasi")]
+use std::os::wasi::io::AsRawFd;
#[cfg(windows)]
use std::os::windows::io::AsRawSocket;
#[cfg(debug_assertions)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{fmt, io};
-#[cfg(any(unix, debug_assertions))]
-use crate::poll;
use crate::sys::IoSourceState;
use crate::{event, Interest, Registry, Token};
@@ -142,7 +142,9 @@ where
) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.associate(registry)?;
- poll::selector(registry).register(self.inner.as_raw_fd(), token, interests)
+ registry
+ .selector()
+ .register(self.inner.as_raw_fd(), token, interests)
}
fn reregister(
@@ -153,13 +155,15 @@ where
) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.check_association(registry)?;
- poll::selector(registry).reregister(self.inner.as_raw_fd(), token, interests)
+ registry
+ .selector()
+ .reregister(self.inner.as_raw_fd(), token, interests)
}
fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.remove_association(registry)?;
- poll::selector(registry).deregister(self.inner.as_raw_fd())
+ registry.selector().deregister(self.inner.as_raw_fd())
}
}
@@ -198,6 +202,44 @@ where
}
}
+#[cfg(target_os = "wasi")]
+impl<T> event::Source for IoSource<T>
+where
+ T: AsRawFd,
+{
+ fn register(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: Interest,
+ ) -> io::Result<()> {
+ #[cfg(debug_assertions)]
+ self.selector_id.associate(registry)?;
+ registry
+ .selector()
+ .register(self.inner.as_raw_fd() as _, token, interests)
+ }
+
+ fn reregister(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: Interest,
+ ) -> io::Result<()> {
+ #[cfg(debug_assertions)]
+ self.selector_id.check_association(registry)?;
+ registry
+ .selector()
+ .reregister(self.inner.as_raw_fd() as _, token, interests)
+ }
+
+ fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
+ #[cfg(debug_assertions)]
+ self.selector_id.remove_association(registry)?;
+ registry.selector().deregister(self.inner.as_raw_fd() as _)
+ }
+}
+
impl<T> fmt::Debug for IoSource<T>
where
T: fmt::Debug,
@@ -230,7 +272,7 @@ impl SelectorId {
/// Associate an I/O source with `registry`, returning an error if its
/// already registered.
fn associate(&self, registry: &Registry) -> io::Result<()> {
- let registry_id = poll::selector(&registry).id();
+ let registry_id = registry.selector().id();
let previous_id = self.id.swap(registry_id, Ordering::AcqRel);
if previous_id == Self::UNASSOCIATED {
@@ -247,7 +289,7 @@ impl SelectorId {
/// error if its registered with a different `Registry` or not registered at
/// all.
fn check_association(&self, registry: &Registry) -> io::Result<()> {
- let registry_id = poll::selector(&registry).id();
+ let registry_id = registry.selector().id();
let id = self.id.load(Ordering::Acquire);
if id == registry_id {
@@ -268,7 +310,7 @@ impl SelectorId {
/// Remove a previously made association from `registry`, returns an error
/// if it was not previously associated with `registry`.
fn remove_association(&self, registry: &Registry) -> io::Result<()> {
- let registry_id = poll::selector(&registry).id();
+ let registry_id = registry.selector().id();
let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel);
if previous_id == registry_id {
diff --git a/src/lib.rs b/src/lib.rs
index 165a340..56a7160 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,4 +1,3 @@
-#![doc(html_root_url = "https://docs.rs/mio/0.7.13")]
#![deny(
missing_docs,
missing_debug_implementations,
@@ -49,6 +48,7 @@ mod interest;
mod poll;
mod sys;
mod token;
+#[cfg(not(target_os = "wasi"))]
mod waker;
pub mod event;
@@ -66,6 +66,7 @@ pub use event::Events;
pub use interest::Interest;
pub use poll::{Poll, Registry};
pub use token::Token;
+#[cfg(not(target_os = "wasi"))]
pub use waker::Waker;
#[cfg(all(unix, feature = "os-ext"))]
@@ -100,7 +101,7 @@ pub mod features {
#![cfg_attr(feature = "os-poll", doc = "## `os-poll` (enabled)")]
#![cfg_attr(not(feature = "os-poll"), doc = "## `os-poll` (disabled)")]
//!
- //! Mio by default provides only a shell implementation, that `panic!`s the
+ //! Mio by default provides only a shell implementation that `panic!`s the
//! moment it is actually run. To run it requires OS support, this is
//! enabled by activating the `os-poll` feature.
//!
@@ -174,8 +175,8 @@ pub mod guide {
//!
//! [event source]: ../event/trait.Source.html
//!
- #![cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")]
- #![cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")]
+ #![cfg_attr(all(feature = "os-poll", feature = "net"), doc = "```")]
+ #![cfg_attr(not(all(feature = "os-poll", feature = "net")), doc = "```ignore")]
//! # use mio::net::TcpListener;
//! # use mio::{Poll, Token, Interest};
//! # fn main() -> std::io::Result<()> {
@@ -213,8 +214,8 @@ pub mod guide {
//! [poll]: ../struct.Poll.html#method.poll
//! [event sources]: ../event/trait.Source.html
//!
- #![cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")]
- #![cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")]
+ #![cfg_attr(all(feature = "os-poll", feature = "net"), doc = "```")]
+ #![cfg_attr(not(all(feature = "os-poll", feature = "net")), doc = "```ignore")]
//! # use std::io;
//! # use std::time::Duration;
//! # use mio::net::TcpListener;
diff --git a/src/net/mod.rs b/src/net/mod.rs
index 4df701d..7d714ca 100644
--- a/src/net/mod.rs
+++ b/src/net/mod.rs
@@ -6,11 +6,31 @@
//! matter the target platform.
//!
//! [portability guidelines]: ../struct.Poll.html#portability
+//!
+//! # Notes
+//!
+//! When using a datagram based socket, i.e. [`UdpSocket`] or [`UnixDatagram`],
+//! its only possible to receive a packet once. This means that if you provide a
+//! buffer that is too small you won't be able to receive the data anymore. How
+//! OSs deal with this situation is different for each OS:
+//! * Unixes, such as Linux, FreeBSD and macOS, will simply fill the buffer and
+//! return the amount of bytes written. This means that if the returned value
+//! is equal to the size of the buffer it may have only written a part of the
+//! packet (or the packet has the same size as the buffer).
+//! * Windows returns an `WSAEMSGSIZE` error.
+//!
+//! Mio does not change the value (either ok or error) returned by the OS, it's
+//! up to the user handle this. How to deal with these difference is still up
+//! for debate, specifically in
+//! <https://github.com/rust-lang/rust/issues/55794>. The best advice we can
+//! give is to always call receive with a large enough buffer.
mod tcp;
-pub use self::tcp::{TcpListener, TcpSocket, TcpStream, TcpKeepalive};
+pub use self::tcp::{TcpListener, TcpStream};
+#[cfg(not(target_os = "wasi"))]
mod udp;
+#[cfg(not(target_os = "wasi"))]
pub use self::udp::UdpSocket;
#[cfg(unix)]
diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs
index da276f3..df51d57 100644
--- a/src/net/tcp/listener.rs
+++ b/src/net/tcp/listener.rs
@@ -1,12 +1,18 @@
use std::net::{self, SocketAddr};
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+#[cfg(target_os = "wasi")]
+use std::os::wasi::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
use std::{fmt, io};
-use super::{TcpSocket, TcpStream};
use crate::io_source::IoSource;
+use crate::net::TcpStream;
+#[cfg(unix)]
+use crate::sys::tcp::set_reuseaddr;
+#[cfg(not(target_os = "wasi"))]
+use crate::sys::tcp::{bind, listen, new_for_addr};
use crate::{event, sys, Interest, Registry, Token};
/// A structure representing a socket server
@@ -49,8 +55,13 @@ impl TcpListener {
/// 2. Set the `SO_REUSEADDR` option on the socket on Unix.
/// 3. Bind the socket to the specified address.
/// 4. Calls `listen` on the socket to prepare it to receive new connections.
+ #[cfg(not(target_os = "wasi"))]
pub fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
- let socket = TcpSocket::new_for_addr(addr)?;
+ let socket = new_for_addr(addr)?;
+ #[cfg(unix)]
+ let listener = unsafe { TcpListener::from_raw_fd(socket) };
+ #[cfg(windows)]
+ let listener = unsafe { TcpListener::from_raw_socket(socket as _) };
// On platforms with Berkeley-derived sockets, this allows to quickly
// rebind a socket, without needing to wait for the OS to clean up the
@@ -60,10 +71,11 @@ impl TcpListener {
// which allows “socket hijacking”, so we explicitly don't set it here.
// https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
#[cfg(not(windows))]
- socket.set_reuseaddr(true)?;
+ set_reuseaddr(&listener.inner, true)?;
- socket.bind(addr)?;
- socket.listen(1024)
+ bind(&listener.inner, addr)?;
+ listen(&listener.inner, 1024)?;
+ Ok(listener)
}
/// Creates a new `TcpListener` from a standard `net::TcpListener`.
@@ -207,3 +219,30 @@ impl FromRawSocket for TcpListener {
TcpListener::from_std(FromRawSocket::from_raw_socket(socket))
}
}
+
+#[cfg(target_os = "wasi")]
+impl IntoRawFd for TcpListener {
+ fn into_raw_fd(self) -> RawFd {
+ self.inner.into_inner().into_raw_fd()
+ }
+}
+
+#[cfg(target_os = "wasi")]
+impl AsRawFd for TcpListener {
+ fn as_raw_fd(&self) -> RawFd {
+ self.inner.as_raw_fd()
+ }
+}
+
+#[cfg(target_os = "wasi")]
+impl FromRawFd for TcpListener {
+ /// Converts a `RawFd` to a `TcpListener`.
+ ///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the socket is in
+ /// non-blocking mode.
+ unsafe fn from_raw_fd(fd: RawFd) -> TcpListener {
+ TcpListener::from_std(FromRawFd::from_raw_fd(fd))
+ }
+}
diff --git a/src/net/tcp/mod.rs b/src/net/tcp/mod.rs
index 4e47aee..94af5c1 100644
--- a/src/net/tcp/mod.rs
+++ b/src/net/tcp/mod.rs
@@ -1,8 +1,5 @@
mod listener;
pub use self::listener::TcpListener;
-mod socket;
-pub use self::socket::{TcpSocket, TcpKeepalive};
-
mod stream;
pub use self::stream::TcpStream;
diff --git a/src/net/tcp/socket.rs b/src/net/tcp/socket.rs
deleted file mode 100644
index 69fbacf..0000000
--- a/src/net/tcp/socket.rs
+++ /dev/null
@@ -1,490 +0,0 @@
-use std::io;
-use std::mem;
-use std::net::SocketAddr;
-#[cfg(unix)]
-use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
-#[cfg(windows)]
-use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
-use std::time::Duration;
-
-use crate::net::{TcpListener, TcpStream};
-use crate::sys;
-
-/// A non-blocking TCP socket used to configure a stream or listener.
-///
-/// The `TcpSocket` type wraps the operating-system's socket handle. This type
-/// is used to configure the socket before establishing a connection or start
-/// listening for inbound connections.
-///
-/// The socket will be closed when the value is dropped.
-#[derive(Debug)]
-pub struct TcpSocket {
- sys: sys::tcp::TcpSocket,
-}
-
-/// Configures a socket's TCP keepalive parameters.
-#[derive(Debug, Default, Clone)]
-pub struct TcpKeepalive {
- pub(crate) time: Option<Duration>,
- #[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
- target_os = "windows",
- ))]
- pub(crate) interval: Option<Duration>,
- #[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
- ))]
- pub(crate) retries: Option<u32>,
-}
-
-impl TcpSocket {
- /// Create a new IPv4 TCP socket.
- ///
- /// This calls `socket(2)`.
- pub fn new_v4() -> io::Result<TcpSocket> {
- sys::tcp::new_v4_socket().map(|sys| TcpSocket { sys })
- }
-
- /// Create a new IPv6 TCP socket.
- ///
- /// This calls `socket(2)`.
- pub fn new_v6() -> io::Result<TcpSocket> {
- sys::tcp::new_v6_socket().map(|sys| TcpSocket { sys })
- }
-
- pub(crate) fn new_for_addr(addr: SocketAddr) -> io::Result<TcpSocket> {
- if addr.is_ipv4() {
- TcpSocket::new_v4()
- } else {
- TcpSocket::new_v6()
- }
- }
-
- /// Bind `addr` to the TCP socket.
- pub fn bind(&self, addr: SocketAddr) -> io::Result<()> {
- sys::tcp::bind(self.sys, addr)
- }
-
- /// Connect the socket to `addr`.
- ///
- /// This consumes the socket and performs the connect operation. Once the
- /// connection completes, the socket is now a non-blocking `TcpStream` and
- /// can be used as such.
- pub fn connect(self, addr: SocketAddr) -> io::Result<TcpStream> {
- let stream = sys::tcp::connect(self.sys, addr)?;
-
- // Don't close the socket
- mem::forget(self);
- Ok(TcpStream::from_std(stream))
- }
-
- /// Listen for inbound connections, converting the socket to a
- /// `TcpListener`.
- pub fn listen(self, backlog: u32) -> io::Result<TcpListener> {
- let listener = sys::tcp::listen(self.sys, backlog)?;
-
- // Don't close the socket
- mem::forget(self);
- Ok(TcpListener::from_std(listener))
- }
-
- /// Sets the value of `SO_REUSEADDR` on this socket.
- pub fn set_reuseaddr(&self, reuseaddr: bool) -> io::Result<()> {
- sys::tcp::set_reuseaddr(self.sys, reuseaddr)
- }
-
- /// Get the value of `SO_REUSEADDR` set on this socket.
- pub fn get_reuseaddr(&self) -> io::Result<bool> {
- sys::tcp::get_reuseaddr(self.sys)
- }
-
- /// Sets the value of `SO_REUSEPORT` on this socket.
- /// Only supported available in unix
- #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
- pub fn set_reuseport(&self, reuseport: bool) -> io::Result<()> {
- sys::tcp::set_reuseport(self.sys, reuseport)
- }
-
- /// Get the value of `SO_REUSEPORT` set on this socket.
- /// Only supported available in unix
- #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
- pub fn get_reuseport(&self) -> io::Result<bool> {
- sys::tcp::get_reuseport(self.sys)
- }
-
- /// Sets the value of `SO_LINGER` on this socket.
- pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
- sys::tcp::set_linger(self.sys, dur)
- }
-
- /// Gets the value of `SO_LINGER` on this socket
- pub fn get_linger(&self) -> io::Result<Option<Duration>> {
- sys::tcp::get_linger(self.sys)
- }
-
- /// Sets the value of `SO_RCVBUF` on this socket.
- pub fn set_recv_buffer_size(&self, size: u32) -> io::Result<()> {
- sys::tcp::set_recv_buffer_size(self.sys, size)
- }
-
- /// Get the value of `SO_RCVBUF` set on this socket.
- ///
- /// Note that if [`set_recv_buffer_size`] has been called on this socket
- /// previously, the value returned by this function may not be the same as
- /// the argument provided to `set_recv_buffer_size`. This is for the
- /// following reasons:
- ///
- /// * Most operating systems have minimum and maximum allowed sizes for the
- /// receive buffer, and will clamp the provided value if it is below the
- /// minimum or above the maximum. The minimum and maximum buffer sizes are
- /// OS-dependent.
- /// * Linux will double the buffer size to account for internal bookkeeping
- /// data, and returns the doubled value from `getsockopt(2)`. As per `man
- /// 7 socket`:
- /// > Sets or gets the maximum socket receive buffer in bytes. The
- /// > kernel doubles this value (to allow space for bookkeeping
- /// > overhead) when it is set using `setsockopt(2)`, and this doubled
- /// > value is returned by `getsockopt(2)`.
- ///
- /// [`set_recv_buffer_size`]: #method.set_recv_buffer_size
- pub fn get_recv_buffer_size(&self) -> io::Result<u32> {
- sys::tcp::get_recv_buffer_size(self.sys)
- }
-
- /// Sets the value of `SO_SNDBUF` on this socket.
- pub fn set_send_buffer_size(&self, size: u32) -> io::Result<()> {
- sys::tcp::set_send_buffer_size(self.sys, size)
- }
-
- /// Get the value of `SO_SNDBUF` set on this socket.
- ///
- /// Note that if [`set_send_buffer_size`] has been called on this socket
- /// previously, the value returned by this function may not be the same as
- /// the argument provided to `set_send_buffer_size`. This is for the
- /// following reasons:
- ///
- /// * Most operating systems have minimum and maximum allowed sizes for the
- /// receive buffer, and will clamp the provided value if it is below the
- /// minimum or above the maximum. The minimum and maximum buffer sizes are
- /// OS-dependent.
- /// * Linux will double the buffer size to account for internal bookkeeping
- /// data, and returns the doubled value from `getsockopt(2)`. As per `man
- /// 7 socket`:
- /// > Sets or gets the maximum socket send buffer in bytes. The
- /// > kernel doubles this value (to allow space for bookkeeping
- /// > overhead) when it is set using `setsockopt(2)`, and this doubled
- /// > value is returned by `getsockopt(2)`.
- ///
- /// [`set_send_buffer_size`]: #method.set_send_buffer_size
- pub fn get_send_buffer_size(&self) -> io::Result<u32> {
- sys::tcp::get_send_buffer_size(self.sys)
- }
-
- /// Sets whether keepalive messages are enabled to be sent on this socket.
- ///
- /// This will set the `SO_KEEPALIVE` option on this socket.
- pub fn set_keepalive(&self, keepalive: bool) -> io::Result<()> {
- sys::tcp::set_keepalive(self.sys, keepalive)
- }
-
- /// Returns whether or not TCP keepalive probes will be sent by this socket.
- pub fn get_keepalive(&self) -> io::Result<bool> {
- sys::tcp::get_keepalive(self.sys)
- }
-
- /// Sets parameters configuring TCP keepalive probes for this socket.
- ///
- /// The supported parameters depend on the operating system, and are
- /// configured using the [`TcpKeepalive`] struct. At a minimum, all systems
- /// support configuring the [keepalive time]: the time after which the OS
- /// will start sending keepalive messages on an idle connection.
- ///
- /// # Notes
- ///
- /// * This will enable TCP keepalive on this socket, if it is not already
- /// enabled.
- /// * On some platforms, such as Windows, any keepalive parameters *not*
- /// configured by the `TcpKeepalive` struct passed to this function may be
- /// overwritten with their default values. Therefore, this function should
- /// either only be called once per socket, or the same parameters should
- /// be passed every time it is called.
- ///
- /// # Examples
- #[cfg_attr(feature = "os-poll", doc = "```")]
- #[cfg_attr(not(feature = "os-poll"), doc = "```ignore")]
- /// use mio::net::{TcpSocket, TcpKeepalive};
- /// use std::time::Duration;
- ///
- /// # fn main() -> Result<(), std::io::Error> {
- /// let socket = TcpSocket::new_v6()?;
- /// let keepalive = TcpKeepalive::default()
- /// .with_time(Duration::from_secs(4));
- /// // Depending on the target operating system, we may also be able to
- /// // configure the keepalive probe interval and/or the number of retries
- /// // here as well.
- ///
- /// socket.set_keepalive_params(keepalive)?;
- /// # Ok(()) }
- /// ```
- ///
- /// [`TcpKeepalive`]: ../struct.TcpKeepalive.html
- /// [keepalive time]: ../struct.TcpKeepalive.html#method.with_time
- pub fn set_keepalive_params(&self, keepalive: TcpKeepalive) -> io::Result<()> {
- self.set_keepalive(true)?;
- sys::tcp::set_keepalive_params(self.sys, keepalive)
- }
-
- /// Returns the amount of time after which TCP keepalive probes will be sent
- /// on idle connections.
- ///
- /// If `None`, then keepalive messages are disabled.
- ///
- /// This returns the value of `SO_KEEPALIVE` + `IPPROTO_TCP` on OpenBSD,
- /// NetBSD, and Haiku, `TCP_KEEPALIVE` on macOS and iOS, and `TCP_KEEPIDLE`
- /// on all other Unix operating systems. On Windows, it is not possible to
- /// access the value of TCP keepalive parameters after they have been set.
- ///
- /// Some platforms specify this value in seconds, so sub-second
- /// specifications may be omitted.
- #[cfg_attr(docsrs, doc(cfg(not(target_os = "windows"))))]
- #[cfg(not(target_os = "windows"))]
- pub fn get_keepalive_time(&self) -> io::Result<Option<Duration>> {
- sys::tcp::get_keepalive_time(self.sys)
- }
-
- /// Returns the time interval between TCP keepalive probes, if TCP keepalive is
- /// enabled on this socket.
- ///
- /// If `None`, then keepalive messages are disabled.
- ///
- /// This returns the value of `TCP_KEEPINTVL` on supported Unix operating
- /// systems. On Windows, it is not possible to access the value of TCP
- /// keepalive parameters after they have been set..
- ///
- /// Some platforms specify this value in seconds, so sub-second
- /// specifications may be omitted.
- #[cfg_attr(
- docsrs,
- doc(cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
- )))
- )]
- #[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
- ))]
- pub fn get_keepalive_interval(&self) -> io::Result<Option<Duration>> {
- sys::tcp::get_keepalive_interval(self.sys)
- }
-
- /// Returns the maximum number of TCP keepalive probes that will be sent before
- /// dropping a connection, if TCP keepalive is enabled on this socket.
- ///
- /// If `None`, then keepalive messages are disabled.
- ///
- /// This returns the value of `TCP_KEEPCNT` on Unix operating systems that
- /// support this option. On Windows, it is not possible to access the value
- /// of TCP keepalive parameters after they have been set.
- #[cfg_attr(
- docsrs,
- doc(cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
- )))
- )]
- #[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
- ))]
- pub fn get_keepalive_retries(&self) -> io::Result<Option<u32>> {
- sys::tcp::get_keepalive_retries(self.sys)
- }
-
- /// Returns the local address of this socket
- ///
- /// Will return `Err` result in windows if called before calling `bind`
- pub fn get_localaddr(&self) -> io::Result<SocketAddr> {
- sys::tcp::get_localaddr(self.sys)
- }
-}
-
-impl Drop for TcpSocket {
- fn drop(&mut self) {
- sys::tcp::close(self.sys);
- }
-}
-
-#[cfg(unix)]
-impl IntoRawFd for TcpSocket {
- fn into_raw_fd(self) -> RawFd {
- let ret = self.sys;
- // Avoid closing the socket
- mem::forget(self);
- ret
- }
-}
-
-#[cfg(unix)]
-impl AsRawFd for TcpSocket {
- fn as_raw_fd(&self) -> RawFd {
- self.sys
- }
-}
-
-#[cfg(unix)]
-impl FromRawFd for TcpSocket {
- /// Converts a `RawFd` to a `TcpSocket`.
- ///
- /// # Notes
- ///
- /// The caller is responsible for ensuring that the socket is in
- /// non-blocking mode.
- unsafe fn from_raw_fd(fd: RawFd) -> TcpSocket {
- TcpSocket { sys: fd }
- }
-}
-
-#[cfg(windows)]
-impl IntoRawSocket for TcpSocket {
- fn into_raw_socket(self) -> RawSocket {
- // The winapi crate defines `SOCKET` as `usize`. The Rust std
- // conditionally defines `RawSocket` as a fixed size unsigned integer
- // matching the pointer width. These end up being the same type but we
- // must cast between them.
- let ret = self.sys as RawSocket;
-
- // Avoid closing the socket
- mem::forget(self);
-
- ret
- }
-}
-
-#[cfg(windows)]
-impl AsRawSocket for TcpSocket {
- fn as_raw_socket(&self) -> RawSocket {
- self.sys as RawSocket
- }
-}
-
-#[cfg(windows)]
-impl FromRawSocket for TcpSocket {
- /// Converts a `RawSocket` to a `TcpSocket`.
- ///
- /// # Notes
- ///
- /// The caller is responsible for ensuring that the socket is in
- /// non-blocking mode.
- unsafe fn from_raw_socket(socket: RawSocket) -> TcpSocket {
- TcpSocket {
- sys: socket as sys::tcp::TcpSocket,
- }
- }
-}
-
-impl TcpKeepalive {
- // Sets the amount of time after which TCP keepalive probes will be sent
- /// on idle connections.
- ///
- /// This will set the value of `SO_KEEPALIVE` + `IPPROTO_TCP` on OpenBSD,
- /// NetBSD, and Haiku, `TCP_KEEPALIVE` on macOS and iOS, and `TCP_KEEPIDLE`
- /// on all other Unix operating systems. On Windows, this sets the value of
- /// the `tcp_keepalive` struct's `keepalivetime` field.
- ///
- /// Some platforms specify this value in seconds, so sub-second
- /// specifications may be omitted.
- pub fn with_time(self, time: Duration) -> Self {
- Self {
- time: Some(time),
- ..self
- }
- }
-
- /// Sets the time interval between TCP keepalive probes.
- /// This sets the value of `TCP_KEEPINTVL` on supported Unix operating
- /// systems. On Windows, this sets the value of the `tcp_keepalive` struct's
- /// `keepaliveinterval` field.
- ///
- /// Some platforms specify this value in seconds, so sub-second
- /// specifications may be omitted.
- #[cfg_attr(
- docsrs,
- doc(cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
- target_os = "windows"
- )))
- )]
- #[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
- target_os = "windows"
- ))]
- pub fn with_interval(self, interval: Duration) -> Self {
- Self {
- interval: Some(interval),
- ..self
- }
- }
-
- /// Sets the maximum number of TCP keepalive probes that will be sent before
- /// dropping a connection, if TCP keepalive is enabled on this socket.
- ///
- /// This will set the value of `TCP_KEEPCNT` on Unix operating systems that
- /// support this option.
- #[cfg_attr(
- docsrs,
- doc(cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
- )))
- )]
- #[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
- ))]
- pub fn with_retries(self, retries: u32) -> Self {
- Self {
- retries: Some(retries),
- ..self
- }
- }
-
- /// Returns a new, empty set of TCP keepalive parameters.
- pub fn new() -> Self {
- Self::default()
- }
-}
diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs
index cdbd46a..a7a9aa1 100644
--- a/src/net/tcp/stream.rs
+++ b/src/net/tcp/stream.rs
@@ -3,11 +3,14 @@ use std::io::{self, IoSlice, IoSliceMut, Read, Write};
use std::net::{self, Shutdown, SocketAddr};
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+#[cfg(target_os = "wasi")]
+use std::os::wasi::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
use crate::io_source::IoSource;
-use crate::net::TcpSocket;
+#[cfg(not(target_os = "wasi"))]
+use crate::sys::tcp::{connect, new_for_addr};
use crate::{event, Interest, Registry, Token};
/// A non-blocking TCP stream between a local socket and a remote socket.
@@ -49,9 +52,43 @@ pub struct TcpStream {
impl TcpStream {
/// Create a new TCP stream and issue a non-blocking connect to the
/// specified address.
+ ///
+ /// # Notes
+ ///
+ /// The returned `TcpStream` may not be connected (and thus usable), unlike
+ /// the API found in `std::net::TcpStream`. Because Mio issues a
+ /// *non-blocking* connect it will not block the thread and instead return
+ /// an unconnected `TcpStream`.
+ ///
+ /// Ensuring the returned stream is connected is surprisingly complex when
+ /// considering cross-platform support. Doing this properly should follow
+ /// the steps below, an example implementation can be found
+ /// [here](https://github.com/Thomasdezeeuw/heph/blob/0c4f1ab3eaf08bea1d65776528bfd6114c9f8374/src/net/tcp/stream.rs#L560-L622).
+ ///
+ /// 1. Call `TcpStream::connect`
+ /// 2. Register the returned stream with at least [write interest].
+ /// 3. Wait for a (writable) event.
+ /// 4. Check `TcpStream::peer_addr`. If it returns `libc::EINPROGRESS` or
+ /// `ErrorKind::NotConnected` it means the stream is not yet connected,
+ /// go back to step 3. If it returns an address it means the stream is
+ /// connected, go to step 5. If another error is returned something
+ /// whent wrong.
+ /// 5. Now the stream can be used.
+ ///
+ /// This may return a `WouldBlock` in which case the socket connection
+ /// cannot be completed immediately, it usually means there are insufficient
+ /// entries in the routing cache.
+ ///
+ /// [write interest]: Interest::WRITABLE
+ #[cfg(not(target_os = "wasi"))]
pub fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
- let socket = TcpSocket::new_for_addr(addr)?;
- socket.connect(addr)
+ let socket = new_for_addr(addr)?;
+ #[cfg(unix)]
+ let stream = unsafe { TcpStream::from_raw_fd(socket) };
+ #[cfg(windows)]
+ let stream = unsafe { TcpStream::from_raw_socket(socket as _) };
+ connect(&stream.inner, addr)?;
+ Ok(stream)
}
/// Creates a new `TcpStream` from a standard `net::TcpStream`.
@@ -103,7 +140,7 @@ impl TcpStream {
///
/// On Windows make sure the stream is connected before calling this method,
/// by receiving an (writable) event. Trying to set `nodelay` on an
- /// unconnected `TcpStream` is undefined behavior.
+ /// unconnected `TcpStream` is unspecified behavior.
pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
self.inner.set_nodelay(nodelay)
}
@@ -118,7 +155,7 @@ impl TcpStream {
///
/// On Windows make sure the stream is connected before calling this method,
/// by receiving an (writable) event. Trying to get `nodelay` on an
- /// unconnected `TcpStream` is undefined behavior.
+ /// unconnected `TcpStream` is unspecified behavior.
pub fn nodelay(&self) -> io::Result<bool> {
self.inner.nodelay()
}
@@ -132,7 +169,7 @@ impl TcpStream {
///
/// On Windows make sure the stream is connected before calling this method,
/// by receiving an (writable) event. Trying to set `ttl` on an
- /// unconnected `TcpStream` is undefined behavior.
+ /// unconnected `TcpStream` is unspecified behavior.
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.inner.set_ttl(ttl)
}
@@ -145,7 +182,7 @@ impl TcpStream {
///
/// On Windows make sure the stream is connected before calling this method,
/// by receiving an (writable) event. Trying to get `ttl` on an
- /// unconnected `TcpStream` is undefined behavior.
+ /// unconnected `TcpStream` is unspecified behavior.
///
/// [link]: #method.set_ttl
pub fn ttl(&self) -> io::Result<u32> {
@@ -170,53 +207,111 @@ impl TcpStream {
pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.peek(buf)
}
+
+ /// Execute an I/O operation ensuring that the socket receives more events
+ /// if it hits a [`WouldBlock`] error.
+ ///
+ /// # Notes
+ ///
+ /// This method is required to be called for **all** I/O operations to
+ /// ensure the user will receive events once the socket is ready again after
+ /// returning a [`WouldBlock`] error.
+ ///
+ /// [`WouldBlock`]: io::ErrorKind::WouldBlock
+ ///
+ /// # Examples
+ ///
+ #[cfg_attr(unix, doc = "```no_run")]
+ #[cfg_attr(windows, doc = "```ignore")]
+ /// # use std::error::Error;
+ /// #
+ /// # fn main() -> Result<(), Box<dyn Error>> {
+ /// use std::io;
+ /// #[cfg(unix)]
+ /// use std::os::unix::io::AsRawFd;
+ /// #[cfg(windows)]
+ /// use std::os::windows::io::AsRawSocket;
+ /// use mio::net::TcpStream;
+ ///
+ /// let address = "127.0.0.1:8080".parse().unwrap();
+ /// let stream = TcpStream::connect(address)?;
+ ///
+ /// // Wait until the stream is readable...
+ ///
+ /// // Read from the stream using a direct libc call, of course the
+ /// // `io::Read` implementation would be easier to use.
+ /// let mut buf = [0; 512];
+ /// let n = stream.try_io(|| {
+ /// let buf_ptr = &mut buf as *mut _ as *mut _;
+ /// #[cfg(unix)]
+ /// let res = unsafe { libc::recv(stream.as_raw_fd(), buf_ptr, buf.len(), 0) };
+ /// #[cfg(windows)]
+ /// let res = unsafe { libc::recvfrom(stream.as_raw_socket() as usize, buf_ptr, buf.len() as i32, 0, std::ptr::null_mut(), std::ptr::null_mut()) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("read {} bytes", n);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
+ where
+ F: FnOnce() -> io::Result<T>,
+ {
+ self.inner.do_io(|_| f())
+ }
}
impl Read for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).read(buf))
+ self.inner.do_io(|mut inner| inner.read(buf))
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).read_vectored(bufs))
+ self.inner.do_io(|mut inner| inner.read_vectored(bufs))
}
}
impl<'a> Read for &'a TcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).read(buf))
+ self.inner.do_io(|mut inner| inner.read(buf))
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).read_vectored(bufs))
+ self.inner.do_io(|mut inner| inner.read_vectored(bufs))
}
}
impl Write for TcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).write(buf))
+ self.inner.do_io(|mut inner| inner.write(buf))
}
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).write_vectored(bufs))
+ self.inner.do_io(|mut inner| inner.write_vectored(bufs))
}
fn flush(&mut self) -> io::Result<()> {
- self.inner.do_io(|inner| (&*inner).flush())
+ self.inner.do_io(|mut inner| inner.flush())
}
}
impl<'a> Write for &'a TcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).write(buf))
+ self.inner.do_io(|mut inner| inner.write(buf))
}
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).write_vectored(bufs))
+ self.inner.do_io(|mut inner| inner.write_vectored(bufs))
}
fn flush(&mut self) -> io::Result<()> {
- self.inner.do_io(|inner| (&*inner).flush())
+ self.inner.do_io(|mut inner| inner.flush())
}
}
@@ -303,3 +398,30 @@ impl FromRawSocket for TcpStream {
TcpStream::from_std(FromRawSocket::from_raw_socket(socket))
}
}
+
+#[cfg(target_os = "wasi")]
+impl IntoRawFd for TcpStream {
+ fn into_raw_fd(self) -> RawFd {
+ self.inner.into_inner().into_raw_fd()
+ }
+}
+
+#[cfg(target_os = "wasi")]
+impl AsRawFd for TcpStream {
+ fn as_raw_fd(&self) -> RawFd {
+ self.inner.as_raw_fd()
+ }
+}
+
+#[cfg(target_os = "wasi")]
+impl FromRawFd for TcpStream {
+ /// Converts a `RawFd` to a `TcpStream`.
+ ///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the socket is in
+ /// non-blocking mode.
+ unsafe fn from_raw_fd(fd: RawFd) -> TcpStream {
+ TcpStream::from_std(FromRawFd::from_raw_fd(fd))
+ }
+}
diff --git a/src/net/udp.rs b/src/net/udp.rs
index c5c3ba9..5abe12e 100644
--- a/src/net/udp.rs
+++ b/src/net/udp.rs
@@ -161,6 +161,29 @@ impl UdpSocket {
self.inner.local_addr()
}
+ /// Returns the socket address of the remote peer this socket was connected to.
+ ///
+ /// # Examples
+ ///
+ #[cfg_attr(feature = "os-poll", doc = "```")]
+ #[cfg_attr(not(feature = "os-poll"), doc = "```ignore")]
+ /// # use std::error::Error;
+ /// #
+ /// # fn main() -> Result<(), Box<dyn Error>> {
+ /// use mio::net::UdpSocket;
+ ///
+ /// let addr = "127.0.0.1:0".parse()?;
+ /// let peer_addr = "127.0.0.1:11100".parse()?;
+ /// let socket = UdpSocket::bind(addr)?;
+ /// socket.connect(peer_addr)?;
+ /// assert_eq!(socket.peer_addr()?.ip(), peer_addr.ip());
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.peer_addr()
+ }
+
/// Sends data on the socket to the given address. On success, returns the
/// number of bytes written.
///
@@ -297,6 +320,10 @@ impl UdpSocket {
/// Connects the UDP socket setting the default destination for `send()`
/// and limiting packets that are read via `recv` from the address specified
/// in `addr`.
+ ///
+ /// This may return a `WouldBlock` in which case the socket connection
+ /// cannot be completed immediately, it usually means there are insufficient
+ /// entries in the routing cache.
pub fn connect(&self, addr: SocketAddr) -> io::Result<()> {
self.inner.connect(addr)
}
@@ -525,6 +552,64 @@ impl UdpSocket {
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.inner.take_error()
}
+
+ /// Execute an I/O operation ensuring that the socket receives more events
+ /// if it hits a [`WouldBlock`] error.
+ ///
+ /// # Notes
+ ///
+ /// This method is required to be called for **all** I/O operations to
+ /// ensure the user will receive events once the socket is ready again after
+ /// returning a [`WouldBlock`] error.
+ ///
+ /// [`WouldBlock`]: io::ErrorKind::WouldBlock
+ ///
+ /// # Examples
+ ///
+ #[cfg_attr(unix, doc = "```no_run")]
+ #[cfg_attr(windows, doc = "```ignore")]
+ /// # use std::error::Error;
+ /// #
+ /// # fn main() -> Result<(), Box<dyn Error>> {
+ /// use std::io;
+ /// #[cfg(unix)]
+ /// use std::os::unix::io::AsRawFd;
+ /// #[cfg(windows)]
+ /// use std::os::windows::io::AsRawSocket;
+ /// use mio::net::UdpSocket;
+ ///
+ /// let address = "127.0.0.1:8080".parse().unwrap();
+ /// let dgram = UdpSocket::bind(address)?;
+ ///
+ /// // Wait until the dgram is readable...
+ ///
+ /// // Read from the dgram using a direct libc call, of course the
+ /// // `io::Read` implementation would be easier to use.
+ /// let mut buf = [0; 512];
+ /// let n = dgram.try_io(|| {
+ /// let buf_ptr = &mut buf as *mut _ as *mut _;
+ /// #[cfg(unix)]
+ /// let res = unsafe { libc::recv(dgram.as_raw_fd(), buf_ptr, buf.len(), 0) };
+ /// #[cfg(windows)]
+ /// let res = unsafe { libc::recvfrom(dgram.as_raw_socket() as usize, buf_ptr, buf.len() as i32, 0, std::ptr::null_mut(), std::ptr::null_mut()) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("read {} bytes", n);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
+ where
+ F: FnOnce() -> io::Result<T>,
+ {
+ self.inner.do_io(|_| f())
+ }
}
impl event::Source for UdpSocket {
diff --git a/src/net/uds/datagram.rs b/src/net/uds/datagram.rs
index 0c8f5ff..e963d6e 100644
--- a/src/net/uds/datagram.rs
+++ b/src/net/uds/datagram.rs
@@ -22,8 +22,8 @@ impl UnixDatagram {
///
/// This function is intended to be used to wrap a Unix datagram from the
/// standard library in the Mio equivalent. The conversion assumes nothing
- /// about the underlying datagram; ; it is left up to the user to set it
- /// in non-blocking mode.
+ /// about the underlying datagram; it is left up to the user to set it in
+ /// non-blocking mode.
pub fn from_std(socket: net::UnixDatagram) -> UnixDatagram {
UnixDatagram {
inner: IoSource::new(socket),
@@ -31,6 +31,9 @@ impl UnixDatagram {
}
/// Connects the socket to the specified address.
+ ///
+ /// This may return a `WouldBlock` in which case the socket connection
+ /// cannot be completed immediately.
pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
self.inner.connect(path)
}
@@ -108,6 +111,74 @@ impl UnixDatagram {
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.inner.shutdown(how)
}
+
+ /// Execute an I/O operation ensuring that the socket receives more events
+ /// if it hits a [`WouldBlock`] error.
+ ///
+ /// # Notes
+ ///
+ /// This method is required to be called for **all** I/O operations to
+ /// ensure the user will receive events once the socket is ready again after
+ /// returning a [`WouldBlock`] error.
+ ///
+ /// [`WouldBlock`]: io::ErrorKind::WouldBlock
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// #
+ /// # fn main() -> Result<(), Box<dyn Error>> {
+ /// use std::io;
+ /// use std::os::unix::io::AsRawFd;
+ /// use mio::net::UnixDatagram;
+ ///
+ /// let (dgram1, dgram2) = UnixDatagram::pair()?;
+ ///
+ /// // Wait until the dgram is writable...
+ ///
+ /// // Write to the dgram using a direct libc call, of course the
+ /// // `io::Write` implementation would be easier to use.
+ /// let buf = b"hello";
+ /// let n = dgram1.try_io(|| {
+ /// let buf_ptr = &buf as *const _ as *const _;
+ /// let res = unsafe { libc::send(dgram1.as_raw_fd(), buf_ptr, buf.len(), 0) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::send, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("write {} bytes", n);
+ ///
+ /// // Wait until the dgram is readable...
+ ///
+ /// // Read from the dgram using a direct libc call, of course the
+ /// // `io::Read` implementation would be easier to use.
+ /// let mut buf = [0; 512];
+ /// let n = dgram2.try_io(|| {
+ /// let buf_ptr = &mut buf as *mut _ as *mut _;
+ /// let res = unsafe { libc::recv(dgram2.as_raw_fd(), buf_ptr, buf.len(), 0) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("read {} bytes", n);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
+ where
+ F: FnOnce() -> io::Result<T>,
+ {
+ self.inner.do_io(|_| f())
+ }
}
impl event::Source for UnixDatagram {
diff --git a/src/net/uds/stream.rs b/src/net/uds/stream.rs
index f21d9e7..b38812e 100644
--- a/src/net/uds/stream.rs
+++ b/src/net/uds/stream.rs
@@ -15,6 +15,9 @@ pub struct UnixStream {
impl UnixStream {
/// Connects to the socket named by `path`.
+ ///
+ /// This may return a `WouldBlock` in which case the socket connection
+ /// cannot be completed immediately. Usually it means the backlog is full.
pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
sys::uds::stream::connect(path.as_ref()).map(UnixStream::from_std)
}
@@ -69,53 +72,121 @@ impl UnixStream {
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.inner.shutdown(how)
}
+
+ /// Execute an I/O operation ensuring that the socket receives more events
+ /// if it hits a [`WouldBlock`] error.
+ ///
+ /// # Notes
+ ///
+ /// This method is required to be called for **all** I/O operations to
+ /// ensure the user will receive events once the socket is ready again after
+ /// returning a [`WouldBlock`] error.
+ ///
+ /// [`WouldBlock`]: io::ErrorKind::WouldBlock
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// #
+ /// # fn main() -> Result<(), Box<dyn Error>> {
+ /// use std::io;
+ /// use std::os::unix::io::AsRawFd;
+ /// use mio::net::UnixStream;
+ ///
+ /// let (stream1, stream2) = UnixStream::pair()?;
+ ///
+ /// // Wait until the stream is writable...
+ ///
+ /// // Write to the stream using a direct libc call, of course the
+ /// // `io::Write` implementation would be easier to use.
+ /// let buf = b"hello";
+ /// let n = stream1.try_io(|| {
+ /// let buf_ptr = &buf as *const _ as *const _;
+ /// let res = unsafe { libc::send(stream1.as_raw_fd(), buf_ptr, buf.len(), 0) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::send, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("write {} bytes", n);
+ ///
+ /// // Wait until the stream is readable...
+ ///
+ /// // Read from the stream using a direct libc call, of course the
+ /// // `io::Read` implementation would be easier to use.
+ /// let mut buf = [0; 512];
+ /// let n = stream2.try_io(|| {
+ /// let buf_ptr = &mut buf as *mut _ as *mut _;
+ /// let res = unsafe { libc::recv(stream2.as_raw_fd(), buf_ptr, buf.len(), 0) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("read {} bytes", n);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
+ where
+ F: FnOnce() -> io::Result<T>,
+ {
+ self.inner.do_io(|_| f())
+ }
}
impl Read for UnixStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).read(buf))
+ self.inner.do_io(|mut inner| inner.read(buf))
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).read_vectored(bufs))
+ self.inner.do_io(|mut inner| inner.read_vectored(bufs))
}
}
impl<'a> Read for &'a UnixStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).read(buf))
+ self.inner.do_io(|mut inner| inner.read(buf))
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).read_vectored(bufs))
+ self.inner.do_io(|mut inner| inner.read_vectored(bufs))
}
}
impl Write for UnixStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).write(buf))
+ self.inner.do_io(|mut inner| inner.write(buf))
}
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).write_vectored(bufs))
+ self.inner.do_io(|mut inner| inner.write_vectored(bufs))
}
fn flush(&mut self) -> io::Result<()> {
- self.inner.do_io(|inner| (&*inner).flush())
+ self.inner.do_io(|mut inner| inner.flush())
}
}
impl<'a> Write for &'a UnixStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).write(buf))
+ self.inner.do_io(|mut inner| inner.write(buf))
}
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).write_vectored(bufs))
+ self.inner.do_io(|mut inner| inner.write_vectored(bufs))
}
fn flush(&mut self) -> io::Result<()> {
- self.inner.do_io(|inner| (&*inner).flush())
+ self.inner.do_io(|mut inner| inner.flush())
}
}
diff --git a/src/poll.rs b/src/poll.rs
index a6f4ab0..289d668 100644
--- a/src/poll.rs
+++ b/src/poll.rs
@@ -30,8 +30,8 @@ use std::{fmt, io};
///
/// A basic example -- establishing a `TcpStream` connection.
///
-#[cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")]
-#[cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")]
+#[cfg_attr(all(feature = "os-poll", feature = "net"), doc = "```")]
+#[cfg_attr(not(all(feature = "os-poll", feature = "net")), doc = "```ignore")]
/// # use std::error::Error;
/// # fn main() -> Result<(), Box<dyn Error>> {
/// use mio::{Events, Poll, Interest, Token};
@@ -127,8 +127,8 @@ use std::{fmt, io};
///
/// For example:
///
-#[cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")]
-#[cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")]
+#[cfg_attr(all(feature = "os-poll", feature = "net"), doc = "```")]
+#[cfg_attr(not(all(feature = "os-poll", feature = "net")), doc = "```ignore")]
/// # use std::error::Error;
/// # use std::net;
/// # fn main() -> Result<(), Box<dyn Error>> {
@@ -163,6 +163,52 @@ use std::{fmt, io};
///
/// [event sources]: ./event/trait.Source.html
///
+/// ### Accessing raw fd/socket/handle
+///
+/// Mio makes it possible for many types to be converted into a raw file
+/// descriptor (fd, Unix), socket (Windows) or handle (Windows). This makes it
+/// possible to support more operations on the type than Mio supports, for
+/// example it makes [mio-aio] possible. However accessing the raw fd is not
+/// without it's pitfalls.
+///
+/// Specifically performing I/O operations outside of Mio on these types (via
+/// the raw fd) has unspecified behaviour. It could cause no more events to be
+/// generated for the type even though it returned `WouldBlock` (in an operation
+/// directly accessing the fd). The behaviour is OS specific and Mio can only
+/// guarantee cross-platform behaviour if it can control the I/O.
+///
+/// [mio-aio]: https://github.com/asomers/mio-aio
+///
+/// *The following is **not** guaranteed, just a description of the current
+/// situation!* Mio is allowed to change the following without it being considered
+/// a breaking change, don't depend on this, it's just here to inform the user.
+/// Currently the kqueue and epoll implementation support direct I/O operations
+/// on the fd without Mio's knowledge. Windows however needs **all** I/O
+/// operations to go through Mio otherwise it is not able to update it's
+/// internal state properly and won't generate events.
+///
+/// ### Polling without registering event sources
+///
+///
+/// *The following is **not** guaranteed, just a description of the current
+/// situation!* Mio is allowed to change the following without it being
+/// considered a breaking change, don't depend on this, it's just here to inform
+/// the user. On platforms that use epoll, kqueue or IOCP (see implementation
+/// notes below) polling without previously registering [event sources] will
+/// result in sleeping forever, only a process signal will be able to wake up
+/// the thread.
+///
+/// On WASM/WASI this is different as it doesn't support process signals,
+/// furthermore the WASI specification doesn't specify a behaviour in this
+/// situation, thus it's up to the implementation what to do here. As an
+/// example, the wasmtime runtime will return `EINVAL` in this situation, but
+/// different runtimes may return different results. If you have further
+/// insights or thoughts about this situation (and/or how Mio should handle it)
+/// please add you comment to [pull request#1580].
+///
+/// [event sources]: crate::event::Source
+/// [pull request#1580]: https://github.com/tokio-rs/mio/pull/1580
+///
/// # Implementation notes
///
/// `Poll` is backed by the selector provided by the operating system.
@@ -172,13 +218,12 @@ use std::{fmt, io};
/// | Android | [epoll] |
/// | DragonFly BSD | [kqueue] |
/// | FreeBSD | [kqueue] |
+/// | iOS | [kqueue] |
+/// | illumos | [epoll] |
/// | Linux | [epoll] |
/// | NetBSD | [kqueue] |
/// | OpenBSD | [kqueue] |
-/// | Solaris | [epoll] |
-/// | illumos | [epoll] |
/// | Windows | [IOCP] |
-/// | iOS | [kqueue] |
/// | macOS | [kqueue] |
///
/// On all supported platforms, socket operations are handled by using the
@@ -195,10 +240,10 @@ use std::{fmt, io};
/// data to be copied into an intermediate buffer before it is passed to the
/// kernel.
///
-/// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
+/// [epoll]: https://man7.org/linux/man-pages/man7/epoll.7.html
/// [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
-/// [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx
-/// [`signalfd`]: http://man7.org/linux/man-pages/man2/signalfd.2.html
+/// [IOCP]: https://docs.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
+/// [`signalfd`]: https://man7.org/linux/man-pages/man2/signalfd.2.html
/// [`SourceFd`]: unix/struct.SourceFd.html
/// [`Poll::poll`]: struct.Poll.html#method.poll
pub struct Poll {
@@ -211,6 +256,54 @@ pub struct Registry {
}
impl Poll {
+ cfg_os_poll! {
+ /// Return a new `Poll` handle.
+ ///
+ /// This function will make a syscall to the operating system to create
+ /// the system selector. If this syscall fails, `Poll::new` will return
+ /// with the error.
+ ///
+ /// close-on-exec flag is set on the file descriptors used by the selector to prevent
+ /// leaking it to executed processes. However, on some systems such as
+ /// old Linux systems that don't support `epoll_create1` syscall it is done
+ /// non-atomically, so a separate thread executing in parallel to this
+ /// function may accidentally leak the file descriptor if it executes a
+ /// new process before this function returns.
+ ///
+ /// See [struct] level docs for more details.
+ ///
+ /// [struct]: struct.Poll.html
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// # fn main() -> Result<(), Box<dyn Error>> {
+ /// use mio::{Poll, Events};
+ /// use std::time::Duration;
+ ///
+ /// let mut poll = match Poll::new() {
+ /// Ok(poll) => poll,
+ /// Err(e) => panic!("failed to create Poll instance; err={:?}", e),
+ /// };
+ ///
+ /// // Create a structure to receive polled events
+ /// let mut events = Events::with_capacity(1024);
+ ///
+ /// // Wait for events, but none will be received because no
+ /// // `event::Source`s have been registered with this `Poll` instance.
+ /// poll.poll(&mut events, Some(Duration::from_millis(500)))?;
+ /// assert!(events.is_empty());
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn new() -> io::Result<Poll> {
+ sys::Selector::new().map(|selector| Poll {
+ registry: Registry { selector },
+ })
+ }
+ }
+
/// Create a separate `Registry` which can be used to register
/// `event::Source`s.
pub fn registry(&self) -> &Registry {
@@ -256,12 +349,16 @@ impl Poll {
/// of Mio would automatically retry the poll call if it was interrupted
/// (if `EINTR` was returned).
///
+ /// Currently if the `timeout` elapses without any readiness events
+ /// triggering this will return `Ok(())`. However we're not guaranteeing
+ /// this behaviour as this depends on the OS.
+ ///
/// # Examples
///
/// A basic example -- establishing a `TcpStream` connection.
///
- #[cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")]
- #[cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")]
+ #[cfg_attr(all(feature = "os-poll", feature = "net"), doc = "```")]
+ #[cfg_attr(not(all(feature = "os-poll", feature = "net")), doc = "```ignore")]
/// # use std::error::Error;
/// # fn main() -> Result<(), Box<dyn Error>> {
/// use mio::{Events, Poll, Interest, Token};
@@ -315,49 +412,6 @@ impl Poll {
}
}
-cfg_os_poll! {
- impl Poll {
- /// Return a new `Poll` handle.
- ///
- /// This function will make a syscall to the operating system to create
- /// the system selector. If this syscall fails, `Poll::new` will return
- /// with the error.
- ///
- /// See [struct] level docs for more details.
- ///
- /// [struct]: struct.Poll.html
- ///
- /// # Examples
- ///
- /// ```
- /// # use std::error::Error;
- /// # fn main() -> Result<(), Box<dyn Error>> {
- /// use mio::{Poll, Events};
- /// use std::time::Duration;
- ///
- /// let mut poll = match Poll::new() {
- /// Ok(poll) => poll,
- /// Err(e) => panic!("failed to create Poll instance; err={:?}", e),
- /// };
- ///
- /// // Create a structure to receive polled events
- /// let mut events = Events::with_capacity(1024);
- ///
- /// // Wait for events, but none will be received because no
- /// // `event::Source`s have been registered with this `Poll` instance.
- /// poll.poll(&mut events, Some(Duration::from_millis(500)))?;
- /// assert!(events.is_empty());
- /// # Ok(())
- /// # }
- /// ```
- pub fn new() -> io::Result<Poll> {
- sys::Selector::new().map(|selector| Poll {
- registry: Registry { selector },
- })
- }
- }
-}
-
#[cfg(unix)]
impl AsRawFd for Poll {
fn as_raw_fd(&self) -> RawFd {
@@ -382,7 +436,7 @@ impl Registry {
///
/// # Arguments
///
- /// `source: &S: event::Source`: This is the source of events that the
+ /// `source: &mut S: event::Source`: This is the source of events that the
/// `Poll` instance should monitor for readiness state changes.
///
/// `token: Token`: The caller picks a token to associate with the socket.
@@ -409,7 +463,7 @@ impl Registry {
/// Callers must ensure that if a source being registered with a `Poll`
/// instance was previously registered with that `Poll` instance, then a
/// call to [`deregister`] has already occurred. Consecutive calls to
- /// `register` is undefined behavior.
+ /// `register` is unspecified behavior.
///
/// Unless otherwise specified, the caller should assume that once an event
/// source is registered with a `Poll` instance, it is bound to that `Poll`
@@ -425,8 +479,8 @@ impl Registry {
///
/// # Examples
///
- #[cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")]
- #[cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")]
+ #[cfg_attr(all(feature = "os-poll", feature = "net"), doc = "```")]
+ #[cfg_attr(not(all(feature = "os-poll", feature = "net")), doc = "```ignore")]
/// # use std::error::Error;
/// # use std::net;
/// # fn main() -> Result<(), Box<dyn Error>> {
@@ -495,7 +549,7 @@ impl Registry {
/// requested for the handle.
///
/// The event source must have previously been registered with this instance
- /// of `Poll`, otherwise the behavior is undefined.
+ /// of `Poll`, otherwise the behavior is unspecified.
///
/// See the [`register`] documentation for details about the function
/// arguments and see the [`struct`] docs for a high level overview of
@@ -503,8 +557,8 @@ impl Registry {
///
/// # Examples
///
- #[cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")]
- #[cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")]
+ #[cfg_attr(all(feature = "os-poll", feature = "net"), doc = "```")]
+ #[cfg_attr(not(all(feature = "os-poll", feature = "net")), doc = "```ignore")]
/// # use std::error::Error;
/// # use std::net;
/// # fn main() -> Result<(), Box<dyn Error>> {
@@ -562,16 +616,16 @@ impl Registry {
/// the poll.
///
/// The event source must have previously been registered with this instance
- /// of `Poll`, otherwise the behavior is undefined.
+ /// of `Poll`, otherwise the behavior is unspecified.
///
/// A handle can be passed back to `register` after it has been
/// deregistered; however, it must be passed back to the **same** `Poll`
- /// instance, otherwise the behavior is undefined.
+ /// instance, otherwise the behavior is unspecified.
///
/// # Examples
///
- #[cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")]
- #[cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")]
+ #[cfg_attr(all(feature = "os-poll", feature = "net"), doc = "```")]
+ #[cfg_attr(not(all(feature = "os-poll", feature = "net")), doc = "```ignore")]
/// # use std::error::Error;
/// # use std::net;
/// # fn main() -> Result<(), Box<dyn Error>> {
@@ -622,11 +676,18 @@ impl Registry {
/// Internal check to ensure only a single `Waker` is active per [`Poll`]
/// instance.
- #[cfg(debug_assertions)]
+ #[cfg(all(debug_assertions, not(target_os = "wasi")))]
pub(crate) fn register_waker(&self) {
- if self.selector.register_waker() {
- panic!("Only a single `Waker` can be active per `Poll` instance");
- }
+ assert!(
+ !self.selector.register_waker(),
+ "Only a single `Waker` can be active per `Poll` instance"
+ );
+ }
+
+ /// Get access to the `sys::Selector`.
+ #[cfg(any(not(target_os = "wasi"), feature = "net"))]
+ pub(crate) fn selector(&self) -> &sys::Selector {
+ &self.selector
}
}
@@ -643,11 +704,6 @@ impl AsRawFd for Registry {
}
}
-/// Get access to the `sys::Selector` from `Registry`.
-pub(crate) fn selector(registry: &Registry) -> &sys::Selector {
- &registry.selector
-}
-
cfg_os_poll! {
#[cfg(unix)]
#[test]
diff --git a/src/sys/mod.rs b/src/sys/mod.rs
index 81ae6d2..2a968b2 100644
--- a/src/sys/mod.rs
+++ b/src/sys/mod.rs
@@ -63,6 +63,12 @@ cfg_os_poll! {
pub use self::windows::*;
}
+#[cfg(target_os = "wasi")]
+cfg_os_poll! {
+ mod wasi;
+ pub(crate) use self::wasi::*;
+}
+
cfg_not_os_poll! {
mod shell;
pub(crate) use self::shell::*;
diff --git a/src/sys/shell/mod.rs b/src/sys/shell/mod.rs
index 7e1533f..8a3175f 100644
--- a/src/sys/shell/mod.rs
+++ b/src/sys/shell/mod.rs
@@ -7,7 +7,9 @@ macro_rules! os_required {
mod selector;
pub(crate) use self::selector::{event, Event, Events, Selector};
+#[cfg(not(target_os = "wasi"))]
mod waker;
+#[cfg(not(target_os = "wasi"))]
pub(crate) use self::waker::Waker;
cfg_net! {
diff --git a/src/sys/shell/selector.rs b/src/sys/shell/selector.rs
index 91fc0bf..bfc598a 100644
--- a/src/sys/shell/selector.rs
+++ b/src/sys/shell/selector.rs
@@ -19,7 +19,7 @@ impl Selector {
os_required!();
}
- #[cfg(debug_assertions)]
+ #[cfg(all(debug_assertions, not(target_os = "wasi")))]
pub fn register_waker(&self) -> bool {
os_required!();
}
@@ -44,6 +44,25 @@ cfg_any_os_ext! {
}
}
+#[cfg(target_os = "wasi")]
+cfg_any_os_ext! {
+ use crate::{Interest, Token};
+
+ impl Selector {
+ pub fn register(&self, _: wasi::Fd, _: Token, _: Interest) -> io::Result<()> {
+ os_required!();
+ }
+
+ pub fn reregister(&self, _: wasi::Fd, _: Token, _: Interest) -> io::Result<()> {
+ os_required!();
+ }
+
+ pub fn deregister(&self, _: wasi::Fd) -> io::Result<()> {
+ os_required!();
+ }
+ }
+}
+
cfg_io_source! {
#[cfg(debug_assertions)]
impl Selector {
diff --git a/src/sys/shell/tcp.rs b/src/sys/shell/tcp.rs
index 0ed225f..260763a 100644
--- a/src/sys/shell/tcp.rs
+++ b/src/sys/shell/tcp.rs
@@ -1,127 +1,31 @@
-use crate::net::TcpKeepalive;
use std::io;
use std::net::{self, SocketAddr};
-use std::time::Duration;
-pub(crate) type TcpSocket = i32;
-
-pub(crate) fn new_v4_socket() -> io::Result<TcpSocket> {
- os_required!();
-}
-
-pub(crate) fn new_v6_socket() -> io::Result<TcpSocket> {
- os_required!();
-}
-
-pub(crate) fn bind(_socket: TcpSocket, _addr: SocketAddr) -> io::Result<()> {
- os_required!();
-}
-
-pub(crate) fn connect(_: TcpSocket, _addr: SocketAddr) -> io::Result<net::TcpStream> {
- os_required!();
-}
-
-pub(crate) fn listen(_: TcpSocket, _: u32) -> io::Result<net::TcpListener> {
+#[cfg(not(target_os = "wasi"))]
+pub(crate) fn new_for_addr(_: SocketAddr) -> io::Result<i32> {
os_required!();
}
-pub(crate) fn close(_: TcpSocket) {
+#[cfg(not(target_os = "wasi"))]
+pub(crate) fn bind(_: &net::TcpListener, _: SocketAddr) -> io::Result<()> {
os_required!();
}
-pub(crate) fn set_reuseaddr(_: TcpSocket, _: bool) -> io::Result<()> {
+#[cfg(not(target_os = "wasi"))]
+pub(crate) fn connect(_: &net::TcpStream, _: SocketAddr) -> io::Result<()> {
os_required!();
}
-pub(crate) fn get_reuseaddr(_: TcpSocket) -> io::Result<bool> {
+#[cfg(not(target_os = "wasi"))]
+pub(crate) fn listen(_: &net::TcpListener, _: u32) -> io::Result<()> {
os_required!();
}
-#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
-pub(crate) fn set_reuseport(_: TcpSocket, _: bool) -> io::Result<()> {
- os_required!();
-}
-
-#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
-pub(crate) fn get_reuseport(_: TcpSocket) -> io::Result<bool> {
- os_required!();
-}
-
-pub(crate) fn set_linger(_: TcpSocket, _: Option<Duration>) -> io::Result<()> {
- os_required!();
-}
-
-pub(crate) fn get_linger(_: TcpSocket) -> io::Result<Option<Duration>> {
- os_required!();
-}
-
-pub(crate) fn set_recv_buffer_size(_: TcpSocket, _: u32) -> io::Result<()> {
- os_required!();
-}
-
-pub(crate) fn get_recv_buffer_size(_: TcpSocket) -> io::Result<u32> {
- os_required!();
-}
-
-pub(crate) fn set_send_buffer_size(_: TcpSocket, _: u32) -> io::Result<()> {
- os_required!();
-}
-
-pub(crate) fn get_send_buffer_size(_: TcpSocket) -> io::Result<u32> {
- os_required!();
-}
-
-pub(crate) fn set_keepalive(_: TcpSocket, _: bool) -> io::Result<()> {
- os_required!();
-}
-
-pub(crate) fn get_keepalive(_: TcpSocket) -> io::Result<bool> {
- os_required!();
-}
-
-pub(crate) fn set_keepalive_params(_: TcpSocket, _: TcpKeepalive) -> io::Result<()> {
- os_required!()
-}
-
-#[cfg(any(
- target_os = "android",
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
- target_os = "solaris",
-))]
-pub(crate) fn get_keepalive_time(_: TcpSocket) -> io::Result<Option<Duration>> {
- os_required!()
-}
-
-#[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
-))]
-pub(crate) fn get_keepalive_interval(_: TcpSocket) -> io::Result<Option<Duration>> {
- os_required!()
-}
-
-#[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
-))]
-pub(crate) fn get_keepalive_retries(_: TcpSocket) -> io::Result<Option<u32>> {
- os_required!()
-}
-
-pub fn accept(_: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
+#[cfg(unix)]
+pub(crate) fn set_reuseaddr(_: &net::TcpListener, _: bool) -> io::Result<()> {
os_required!();
}
-pub(crate) fn get_localaddr(_: TcpSocket) -> io::Result<SocketAddr> {
+pub(crate) fn accept(_: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
os_required!();
}
diff --git a/src/sys/shell/udp.rs b/src/sys/shell/udp.rs
index 48ccac7..6a48b69 100644
--- a/src/sys/shell/udp.rs
+++ b/src/sys/shell/udp.rs
@@ -1,3 +1,4 @@
+#![cfg(not(target_os = "wasi"))]
use std::io;
use std::net::{self, SocketAddr};
diff --git a/src/sys/unix/net.rs b/src/sys/unix/net.rs
index 2f8d618..78f1387 100644
--- a/src/sys/unix/net.rs
+++ b/src/sys/unix/net.rs
@@ -41,9 +41,8 @@ pub(crate) fn new_socket(domain: libc::c_int, socket_type: libc::c_int) -> io::R
.map(|_| socket)
});
- // Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC. Not sure about
- // Solaris, couldn't find anything online.
- #[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))]
+ // Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC.
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
let socket = socket.and_then(|socket| {
// For platforms that don't support flags in socket, we need to
// set the flags ourselves.
@@ -124,7 +123,7 @@ pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, libc::socklen_
target_os = "openbsd"
))]
sin6_len: 0,
- #[cfg(any(target_os = "solaris", target_os = "illumos"))]
+ #[cfg(target_os = "illumos")]
__sin6_src_id: 0,
};
diff --git a/src/sys/unix/pipe.rs b/src/sys/unix/pipe.rs
index ccf5252..7a95b96 100644
--- a/src/sys/unix/pipe.rs
+++ b/src/sys/unix/pipe.rs
@@ -155,6 +155,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> {
target_os = "netbsd",
target_os = "openbsd",
target_os = "illumos",
+ target_os = "redox",
))]
unsafe {
if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
@@ -162,7 +163,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> {
}
}
- #[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))]
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
unsafe {
// For platforms that don't have `pipe2(2)` we need to manually set the
// correct flags on the file descriptor.
@@ -192,8 +193,8 @@ pub fn new() -> io::Result<(Sender, Receiver)> {
target_os = "openbsd",
target_os = "ios",
target_os = "macos",
- target_os = "solaris",
target_os = "illumos",
+ target_os = "redox",
)))]
compile_error!("unsupported target for `mio::unix::pipe`");
@@ -216,6 +217,74 @@ impl Sender {
pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
set_nonblocking(self.inner.as_raw_fd(), nonblocking)
}
+
+ /// Execute an I/O operation ensuring that the socket receives more events
+ /// if it hits a [`WouldBlock`] error.
+ ///
+ /// # Notes
+ ///
+ /// This method is required to be called for **all** I/O operations to
+ /// ensure the user will receive events once the socket is ready again after
+ /// returning a [`WouldBlock`] error.
+ ///
+ /// [`WouldBlock`]: io::ErrorKind::WouldBlock
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// #
+ /// # fn main() -> Result<(), Box<dyn Error>> {
+ /// use std::io;
+ /// use std::os::unix::io::AsRawFd;
+ /// use mio::unix::pipe;
+ ///
+ /// let (sender, receiver) = pipe::new()?;
+ ///
+ /// // Wait until the sender is writable...
+ ///
+ /// // Write to the sender using a direct libc call, of course the
+ /// // `io::Write` implementation would be easier to use.
+ /// let buf = b"hello";
+ /// let n = sender.try_io(|| {
+ /// let buf_ptr = &buf as *const _ as *const _;
+ /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("write {} bytes", n);
+ ///
+ /// // Wait until the receiver is readable...
+ ///
+ /// // Read from the receiver using a direct libc call, of course the
+ /// // `io::Read` implementation would be easier to use.
+ /// let mut buf = [0; 512];
+ /// let n = receiver.try_io(|| {
+ /// let buf_ptr = &mut buf as *mut _ as *mut _;
+ /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("read {} bytes", n);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
+ where
+ F: FnOnce() -> io::Result<T>,
+ {
+ self.inner.do_io(|_| f())
+ }
}
impl event::Source for Sender {
@@ -244,29 +313,29 @@ impl event::Source for Sender {
impl Write for Sender {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).write(buf))
+ self.inner.do_io(|mut sender| sender.write(buf))
}
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).write_vectored(bufs))
+ self.inner.do_io(|mut sender| sender.write_vectored(bufs))
}
fn flush(&mut self) -> io::Result<()> {
- self.inner.do_io(|sender| (&*sender).flush())
+ self.inner.do_io(|mut sender| sender.flush())
}
}
impl Write for &Sender {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).write(buf))
+ self.inner.do_io(|mut sender| sender.write(buf))
}
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).write_vectored(bufs))
+ self.inner.do_io(|mut sender| sender.write_vectored(bufs))
}
fn flush(&mut self) -> io::Result<()> {
- self.inner.do_io(|sender| (&*sender).flush())
+ self.inner.do_io(|mut sender| sender.flush())
}
}
@@ -313,6 +382,74 @@ impl Receiver {
pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
set_nonblocking(self.inner.as_raw_fd(), nonblocking)
}
+
+ /// Execute an I/O operation ensuring that the socket receives more events
+ /// if it hits a [`WouldBlock`] error.
+ ///
+ /// # Notes
+ ///
+ /// This method is required to be called for **all** I/O operations to
+ /// ensure the user will receive events once the socket is ready again after
+ /// returning a [`WouldBlock`] error.
+ ///
+ /// [`WouldBlock`]: io::ErrorKind::WouldBlock
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// #
+ /// # fn main() -> Result<(), Box<dyn Error>> {
+ /// use std::io;
+ /// use std::os::unix::io::AsRawFd;
+ /// use mio::unix::pipe;
+ ///
+ /// let (sender, receiver) = pipe::new()?;
+ ///
+ /// // Wait until the sender is writable...
+ ///
+ /// // Write to the sender using a direct libc call, of course the
+ /// // `io::Write` implementation would be easier to use.
+ /// let buf = b"hello";
+ /// let n = sender.try_io(|| {
+ /// let buf_ptr = &buf as *const _ as *const _;
+ /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("write {} bytes", n);
+ ///
+ /// // Wait until the receiver is readable...
+ ///
+ /// // Read from the receiver using a direct libc call, of course the
+ /// // `io::Read` implementation would be easier to use.
+ /// let mut buf = [0; 512];
+ /// let n = receiver.try_io(|| {
+ /// let buf_ptr = &mut buf as *mut _ as *mut _;
+ /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("read {} bytes", n);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
+ where
+ F: FnOnce() -> io::Result<T>,
+ {
+ self.inner.do_io(|_| f())
+ }
}
impl event::Source for Receiver {
@@ -341,21 +478,21 @@ impl event::Source for Receiver {
impl Read for Receiver {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).read(buf))
+ self.inner.do_io(|mut sender| sender.read(buf))
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).read_vectored(bufs))
+ self.inner.do_io(|mut sender| sender.read_vectored(bufs))
}
}
impl Read for &Receiver {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).read(buf))
+ self.inner.do_io(|mut sender| sender.read(buf))
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).read_vectored(bufs))
+ self.inner.do_io(|mut sender| sender.read_vectored(bufs))
}
}
diff --git a/src/sys/unix/selector/epoll.rs b/src/sys/unix/selector/epoll.rs
index 38667d6..1256663 100644
--- a/src/sys/unix/selector/epoll.rs
+++ b/src/sys/unix/selector/epoll.rs
@@ -23,15 +23,41 @@ pub struct Selector {
impl Selector {
pub fn new() -> io::Result<Selector> {
+ #[cfg(not(target_os = "android"))]
+ let res = syscall!(epoll_create1(libc::EPOLL_CLOEXEC));
+
+ // On Android < API level 16 `epoll_create1` is not defined, so use a
+ // raw system call.
// According to libuv, `EPOLL_CLOEXEC` is not defined on Android API <
// 21. But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on that platform,
// so we use it instead.
#[cfg(target_os = "android")]
- let flag = libc::O_CLOEXEC;
- #[cfg(not(target_os = "android"))]
- let flag = libc::EPOLL_CLOEXEC;
+ let res = syscall!(syscall(libc::SYS_epoll_create1, libc::O_CLOEXEC));
+
+ let ep = match res {
+ Ok(ep) => ep as RawFd,
+ Err(err) => {
+ // When `epoll_create1` is not available fall back to use
+ // `epoll_create` followed by `fcntl`.
+ if let Some(libc::ENOSYS) = err.raw_os_error() {
+ match syscall!(epoll_create(1024)) {
+ Ok(ep) => match syscall!(fcntl(ep, libc::F_SETFD, libc::FD_CLOEXEC)) {
+ Ok(ep) => ep as RawFd,
+ Err(err) => {
+ // `fcntl` failed, cleanup `ep`.
+ let _ = unsafe { libc::close(ep) };
+ return Err(err);
+ }
+ },
+ Err(err) => return Err(err),
+ }
+ } else {
+ return Err(err);
+ }
+ }
+ };
- syscall!(epoll_create1(flag)).map(|ep| Selector {
+ Ok(Selector {
#[cfg(debug_assertions)]
id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
ep,
@@ -61,7 +87,19 @@ impl Selector {
const MAX_SAFE_TIMEOUT: u128 = libc::c_int::max_value() as u128;
let timeout = timeout
- .map(|to| cmp::min(to.as_millis(), MAX_SAFE_TIMEOUT) as libc::c_int)
+ .map(|to| {
+ let to_ms = to.as_millis();
+ // as_millis() truncates, so round up to 1 ms as the documentation says can happen.
+ // This avoids turning submillisecond timeouts into immediate returns unless the
+ // caller explicitly requests that by specifying a zero timeout.
+ let to_ms = to_ms
+ + if to_ms == 0 && to.subsec_nanos() != 0 {
+ 1
+ } else {
+ 0
+ };
+ cmp::min(MAX_SAFE_TIMEOUT, to_ms) as libc::c_int
+ })
.unwrap_or(-1);
events.clear();
@@ -82,6 +120,8 @@ impl Selector {
let mut event = libc::epoll_event {
events: interests_to_epoll(interests),
u64: usize::from(token) as u64,
+ #[cfg(target_os = "redox")]
+ _pad: 0,
};
syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_ADD, fd, &mut event)).map(|_| ())
@@ -91,6 +131,8 @@ impl Selector {
let mut event = libc::epoll_event {
events: interests_to_epoll(interests),
u64: usize::from(token) as u64,
+ #[cfg(target_os = "redox")]
+ _pad: 0,
};
syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_MOD, fd, &mut event)).map(|_| ())
@@ -222,7 +264,7 @@ pub mod event {
libc::EPOLLET,
libc::EPOLLRDHUP,
libc::EPOLLONESHOT,
- #[cfg(any(target_os = "linux", target_os = "solaris"))]
+ #[cfg(target_os = "linux")]
libc::EPOLLEXCLUSIVE,
#[cfg(any(target_os = "android", target_os = "linux"))]
libc::EPOLLWAKEUP,
diff --git a/src/sys/unix/selector/kqueue.rs b/src/sys/unix/selector/kqueue.rs
index b36a537..0be4281 100644
--- a/src/sys/unix/selector/kqueue.rs
+++ b/src/sys/unix/selector/kqueue.rs
@@ -1,6 +1,6 @@
use crate::{Interest, Token};
use log::error;
-use std::mem::MaybeUninit;
+use std::mem::{self, MaybeUninit};
use std::ops::{Deref, DerefMut};
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(debug_assertions)]
@@ -34,17 +34,6 @@ type Flags = u16;
#[cfg(target_os = "netbsd")]
type Flags = u32;
-// Type of the `data` field in the `kevent` structure.
-#[cfg(any(
- target_os = "dragonfly",
- target_os = "freebsd",
- target_os = "ios",
- target_os = "macos"
-))]
-type Data = libc::intptr_t;
-#[cfg(any(target_os = "netbsd", target_os = "openbsd"))]
-type Data = i64;
-
// Type of the `udata` field in the `kevent` structure.
#[cfg(not(target_os = "netbsd"))]
type UData = *mut libc::c_void;
@@ -57,9 +46,8 @@ macro_rules! kevent {
ident: $id as libc::uintptr_t,
filter: $filter as Filter,
flags: $flags,
- fflags: 0,
- data: 0,
udata: $data as UData,
+ ..unsafe { mem::zeroed() }
}
};
}
@@ -163,7 +151,7 @@ impl Selector {
// the array.
slice::from_raw_parts_mut(changes[0].as_mut_ptr(), n_changes)
};
- kevent_register(self.kq, changes, &[libc::EPIPE as Data])
+ kevent_register(self.kq, changes, &[libc::EPIPE as i64])
}
pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
@@ -195,7 +183,7 @@ impl Selector {
kevent_register(
self.kq,
&mut changes,
- &[libc::ENOENT as Data, libc::EPIPE as Data],
+ &[libc::ENOENT as i64, libc::EPIPE as i64],
)
}
@@ -211,7 +199,7 @@ impl Selector {
// the ENOENT error when it comes up. The ENOENT error informs us that
// the filter wasn't there in first place, but we don't really care
// about that since our goal is to remove it.
- kevent_register(self.kq, &mut changes, &[libc::ENOENT as Data])
+ kevent_register(self.kq, &mut changes, &[libc::ENOENT as i64])
}
#[cfg(debug_assertions)]
@@ -264,7 +252,7 @@ impl Selector {
fn kevent_register(
kq: RawFd,
changes: &mut [libc::kevent],
- ignored_errors: &[Data],
+ ignored_errors: &[i64],
) -> io::Result<()> {
syscall!(kevent(
kq,
@@ -285,15 +273,15 @@ fn kevent_register(
Err(err)
}
})
- .and_then(|()| check_errors(&changes, ignored_errors))
+ .and_then(|()| check_errors(changes, ignored_errors))
}
/// Check all events for possible errors, it returns the first error found.
-fn check_errors(events: &[libc::kevent], ignored_errors: &[Data]) -> io::Result<()> {
+fn check_errors(events: &[libc::kevent], ignored_errors: &[i64]) -> io::Result<()> {
for event in events {
// We can't use references to packed structures (in checking the ignored
// errors), so we need copy the data out before use.
- let data = event.data;
+ let data = event.data as _;
// Check for the error flag, the actual error will be in the `data`
// field.
if (event.flags & libc::EV_ERROR != 0) && data != 0 && !ignored_errors.contains(&data) {
diff --git a/src/sys/unix/selector/mod.rs b/src/sys/unix/selector/mod.rs
index b73d645..9ae4c14 100644
--- a/src/sys/unix/selector/mod.rs
+++ b/src/sys/unix/selector/mod.rs
@@ -2,7 +2,7 @@
target_os = "android",
target_os = "illumos",
target_os = "linux",
- target_os = "solaris"
+ target_os = "redox",
))]
mod epoll;
@@ -10,7 +10,7 @@ mod epoll;
target_os = "android",
target_os = "illumos",
target_os = "linux",
- target_os = "solaris"
+ target_os = "redox",
))]
pub(crate) use self::epoll::{event, Event, Events, Selector};
diff --git a/src/sys/unix/sourcefd.rs b/src/sys/unix/sourcefd.rs
index ba52b38..84e776d 100644
--- a/src/sys/unix/sourcefd.rs
+++ b/src/sys/unix/sourcefd.rs
@@ -1,4 +1,4 @@
-use crate::{event, poll, Interest, Registry, Token};
+use crate::{event, Interest, Registry, Token};
use std::io;
use std::os::unix::io::RawFd;
@@ -25,8 +25,14 @@ use std::os::unix::io::RawFd;
///
/// Basic usage.
///
-#[cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")]
-#[cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")]
+#[cfg_attr(
+ all(feature = "os-poll", feature = "net", feature = "os-ext"),
+ doc = "```"
+)]
+#[cfg_attr(
+ not(all(feature = "os-poll", feature = "net", feature = "os-ext")),
+ doc = "```ignore"
+)]
/// # use std::error::Error;
/// # fn main() -> Result<(), Box<dyn Error>> {
/// use mio::{Interest, Poll, Token};
@@ -51,8 +57,8 @@ use std::os::unix::io::RawFd;
///
/// Implementing [`event::Source`] for a custom type backed by a [`RawFd`].
///
-#[cfg_attr(all(feature = "os-poll", features = "os-ext"), doc = "```")]
-#[cfg_attr(not(all(feature = "os-poll", features = "os-ext")), doc = "```ignore")]
+#[cfg_attr(all(feature = "os-poll", feature = "os-ext"), doc = "```")]
+#[cfg_attr(not(all(feature = "os-poll", feature = "os-ext")), doc = "```ignore")]
/// use mio::{event, Interest, Registry, Token};
/// use mio::unix::SourceFd;
///
@@ -92,7 +98,7 @@ impl<'a> event::Source for SourceFd<'a> {
token: Token,
interests: Interest,
) -> io::Result<()> {
- poll::selector(registry).register(*self.0, token, interests)
+ registry.selector().register(*self.0, token, interests)
}
fn reregister(
@@ -101,10 +107,10 @@ impl<'a> event::Source for SourceFd<'a> {
token: Token,
interests: Interest,
) -> io::Result<()> {
- poll::selector(registry).reregister(*self.0, token, interests)
+ registry.selector().reregister(*self.0, token, interests)
}
fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
- poll::selector(registry).deregister(*self.0)
+ registry.selector().deregister(*self.0)
}
}
diff --git a/src/sys/unix/tcp.rs b/src/sys/unix/tcp.rs
index 59642c6..c4d7e94 100644
--- a/src/sys/unix/tcp.rs
+++ b/src/sys/unix/tcp.rs
@@ -1,428 +1,57 @@
use std::convert::TryInto;
use std::io;
-use std::mem;
use std::mem::{size_of, MaybeUninit};
use std::net::{self, SocketAddr};
use std::os::unix::io::{AsRawFd, FromRawFd};
-use std::time::Duration;
use crate::sys::unix::net::{new_socket, socket_addr, to_socket_addr};
-use crate::net::TcpKeepalive;
-#[cfg(any(target_os = "openbsd", target_os = "netbsd", target_os = "haiku"))]
-use libc::SO_KEEPALIVE as KEEPALIVE_TIME;
-#[cfg(any(target_os = "macos", target_os = "ios"))]
-use libc::TCP_KEEPALIVE as KEEPALIVE_TIME;
-#[cfg(not(any(
- target_os = "macos",
- target_os = "ios",
- target_os = "openbsd",
- target_os = "netbsd",
- target_os = "haiku"
-)))]
-use libc::TCP_KEEPIDLE as KEEPALIVE_TIME;
-pub type TcpSocket = libc::c_int;
-
-pub(crate) fn new_v4_socket() -> io::Result<TcpSocket> {
- new_socket(libc::AF_INET, libc::SOCK_STREAM)
-}
-
-pub(crate) fn new_v6_socket() -> io::Result<TcpSocket> {
- new_socket(libc::AF_INET6, libc::SOCK_STREAM)
+pub(crate) fn new_for_addr(address: SocketAddr) -> io::Result<libc::c_int> {
+ let domain = match address {
+ SocketAddr::V4(_) => libc::AF_INET,
+ SocketAddr::V6(_) => libc::AF_INET6,
+ };
+ new_socket(domain, libc::SOCK_STREAM)
}
-pub(crate) fn bind(socket: TcpSocket, addr: SocketAddr) -> io::Result<()> {
+pub(crate) fn bind(socket: &net::TcpListener, addr: SocketAddr) -> io::Result<()> {
let (raw_addr, raw_addr_length) = socket_addr(&addr);
- syscall!(bind(socket, raw_addr.as_ptr(), raw_addr_length))?;
+ syscall!(bind(socket.as_raw_fd(), raw_addr.as_ptr(), raw_addr_length))?;
Ok(())
}
-pub(crate) fn connect(socket: TcpSocket, addr: SocketAddr) -> io::Result<net::TcpStream> {
+pub(crate) fn connect(socket: &net::TcpStream, addr: SocketAddr) -> io::Result<()> {
let (raw_addr, raw_addr_length) = socket_addr(&addr);
- match syscall!(connect(socket, raw_addr.as_ptr(), raw_addr_length)) {
- Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => {
- Err(err)
- }
- _ => {
- Ok(unsafe { net::TcpStream::from_raw_fd(socket) })
- }
+ match syscall!(connect(
+ socket.as_raw_fd(),
+ raw_addr.as_ptr(),
+ raw_addr_length
+ )) {
+ Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err),
+ _ => Ok(()),
}
}
-pub(crate) fn listen(socket: TcpSocket, backlog: u32) -> io::Result<net::TcpListener> {
+pub(crate) fn listen(socket: &net::TcpListener, backlog: u32) -> io::Result<()> {
let backlog = backlog.try_into().unwrap_or(i32::max_value());
- syscall!(listen(socket, backlog))?;
- Ok(unsafe { net::TcpListener::from_raw_fd(socket) })
-}
-
-pub(crate) fn close(socket: TcpSocket) {
- let _ = unsafe { net::TcpStream::from_raw_fd(socket) };
+ syscall!(listen(socket.as_raw_fd(), backlog))?;
+ Ok(())
}
-pub(crate) fn set_reuseaddr(socket: TcpSocket, reuseaddr: bool) -> io::Result<()> {
+pub(crate) fn set_reuseaddr(socket: &net::TcpListener, reuseaddr: bool) -> io::Result<()> {
let val: libc::c_int = if reuseaddr { 1 } else { 0 };
syscall!(setsockopt(
- socket,
+ socket.as_raw_fd(),
libc::SOL_SOCKET,
libc::SO_REUSEADDR,
&val as *const libc::c_int as *const libc::c_void,
size_of::<libc::c_int>() as libc::socklen_t,
- ))
- .map(|_| ())
-}
-
-pub(crate) fn get_reuseaddr(socket: TcpSocket) -> io::Result<bool> {
- let mut optval: libc::c_int = 0;
- let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t;
-
- syscall!(getsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_REUSEADDR,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ))?;
-
- Ok(optval != 0)
-}
-
-#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
-pub(crate) fn set_reuseport(socket: TcpSocket, reuseport: bool) -> io::Result<()> {
- let val: libc::c_int = if reuseport { 1 } else { 0 };
-
- syscall!(setsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_REUSEPORT,
- &val as *const libc::c_int as *const libc::c_void,
- size_of::<libc::c_int>() as libc::socklen_t,
- ))
- .map(|_| ())
-}
-
-#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
-pub(crate) fn get_reuseport(socket: TcpSocket) -> io::Result<bool> {
- let mut optval: libc::c_int = 0;
- let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t;
-
- syscall!(getsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_REUSEPORT,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ))?;
-
- Ok(optval != 0)
-}
-
-pub(crate) fn get_localaddr(socket: TcpSocket) -> io::Result<SocketAddr> {
- let mut addr: libc::sockaddr_storage = unsafe { std::mem::zeroed() };
- let mut length = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
-
- syscall!(getsockname(
- socket,
- &mut addr as *mut _ as *mut _,
- &mut length
))?;
-
- unsafe { to_socket_addr(&addr) }
-}
-
-pub(crate) fn set_linger(socket: TcpSocket, dur: Option<Duration>) -> io::Result<()> {
- let val: libc::linger = libc::linger {
- l_onoff: if dur.is_some() { 1 } else { 0 },
- l_linger: dur
- .map(|dur| dur.as_secs() as libc::c_int)
- .unwrap_or_default(),
- };
- syscall!(setsockopt(
- socket,
- libc::SOL_SOCKET,
- #[cfg(target_vendor = "apple")]
- libc::SO_LINGER_SEC,
- #[cfg(not(target_vendor = "apple"))]
- libc::SO_LINGER,
- &val as *const libc::linger as *const libc::c_void,
- size_of::<libc::linger>() as libc::socklen_t,
- ))
- .map(|_| ())
-}
-
-pub(crate) fn get_linger(socket: TcpSocket) -> io::Result<Option<Duration>> {
- let mut val: libc::linger = unsafe { std::mem::zeroed() };
- let mut len = mem::size_of::<libc::linger>() as libc::socklen_t;
-
- syscall!(getsockopt(
- socket,
- libc::SOL_SOCKET,
- #[cfg(target_vendor = "apple")]
- libc::SO_LINGER_SEC,
- #[cfg(not(target_vendor = "apple"))]
- libc::SO_LINGER,
- &mut val as *mut _ as *mut _,
- &mut len,
- ))?;
-
- if val.l_onoff == 0 {
- Ok(None)
- } else {
- Ok(Some(Duration::from_secs(val.l_linger as u64)))
- }
-}
-
-pub(crate) fn set_recv_buffer_size(socket: TcpSocket, size: u32) -> io::Result<()> {
- let size = size.try_into().ok().unwrap_or_else(i32::max_value);
- syscall!(setsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_RCVBUF,
- &size as *const _ as *const libc::c_void,
- size_of::<libc::c_int>() as libc::socklen_t
- ))
- .map(|_| ())
-}
-
-pub(crate) fn get_recv_buffer_size(socket: TcpSocket) -> io::Result<u32> {
- let mut optval: libc::c_int = 0;
- let mut optlen = size_of::<libc::c_int>() as libc::socklen_t;
- syscall!(getsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_RCVBUF,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ))?;
-
- Ok(optval as u32)
-}
-
-pub(crate) fn set_send_buffer_size(socket: TcpSocket, size: u32) -> io::Result<()> {
- let size = size.try_into().ok().unwrap_or_else(i32::max_value);
- syscall!(setsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_SNDBUF,
- &size as *const _ as *const libc::c_void,
- size_of::<libc::c_int>() as libc::socklen_t
- ))
- .map(|_| ())
-}
-
-pub(crate) fn get_send_buffer_size(socket: TcpSocket) -> io::Result<u32> {
- let mut optval: libc::c_int = 0;
- let mut optlen = size_of::<libc::c_int>() as libc::socklen_t;
-
- syscall!(getsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_SNDBUF,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ))?;
-
- Ok(optval as u32)
-}
-
-pub(crate) fn set_keepalive(socket: TcpSocket, keepalive: bool) -> io::Result<()> {
- let val: libc::c_int = if keepalive { 1 } else { 0 };
- syscall!(setsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_KEEPALIVE,
- &val as *const _ as *const libc::c_void,
- size_of::<libc::c_int>() as libc::socklen_t
- ))
- .map(|_| ())
-}
-
-pub(crate) fn get_keepalive(socket: TcpSocket) -> io::Result<bool> {
- let mut optval: libc::c_int = 0;
- let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t;
-
- syscall!(getsockopt(
- socket,
- libc::SOL_SOCKET,
- libc::SO_KEEPALIVE,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ))?;
-
- Ok(optval != 0)
-}
-
-pub(crate) fn set_keepalive_params(socket: TcpSocket, keepalive: TcpKeepalive) -> io::Result<()> {
- if let Some(dur) = keepalive.time {
- set_keepalive_time(socket, dur)?;
- }
-
- #[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
- ))]
- {
- if let Some(dur) = keepalive.interval {
- set_keepalive_interval(socket, dur)?;
- }
-
- if let Some(retries) = keepalive.retries {
- set_keepalive_retries(socket, retries)?;
- }
- }
-
-
Ok(())
}
-fn set_keepalive_time(socket: TcpSocket, time: Duration) -> io::Result<()> {
- let time_secs = time
- .as_secs()
- .try_into()
- .ok()
- .unwrap_or_else(i32::max_value);
- syscall!(setsockopt(
- socket,
- libc::IPPROTO_TCP,
- KEEPALIVE_TIME,
- &(time_secs as libc::c_int) as *const _ as *const libc::c_void,
- size_of::<libc::c_int>() as libc::socklen_t
- ))
- .map(|_| ())
-}
-
-pub(crate) fn get_keepalive_time(socket: TcpSocket) -> io::Result<Option<Duration>> {
- if !get_keepalive(socket)? {
- return Ok(None);
- }
-
- let mut optval: libc::c_int = 0;
- let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t;
- syscall!(getsockopt(
- socket,
- libc::IPPROTO_TCP,
- KEEPALIVE_TIME,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ))?;
-
- Ok(Some(Duration::from_secs(optval as u64)))
-}
-
-/// Linux, FreeBSD, and NetBSD support setting the keepalive interval via
-/// `TCP_KEEPINTVL`.
-/// See:
-/// - https://man7.org/linux/man-pages/man7/tcp.7.html
-/// - https://www.freebsd.org/cgi/man.cgi?query=tcp#end
-/// - http://man.netbsd.org/tcp.4#DESCRIPTION
-///
-/// OpenBSD does not:
-/// https://man.openbsd.org/tcp
-#[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
-))]
-fn set_keepalive_interval(socket: TcpSocket, interval: Duration) -> io::Result<()> {
- let interval_secs = interval
- .as_secs()
- .try_into()
- .ok()
- .unwrap_or_else(i32::max_value);
- syscall!(setsockopt(
- socket,
- libc::IPPROTO_TCP,
- libc::TCP_KEEPINTVL,
- &(interval_secs as libc::c_int) as *const _ as *const libc::c_void,
- size_of::<libc::c_int>() as libc::socklen_t
- ))
- .map(|_| ())
-}
-
-#[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
-))]
-pub(crate) fn get_keepalive_interval(socket: TcpSocket) -> io::Result<Option<Duration>> {
- if !get_keepalive(socket)? {
- return Ok(None);
- }
-
- let mut optval: libc::c_int = 0;
- let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t;
- syscall!(getsockopt(
- socket,
- libc::IPPROTO_TCP,
- libc::TCP_KEEPINTVL,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ))?;
-
- Ok(Some(Duration::from_secs(optval as u64)))
-}
-
-/// Linux, macOS/iOS, FreeBSD, and NetBSD support setting the number of TCP
-/// keepalive retries via `TCP_KEEPCNT`.
-/// See:
-/// - https://man7.org/linux/man-pages/man7/tcp.7.html
-/// - https://www.freebsd.org/cgi/man.cgi?query=tcp#end
-/// - http://man.netbsd.org/tcp.4#DESCRIPTION
-///
-/// OpenBSD does not:
-/// https://man.openbsd.org/tcp
-#[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
-))]
-fn set_keepalive_retries(socket: TcpSocket, retries: u32) -> io::Result<()> {
- let retries = retries.try_into().ok().unwrap_or_else(i32::max_value);
- syscall!(setsockopt(
- socket,
- libc::IPPROTO_TCP,
- libc::TCP_KEEPCNT,
- &(retries as libc::c_int) as *const _ as *const libc::c_void,
- size_of::<libc::c_int>() as libc::socklen_t
- ))
- .map(|_| ())
-}
-
-#[cfg(any(
- target_os = "linux",
- target_os = "macos",
- target_os = "ios",
- target_os = "freebsd",
- target_os = "netbsd",
-))]
-pub(crate) fn get_keepalive_retries(socket: TcpSocket) -> io::Result<Option<u32>> {
- if !get_keepalive(socket)? {
- return Ok(None);
- }
-
- let mut optval: libc::c_int = 0;
- let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t;
- syscall!(getsockopt(
- socket,
- libc::IPPROTO_TCP,
- libc::TCP_KEEPCNT,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ))?;
-
- Ok(Some(optval as u32))
-}
-
-pub fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
+pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
let mut addr: MaybeUninit<libc::sockaddr_storage> = MaybeUninit::uninit();
let mut length = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
@@ -456,13 +85,10 @@ pub fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, Socket
// OSes inherit the non-blocking flag from the listener, so we just have to
// set `CLOEXEC`.
#[cfg(any(
- all(
- target_arch = "x86",
- target_os = "android"
- ),
- target_os = "ios",
- target_os = "macos",
- target_os = "solaris"
+ all(target_arch = "x86", target_os = "android"),
+ target_os = "ios",
+ target_os = "macos",
+ target_os = "redox"
))]
let stream = {
syscall!(accept(
@@ -473,11 +99,11 @@ pub fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, Socket
.map(|socket| unsafe { net::TcpStream::from_raw_fd(socket) })
.and_then(|s| {
syscall!(fcntl(s.as_raw_fd(), libc::F_SETFD, libc::FD_CLOEXEC))?;
-
+
// See https://github.com/tokio-rs/mio/issues/1450
- #[cfg(all(target_arch = "x86",target_os = "android"))]
+ #[cfg(all(target_arch = "x86", target_os = "android"))]
syscall!(fcntl(s.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK))?;
-
+
Ok(s)
})
}?;
diff --git a/src/sys/unix/uds/listener.rs b/src/sys/unix/uds/listener.rs
index 547ff57..79bd14e 100644
--- a/src/sys/unix/uds/listener.rs
+++ b/src/sys/unix/uds/listener.rs
@@ -42,7 +42,7 @@ pub(crate) fn accept(listener: &net::UnixListener) -> io::Result<(UnixStream, So
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
- target_os = "solaris",
+ target_os = "redox",
// Android x86's seccomp profile forbids calls to `accept4(2)`
// See https://github.com/tokio-rs/mio/issues/1445 for details
all(
@@ -65,11 +65,8 @@ pub(crate) fn accept(listener: &net::UnixListener) -> io::Result<(UnixStream, So
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
- target_os = "solaris",
- all(
- target_arch = "x86",
- target_os = "android"
- )
+ target_os = "redox",
+ all(target_arch = "x86", target_os = "android")
))]
let socket = syscall!(accept(
listener.as_raw_fd(),
@@ -83,9 +80,9 @@ pub(crate) fn accept(listener: &net::UnixListener) -> io::Result<(UnixStream, So
syscall!(fcntl(socket, libc::F_SETFD, libc::FD_CLOEXEC))?;
// See https://github.com/tokio-rs/mio/issues/1450
- #[cfg(all(target_arch = "x86",target_os = "android"))]
+ #[cfg(all(target_arch = "x86", target_os = "android"))]
syscall!(fcntl(socket, libc::F_SETFL, libc::O_NONBLOCK))?;
-
+
Ok(s)
});
diff --git a/src/sys/unix/uds/mod.rs b/src/sys/unix/uds/mod.rs
index 3ec829f..526bbdf 100644
--- a/src/sys/unix/uds/mod.rs
+++ b/src/sys/unix/uds/mod.rs
@@ -40,7 +40,7 @@ cfg_os_poll! {
sockaddr.sun_family = libc::AF_UNIX as libc::sa_family_t;
let bytes = path.as_os_str().as_bytes();
- match (bytes.get(0), bytes.len().cmp(&sockaddr.sun_path.len())) {
+ match (bytes.first(), bytes.len().cmp(&sockaddr.sun_path.len())) {
// Abstract paths don't need a null terminator
(Some(&0), Ordering::Greater) => {
return Err(io::Error::new(
@@ -64,7 +64,7 @@ cfg_os_poll! {
let offset = path_offset(&sockaddr);
let mut socklen = offset + bytes.len();
- match bytes.get(0) {
+ match bytes.first() {
// The struct has already been zeroes so the null byte for pathname
// addresses is already there.
Some(&0) | None => {}
@@ -77,20 +77,20 @@ cfg_os_poll! {
fn pair<T>(flags: libc::c_int) -> io::Result<(T, T)>
where T: FromRawFd,
{
- #[cfg(not(any(target_os = "ios", target_os = "macos", target_os = "solaris")))]
+ #[cfg(not(any(target_os = "ios", target_os = "macos")))]
let flags = flags | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC;
let mut fds = [-1; 2];
syscall!(socketpair(libc::AF_UNIX, flags, 0, fds.as_mut_ptr()))?;
let pair = unsafe { (T::from_raw_fd(fds[0]), T::from_raw_fd(fds[1])) };
- // Darwin and Solaris do not have SOCK_NONBLOCK or SOCK_CLOEXEC.
+ // Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC.
//
// In order to set those flags, additional `fcntl` sys calls must be
// performed. If a `fnctl` fails after the sockets have been created,
// the file descriptors will leak. Creating `pair` above ensures that if
// there is an error, the file descriptors are closed.
- #[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))]
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
{
syscall!(fcntl(fds[0], libc::F_SETFL, libc::O_NONBLOCK))?;
syscall!(fcntl(fds[0], libc::F_SETFD, libc::FD_CLOEXEC))?;
diff --git a/src/sys/unix/uds/socketaddr.rs b/src/sys/unix/uds/socketaddr.rs
index a9f9ea9..4c7c411 100644
--- a/src/sys/unix/uds/socketaddr.rs
+++ b/src/sys/unix/uds/socketaddr.rs
@@ -78,14 +78,8 @@ cfg_os_poll! {
/// Documentation reflected in [`SocketAddr`]
///
/// [`SocketAddr`]: std::os::unix::net::SocketAddr
- // FIXME: The matches macro requires rust 1.42.0 and we still support 1.39.0
- #[allow(clippy::match_like_matches_macro)]
pub fn is_unnamed(&self) -> bool {
- if let AddressKind::Unnamed = self.address() {
- true
- } else {
- false
- }
+ matches!(self.address(), AddressKind::Unnamed)
}
/// Returns the contents of this address if it is a `pathname` address.
@@ -100,6 +94,18 @@ cfg_os_poll! {
None
}
}
+
+ /// Returns the contents of this address if it is an abstract namespace
+ /// without the leading null byte.
+ // Link to std::os::unix::net::SocketAddr pending
+ // https://github.com/rust-lang/rust/issues/85410.
+ pub fn as_abstract_namespace(&self) -> Option<&[u8]> {
+ if let AddressKind::Abstract(path) = self.address() {
+ Some(path)
+ } else {
+ None
+ }
+ }
}
}
diff --git a/src/sys/unix/uds/stream.rs b/src/sys/unix/uds/stream.rs
index 149dd14..9ae4867 100644
--- a/src/sys/unix/uds/stream.rs
+++ b/src/sys/unix/uds/stream.rs
@@ -13,7 +13,7 @@ pub(crate) fn connect(path: &Path) -> io::Result<net::UnixStream> {
match syscall!(connect(socket, sockaddr, socklen)) {
Ok(_) => {}
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
+ Err(ref err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {}
Err(e) => {
// Close the socket if we hit an error, ignoring the error
// from closing since we can't pass back two errors.
diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs
index a7cf484..de88e31 100644
--- a/src/sys/unix/waker.rs
+++ b/src/sys/unix/waker.rs
@@ -103,7 +103,7 @@ pub use self::kqueue::Waker;
target_os = "illumos",
target_os = "netbsd",
target_os = "openbsd",
- target_os = "solaris"
+ target_os = "redox",
))]
mod pipe {
use crate::sys::unix::Selector;
@@ -175,6 +175,6 @@ mod pipe {
target_os = "illumos",
target_os = "netbsd",
target_os = "openbsd",
- target_os = "solaris"
+ target_os = "redox",
))]
pub use self::pipe::Waker;
diff --git a/src/sys/wasi/mod.rs b/src/sys/wasi/mod.rs
new file mode 100644
index 0000000..eeea0f2
--- /dev/null
+++ b/src/sys/wasi/mod.rs
@@ -0,0 +1,370 @@
+//! # Notes
+//!
+//! The current implementation is somewhat limited. The `Waker` is not
+//! implemented, as at the time of writing there is no way to support to wake-up
+//! a thread from calling `poll_oneoff`.
+//!
+//! Furthermore the (re/de)register functions also don't work while concurrently
+//! polling as both registering and polling requires a lock on the
+//! `subscriptions`.
+//!
+//! Finally `Selector::try_clone`, required by `Registry::try_clone`, doesn't
+//! work. However this could be implemented by use of an `Arc`.
+//!
+//! In summary, this only (barely) works using a single thread.
+
+use std::cmp::min;
+use std::io;
+#[cfg(all(feature = "net", debug_assertions))]
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+
+#[cfg(feature = "net")]
+use crate::{Interest, Token};
+
+cfg_net! {
+ pub(crate) mod tcp {
+ use std::io;
+ use std::net::{self, SocketAddr};
+
+ pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
+ let (stream, addr) = listener.accept()?;
+ stream.set_nonblocking(true)?;
+ Ok((stream, addr))
+ }
+ }
+}
+
+/// Unique id for use as `SelectorId`.
+#[cfg(all(debug_assertions, feature = "net"))]
+static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
+
+pub(crate) struct Selector {
+ #[cfg(all(debug_assertions, feature = "net"))]
+ id: usize,
+ /// Subscriptions (reads events) we're interested in.
+ subscriptions: Arc<Mutex<Vec<wasi::Subscription>>>,
+}
+
+impl Selector {
+ pub(crate) fn new() -> io::Result<Selector> {
+ Ok(Selector {
+ #[cfg(all(debug_assertions, feature = "net"))]
+ id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
+ subscriptions: Arc::new(Mutex::new(Vec::new())),
+ })
+ }
+
+ #[cfg(all(debug_assertions, feature = "net"))]
+ pub(crate) fn id(&self) -> usize {
+ self.id
+ }
+
+ pub(crate) fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
+ events.clear();
+
+ let mut subscriptions = self.subscriptions.lock().unwrap();
+
+ // If we want to a use a timeout in the `wasi_poll_oneoff()` function
+ // we need another subscription to the list.
+ if let Some(timeout) = timeout {
+ subscriptions.push(timeout_subscription(timeout));
+ }
+
+ // `poll_oneoff` needs the same number of events as subscriptions.
+ let length = subscriptions.len();
+ events.reserve(length);
+
+ debug_assert!(events.capacity() >= length);
+ #[cfg(debug_assertions)]
+ if length == 0 {
+ log::warn!(
+ "calling mio::Poll::poll with empty subscriptions, this likely not what you want"
+ );
+ }
+
+ let res = unsafe { wasi::poll_oneoff(subscriptions.as_ptr(), events.as_mut_ptr(), length) };
+
+ // Remove the timeout subscription we possibly added above.
+ if timeout.is_some() {
+ let timeout_sub = subscriptions.pop();
+ debug_assert_eq!(
+ timeout_sub.unwrap().u.tag,
+ wasi::EVENTTYPE_CLOCK.raw(),
+ "failed to remove timeout subscription"
+ );
+ }
+
+ drop(subscriptions); // Unlock.
+
+ match res {
+ Ok(n_events) => {
+ // Safety: `poll_oneoff` initialises the `events` for us.
+ unsafe { events.set_len(n_events) };
+
+ // Remove the timeout event.
+ if timeout.is_some() {
+ if let Some(index) = events.iter().position(is_timeout_event) {
+ events.swap_remove(index);
+ }
+ }
+
+ check_errors(&events)
+ }
+ Err(err) => Err(io_err(err)),
+ }
+ }
+
+ pub(crate) fn try_clone(&self) -> io::Result<Selector> {
+ Ok(Selector {
+ #[cfg(all(debug_assertions, feature = "net"))]
+ id: self.id,
+ subscriptions: self.subscriptions.clone(),
+ })
+ }
+
+ #[cfg(feature = "net")]
+ pub(crate) fn register(
+ &self,
+ fd: wasi::Fd,
+ token: Token,
+ interests: Interest,
+ ) -> io::Result<()> {
+ let mut subscriptions = self.subscriptions.lock().unwrap();
+
+ if interests.is_writable() {
+ let subscription = wasi::Subscription {
+ userdata: token.0 as wasi::Userdata,
+ u: wasi::SubscriptionU {
+ tag: wasi::EVENTTYPE_FD_WRITE.raw(),
+ u: wasi::SubscriptionUU {
+ fd_write: wasi::SubscriptionFdReadwrite {
+ file_descriptor: fd,
+ },
+ },
+ },
+ };
+ subscriptions.push(subscription);
+ }
+
+ if interests.is_readable() {
+ let subscription = wasi::Subscription {
+ userdata: token.0 as wasi::Userdata,
+ u: wasi::SubscriptionU {
+ tag: wasi::EVENTTYPE_FD_READ.raw(),
+ u: wasi::SubscriptionUU {
+ fd_read: wasi::SubscriptionFdReadwrite {
+ file_descriptor: fd,
+ },
+ },
+ },
+ };
+ subscriptions.push(subscription);
+ }
+
+ Ok(())
+ }
+
+ #[cfg(feature = "net")]
+ pub(crate) fn reregister(
+ &self,
+ fd: wasi::Fd,
+ token: Token,
+ interests: Interest,
+ ) -> io::Result<()> {
+ self.deregister(fd)
+ .and_then(|()| self.register(fd, token, interests))
+ }
+
+ #[cfg(feature = "net")]
+ pub(crate) fn deregister(&self, fd: wasi::Fd) -> io::Result<()> {
+ let mut subscriptions = self.subscriptions.lock().unwrap();
+
+ let predicate = |subscription: &wasi::Subscription| {
+ // Safety: `subscription.u.tag` defines the type of the union in
+ // `subscription.u.u`.
+ match subscription.u.tag {
+ t if t == wasi::EVENTTYPE_FD_WRITE.raw() => unsafe {
+ subscription.u.u.fd_write.file_descriptor == fd
+ },
+ t if t == wasi::EVENTTYPE_FD_READ.raw() => unsafe {
+ subscription.u.u.fd_read.file_descriptor == fd
+ },
+ _ => false,
+ }
+ };
+
+ let mut ret = Err(io::ErrorKind::NotFound.into());
+
+ while let Some(index) = subscriptions.iter().position(predicate) {
+ subscriptions.swap_remove(index);
+ ret = Ok(())
+ }
+
+ ret
+ }
+}
+
+/// Token used to a add a timeout subscription, also used in removing it again.
+const TIMEOUT_TOKEN: wasi::Userdata = wasi::Userdata::max_value();
+
+/// Returns a `wasi::Subscription` for `timeout`.
+fn timeout_subscription(timeout: Duration) -> wasi::Subscription {
+ wasi::Subscription {
+ userdata: TIMEOUT_TOKEN,
+ u: wasi::SubscriptionU {
+ tag: wasi::EVENTTYPE_CLOCK.raw(),
+ u: wasi::SubscriptionUU {
+ clock: wasi::SubscriptionClock {
+ id: wasi::CLOCKID_MONOTONIC,
+ // Timestamp is in nanoseconds.
+ timeout: min(wasi::Timestamp::MAX as u128, timeout.as_nanos())
+ as wasi::Timestamp,
+ // Give the implementation another millisecond to coalesce
+ // events.
+ precision: Duration::from_millis(1).as_nanos() as wasi::Timestamp,
+ // Zero means the `timeout` is considered relative to the
+ // current time.
+ flags: 0,
+ },
+ },
+ },
+ }
+}
+
+fn is_timeout_event(event: &wasi::Event) -> bool {
+ event.type_ == wasi::EVENTTYPE_CLOCK && event.userdata == TIMEOUT_TOKEN
+}
+
+/// Check all events for possible errors, it returns the first error found.
+fn check_errors(events: &[Event]) -> io::Result<()> {
+ for event in events {
+ if event.error != wasi::ERRNO_SUCCESS {
+ return Err(io_err(event.error));
+ }
+ }
+ Ok(())
+}
+
+/// Convert `wasi::Errno` into an `io::Error`.
+fn io_err(errno: wasi::Errno) -> io::Error {
+ // TODO: check if this is valid.
+ io::Error::from_raw_os_error(errno.raw() as i32)
+}
+
+pub(crate) type Events = Vec<Event>;
+
+pub(crate) type Event = wasi::Event;
+
+pub(crate) mod event {
+ use std::fmt;
+
+ use crate::sys::Event;
+ use crate::Token;
+
+ pub(crate) fn token(event: &Event) -> Token {
+ Token(event.userdata as usize)
+ }
+
+ pub(crate) fn is_readable(event: &Event) -> bool {
+ event.type_ == wasi::EVENTTYPE_FD_READ
+ }
+
+ pub(crate) fn is_writable(event: &Event) -> bool {
+ event.type_ == wasi::EVENTTYPE_FD_WRITE
+ }
+
+ pub(crate) fn is_error(_: &Event) -> bool {
+ // Not supported? It could be that `wasi::Event.error` could be used for
+ // this, but the docs say `error that occurred while processing the
+ // subscription request`, so it's checked in `Select::select` already.
+ false
+ }
+
+ pub(crate) fn is_read_closed(event: &Event) -> bool {
+ event.type_ == wasi::EVENTTYPE_FD_READ
+ // Safety: checked the type of the union above.
+ && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0
+ }
+
+ pub(crate) fn is_write_closed(event: &Event) -> bool {
+ event.type_ == wasi::EVENTTYPE_FD_WRITE
+ // Safety: checked the type of the union above.
+ && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0
+ }
+
+ pub(crate) fn is_priority(_: &Event) -> bool {
+ // Not supported.
+ false
+ }
+
+ pub(crate) fn is_aio(_: &Event) -> bool {
+ // Not supported.
+ false
+ }
+
+ pub(crate) fn is_lio(_: &Event) -> bool {
+ // Not supported.
+ false
+ }
+
+ pub(crate) fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result {
+ debug_detail!(
+ TypeDetails(wasi::Eventtype),
+ PartialEq::eq,
+ wasi::EVENTTYPE_CLOCK,
+ wasi::EVENTTYPE_FD_READ,
+ wasi::EVENTTYPE_FD_WRITE,
+ );
+
+ #[allow(clippy::trivially_copy_pass_by_ref)]
+ fn check_flag(got: &wasi::Eventrwflags, want: &wasi::Eventrwflags) -> bool {
+ (got & want) != 0
+ }
+ debug_detail!(
+ EventrwflagsDetails(wasi::Eventrwflags),
+ check_flag,
+ wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP,
+ );
+
+ struct EventFdReadwriteDetails(wasi::EventFdReadwrite);
+
+ impl fmt::Debug for EventFdReadwriteDetails {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("EventFdReadwrite")
+ .field("nbytes", &self.0.nbytes)
+ .field("flags", &self.0.flags)
+ .finish()
+ }
+ }
+
+ f.debug_struct("Event")
+ .field("userdata", &event.userdata)
+ .field("error", &event.error)
+ .field("type", &TypeDetails(event.type_))
+ .field("fd_readwrite", &EventFdReadwriteDetails(event.fd_readwrite))
+ .finish()
+ }
+}
+
+cfg_os_poll! {
+ cfg_io_source! {
+ pub(crate) struct IoSourceState;
+
+ impl IoSourceState {
+ pub(crate) fn new() -> IoSourceState {
+ IoSourceState
+ }
+
+ pub(crate) fn do_io<T, F, R>(&self, f: F, io: &T) -> io::Result<R>
+ where
+ F: FnOnce(&T) -> io::Result<R>,
+ {
+ // We don't hold state, so we can just call the function and
+ // return.
+ f(io)
+ }
+ }
+ }
+}
diff --git a/src/sys/windows/afd.rs b/src/sys/windows/afd.rs
index 6241a45..0308e2f 100644
--- a/src/sys/windows/afd.rs
+++ b/src/sys/windows/afd.rs
@@ -1,17 +1,32 @@
-use ntapi::ntioapi::{IO_STATUS_BLOCK_u, IO_STATUS_BLOCK};
-use ntapi::ntioapi::{NtCancelIoFileEx, NtDeviceIoControlFile};
-use ntapi::ntrtl::RtlNtStatusToDosError;
+use std::ffi::c_void;
use std::fmt;
use std::fs::File;
use std::io;
use std::mem::size_of;
use std::os::windows::io::AsRawHandle;
-use std::ptr::null_mut;
-use winapi::shared::ntdef::{HANDLE, LARGE_INTEGER, NTSTATUS, PVOID, ULONG};
-use winapi::shared::ntstatus::{STATUS_NOT_FOUND, STATUS_PENDING, STATUS_SUCCESS};
-const IOCTL_AFD_POLL: ULONG = 0x00012024;
+use windows_sys::Win32::Foundation::{
+ RtlNtStatusToDosError, HANDLE, NTSTATUS, STATUS_NOT_FOUND, STATUS_PENDING, STATUS_SUCCESS,
+};
+use windows_sys::Win32::System::WindowsProgramming::{
+ NtDeviceIoControlFile, IO_STATUS_BLOCK, IO_STATUS_BLOCK_0,
+};
+const IOCTL_AFD_POLL: u32 = 0x00012024;
+
+#[link(name = "ntdll")]
+extern "system" {
+ /// See <https://processhacker.sourceforge.io/doc/ntioapi_8h.html#a0d4d550cad4d62d75b76961e25f6550c>
+ ///
+ /// This is an undocumented API and as such not part of <https://github.com/microsoft/win32metadata>
+ /// from which `windows-sys` is generated, and also unlikely to be added, so
+ /// we manually declare it here
+ fn NtCancelIoFileEx(
+ FileHandle: HANDLE,
+ IoRequestToCancel: *mut IO_STATUS_BLOCK,
+ IoStatusBlock: *mut IO_STATUS_BLOCK,
+ ) -> NTSTATUS;
+}
/// Winsock2 AFD driver instance.
///
/// All operations are unsafe due to IO_STATUS_BLOCK parameter are being used by Afd driver during STATUS_PENDING before I/O Completion Port returns its result.
@@ -24,7 +39,7 @@ pub struct Afd {
#[derive(Debug)]
pub struct AfdPollHandleInfo {
pub handle: HANDLE,
- pub events: ULONG,
+ pub events: u32,
pub status: NTSTATUS,
}
@@ -32,10 +47,10 @@ unsafe impl Send for AfdPollHandleInfo {}
#[repr(C)]
pub struct AfdPollInfo {
- pub timeout: LARGE_INTEGER,
+ pub timeout: i64,
// Can have only value 1.
- pub number_of_handles: ULONG,
- pub exclusive: ULONG,
+ pub number_of_handles: u32,
+ pub exclusive: u32,
pub handles: [AfdPollHandleInfo; 1],
}
@@ -58,13 +73,13 @@ impl Afd {
&self,
info: &mut AfdPollInfo,
iosb: *mut IO_STATUS_BLOCK,
- overlapped: PVOID,
+ overlapped: *mut c_void,
) -> io::Result<bool> {
- let info_ptr: PVOID = info as *mut _ as PVOID;
- (*iosb).u.Status = STATUS_PENDING;
+ let info_ptr = info as *mut _ as *mut c_void;
+ (*iosb).Anonymous.Status = STATUS_PENDING;
let status = NtDeviceIoControlFile(
- self.fd.as_raw_handle(),
- null_mut(),
+ self.fd.as_raw_handle() as HANDLE,
+ 0,
None,
overlapped,
iosb,
@@ -93,15 +108,15 @@ impl Afd {
/// Use it only with request is still being polled so that you have valid `IO_STATUS_BLOCK` to use.
/// User should NOT deallocate there overlapped value after the `cancel` to prevent double free.
pub unsafe fn cancel(&self, iosb: *mut IO_STATUS_BLOCK) -> io::Result<()> {
- if (*iosb).u.Status != STATUS_PENDING {
+ if (*iosb).Anonymous.Status != STATUS_PENDING {
return Ok(());
}
let mut cancel_iosb = IO_STATUS_BLOCK {
- u: IO_STATUS_BLOCK_u { Status: 0 },
+ Anonymous: IO_STATUS_BLOCK_0 { Status: 0 },
Information: 0,
};
- let status = NtCancelIoFileEx(self.fd.as_raw_handle(), iosb, &mut cancel_iosb);
+ let status = NtCancelIoFileEx(self.fd.as_raw_handle() as HANDLE, iosb, &mut cancel_iosb);
if status == STATUS_SUCCESS || status == STATUS_NOT_FOUND {
return Ok(());
}
@@ -114,18 +129,21 @@ impl Afd {
cfg_io_source! {
use std::mem::zeroed;
use std::os::windows::io::{FromRawHandle, RawHandle};
+ use std::ptr::null_mut;
use std::sync::atomic::{AtomicUsize, Ordering};
- use miow::iocp::CompletionPort;
- use ntapi::ntioapi::{NtCreateFile, FILE_OPEN};
- use winapi::shared::ntdef::{OBJECT_ATTRIBUTES, UNICODE_STRING, USHORT, WCHAR};
- use winapi::um::handleapi::INVALID_HANDLE_VALUE;
- use winapi::um::winbase::{SetFileCompletionNotificationModes, FILE_SKIP_SET_EVENT_ON_HANDLE};
- use winapi::um::winnt::{SYNCHRONIZE, FILE_SHARE_READ, FILE_SHARE_WRITE};
+ use super::iocp::CompletionPort;
+ use windows_sys::Win32::{
+ Foundation::{UNICODE_STRING, INVALID_HANDLE_VALUE},
+ System::WindowsProgramming::{
+ OBJECT_ATTRIBUTES, FILE_SKIP_SET_EVENT_ON_HANDLE,
+ },
+ Storage::FileSystem::{FILE_OPEN, NtCreateFile, SetFileCompletionNotificationModes, SYNCHRONIZE, FILE_SHARE_READ, FILE_SHARE_WRITE},
+ };
const AFD_HELPER_ATTRIBUTES: OBJECT_ATTRIBUTES = OBJECT_ATTRIBUTES {
- Length: size_of::<OBJECT_ATTRIBUTES>() as ULONG,
- RootDirectory: null_mut(),
+ Length: size_of::<OBJECT_ATTRIBUTES>() as u32,
+ RootDirectory: 0,
ObjectName: &AFD_OBJ_NAME as *const _ as *mut _,
Attributes: 0,
SecurityDescriptor: null_mut(),
@@ -133,12 +151,12 @@ cfg_io_source! {
};
const AFD_OBJ_NAME: UNICODE_STRING = UNICODE_STRING {
- Length: (AFD_HELPER_NAME.len() * size_of::<WCHAR>()) as USHORT,
- MaximumLength: (AFD_HELPER_NAME.len() * size_of::<WCHAR>()) as USHORT,
+ Length: (AFD_HELPER_NAME.len() * size_of::<u16>()) as u16,
+ MaximumLength: (AFD_HELPER_NAME.len() * size_of::<u16>()) as u16,
Buffer: AFD_HELPER_NAME.as_ptr() as *mut _,
};
- const AFD_HELPER_NAME: &[WCHAR] = &[
+ const AFD_HELPER_NAME: &[u16] = &[
'\\' as _,
'D' as _,
'e' as _,
@@ -166,10 +184,10 @@ cfg_io_source! {
impl Afd {
/// Create new Afd instance.
- pub fn new(cp: &CompletionPort) -> io::Result<Afd> {
+ pub(crate) fn new(cp: &CompletionPort) -> io::Result<Afd> {
let mut afd_helper_handle: HANDLE = INVALID_HANDLE_VALUE;
let mut iosb = IO_STATUS_BLOCK {
- u: IO_STATUS_BLOCK_u { Status: 0 },
+ Anonymous: IO_STATUS_BLOCK_0 { Status: 0 },
Information: 0,
};
@@ -180,12 +198,12 @@ cfg_io_source! {
&AFD_HELPER_ATTRIBUTES as *const _ as *mut _,
&mut iosb,
null_mut(),
- 0 as ULONG,
+ 0,
FILE_SHARE_READ | FILE_SHARE_WRITE,
FILE_OPEN,
- 0 as ULONG,
+ 0,
null_mut(),
- 0 as ULONG,
+ 0,
);
if status != STATUS_SUCCESS {
let raw_err = io::Error::from_raw_os_error(
@@ -204,7 +222,7 @@ cfg_io_source! {
cp.add_handle(token, &afd.fd)?;
match SetFileCompletionNotificationModes(
afd_helper_handle,
- FILE_SKIP_SET_EVENT_ON_HANDLE,
+ FILE_SKIP_SET_EVENT_ON_HANDLE as u8 // This is just 2, so fits in u8
) {
0 => Err(io::Error::last_os_error()),
_ => Ok(afd),
@@ -214,18 +232,18 @@ cfg_io_source! {
}
}
-pub const POLL_RECEIVE: u32 = 0b000_000_001;
-pub const POLL_RECEIVE_EXPEDITED: u32 = 0b000_000_010;
-pub const POLL_SEND: u32 = 0b000_000_100;
-pub const POLL_DISCONNECT: u32 = 0b000_001_000;
-pub const POLL_ABORT: u32 = 0b000_010_000;
-pub const POLL_LOCAL_CLOSE: u32 = 0b000_100_000;
+pub const POLL_RECEIVE: u32 = 0b0_0000_0001;
+pub const POLL_RECEIVE_EXPEDITED: u32 = 0b0_0000_0010;
+pub const POLL_SEND: u32 = 0b0_0000_0100;
+pub const POLL_DISCONNECT: u32 = 0b0_0000_1000;
+pub const POLL_ABORT: u32 = 0b0_0001_0000;
+pub const POLL_LOCAL_CLOSE: u32 = 0b0_0010_0000;
// Not used as it indicated in each event where a connection is connected, not
// just the first time a connection is established.
// Also see https://github.com/piscisaureus/wepoll/commit/8b7b340610f88af3d83f40fb728e7b850b090ece.
-pub const POLL_CONNECT: u32 = 0b001_000_000;
-pub const POLL_ACCEPT: u32 = 0b010_000_000;
-pub const POLL_CONNECT_FAIL: u32 = 0b100_000_000;
+pub const POLL_CONNECT: u32 = 0b0_0100_0000;
+pub const POLL_ACCEPT: u32 = 0b0_1000_0000;
+pub const POLL_CONNECT_FAIL: u32 = 0b1_0000_0000;
pub const KNOWN_EVENTS: u32 = POLL_RECEIVE
| POLL_RECEIVE_EXPEDITED
diff --git a/src/sys/windows/event.rs b/src/sys/windows/event.rs
index a49252a..731bd60 100644
--- a/src/sys/windows/event.rs
+++ b/src/sys/windows/event.rs
@@ -1,8 +1,7 @@
use std::fmt;
-use miow::iocp::CompletionStatus;
-
use super::afd;
+use super::iocp::CompletionStatus;
use crate::Token;
#[derive(Clone)]
diff --git a/src/sys/windows/handle.rs b/src/sys/windows/handle.rs
new file mode 100644
index 0000000..5b9ac0b
--- /dev/null
+++ b/src/sys/windows/handle.rs
@@ -0,0 +1,30 @@
+use std::os::windows::io::RawHandle;
+use windows_sys::Win32::Foundation::{CloseHandle, HANDLE};
+
+/// Wrapper around a Windows HANDLE so that we close it upon drop in all scenarios
+#[derive(Debug)]
+pub struct Handle(HANDLE);
+
+impl Handle {
+ #[inline]
+ pub fn new(handle: HANDLE) -> Self {
+ Self(handle)
+ }
+
+ pub fn raw(&self) -> HANDLE {
+ self.0
+ }
+
+ pub fn into_raw(self) -> RawHandle {
+ let ret = self.0;
+ // This is super important so that drop is not called!
+ std::mem::forget(self);
+ ret as RawHandle
+ }
+}
+
+impl Drop for Handle {
+ fn drop(&mut self) {
+ unsafe { CloseHandle(self.0) };
+ }
+}
diff --git a/src/sys/windows/io_status_block.rs b/src/sys/windows/io_status_block.rs
index 3e60334..d7eda6a 100644
--- a/src/sys/windows/io_status_block.rs
+++ b/src/sys/windows/io_status_block.rs
@@ -1,17 +1,17 @@
use std::fmt;
use std::ops::{Deref, DerefMut};
-use ntapi::ntioapi::IO_STATUS_BLOCK;
+use windows_sys::Win32::System::WindowsProgramming::IO_STATUS_BLOCK;
pub struct IoStatusBlock(IO_STATUS_BLOCK);
cfg_io_source! {
- use ntapi::ntioapi::IO_STATUS_BLOCK_u;
+ use windows_sys::Win32::System::WindowsProgramming::{IO_STATUS_BLOCK_0};
impl IoStatusBlock {
pub fn zeroed() -> Self {
Self(IO_STATUS_BLOCK {
- u: IO_STATUS_BLOCK_u { Status: 0 },
+ Anonymous: IO_STATUS_BLOCK_0 { Status: 0 },
Information: 0,
})
}
diff --git a/src/sys/windows/iocp.rs b/src/sys/windows/iocp.rs
new file mode 100644
index 0000000..142b6fc
--- /dev/null
+++ b/src/sys/windows/iocp.rs
@@ -0,0 +1,275 @@
+//! Bindings to IOCP, I/O Completion Ports
+
+use super::{Handle, Overlapped};
+use std::cmp;
+use std::fmt;
+use std::io;
+use std::mem;
+use std::os::windows::io::*;
+use std::time::Duration;
+
+use windows_sys::Win32::Foundation::{HANDLE, INVALID_HANDLE_VALUE};
+use windows_sys::Win32::System::IO::{
+ CreateIoCompletionPort, GetQueuedCompletionStatusEx, PostQueuedCompletionStatus, OVERLAPPED,
+ OVERLAPPED_ENTRY,
+};
+
+/// A handle to an Windows I/O Completion Port.
+#[derive(Debug)]
+pub(crate) struct CompletionPort {
+ handle: Handle,
+}
+
+/// A status message received from an I/O completion port.
+///
+/// These statuses can be created via the `new` or `empty` constructors and then
+/// provided to a completion port, or they are read out of a completion port.
+/// The fields of each status are read through its accessor methods.
+#[derive(Clone, Copy)]
+#[repr(transparent)]
+pub struct CompletionStatus(OVERLAPPED_ENTRY);
+
+impl fmt::Debug for CompletionStatus {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "CompletionStatus(OVERLAPPED_ENTRY)")
+ }
+}
+
+unsafe impl Send for CompletionStatus {}
+unsafe impl Sync for CompletionStatus {}
+
+impl CompletionPort {
+ /// Creates a new I/O completion port with the specified concurrency value.
+ ///
+ /// The number of threads given corresponds to the level of concurrency
+ /// allowed for threads associated with this port. Consult the Windows
+ /// documentation for more information about this value.
+ pub fn new(threads: u32) -> io::Result<CompletionPort> {
+ let ret = unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, threads) };
+ if ret == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(CompletionPort {
+ handle: Handle::new(ret),
+ })
+ }
+ }
+
+ /// Associates a new `HANDLE` to this I/O completion port.
+ ///
+ /// This function will associate the given handle to this port with the
+ /// given `token` to be returned in status messages whenever it receives a
+ /// notification.
+ ///
+ /// Any object which is convertible to a `HANDLE` via the `AsRawHandle`
+ /// trait can be provided to this function, such as `std::fs::File` and
+ /// friends.
+ #[cfg(any(feature = "net", feature = "os-ext"))]
+ pub fn add_handle<T: AsRawHandle + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> {
+ let ret = unsafe {
+ CreateIoCompletionPort(t.as_raw_handle() as HANDLE, self.handle.raw(), token, 0)
+ };
+ if ret == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(())
+ }
+ }
+
+ /// Dequeues a number of completion statuses from this I/O completion port.
+ ///
+ /// This function is the same as `get` except that it may return more than
+ /// one status. A buffer of "zero" statuses is provided (the contents are
+ /// not read) and then on success this function will return a sub-slice of
+ /// statuses which represent those which were dequeued from this port. This
+ /// function does not wait to fill up the entire list of statuses provided.
+ ///
+ /// Like with `get`, a timeout may be specified for this operation.
+ pub fn get_many<'a>(
+ &self,
+ list: &'a mut [CompletionStatus],
+ timeout: Option<Duration>,
+ ) -> io::Result<&'a mut [CompletionStatus]> {
+ debug_assert_eq!(
+ mem::size_of::<CompletionStatus>(),
+ mem::size_of::<OVERLAPPED_ENTRY>()
+ );
+ let mut removed = 0;
+ let timeout = duration_millis(timeout);
+ let len = cmp::min(list.len(), <u32>::max_value() as usize) as u32;
+ let ret = unsafe {
+ GetQueuedCompletionStatusEx(
+ self.handle.raw(),
+ list.as_ptr() as *mut _,
+ len,
+ &mut removed,
+ timeout,
+ 0,
+ )
+ };
+
+ if ret == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(&mut list[..removed as usize])
+ }
+ }
+
+ /// Posts a new completion status onto this I/O completion port.
+ ///
+ /// This function will post the given status, with custom parameters, to the
+ /// port. Threads blocked in `get` or `get_many` will eventually receive
+ /// this status.
+ pub fn post(&self, status: CompletionStatus) -> io::Result<()> {
+ let ret = unsafe {
+ PostQueuedCompletionStatus(
+ self.handle.raw(),
+ status.0.dwNumberOfBytesTransferred,
+ status.0.lpCompletionKey,
+ status.0.lpOverlapped,
+ )
+ };
+
+ if ret == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(())
+ }
+ }
+}
+
+impl AsRawHandle for CompletionPort {
+ fn as_raw_handle(&self) -> RawHandle {
+ self.handle.raw() as RawHandle
+ }
+}
+
+impl FromRawHandle for CompletionPort {
+ unsafe fn from_raw_handle(handle: RawHandle) -> CompletionPort {
+ CompletionPort {
+ handle: Handle::new(handle as HANDLE),
+ }
+ }
+}
+
+impl IntoRawHandle for CompletionPort {
+ fn into_raw_handle(self) -> RawHandle {
+ self.handle.into_raw()
+ }
+}
+
+impl CompletionStatus {
+ /// Creates a new completion status with the provided parameters.
+ ///
+ /// This function is useful when creating a status to send to a port with
+ /// the `post` method. The parameters are opaquely passed through and not
+ /// interpreted by the system at all.
+ pub(crate) fn new(bytes: u32, token: usize, overlapped: *mut Overlapped) -> Self {
+ CompletionStatus(OVERLAPPED_ENTRY {
+ dwNumberOfBytesTransferred: bytes,
+ lpCompletionKey: token,
+ lpOverlapped: overlapped as *mut _,
+ Internal: 0,
+ })
+ }
+
+ /// Creates a new borrowed completion status from the borrowed
+ /// `OVERLAPPED_ENTRY` argument provided.
+ ///
+ /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`,
+ /// returning the wrapped structure.
+ #[cfg(feature = "os-ext")]
+ pub fn from_entry(entry: &OVERLAPPED_ENTRY) -> &Self {
+ // Safety: CompletionStatus is repr(transparent) w/ OVERLAPPED_ENTRY, so
+ // a reference to one is guaranteed to be layout compatible with the
+ // reference to another.
+ unsafe { &*(entry as *const _ as *const _) }
+ }
+
+ /// Creates a new "zero" completion status.
+ ///
+ /// This function is useful when creating a stack buffer or vector of
+ /// completion statuses to be passed to the `get_many` function.
+ pub fn zero() -> Self {
+ Self::new(0, 0, std::ptr::null_mut())
+ }
+
+ /// Returns the number of bytes that were transferred for the I/O operation
+ /// associated with this completion status.
+ pub fn bytes_transferred(&self) -> u32 {
+ self.0.dwNumberOfBytesTransferred
+ }
+
+ /// Returns the completion key value associated with the file handle whose
+ /// I/O operation has completed.
+ ///
+ /// A completion key is a per-handle key that is specified when it is added
+ /// to an I/O completion port via `add_handle` or `add_socket`.
+ pub fn token(&self) -> usize {
+ self.0.lpCompletionKey as usize
+ }
+
+ /// Returns a pointer to the `Overlapped` structure that was specified when
+ /// the I/O operation was started.
+ pub fn overlapped(&self) -> *mut OVERLAPPED {
+ self.0.lpOverlapped
+ }
+
+ /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object.
+ pub fn entry(&self) -> &OVERLAPPED_ENTRY {
+ &self.0
+ }
+}
+
+#[inline]
+fn duration_millis(dur: Option<Duration>) -> u32 {
+ if let Some(dur) = dur {
+ let dur_ms = dur.as_millis();
+ // as_millis() truncates, so round nonzero <1ms timeouts up to 1ms. This avoids turning
+ // submillisecond timeouts into immediate reutrns unless the caller explictly requests that
+ // by specifiying a zero timeout.
+ let dur_ms = dur_ms
+ + if dur_ms == 0 && dur.subsec_nanos() != 0 {
+ 1
+ } else {
+ 0
+ };
+ std::cmp::min(dur_ms, u32::MAX as u128) as u32
+ } else {
+ u32::MAX
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::{CompletionPort, CompletionStatus};
+
+ #[test]
+ fn is_send_sync() {
+ fn is_send_sync<T: Send + Sync>() {}
+ is_send_sync::<CompletionPort>();
+ }
+
+ #[test]
+ fn get_many() {
+ let c = CompletionPort::new(1).unwrap();
+
+ c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap();
+ c.post(CompletionStatus::new(4, 5, 6 as *mut _)).unwrap();
+
+ let mut s = vec![CompletionStatus::zero(); 4];
+ {
+ let s = c.get_many(&mut s, None).unwrap();
+ assert_eq!(s.len(), 2);
+ assert_eq!(s[0].bytes_transferred(), 1);
+ assert_eq!(s[0].token(), 2);
+ assert_eq!(s[0].overlapped(), 3 as *mut _);
+ assert_eq!(s[1].bytes_transferred(), 4);
+ assert_eq!(s[1].token(), 5);
+ assert_eq!(s[1].overlapped(), 6 as *mut _);
+ }
+ assert_eq!(s[2].bytes_transferred(), 0);
+ assert_eq!(s[2].token(), 0);
+ assert_eq!(s[2].overlapped(), 0 as *mut _);
+ }
+}
diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs
index 98b6fc6..f8b72fc 100644
--- a/src/sys/windows/mod.rs
+++ b/src/sys/windows/mod.rs
@@ -1,15 +1,20 @@
mod afd;
-mod io_status_block;
pub mod event;
pub use event::{Event, Events};
-mod selector;
-pub use selector::{Selector, SelectorInner, SockState};
+mod handle;
+use handle::Handle;
+
+mod io_status_block;
+mod iocp;
mod overlapped;
use overlapped::Overlapped;
+mod selector;
+pub use selector::{Selector, SelectorInner, SockState};
+
// Macros must be defined before the modules that use them
cfg_net! {
/// Helper macro to execute a system call that returns an `io::Result`.
@@ -45,7 +50,7 @@ cfg_io_source! {
use std::pin::Pin;
use std::sync::{Arc, Mutex};
- use crate::{poll, Interest, Registry, Token};
+ use crate::{Interest, Registry, Token};
struct InternalState {
selector: Arc<SelectorInner>,
@@ -101,7 +106,8 @@ cfg_io_source! {
if self.inner.is_some() {
Err(io::ErrorKind::AlreadyExists.into())
} else {
- poll::selector(registry)
+ registry
+ .selector()
.register(socket, token, interests)
.map(|state| {
self.inner = Some(Box::new(state));
@@ -117,7 +123,8 @@ cfg_io_source! {
) -> io::Result<()> {
match self.inner.as_mut() {
Some(state) => {
- poll::selector(registry)
+ registry
+ .selector()
.reregister(state.sock_state.clone(), token, interests)
.map(|()| {
state.token = token;
diff --git a/src/sys/windows/named_pipe.rs b/src/sys/windows/named_pipe.rs
index 8c81f38..23f85d1 100644
--- a/src/sys/windows/named_pipe.rs
+++ b/src/sys/windows/named_pipe.rs
@@ -1,41 +1,31 @@
-use crate::event::Source;
-use crate::sys::windows::{Event, Overlapped};
-use crate::{poll, Registry};
-use winapi::um::minwinbase::OVERLAPPED_ENTRY;
-
use std::ffi::OsStr;
-use std::fmt;
use std::io::{self, Read, Write};
-use std::mem;
-use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
-use std::slice;
+use std::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle};
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::{Arc, Mutex};
+use std::{fmt, mem, slice};
+
+use windows_sys::Win32::Foundation::{
+ ERROR_BROKEN_PIPE, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_NO_DATA, ERROR_PIPE_CONNECTED,
+ ERROR_PIPE_LISTENING, HANDLE, INVALID_HANDLE_VALUE,
+};
+use windows_sys::Win32::Storage::FileSystem::{
+ ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX,
+};
+use windows_sys::Win32::System::Pipes::{
+ ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe, PIPE_TYPE_BYTE,
+ PIPE_UNLIMITED_INSTANCES,
+};
+use windows_sys::Win32::System::IO::{
+ CancelIoEx, GetOverlappedResult, OVERLAPPED, OVERLAPPED_ENTRY,
+};
+use crate::event::Source;
+use crate::sys::windows::iocp::{CompletionPort, CompletionStatus};
+use crate::sys::windows::{Event, Handle, Overlapped};
+use crate::Registry;
use crate::{Interest, Token};
-use miow::iocp::{CompletionPort, CompletionStatus};
-use miow::pipe;
-use winapi::shared::winerror::{ERROR_BROKEN_PIPE, ERROR_PIPE_LISTENING};
-use winapi::um::ioapiset::CancelIoEx;
-
-/// # Safety
-///
-/// Only valid if the strict is annotated with `#[repr(C)]`. This is only used
-/// with `Overlapped` and `Inner`, which are correctly annotated.
-macro_rules! offset_of {
- ($t:ty, $($field:ident).+) => (
- &(*(0 as *const $t)).$($field).+ as *const _ as usize
- )
-}
-
-macro_rules! overlapped2arc {
- ($e:expr, $t:ty, $($field:ident).+) => ({
- let offset = offset_of!($t, $($field).+);
- debug_assert!(offset < mem::size_of::<$t>());
- Arc::from_raw(($e as usize - offset) as *mut $t)
- })
-}
/// Non-blocking windows named pipe.
///
@@ -83,30 +73,266 @@ pub struct NamedPipe {
inner: Arc<Inner>,
}
+/// # Notes
+///
+/// The memory layout of this structure must be fixed as the
+/// `ptr_from_*_overlapped` methods depend on it, see the `ptr_from` test.
#[repr(C)]
struct Inner {
- handle: pipe::NamedPipe,
-
+ // NOTE: careful modifying the order of these three fields, the `ptr_from_*`
+ // methods depend on the layout!
connect: Overlapped,
- connecting: AtomicBool,
-
read: Overlapped,
write: Overlapped,
-
+ // END NOTE.
+ handle: Handle,
+ connecting: AtomicBool,
io: Mutex<Io>,
-
pool: Mutex<BufferPool>,
}
+impl Inner {
+ /// Converts a pointer to `Inner.connect` to a pointer to `Inner`.
+ ///
+ /// # Unsafety
+ ///
+ /// Caller must ensure `ptr` is pointing to `Inner.connect`.
+ unsafe fn ptr_from_conn_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
+ // `connect` is the first field, so the pointer are the same.
+ ptr.cast()
+ }
+
+ /// Same as [`ptr_from_conn_overlapped`] but for `Inner.read`.
+ unsafe fn ptr_from_read_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
+ // `read` is after `connect: Overlapped`.
+ (ptr as *mut Overlapped).wrapping_sub(1) as *const Inner
+ }
+
+ /// Same as [`ptr_from_conn_overlapped`] but for `Inner.write`.
+ unsafe fn ptr_from_write_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
+ // `read` is after `connect: Overlapped` and `read: Overlapped`.
+ (ptr as *mut Overlapped).wrapping_sub(2) as *const Inner
+ }
+
+ /// Issue a connection request with the specified overlapped operation.
+ ///
+ /// This function will issue a request to connect a client to this server,
+ /// returning immediately after starting the overlapped operation.
+ ///
+ /// If this function immediately succeeds then `Ok(true)` is returned. If
+ /// the overlapped operation is enqueued and pending, then `Ok(false)` is
+ /// returned. Otherwise an error is returned indicating what went wrong.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the
+ /// `overlapped` pointer is valid until the end of the I/O operation. The
+ /// kernel also requires that `overlapped` is unique for this I/O operation
+ /// and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that this pointer is
+ /// valid until the I/O operation is completed, typically via completion
+ /// ports and waiting to receive the completion notification on the port.
+ pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool> {
+ if ConnectNamedPipe(self.handle.raw(), overlapped) != 0 {
+ return Ok(true);
+ }
+
+ let err = io::Error::last_os_error();
+
+ match err.raw_os_error().map(|e| e as u32) {
+ Some(ERROR_PIPE_CONNECTED) => Ok(true),
+ Some(ERROR_NO_DATA) => Ok(true),
+ Some(ERROR_IO_PENDING) => Ok(false),
+ _ => Err(err),
+ }
+ }
+
+ /// Disconnects this named pipe from any connected client.
+ pub fn disconnect(&self) -> io::Result<()> {
+ if unsafe { DisconnectNamedPipe(self.handle.raw()) } == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(())
+ }
+ }
+
+ /// Issues an overlapped read operation to occur on this pipe.
+ ///
+ /// This function will issue an asynchronous read to occur in an overlapped
+ /// fashion, returning immediately. The `buf` provided will be filled in
+ /// with data and the request is tracked by the `overlapped` function
+ /// provided.
+ ///
+ /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
+ /// `n` is the number of bytes read. If an asynchronous operation is
+ /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
+ /// it is returned.
+ ///
+ /// When this operation completes (or if it completes immediately), another
+ /// mechanism must be used to learn how many bytes were transferred (such as
+ /// looking at the filed in the IOCP status message).
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the `buf` and
+ /// `overlapped` pointers to be valid until the end of the I/O operation.
+ /// The kernel also requires that `overlapped` is unique for this I/O
+ /// operation and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that the pointers are
+ /// valid until the I/O operation is completed, typically via completion
+ /// ports and waiting to receive the completion notification on the port.
+ pub unsafe fn read_overlapped(
+ &self,
+ buf: &mut [u8],
+ overlapped: *mut OVERLAPPED,
+ ) -> io::Result<Option<usize>> {
+ let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32;
+ let res = ReadFile(
+ self.handle.raw(),
+ buf.as_mut_ptr() as *mut _,
+ len,
+ std::ptr::null_mut(),
+ overlapped,
+ );
+ if res == 0 {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() != Some(ERROR_IO_PENDING as i32) {
+ return Err(err);
+ }
+ }
+
+ let mut bytes = 0;
+ let res = GetOverlappedResult(self.handle.raw(), overlapped, &mut bytes, 0);
+ if res == 0 {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) {
+ Ok(None)
+ } else {
+ Err(err)
+ }
+ } else {
+ Ok(Some(bytes as usize))
+ }
+ }
+
+ /// Issues an overlapped write operation to occur on this pipe.
+ ///
+ /// This function will issue an asynchronous write to occur in an overlapped
+ /// fashion, returning immediately. The `buf` provided will be filled in
+ /// with data and the request is tracked by the `overlapped` function
+ /// provided.
+ ///
+ /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
+ /// `n` is the number of bytes written. If an asynchronous operation is
+ /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
+ /// it is returned.
+ ///
+ /// When this operation completes (or if it completes immediately), another
+ /// mechanism must be used to learn how many bytes were transferred (such as
+ /// looking at the filed in the IOCP status message).
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the `buf` and
+ /// `overlapped` pointers to be valid until the end of the I/O operation.
+ /// The kernel also requires that `overlapped` is unique for this I/O
+ /// operation and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that the pointers are
+ /// valid until the I/O operation is completed, typically via completion
+ /// ports and waiting to receive the completion notification on the port.
+ pub unsafe fn write_overlapped(
+ &self,
+ buf: &[u8],
+ overlapped: *mut OVERLAPPED,
+ ) -> io::Result<Option<usize>> {
+ let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32;
+ let res = WriteFile(
+ self.handle.raw(),
+ buf.as_ptr() as *const _,
+ len,
+ std::ptr::null_mut(),
+ overlapped,
+ );
+ if res == 0 {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() != Some(ERROR_IO_PENDING as i32) {
+ return Err(err);
+ }
+ }
+
+ let mut bytes = 0;
+ let res = GetOverlappedResult(self.handle.raw(), overlapped, &mut bytes, 0);
+ if res == 0 {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) {
+ Ok(None)
+ } else {
+ Err(err)
+ }
+ } else {
+ Ok(Some(bytes as usize))
+ }
+ }
+
+ /// Calls the `GetOverlappedResult` function to get the result of an
+ /// overlapped operation for this handle.
+ ///
+ /// This function takes the `OVERLAPPED` argument which must have been used
+ /// to initiate an overlapped I/O operation, and returns either the
+ /// successful number of bytes transferred during the operation or an error
+ /// if one occurred.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe as `overlapped` must have previously been used
+ /// to execute an operation for this handle, and it must also be a valid
+ /// pointer to an `Overlapped` instance.
+ #[inline]
+ unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize> {
+ let mut transferred = 0;
+ let r = GetOverlappedResult(self.handle.raw(), overlapped, &mut transferred, 0);
+ if r == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(transferred as usize)
+ }
+ }
+}
+
+#[test]
+fn ptr_from() {
+ use std::mem::ManuallyDrop;
+ use std::ptr;
+
+ let pipe = unsafe { ManuallyDrop::new(NamedPipe::from_raw_handle(ptr::null_mut())) };
+ let inner: &Inner = &pipe.inner;
+ assert_eq!(
+ inner as *const Inner,
+ unsafe { Inner::ptr_from_conn_overlapped(&inner.connect as *const _ as *mut OVERLAPPED) },
+ "`ptr_from_conn_overlapped` incorrect"
+ );
+ assert_eq!(
+ inner as *const Inner,
+ unsafe { Inner::ptr_from_read_overlapped(&inner.read as *const _ as *mut OVERLAPPED) },
+ "`ptr_from_read_overlapped` incorrect"
+ );
+ assert_eq!(
+ inner as *const Inner,
+ unsafe { Inner::ptr_from_write_overlapped(&inner.write as *const _ as *mut OVERLAPPED) },
+ "`ptr_from_write_overlapped` incorrect"
+ );
+}
+
struct Io {
// Uniquely identifies the selector associated with this named pipe
cp: Option<Arc<CompletionPort>>,
// Token used to identify events
token: Option<Token>,
read: State,
- read_interest: bool,
write: State,
- write_interest: bool,
connect_error: Option<io::Error>,
}
@@ -129,10 +355,30 @@ impl NamedPipe {
/// Creates a new named pipe at the specified `addr` given a "reasonable
/// set" of initial configuration options.
pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
- let pipe = pipe::NamedPipe::new(addr)?;
- // Safety: nothing actually unsafe about this. The trait fn includes
- // `unsafe`.
- Ok(unsafe { NamedPipe::from_raw_handle(pipe.into_raw_handle()) })
+ use std::os::windows::ffi::OsStrExt;
+ let name: Vec<_> = addr.as_ref().encode_wide().chain(Some(0)).collect();
+
+ // Safety: syscall
+ let h = unsafe {
+ CreateNamedPipeW(
+ name.as_ptr(),
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE,
+ PIPE_UNLIMITED_INSTANCES,
+ 65536,
+ 65536,
+ 0,
+ std::ptr::null_mut(),
+ )
+ };
+
+ if h == INVALID_HANDLE_VALUE {
+ Err(io::Error::last_os_error())
+ } else {
+ // Safety: nothing actually unsafe about this. The trait fn includes
+ // `unsafe`.
+ Ok(unsafe { Self::from_raw_handle(h as RawHandle) })
+ }
}
/// Attempts to call `ConnectNamedPipe`, if possible.
@@ -167,7 +413,7 @@ impl NamedPipe {
// internal state accordingly.
let res = unsafe {
let overlapped = self.inner.connect.as_ptr() as *mut _;
- self.inner.handle.connect_overlapped(overlapped)
+ self.inner.connect_overlapped(overlapped)
};
match res {
@@ -219,7 +465,7 @@ impl NamedPipe {
/// After a `disconnect` is issued, then a `connect` may be called again to
/// connect to another client.
pub fn disconnect(&self) -> io::Result<()> {
- self.inner.handle.disconnect()
+ self.inner.disconnect()
}
}
@@ -227,10 +473,7 @@ impl FromRawHandle for NamedPipe {
unsafe fn from_raw_handle(handle: RawHandle) -> NamedPipe {
NamedPipe {
inner: Arc::new(Inner {
- // Safety: not really unsafe
- handle: pipe::NamedPipe::from_raw_handle(handle),
- // transmutes to straddle winapi versions (mio 0.6 is on an
- // older winapi)
+ handle: Handle::new(handle as HANDLE),
connect: Overlapped::new(connect_done),
connecting: AtomicBool::new(false),
read: Overlapped::new(read_done),
@@ -239,9 +482,7 @@ impl FromRawHandle for NamedPipe {
cp: None,
token: None,
read: State::None,
- read_interest: false,
write: State::None,
- write_interest: false,
connect_error: None,
}),
pool: Mutex::new(BufferPool::with_capacity(2)),
@@ -356,12 +597,7 @@ impl<'a> Write for &'a NamedPipe {
}
impl Source for NamedPipe {
- fn register(
- &mut self,
- registry: &Registry,
- token: Token,
- interest: Interest,
- ) -> io::Result<()> {
+ fn register(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()> {
let mut io = self.inner.io.lock().unwrap();
io.check_association(registry, false)?;
@@ -374,18 +610,15 @@ impl Source for NamedPipe {
}
if io.cp.is_none() {
- io.cp = Some(poll::selector(registry).clone_port());
+ let selector = registry.selector();
+
+ io.cp = Some(selector.clone_port());
let inner_token = NEXT_TOKEN.fetch_add(2, Relaxed) + 2;
- poll::selector(registry)
- .inner
- .cp
- .add_handle(inner_token, &self.inner.handle)?;
+ selector.inner.cp.add_handle(inner_token, self)?;
}
io.token = Some(token);
- io.read_interest = interest.is_readable();
- io.write_interest = interest.is_writable();
drop(io);
Inner::post_register(&self.inner, None);
@@ -393,19 +626,12 @@ impl Source for NamedPipe {
Ok(())
}
- fn reregister(
- &mut self,
- registry: &Registry,
- token: Token,
- interest: Interest,
- ) -> io::Result<()> {
+ fn reregister(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()> {
let mut io = self.inner.io.lock().unwrap();
io.check_association(registry, true)?;
io.token = Some(token);
- io.read_interest = interest.is_readable();
- io.write_interest = interest.is_writable();
drop(io);
Inner::post_register(&self.inner, None);
@@ -432,7 +658,7 @@ impl Source for NamedPipe {
impl AsRawHandle for NamedPipe {
fn as_raw_handle(&self) -> RawHandle {
- self.inner.handle.as_raw_handle()
+ self.inner.handle.raw() as RawHandle
}
}
@@ -452,12 +678,8 @@ impl Drop for NamedPipe {
}
let io = self.inner.io.lock().unwrap();
-
- match io.read {
- State::Pending(..) => {
- drop(cancel(&self.inner.handle, &self.inner.read));
- }
- _ => {}
+ if let State::Pending(..) = io.read {
+ drop(cancel(&self.inner.handle, &self.inner.read));
}
}
}
@@ -483,7 +705,7 @@ impl Inner {
let e = unsafe {
let overlapped = me.read.as_ptr() as *mut _;
let slice = slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity());
- me.handle.read_overlapped(slice, overlapped)
+ me.read_overlapped(slice, overlapped)
};
match e {
@@ -522,7 +744,7 @@ impl Inner {
// Very similar to `schedule_read` above, just done for the write half.
let e = unsafe {
let overlapped = me.write.as_ptr() as *mut _;
- me.handle.write_overlapped(&buf[pos..], overlapped)
+ me.write_overlapped(&buf[pos..], overlapped)
};
// See `connect` above for the rationale behind `forget`
@@ -572,7 +794,8 @@ impl Inner {
fn post_register(me: &Arc<Inner>, mut events: Option<&mut Vec<Event>>) {
let mut io = me.io.lock().unwrap();
- if Inner::schedule_read(&me, &mut io, events.as_mut().map(|ptr| &mut **ptr)) {
+ #[allow(clippy::needless_option_as_deref)]
+ if Inner::schedule_read(me, &mut io, events.as_deref_mut()) {
if let State::None = io.write {
io.notify_writable(events);
}
@@ -588,8 +811,8 @@ impl Inner {
}
}
-unsafe fn cancel<T: AsRawHandle>(handle: &T, overlapped: &Overlapped) -> io::Result<()> {
- let ret = CancelIoEx(handle.as_raw_handle(), overlapped.as_ptr() as *mut _);
+unsafe fn cancel(handle: &Handle, overlapped: &Overlapped) -> io::Result<()> {
+ let ret = CancelIoEx(handle.raw(), overlapped.as_ptr());
// `CancelIoEx` returns 0 on error:
// https://docs.microsoft.com/en-us/windows/win32/fileio/cancelioex-func
if ret == 0 {
@@ -605,7 +828,7 @@ fn connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// Acquire the `Arc<Inner>`. Note that we should be guaranteed that
// the refcount is available to us due to the `mem::forget` in
// `connect` above.
- let me = unsafe { overlapped2arc!(status.overlapped(), Inner, connect) };
+ let me = unsafe { Arc::from_raw(Inner::ptr_from_conn_overlapped(status.overlapped())) };
// Flag ourselves as no longer using the `connect` overlapped instances.
let prev = me.connecting.swap(false, SeqCst);
@@ -614,7 +837,7 @@ fn connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// Stash away our connect error if one happened
debug_assert_eq!(status.bytes_transferred(), 0);
unsafe {
- match me.handle.result(status.overlapped()) {
+ match me.result(status.overlapped()) {
Ok(n) => debug_assert_eq!(n, 0),
Err(e) => me.io.lock().unwrap().connect_error = Some(e),
}
@@ -631,7 +854,7 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// Acquire the `FromRawArc<Inner>`. Note that we should be guaranteed that
// the refcount is available to us due to the `mem::forget` in
// `schedule_read` above.
- let me = unsafe { overlapped2arc!(status.overlapped(), Inner, read) };
+ let me = unsafe { Arc::from_raw(Inner::ptr_from_read_overlapped(status.overlapped())) };
// Move from the `Pending` to `Ok` state.
let mut io = me.io.lock().unwrap();
@@ -640,7 +863,7 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
_ => unreachable!(),
};
unsafe {
- match me.handle.result(status.overlapped()) {
+ match me.result(status.overlapped()) {
Ok(n) => {
debug_assert_eq!(status.bytes_transferred() as usize, n);
buf.set_len(status.bytes_transferred() as usize);
@@ -663,7 +886,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// Acquire the `Arc<Inner>`. Note that we should be guaranteed that
// the refcount is available to us due to the `mem::forget` in
// `schedule_write` above.
- let me = unsafe { overlapped2arc!(status.overlapped(), Inner, write) };
+ let me = unsafe { Arc::from_raw(Inner::ptr_from_write_overlapped(status.overlapped())) };
// Make the state change out of `Pending`. If we wrote the entire buffer
// then we're writable again and otherwise we schedule another write.
@@ -680,7 +903,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
};
unsafe {
- match me.handle.result(status.overlapped()) {
+ match me.result(status.overlapped()) {
Ok(n) => {
debug_assert_eq!(status.bytes_transferred() as usize, n);
let new_pos = pos + (status.bytes_transferred() as usize);
@@ -703,7 +926,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
impl Io {
fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> {
match self.cp {
- Some(ref cp) if !poll::selector(registry).same_port(cp) => Err(io::Error::new(
+ Some(ref cp) if !registry.selector().same_port(cp) => Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"I/O source already registered with a different `Registry`",
)),
diff --git a/src/sys/windows/net.rs b/src/sys/windows/net.rs
index 2de98fa..102ba79 100644
--- a/src/sys/windows/net.rs
+++ b/src/sys/windows/net.rs
@@ -3,12 +3,10 @@ use std::mem;
use std::net::SocketAddr;
use std::sync::Once;
-use winapi::ctypes::c_int;
-use winapi::shared::inaddr::{in_addr_S_un, IN_ADDR};
-use winapi::shared::in6addr::{in6_addr_u, IN6_ADDR};
-use winapi::shared::ws2def::{AF_INET, AF_INET6, ADDRESS_FAMILY, SOCKADDR, SOCKADDR_IN};
-use winapi::shared::ws2ipdef::{SOCKADDR_IN6_LH, SOCKADDR_IN6_LH_u};
-use winapi::um::winsock2::{ioctlsocket, socket, FIONBIO, INVALID_SOCKET, SOCKET};
+use windows_sys::Win32::Networking::WinSock::{
+ ioctlsocket, socket, AF_INET, AF_INET6, FIONBIO, IN6_ADDR, IN6_ADDR_0, INVALID_SOCKET, IN_ADDR,
+ IN_ADDR_0, SOCKADDR, SOCKADDR_IN, SOCKADDR_IN6, SOCKADDR_IN6_0, SOCKET,
+};
/// Initialise the network stack for Windows.
pub(crate) fn init() {
@@ -22,20 +20,18 @@ pub(crate) fn init() {
}
/// Create a new non-blocking socket.
-pub(crate) fn new_ip_socket(addr: SocketAddr, socket_type: c_int) -> io::Result<SOCKET> {
- use winapi::um::winsock2::{PF_INET, PF_INET6};
-
+pub(crate) fn new_ip_socket(addr: SocketAddr, socket_type: u16) -> io::Result<SOCKET> {
let domain = match addr {
- SocketAddr::V4(..) => PF_INET,
- SocketAddr::V6(..) => PF_INET6,
+ SocketAddr::V4(..) => AF_INET,
+ SocketAddr::V6(..) => AF_INET6,
};
new_socket(domain, socket_type)
}
-pub(crate) fn new_socket(domain: c_int, socket_type: c_int) -> io::Result<SOCKET> {
+pub(crate) fn new_socket(domain: u32, socket_type: u16) -> io::Result<SOCKET> {
syscall!(
- socket(domain, socket_type, 0),
+ socket(domain as i32, socket_type as i32, 0),
PartialEq::eq,
INVALID_SOCKET
)
@@ -51,7 +47,7 @@ pub(crate) fn new_socket(domain: c_int, socket_type: c_int) -> io::Result<SOCKET
#[repr(C)]
pub(crate) union SocketAddrCRepr {
v4: SOCKADDR_IN,
- v6: SOCKADDR_IN6_LH,
+ v6: SOCKADDR_IN6,
}
impl SocketAddrCRepr {
@@ -60,49 +56,49 @@ impl SocketAddrCRepr {
}
}
-pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, c_int) {
+pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, i32) {
match addr {
SocketAddr::V4(ref addr) => {
// `s_addr` is stored as BE on all machine and the array is in BE order.
// So the native endian conversion method is used so that it's never swapped.
let sin_addr = unsafe {
- let mut s_un = mem::zeroed::<in_addr_S_un>();
- *s_un.S_addr_mut() = u32::from_ne_bytes(addr.ip().octets());
+ let mut s_un = mem::zeroed::<IN_ADDR_0>();
+ s_un.S_addr = u32::from_ne_bytes(addr.ip().octets());
IN_ADDR { S_un: s_un }
};
let sockaddr_in = SOCKADDR_IN {
- sin_family: AF_INET as ADDRESS_FAMILY,
+ sin_family: AF_INET as u16, // 1
sin_port: addr.port().to_be(),
sin_addr,
sin_zero: [0; 8],
};
let sockaddr = SocketAddrCRepr { v4: sockaddr_in };
- (sockaddr, mem::size_of::<SOCKADDR_IN>() as c_int)
- },
+ (sockaddr, mem::size_of::<SOCKADDR_IN>() as i32)
+ }
SocketAddr::V6(ref addr) => {
let sin6_addr = unsafe {
- let mut u = mem::zeroed::<in6_addr_u>();
- *u.Byte_mut() = addr.ip().octets();
+ let mut u = mem::zeroed::<IN6_ADDR_0>();
+ u.Byte = addr.ip().octets();
IN6_ADDR { u }
};
let u = unsafe {
- let mut u = mem::zeroed::<SOCKADDR_IN6_LH_u>();
- *u.sin6_scope_id_mut() = addr.scope_id();
+ let mut u = mem::zeroed::<SOCKADDR_IN6_0>();
+ u.sin6_scope_id = addr.scope_id();
u
};
- let sockaddr_in6 = SOCKADDR_IN6_LH {
- sin6_family: AF_INET6 as ADDRESS_FAMILY,
+ let sockaddr_in6 = SOCKADDR_IN6 {
+ sin6_family: AF_INET6 as u16, // 23
sin6_port: addr.port().to_be(),
sin6_addr,
sin6_flowinfo: addr.flowinfo(),
- u,
+ Anonymous: u,
};
let sockaddr = SocketAddrCRepr { v6: sockaddr_in6 };
- (sockaddr, mem::size_of::<SOCKADDR_IN6_LH>() as c_int)
+ (sockaddr, mem::size_of::<SOCKADDR_IN6>() as i32)
}
}
}
diff --git a/src/sys/windows/overlapped.rs b/src/sys/windows/overlapped.rs
index 837b78b..d1456de 100644
--- a/src/sys/windows/overlapped.rs
+++ b/src/sys/windows/overlapped.rs
@@ -3,13 +3,11 @@ use crate::sys::windows::Event;
use std::cell::UnsafeCell;
use std::fmt;
-#[cfg(feature = "os-ext")]
-use winapi::um::minwinbase::OVERLAPPED;
-use winapi::um::minwinbase::OVERLAPPED_ENTRY;
+use windows_sys::Win32::System::IO::{OVERLAPPED, OVERLAPPED_ENTRY};
#[repr(C)]
pub(crate) struct Overlapped {
- inner: UnsafeCell<miow::Overlapped>,
+ inner: UnsafeCell<OVERLAPPED>,
pub(crate) callback: fn(&OVERLAPPED_ENTRY, Option<&mut Vec<Event>>),
}
@@ -17,13 +15,13 @@ pub(crate) struct Overlapped {
impl Overlapped {
pub(crate) fn new(cb: fn(&OVERLAPPED_ENTRY, Option<&mut Vec<Event>>)) -> Overlapped {
Overlapped {
- inner: UnsafeCell::new(miow::Overlapped::zero()),
+ inner: UnsafeCell::new(unsafe { std::mem::zeroed() }),
callback: cb,
}
}
pub(crate) fn as_ptr(&self) -> *const OVERLAPPED {
- unsafe { (*self.inner.get()).raw() }
+ self.inner.get()
}
}
diff --git a/src/sys/windows/selector.rs b/src/sys/windows/selector.rs
index 572a9a9..9f3cf68 100644
--- a/src/sys/windows/selector.rs
+++ b/src/sys/windows/selector.rs
@@ -10,8 +10,9 @@ cfg_net! {
use crate::Interest;
}
-use miow::iocp::{CompletionPort, CompletionStatus};
+use super::iocp::{CompletionPort, CompletionStatus};
use std::collections::VecDeque;
+use std::ffi::c_void;
use std::io;
use std::marker::PhantomPinned;
use std::os::windows::io::RawSocket;
@@ -21,14 +22,15 @@ use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
-use winapi::shared::ntdef::NT_SUCCESS;
-use winapi::shared::ntdef::{HANDLE, PVOID};
-use winapi::shared::ntstatus::STATUS_CANCELLED;
-use winapi::shared::winerror::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING, WAIT_TIMEOUT};
-use winapi::um::minwinbase::OVERLAPPED;
+
+use windows_sys::Win32::Foundation::{
+ ERROR_INVALID_HANDLE, ERROR_IO_PENDING, HANDLE, STATUS_CANCELLED, WAIT_TIMEOUT,
+};
+use windows_sys::Win32::System::IO::OVERLAPPED;
#[derive(Debug)]
struct AfdGroup {
+ #[cfg_attr(not(feature = "net"), allow(dead_code))]
cp: Arc<CompletionPort>,
afd_group: Mutex<Vec<Arc<Afd>>>,
}
@@ -43,7 +45,7 @@ impl AfdGroup {
pub fn release_unused_afd(&self) {
let mut afd_group = self.afd_group.lock().unwrap();
- afd_group.retain(|g| Arc::strong_count(&g) > 1);
+ afd_group.retain(|g| Arc::strong_count(g) > 1);
}
}
@@ -57,7 +59,7 @@ cfg_io_source! {
self._alloc_afd_group(&mut afd_group)?;
} else {
// + 1 reference in Vec
- if Arc::strong_count(afd_group.last().unwrap()) >= POLL_GROUP__MAX_GROUP_SIZE + 1 {
+ if Arc::strong_count(afd_group.last().unwrap()) > POLL_GROUP__MAX_GROUP_SIZE {
self._alloc_afd_group(&mut afd_group)?;
}
}
@@ -93,7 +95,6 @@ pub struct SockState {
poll_info: AfdPollInfo,
afd: Arc<Afd>,
- raw_socket: RawSocket,
base_socket: RawSocket,
user_evts: u32,
@@ -107,7 +108,7 @@ pub struct SockState {
// last raw os error
error: Option<i32>,
- pinned: PhantomPinned,
+ _pinned: PhantomPinned,
}
impl SockState {
@@ -141,7 +142,7 @@ impl SockState {
/* No poll operation is pending; start one. */
self.poll_info.exclusive = 0;
self.poll_info.number_of_handles = 1;
- *unsafe { self.poll_info.timeout.QuadPart_mut() } = std::i64::MAX;
+ self.poll_info.timeout = i64::MAX;
self.poll_info.handles[0].handle = self.base_socket as HANDLE;
self.poll_info.handles[0].status = 0;
self.poll_info.handles[0].events = self.user_evts | afd::POLL_LOCAL_CLOSE;
@@ -204,9 +205,9 @@ impl SockState {
unsafe {
if self.delete_pending {
return None;
- } else if self.iosb.u.Status == STATUS_CANCELLED {
+ } else if self.iosb.Anonymous.Status == STATUS_CANCELLED {
/* The poll request was cancelled by CancelIoEx. */
- } else if !NT_SUCCESS(self.iosb.u.Status) {
+ } else if self.iosb.Anonymous.Status < 0 {
/* The overlapped request itself failed in an unexpected way. */
afd_events = afd::POLL_CONNECT_FAIL;
} else if self.poll_info.number_of_handles < 1 {
@@ -263,7 +264,6 @@ cfg_io_source! {
iosb: IoStatusBlock::zeroed(),
poll_info: AfdPollInfo::zeroed(),
afd,
- raw_socket,
base_socket: get_base_socket(raw_socket)?,
user_evts: 0,
pending_evts: 0,
@@ -271,7 +271,7 @@ cfg_io_source! {
poll_status: SockPollStatus::Idle,
delete_pending: false,
error: None,
- pinned: PhantomPinned,
+ _pinned: PhantomPinned,
})
}
@@ -296,7 +296,7 @@ impl Drop for SockState {
/// Converts the pointer to a `SockState` into a raw pointer.
/// To revert see `from_overlapped`.
-fn into_overlapped(sock_state: Pin<Arc<Mutex<SockState>>>) -> PVOID {
+fn into_overlapped(sock_state: Pin<Arc<Mutex<SockState>>>) -> *mut c_void {
let overlapped_ptr: *const Mutex<SockState> =
unsafe { Arc::into_raw(Pin::into_inner_unchecked(sock_state)) };
overlapped_ptr as *mut _
@@ -448,11 +448,11 @@ impl SelectorInner {
if len == 0 {
continue;
}
- return Ok(());
+ break Ok(());
}
} else {
self.select2(&mut events.statuses, &mut events.events, timeout)?;
- return Ok(());
+ Ok(())
}
}
@@ -462,7 +462,7 @@ impl SelectorInner {
events: &mut Vec<Event>,
timeout: Option<Duration>,
) -> io::Result<usize> {
- assert_eq!(self.is_polling.swap(true, Ordering::AcqRel), false);
+ assert!(!self.is_polling.swap(true, Ordering::AcqRel));
unsafe { self.update_sockets_events() }?;
@@ -482,7 +482,7 @@ impl SelectorInner {
for sock in update_queue.iter_mut() {
let mut sock_internal = sock.lock().unwrap();
if !sock_internal.is_pending_deletion() {
- sock_internal.update(&sock)?;
+ sock_internal.update(sock)?;
}
}
@@ -518,12 +518,9 @@ impl SelectorInner {
let sock_state = from_overlapped(iocp_event.overlapped());
let mut sock_guard = sock_state.lock().unwrap();
- match sock_guard.feed_event() {
- Some(e) => {
- events.push(e);
- n += 1;
- }
- None => {}
+ if let Some(e) = sock_guard.feed_event() {
+ events.push(e);
+ n += 1;
}
if !sock_guard.is_pending_deletion() {
@@ -538,9 +535,12 @@ impl SelectorInner {
cfg_io_source! {
use std::mem::size_of;
use std::ptr::null_mut;
- use winapi::um::mswsock;
- use winapi::um::winsock2::WSAGetLastError;
- use winapi::um::winsock2::{WSAIoctl, SOCKET_ERROR};
+
+ use windows_sys::Win32::Networking::WinSock::{
+ WSAGetLastError, WSAIoctl, SIO_BASE_HANDLE, SIO_BSP_HANDLE,
+ SIO_BSP_HANDLE_POLL, SIO_BSP_HANDLE_SELECT, SOCKET_ERROR,
+ };
+
impl SelectorInner {
fn register(
@@ -644,7 +644,7 @@ cfg_io_source! {
ioctl,
null_mut(),
0,
- &mut base_socket as *mut _ as PVOID,
+ &mut base_socket as *mut _ as *mut c_void,
size_of::<RawSocket>() as u32,
&mut bytes,
null_mut(),
@@ -659,7 +659,7 @@ cfg_io_source! {
}
fn get_base_socket(raw_socket: RawSocket) -> io::Result<RawSocket> {
- let res = try_get_base_socket(raw_socket, mswsock::SIO_BASE_HANDLE);
+ let res = try_get_base_socket(raw_socket, SIO_BASE_HANDLE);
if let Ok(base_socket) = res {
return Ok(base_socket);
}
@@ -670,9 +670,9 @@ cfg_io_source! {
// However, at least one known LSP deliberately breaks it, so we try
// some alternative IOCTLs, starting with the most appropriate one.
for &ioctl in &[
- mswsock::SIO_BSP_HANDLE_SELECT,
- mswsock::SIO_BSP_HANDLE_POLL,
- mswsock::SIO_BSP_HANDLE,
+ SIO_BSP_HANDLE_SELECT,
+ SIO_BSP_HANDLE_POLL,
+ SIO_BSP_HANDLE,
] {
if let Ok(base_socket) = try_get_base_socket(raw_socket, ioctl) {
// Since we know now that we're dealing with an LSP (otherwise
diff --git a/src/sys/windows/tcp.rs b/src/sys/windows/tcp.rs
index 6757b44..533074b 100644
--- a/src/sys/windows/tcp.rs
+++ b/src/sys/windows/tcp.rs
@@ -1,321 +1,69 @@
use std::io;
-use std::convert::TryInto;
-use std::mem::size_of;
-use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
-use std::time::Duration;
-use std::ptr;
-use std::os::windows::io::FromRawSocket;
-use std::os::windows::raw::SOCKET as StdSocket; // winapi uses usize, stdlib uses u32/u64.
+use std::net::{self, SocketAddr};
+use std::os::windows::io::AsRawSocket;
-use winapi::ctypes::{c_char, c_int, c_ushort, c_ulong};
-use winapi::shared::ws2def::{SOCKADDR_STORAGE, AF_INET, AF_INET6, SOCKADDR_IN};
-use winapi::shared::ws2ipdef::SOCKADDR_IN6_LH;
-use winapi::shared::mstcpip;
-
-use winapi::shared::minwindef::{BOOL, TRUE, FALSE, DWORD, LPVOID, LPDWORD};
-use winapi::um::winsock2::{
- self, closesocket, linger, setsockopt, getsockopt, getsockname, PF_INET, PF_INET6, SOCKET, SOCKET_ERROR,
- SOCK_STREAM, SOL_SOCKET, SO_LINGER, SO_REUSEADDR, SO_RCVBUF, SO_SNDBUF, SO_KEEPALIVE, WSAIoctl, LPWSAOVERLAPPED,
+use windows_sys::Win32::Networking::WinSock::{
+ self, AF_INET, AF_INET6, SOCKET, SOCKET_ERROR, SOCK_STREAM,
};
use crate::sys::windows::net::{init, new_socket, socket_addr};
-use crate::net::TcpKeepalive;
-
-pub(crate) type TcpSocket = SOCKET;
-
-pub(crate) fn new_v4_socket() -> io::Result<TcpSocket> {
- init();
- new_socket(PF_INET, SOCK_STREAM)
-}
-pub(crate) fn new_v6_socket() -> io::Result<TcpSocket> {
+pub(crate) fn new_for_addr(address: SocketAddr) -> io::Result<SOCKET> {
init();
- new_socket(PF_INET6, SOCK_STREAM)
+ let domain = match address {
+ SocketAddr::V4(_) => AF_INET,
+ SocketAddr::V6(_) => AF_INET6,
+ };
+ new_socket(domain, SOCK_STREAM)
}
-pub(crate) fn bind(socket: TcpSocket, addr: SocketAddr) -> io::Result<()> {
- use winsock2::bind;
+pub(crate) fn bind(socket: &net::TcpListener, addr: SocketAddr) -> io::Result<()> {
+ use WinSock::bind;
let (raw_addr, raw_addr_length) = socket_addr(&addr);
syscall!(
- bind(socket, raw_addr.as_ptr(), raw_addr_length),
+ bind(
+ socket.as_raw_socket() as _,
+ raw_addr.as_ptr(),
+ raw_addr_length
+ ),
PartialEq::eq,
SOCKET_ERROR
)?;
Ok(())
}
-pub(crate) fn connect(socket: TcpSocket, addr: SocketAddr) -> io::Result<net::TcpStream> {
- use winsock2::connect;
+pub(crate) fn connect(socket: &net::TcpStream, addr: SocketAddr) -> io::Result<()> {
+ use WinSock::connect;
let (raw_addr, raw_addr_length) = socket_addr(&addr);
-
let res = syscall!(
- connect(socket, raw_addr.as_ptr(), raw_addr_length),
+ connect(
+ socket.as_raw_socket() as _,
+ raw_addr.as_ptr(),
+ raw_addr_length
+ ),
PartialEq::eq,
SOCKET_ERROR
);
match res {
- Err(err) if err.kind() != io::ErrorKind::WouldBlock => {
- Err(err)
- }
- _ => {
- Ok(unsafe { net::TcpStream::from_raw_socket(socket as StdSocket) })
- }
+ Err(err) if err.kind() != io::ErrorKind::WouldBlock => Err(err),
+ _ => Ok(()),
}
}
-pub(crate) fn listen(socket: TcpSocket, backlog: u32) -> io::Result<net::TcpListener> {
- use winsock2::listen;
+pub(crate) fn listen(socket: &net::TcpListener, backlog: u32) -> io::Result<()> {
use std::convert::TryInto;
+ use WinSock::listen;
let backlog = backlog.try_into().unwrap_or(i32::max_value());
- syscall!(listen(socket, backlog), PartialEq::eq, SOCKET_ERROR)?;
- Ok(unsafe { net::TcpListener::from_raw_socket(socket as StdSocket) })
-}
-
-pub(crate) fn close(socket: TcpSocket) {
- let _ = unsafe { closesocket(socket) };
-}
-
-pub(crate) fn set_reuseaddr(socket: TcpSocket, reuseaddr: bool) -> io::Result<()> {
- let val: BOOL = if reuseaddr { TRUE } else { FALSE };
-
- match unsafe { setsockopt(
- socket,
- SOL_SOCKET,
- SO_REUSEADDR,
- &val as *const _ as *const c_char,
- size_of::<BOOL>() as c_int,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(()),
- }
-}
-
-pub(crate) fn get_reuseaddr(socket: TcpSocket) -> io::Result<bool> {
- let mut optval: c_char = 0;
- let mut optlen = size_of::<BOOL>() as c_int;
-
- match unsafe { getsockopt(
- socket,
- SOL_SOCKET,
- SO_REUSEADDR,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(optval != 0),
- }
-}
-
-pub(crate) fn get_localaddr(socket: TcpSocket) -> io::Result<SocketAddr> {
- let mut storage: SOCKADDR_STORAGE = unsafe { std::mem::zeroed() };
- let mut length = std::mem::size_of_val(&storage) as c_int;
-
- match unsafe { getsockname(
- socket,
- &mut storage as *mut _ as *mut _,
- &mut length
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => {
- if storage.ss_family as c_int == AF_INET {
- // Safety: if the ss_family field is AF_INET then storage must be a sockaddr_in.
- let addr: &SOCKADDR_IN = unsafe { &*(&storage as *const _ as *const SOCKADDR_IN) };
- let ip_bytes = unsafe { addr.sin_addr.S_un.S_un_b() };
- let ip = Ipv4Addr::from([ip_bytes.s_b1, ip_bytes.s_b2, ip_bytes.s_b3, ip_bytes.s_b4]);
- let port = u16::from_be(addr.sin_port);
- Ok(SocketAddr::V4(SocketAddrV4::new(ip, port)))
- } else if storage.ss_family as c_int == AF_INET6 {
- // Safety: if the ss_family field is AF_INET6 then storage must be a sockaddr_in6.
- let addr: &SOCKADDR_IN6_LH = unsafe { &*(&storage as *const _ as *const SOCKADDR_IN6_LH) };
- let ip = Ipv6Addr::from(*unsafe { addr.sin6_addr.u.Byte() });
- let port = u16::from_be(addr.sin6_port);
- let scope_id = unsafe { *addr.u.sin6_scope_id() };
- Ok(SocketAddr::V6(SocketAddrV6::new(ip, port, addr.sin6_flowinfo, scope_id)))
- } else {
- Err(std::io::ErrorKind::InvalidInput.into())
- }
- },
- }
-}
-
-pub(crate) fn set_linger(socket: TcpSocket, dur: Option<Duration>) -> io::Result<()> {
- let val: linger = linger {
- l_onoff: if dur.is_some() { 1 } else { 0 },
- l_linger: dur.map(|dur| dur.as_secs() as c_ushort).unwrap_or_default(),
- };
-
- match unsafe { setsockopt(
- socket,
- SOL_SOCKET,
- SO_LINGER,
- &val as *const _ as *const c_char,
- size_of::<linger>() as c_int,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(()),
- }
-}
-
-pub(crate) fn get_linger(socket: TcpSocket) -> io::Result<Option<Duration>> {
- let mut val: linger = unsafe { std::mem::zeroed() };
- let mut len = size_of::<linger>() as c_int;
-
- match unsafe { getsockopt(
- socket,
- SOL_SOCKET,
- SO_LINGER,
- &mut val as *mut _ as *mut _,
- &mut len,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => {
- if val.l_onoff == 0 {
- Ok(None)
- } else {
- Ok(Some(Duration::from_secs(val.l_linger as u64)))
- }
- },
- }
-}
-
-
-pub(crate) fn set_recv_buffer_size(socket: TcpSocket, size: u32) -> io::Result<()> {
- let size = size.try_into().ok().unwrap_or_else(i32::max_value);
- match unsafe { setsockopt(
- socket,
- SOL_SOCKET,
- SO_RCVBUF,
- &size as *const _ as *const c_char,
- size_of::<c_int>() as c_int
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(()),
- }
-}
-
-pub(crate) fn get_recv_buffer_size(socket: TcpSocket) -> io::Result<u32> {
- let mut optval: c_int = 0;
- let mut optlen = size_of::<c_int>() as c_int;
- match unsafe { getsockopt(
- socket,
- SOL_SOCKET,
- SO_RCVBUF,
- &mut optval as *mut _ as *mut _,
- &mut optlen as *mut _,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(optval as u32),
- }
-}
-
-pub(crate) fn set_send_buffer_size(socket: TcpSocket, size: u32) -> io::Result<()> {
- let size = size.try_into().ok().unwrap_or_else(i32::max_value);
- match unsafe { setsockopt(
- socket,
- SOL_SOCKET,
- SO_SNDBUF,
- &size as *const _ as *const c_char,
- size_of::<c_int>() as c_int
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(()),
- }
-}
-
-pub(crate) fn get_send_buffer_size(socket: TcpSocket) -> io::Result<u32> {
- let mut optval: c_int = 0;
- let mut optlen = size_of::<c_int>() as c_int;
- match unsafe { getsockopt(
- socket,
- SOL_SOCKET,
- SO_SNDBUF,
- &mut optval as *mut _ as *mut _,
- &mut optlen as *mut _,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(optval as u32),
- }
-}
-
-pub(crate) fn set_keepalive(socket: TcpSocket, keepalive: bool) -> io::Result<()> {
- let val: BOOL = if keepalive { TRUE } else { FALSE };
- match unsafe { setsockopt(
- socket,
- SOL_SOCKET,
- SO_KEEPALIVE,
- &val as *const _ as *const c_char,
- size_of::<BOOL>() as c_int
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(()),
- }
-}
-
-pub(crate) fn get_keepalive(socket: TcpSocket) -> io::Result<bool> {
- let mut optval: c_char = 0;
- let mut optlen = size_of::<BOOL>() as c_int;
-
- match unsafe { getsockopt(
- socket,
- SOL_SOCKET,
- SO_KEEPALIVE,
- &mut optval as *mut _ as *mut _,
- &mut optlen,
- ) } {
- SOCKET_ERROR => Err(io::Error::last_os_error()),
- _ => Ok(optval != FALSE as c_char),
- }
-}
-
-pub(crate) fn set_keepalive_params(socket: TcpSocket, keepalive: TcpKeepalive) -> io::Result<()> {
- /// Windows configures keepalive time/interval in a u32 of milliseconds.
- fn dur_to_ulong_ms(dur: Duration) -> c_ulong {
- dur.as_millis().try_into().ok().unwrap_or_else(u32::max_value)
- }
-
- // If any of the fields on the `tcp_keepalive` struct were not provided by
- // the user, just leaving them zero will clobber any existing value.
- // Unfortunately, we can't access the current value, so we will use the
- // defaults if a value for the time or interval was not not provided.
- let time = keepalive.time.unwrap_or_else(|| {
- // The default value is two hours, as per
- // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-keepalive-vals
- let two_hours = 2 * 60 * 60;
- Duration::from_secs(two_hours)
- });
-
- let interval = keepalive.interval.unwrap_or_else(|| {
- // The default value is one second, as per
- // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-keepalive-vals
- Duration::from_secs(1)
- });
-
- let mut keepalive = mstcpip::tcp_keepalive {
- // Enable keepalive
- onoff: 1,
- keepalivetime: dur_to_ulong_ms(time),
- keepaliveinterval: dur_to_ulong_ms(interval),
- };
-
- let mut out = 0;
- match unsafe { WSAIoctl(
- socket,
- mstcpip::SIO_KEEPALIVE_VALS,
- &mut keepalive as *mut _ as LPVOID,
- size_of::<mstcpip::tcp_keepalive>() as DWORD,
- ptr::null_mut() as LPVOID,
- 0 as DWORD,
- &mut out as *mut _ as LPDWORD,
- 0 as LPWSAOVERLAPPED,
- None,
- ) } {
- 0 => Ok(()),
- _ => Err(io::Error::last_os_error())
- }
+ syscall!(
+ listen(socket.as_raw_socket() as _, backlog),
+ PartialEq::eq,
+ SOCKET_ERROR
+ )?;
+ Ok(())
}
pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
diff --git a/src/sys/windows/udp.rs b/src/sys/windows/udp.rs
index 825eccc..91516cc 100644
--- a/src/sys/windows/udp.rs
+++ b/src/sys/windows/udp.rs
@@ -2,14 +2,12 @@ use std::io;
use std::mem::{self, MaybeUninit};
use std::net::{self, SocketAddr};
use std::os::windows::io::{AsRawSocket, FromRawSocket};
-use std::os::windows::raw::SOCKET as StdSocket; // winapi uses usize, stdlib uses u32/u64.
-
-use winapi::ctypes::c_int;
-use winapi::shared::ws2def::IPPROTO_IPV6;
-use winapi::shared::ws2ipdef::IPV6_V6ONLY;
-use winapi::um::winsock2::{bind as win_bind, closesocket, getsockopt, SOCKET_ERROR, SOCK_DGRAM};
+use std::os::windows::raw::SOCKET as StdSocket; // windows-sys uses usize, stdlib uses u32/u64.
use crate::sys::windows::net::{init, new_ip_socket, socket_addr};
+use windows_sys::Win32::Networking::WinSock::{
+ bind as win_bind, closesocket, getsockopt, IPPROTO_IPV6, IPV6_V6ONLY, SOCKET_ERROR, SOCK_DGRAM,
+};
pub fn bind(addr: SocketAddr) -> io::Result<net::UdpSocket> {
init();
@@ -31,14 +29,14 @@ pub fn bind(addr: SocketAddr) -> io::Result<net::UdpSocket> {
}
pub(crate) fn only_v6(socket: &net::UdpSocket) -> io::Result<bool> {
- let mut optval: MaybeUninit<c_int> = MaybeUninit::uninit();
- let mut optlen = mem::size_of::<c_int>() as c_int;
+ let mut optval: MaybeUninit<i32> = MaybeUninit::uninit();
+ let mut optlen = mem::size_of::<i32>() as i32;
syscall!(
getsockopt(
socket.as_raw_socket() as usize,
- IPPROTO_IPV6 as c_int,
- IPV6_V6ONLY as c_int,
+ IPPROTO_IPV6 as i32,
+ IPV6_V6ONLY as i32,
optval.as_mut_ptr().cast(),
&mut optlen,
),
@@ -46,7 +44,7 @@ pub(crate) fn only_v6(socket: &net::UdpSocket) -> io::Result<bool> {
SOCKET_ERROR
)?;
- debug_assert_eq!(optlen as usize, mem::size_of::<c_int>());
+ debug_assert_eq!(optlen as usize, mem::size_of::<i32>());
// Safety: `getsockopt` initialised `optval` for us.
let optval = unsafe { optval.assume_init() };
Ok(optval != 0)
diff --git a/src/sys/windows/waker.rs b/src/sys/windows/waker.rs
index ab12c3c..103aa01 100644
--- a/src/sys/windows/waker.rs
+++ b/src/sys/windows/waker.rs
@@ -2,7 +2,7 @@ use crate::sys::windows::Event;
use crate::sys::windows::Selector;
use crate::Token;
-use miow::iocp::CompletionPort;
+use super::iocp::CompletionPort;
use std::io;
use std::sync::Arc;
diff --git a/src/token.rs b/src/token.rs
index d8a1fd1..91601cd 100644
--- a/src/token.rs
+++ b/src/token.rs
@@ -17,8 +17,8 @@
///
/// [`slab`]: https://crates.io/crates/slab
///
-#[cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")]
-#[cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")]
+#[cfg_attr(all(feature = "os-poll", feature = "net"), doc = "```")]
+#[cfg_attr(not(all(feature = "os-poll", feature = "net")), doc = "```ignore")]
/// # use std::error::Error;
/// # fn main() -> Result<(), Box<dyn Error>> {
/// use mio::{Events, Interest, Poll, Token};
diff --git a/src/waker.rs b/src/waker.rs
index bc73029..92fdb4c 100644
--- a/src/waker.rs
+++ b/src/waker.rs
@@ -1,4 +1,4 @@
-use crate::{poll, sys, Registry, Token};
+use crate::{sys, Registry, Token};
use std::io;
@@ -19,7 +19,7 @@ use std::io;
/// Only a single `Waker` can be active per [`Poll`], if multiple threads need
/// access to the `Waker` it can be shared via for example an `Arc`. What
/// happens if multiple `Waker`s are registered with the same `Poll` is
-/// undefined.
+/// unspecified.
///
/// # Implementation notes
///
@@ -28,7 +28,7 @@ use std::io;
/// kqueue. On Linux it uses [eventfd].
///
/// [implementation notes of `Poll`]: struct.Poll.html#implementation-notes
-/// [eventfd]: http://man7.org/linux/man-pages/man2/eventfd.2.html
+/// [eventfd]: https://man7.org/linux/man-pages/man2/eventfd.2.html
///
/// # Examples
///
@@ -84,7 +84,7 @@ impl Waker {
pub fn new(registry: &Registry, token: Token) -> io::Result<Waker> {
#[cfg(debug_assertions)]
registry.register_waker();
- sys::Waker::new(poll::selector(&registry), token).map(|inner| Waker { inner })
+ sys::Waker::new(registry.selector(), token).map(|inner| Waker { inner })
}
/// Wake up the [`Poll`] associated with this `Waker`.