aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHaibo Huang <hhb@google.com>2021-03-05 09:56:07 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2021-03-05 09:56:07 +0000
commitea52fcfdd39d82bca937b5d0869110df501611ba (patch)
tree16d053e70d21e456d52f4a7762ee41441342b7a2
parenta661b1e5eafa6ea0f47736c93d9cfc79cbbc8846 (diff)
parente29ba45f952a1c193d6091f8aead991e88882126 (diff)
downloadtokio-ea52fcfdd39d82bca937b5d0869110df501611ba.tar.gz
Upgrade rust/crates/tokio to 1.2.0 am: e3d8d80d2d am: e29ba45f95
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio/+/1582273 MUST ONLY BE SUBMITTED BY AUTOMERGER Change-Id: I8ae7bf1fce010cd6135c0fbbc64662bba10f7c56
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp10
-rw-r--r--CHANGELOG.md1130
-rw-r--r--Cargo.toml9
-rw-r--r--Cargo.toml.orig7
-rw-r--r--METADATA8
-rw-r--r--src/fs/read_dir.rs13
-rw-r--r--src/io/async_fd.rs8
-rw-r--r--src/io/blocking.rs2
-rw-r--r--src/io/driver/mod.rs5
-rw-r--r--src/io/driver/registration.rs13
-rw-r--r--src/io/driver/scheduled_io.rs6
-rw-r--r--src/io/poll_evented.rs2
-rw-r--r--src/io/read_buf.rs19
-rw-r--r--src/io/split.rs6
-rw-r--r--src/io/util/lines.rs10
-rw-r--r--src/io/util/mod.rs2
-rw-r--r--src/io/util/read_to_end.rs67
-rw-r--r--src/io/util/read_to_string.rs19
-rw-r--r--src/io/util/split.rs6
-rw-r--r--src/io/util/vec_with_initialized.rs132
-rw-r--r--src/lib.rs4
-rw-r--r--src/loom/std/atomic_u64.rs31
-rw-r--r--src/loom/std/mod.rs5
-rw-r--r--src/macros/select.rs2
-rw-r--r--src/net/addr.rs2
-rw-r--r--src/net/tcp/listener.rs11
-rw-r--r--src/net/tcp/stream.rs102
-rw-r--r--src/net/udp.rs154
-rw-r--r--src/net/unix/datagram/socket.rs136
-rw-r--r--src/net/unix/listener.rs4
-rw-r--r--src/net/unix/stream.rs85
-rw-r--r--src/process/mod.rs129
-rw-r--r--src/process/unix/mod.rs53
-rw-r--r--src/process/windows.rs36
-rw-r--r--src/runtime/basic_scheduler.rs2
-rw-r--r--src/runtime/blocking/pool.rs5
-rw-r--r--src/runtime/builder.rs32
-rw-r--r--src/runtime/context.rs18
-rw-r--r--src/runtime/driver.rs10
-rw-r--r--src/runtime/handle.rs5
-rw-r--r--src/runtime/mod.rs4
-rw-r--r--src/runtime/queue.rs24
-rw-r--r--src/runtime/task/core.rs172
-rw-r--r--src/runtime/task/harness.rs493
-rw-r--r--src/runtime/thread_pool/idle.rs2
-rw-r--r--src/runtime/thread_pool/worker.rs2
-rw-r--r--src/signal/unix.rs36
-rw-r--r--src/sync/broadcast.rs59
-rw-r--r--src/sync/mpsc/block.rs18
-rw-r--r--src/sync/mpsc/bounded.rs115
-rw-r--r--src/sync/mpsc/list.rs12
-rw-r--r--src/sync/mpsc/unbounded.rs4
-rw-r--r--src/sync/mutex.rs17
-rw-r--r--src/sync/oneshot.rs120
-rw-r--r--src/sync/rwlock.rs115
-rw-r--r--src/sync/semaphore.rs5
-rw-r--r--src/sync/task/atomic_waker.rs8
-rw-r--r--src/sync/tests/loom_broadcast.rs27
-rw-r--r--src/sync/watch.rs5
-rw-r--r--src/task/local.rs110
-rw-r--r--src/task/spawn.rs3
-rw-r--r--src/time/clock.rs15
-rw-r--r--src/time/driver/entry.rs76
-rw-r--r--src/time/driver/handle.rs5
-rw-r--r--src/time/driver/mod.rs2
-rw-r--r--src/time/driver/sleep.rs109
-rw-r--r--src/time/driver/tests/mod.rs12
-rw-r--r--src/time/driver/wheel/level.rs4
-rw-r--r--src/time/driver/wheel/mod.rs17
-rw-r--r--src/time/interval.rs11
-rw-r--r--src/util/error.rs3
-rw-r--r--src/util/mod.rs13
-rw-r--r--tests/io_driver.rs13
-rw-r--r--tests/io_read_to_end.rs65
-rw-r--r--tests/macros_test.rs8
-rw-r--r--tests/no_rt.rs18
-rw-r--r--tests/rt_basic.rs17
-rw-r--r--tests/rt_common.rs30
-rw-r--r--tests/sync_mpsc.rs23
-rw-r--r--tests/sync_rwlock.rs33
-rw-r--r--tests/task_local_set.rs14
-rw-r--r--tests/tcp_stream.rs78
-rw-r--r--tests/time_pause.rs26
-rw-r--r--tests/udp.rs87
-rw-r--r--tests/uds_datagram.rs92
-rw-r--r--tests/uds_stream.rs82
87 files changed, 3431 insertions, 1075 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 0ccdcd3..9e33e70 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "5d35c907f693e25ba20c3cfb47e0cb1957679019"
+ "sha1": "572a897d43d5e4942f26b7a67bed862d642679e4"
}
}
diff --git a/Android.bp b/Android.bp
index 35ff0fd..727f8fb 100644
--- a/Android.bp
+++ b/Android.bp
@@ -56,15 +56,15 @@ rust_library {
// dependent_library ["feature_list"]
// autocfg-1.0.1
// bytes-1.0.1 "default,std"
-// cfg-if-0.1.10
-// libc-0.2.82 "align,default,extra_traits,std"
-// log-0.4.13
+// cfg-if-1.0.0
+// libc-0.2.86 "align,default,extra_traits,std"
+// log-0.4.14
// memchr-2.3.4 "default,std"
// mio-0.7.7 "default,net,os-ext,os-poll,os-util,tcp,udp,uds"
// num_cpus-1.13.0
// pin-project-lite-0.2.4
// proc-macro2-1.0.24 "default,proc-macro"
// quote-1.0.8 "default,proc-macro"
-// syn-1.0.58 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit,visit-mut"
-// tokio-macros-1.0.0
+// syn-1.0.60 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut"
+// tokio-macros-1.1.0
// unicode-xid-0.2.1 "default"
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a36212d..cc1a305 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,7 +1,91 @@
-# 1.0.2 (January 14, 2020)
+# 1.2.0 (February 5, 2021)
+
+### Added
+
+- signal: make `Signal::poll_recv` method public ([#3383])
+
+### Fixed
+
+- time: make `test-util` paused time fully deterministic ([#3492])
+
+### Documented
+
+- sync: link to new broadcast and watch wrappers ([#3504])
+
+[#3383]: https://github.com/tokio-rs/tokio/pull/3383
+[#3492]: https://github.com/tokio-rs/tokio/pull/3492
+[#3504]: https://github.com/tokio-rs/tokio/pull/3504
+
+# 1.1.1 (January 29, 2021)
+
+Forward ports 1.0.3 fix.
+
+### Fixed
+- io: memory leak during shutdown ([#3477]).
+
+# 1.1.0 (January 22, 2021)
+
+### Added
+
+- net: add `try_read_buf` and `try_recv_buf` ([#3351])
+- mpsc: Add `Sender::try_reserve` function ([#3418])
+- sync: add `RwLock` `try_read` and `try_write` methods ([#3400])
+- io: add `ReadBuf::inner_mut` ([#3443])
+
+### Changed
+
+- macros: improve `select!` error message ([#3352])
+- io: keep track of initialized bytes in `read_to_end` ([#3426])
+- runtime: consolidate errors for context missing ([#3441])
+
+### Fixed
+
+- task: wake `LocalSet` on `spawn_local` ([#3369])
+- sync: fix panic in broadcast::Receiver drop ([#3434])
+
+### Documented
+- stream: link to new `Stream` wrappers in `tokio-stream` ([#3343])
+- docs: mention that `test-util` feature is not enabled with full ([#3397])
+- process: add documentation to process::Child fields ([#3437])
+- io: clarify `AsyncFd` docs about changes of the inner fd ([#3430])
+- net: update datagram docs on splitting ([#3448])
+- time: document that `Sleep` is not `Unpin` ([#3457])
+- sync: add link to `PollSemaphore` ([#3456])
+- task: add `LocalSet` example ([#3438])
+- sync: improve bounded `mpsc` documentation ([#3458])
+
+[#3343]: https://github.com/tokio-rs/tokio/pull/3343
+[#3351]: https://github.com/tokio-rs/tokio/pull/3351
+[#3352]: https://github.com/tokio-rs/tokio/pull/3352
+[#3369]: https://github.com/tokio-rs/tokio/pull/3369
+[#3397]: https://github.com/tokio-rs/tokio/pull/3397
+[#3400]: https://github.com/tokio-rs/tokio/pull/3400
+[#3418]: https://github.com/tokio-rs/tokio/pull/3418
+[#3426]: https://github.com/tokio-rs/tokio/pull/3426
+[#3430]: https://github.com/tokio-rs/tokio/pull/3430
+[#3434]: https://github.com/tokio-rs/tokio/pull/3434
+[#3437]: https://github.com/tokio-rs/tokio/pull/3437
+[#3438]: https://github.com/tokio-rs/tokio/pull/3438
+[#3441]: https://github.com/tokio-rs/tokio/pull/3441
+[#3443]: https://github.com/tokio-rs/tokio/pull/3443
+[#3448]: https://github.com/tokio-rs/tokio/pull/3448
+[#3456]: https://github.com/tokio-rs/tokio/pull/3456
+[#3457]: https://github.com/tokio-rs/tokio/pull/3457
+[#3458]: https://github.com/tokio-rs/tokio/pull/3458
+
+# 1.0.3 (January 28, 2021)
### Fixed
-- io: soundness in `read_to_end` (#3428).
+- io: memory leak during shutdown ([#3477]).
+
+[#3477]: https://github.com/tokio-rs/tokio/pull/3477
+
+# 1.0.2 (January 14, 2021)
+
+### Fixed
+- io: soundness in `read_to_end` ([#3428]).
+
+[#3428]: https://github.com/tokio-rs/tokio/pull/3428
# 1.0.1 (December 25, 2020)
@@ -17,91 +101,153 @@ Due to the soundness hole, we have also yanked Tokio version 1.0.0.
### Removed
- - sync: remove `RwLockWriteGuard::map` and `RwLockWriteGuard::try_map` (#3345)
+- sync: remove `RwLockWriteGuard::map` and `RwLockWriteGuard::try_map` ([#3345])
### Fixed
- - docs: remove stream feature from docs (#3335)
+- docs: remove stream feature from docs ([#3335])
[semver]: https://github.com/rust-lang/rfcs/blob/master/text/1122-language-semver.md#soundness-changes
+[#3335]: https://github.com/tokio-rs/tokio/pull/3335
+[#3345]: https://github.com/tokio-rs/tokio/pull/3345
# 1.0.0 (December 23, 2020)
Commit to the API and long-term support.
### Fixed
-- sync: spurious wakeup in `watch` (#3234).
+
+- sync: spurious wakeup in `watch` ([#3234]).
### Changed
-- io: rename `AsyncFd::with_io()` to `try_io()` (#3306)
-- fs: avoid OS specific `*Ext` traits in favor of conditionally defining the fn (#3264).
-- fs: `Sleep` is `!Unpin` (#3278).
-- net: pass `SocketAddr` by value (#3125).
-- net: `TcpStream::poll_peek` takes `ReadBuf` (#3259).
-- rt: rename `runtime::Builder::max_threads()` to `max_blocking_threads()` (#3287).
-- time: require `current_thread` runtime when calling `time::pause()` (#3289).
+
+- io: rename `AsyncFd::with_io()` to `try_io()` ([#3306])
+- fs: avoid OS specific `*Ext` traits in favor of conditionally defining the fn ([#3264]).
+- fs: `Sleep` is `!Unpin` ([#3278]).
+- net: pass `SocketAddr` by value ([#3125]).
+- net: `TcpStream::poll_peek` takes `ReadBuf` ([#3259]).
+- rt: rename `runtime::Builder::max_threads()` to `max_blocking_threads()` ([#3287]).
+- time: require `current_thread` runtime when calling `time::pause()` ([#3289]).
### Removed
-- remove `tokio::prelude` (#3299).
-- io: remove `AsyncFd::with_poll()` (#3306).
-- net: remove `{Tcp,Unix}Stream::shutdown()` in favor of `AsyncWrite::shutdown()` (#3298).
+
+- remove `tokio::prelude` ([#3299]).
+- io: remove `AsyncFd::with_poll()` ([#3306]).
+- net: remove `{Tcp,Unix}Stream::shutdown()` in favor of `AsyncWrite::shutdown()` ([#3298]).
- stream: move all stream utilities to `tokio-stream` until `Stream` is added to
- `std` (#3277).
-- sync: mpsc `try_recv()` due to unexpected behavior (#3263).
-- tracing: make unstable as `tracing-core` is not 1.0 yet (#3266).
+ `std` ([#3277]).
+- sync: mpsc `try_recv()` due to unexpected behavior ([#3263]).
+- tracing: make unstable as `tracing-core` is not 1.0 yet ([#3266]).
### Added
-- fs: `poll_*` fns to `DirEntry` (#3308).
-- io: `poll_*` fns to `io::Lines`, `io::Split` (#3308).
-- io: `_mut` method variants to `AsyncFd` (#3304).
-- net: `poll_*` fns to `UnixDatagram` (#3223).
-- net: `UnixStream` readiness and non-blocking ops (#3246).
-- sync: `UnboundedReceiver::blocking_recv()` (#3262).
-- sync: `watch::Sender::borrow()` (#3269).
-- sync: `Semaphore::close()` (#3065).
-- sync: `poll_recv` fns to `mpsc::Receiver`, `mpsc::UnboundedReceiver` (#3308).
-- time: `poll_tick` fn to `time::Interval` (#3316).
+
+- fs: `poll_*` fns to `DirEntry` ([#3308]).
+- io: `poll_*` fns to `io::Lines`, `io::Split` ([#3308]).
+- io: `_mut` method variants to `AsyncFd` ([#3304]).
+- net: `poll_*` fns to `UnixDatagram` ([#3223]).
+- net: `UnixStream` readiness and non-blocking ops ([#3246]).
+- sync: `UnboundedReceiver::blocking_recv()` ([#3262]).
+- sync: `watch::Sender::borrow()` ([#3269]).
+- sync: `Semaphore::close()` ([#3065]).
+- sync: `poll_recv` fns to `mpsc::Receiver`, `mpsc::UnboundedReceiver` ([#3308]).
+- time: `poll_tick` fn to `time::Interval` ([#3316]).
+
+[#3065]: https://github.com/tokio-rs/tokio/pull/3065
+[#3125]: https://github.com/tokio-rs/tokio/pull/3125
+[#3223]: https://github.com/tokio-rs/tokio/pull/3223
+[#3234]: https://github.com/tokio-rs/tokio/pull/3234
+[#3246]: https://github.com/tokio-rs/tokio/pull/3246
+[#3259]: https://github.com/tokio-rs/tokio/pull/3259
+[#3262]: https://github.com/tokio-rs/tokio/pull/3262
+[#3263]: https://github.com/tokio-rs/tokio/pull/3263
+[#3264]: https://github.com/tokio-rs/tokio/pull/3264
+[#3266]: https://github.com/tokio-rs/tokio/pull/3266
+[#3269]: https://github.com/tokio-rs/tokio/pull/3269
+[#3277]: https://github.com/tokio-rs/tokio/pull/3277
+[#3278]: https://github.com/tokio-rs/tokio/pull/3278
+[#3287]: https://github.com/tokio-rs/tokio/pull/3287
+[#3289]: https://github.com/tokio-rs/tokio/pull/3289
+[#3298]: https://github.com/tokio-rs/tokio/pull/3298
+[#3299]: https://github.com/tokio-rs/tokio/pull/3299
+[#3304]: https://github.com/tokio-rs/tokio/pull/3304
+[#3306]: https://github.com/tokio-rs/tokio/pull/3306
+[#3308]: https://github.com/tokio-rs/tokio/pull/3308
+[#3316]: https://github.com/tokio-rs/tokio/pull/3316
# 0.3.6 (December 14, 2020)
### Fixed
-- rt: fix deadlock in shutdown (#3228)
-- rt: fix panic in task abort when off rt (#3159)
-- sync: make `add_permits` panic with usize::MAX >> 3 permits (#3188)
-- time: Fix race condition in timer drop (#3229)
-- watch: fix spurious wakeup (#3244)
+
+- rt: fix deadlock in shutdown ([#3228])
+- rt: fix panic in task abort when off rt ([#3159])
+- sync: make `add_permits` panic with usize::MAX >> 3 permits ([#3188])
+- time: Fix race condition in timer drop ([#3229])
+- watch: fix spurious wakeup ([#3244])
### Added
-- example: add back udp-codec example (#3205)
-- net: add `TcpStream::into_std` (#3189)
+
+- example: add back udp-codec example ([#3205])
+- net: add `TcpStream::into_std` ([#3189])
+
+[#3159]: https://github.com/tokio-rs/tokio/pull/3159
+[#3188]: https://github.com/tokio-rs/tokio/pull/3188
+[#3189]: https://github.com/tokio-rs/tokio/pull/3189
+[#3205]: https://github.com/tokio-rs/tokio/pull/3205
+[#3228]: https://github.com/tokio-rs/tokio/pull/3228
+[#3229]: https://github.com/tokio-rs/tokio/pull/3229
+[#3244]: https://github.com/tokio-rs/tokio/pull/3244
# 0.3.5 (November 30, 2020)
### Fixed
-- rt: fix `shutdown_timeout(0)` (#3196).
-- time: fixed race condition with small sleeps (#3069).
+
+- rt: fix `shutdown_timeout(0)` ([#3196]).
+- time: fixed race condition with small sleeps ([#3069]).
### Added
-- io: `AsyncFd::with_interest()` (#3167).
-- signal: `CtrlC` stream on windows (#3186).
+
+- io: `AsyncFd::with_interest()` ([#3167]).
+- signal: `CtrlC` stream on windows ([#3186]).
+
+[#3069]: https://github.com/tokio-rs/tokio/pull/3069
+[#3167]: https://github.com/tokio-rs/tokio/pull/3167
+[#3186]: https://github.com/tokio-rs/tokio/pull/3186
+[#3196]: https://github.com/tokio-rs/tokio/pull/3196
# 0.3.4 (November 18, 2020)
### Fixed
-- stream: `StreamMap` `Default` impl bound (#3093).
-- io: `AsyncFd::into_inner()` should deregister the FD (#3104).
+
+- stream: `StreamMap` `Default` impl bound ([#3093]).
+- io: `AsyncFd::into_inner()` should deregister the FD ([#3104]).
### Changed
-- meta: `parking_lot` feature enabled with `full` (#3119).
+
+- meta: `parking_lot` feature enabled with `full` ([#3119]).
### Added
-- io: `AsyncWrite` vectored writes (#3149).
-- net: TCP/UDP readiness and non-blocking ops (#3130, #2743, #3138).
-- net: TCP socket option (linger, send/recv buf size) (#3145, #3143).
-- net: PID field in `UCred` with solaris/illumos (#3085).
-- rt: `runtime::Handle` allows spawning onto a runtime (#3079).
-- sync: `Notify::notify_waiters()` (#3098).
-- sync: `acquire_many()`, `try_acquire_many()` to `Semaphore` (#3067).
+
+- io: `AsyncWrite` vectored writes ([#3149]).
+- net: TCP/UDP readiness and non-blocking ops ([#3130], [#2743], [#3138]).
+- net: TCP socket option (linger, send/recv buf size) ([#3145], [#3143]).
+- net: PID field in `UCred` with solaris/illumos ([#3085]).
+- rt: `runtime::Handle` allows spawning onto a runtime ([#3079]).
+- sync: `Notify::notify_waiters()` ([#3098]).
+- sync: `acquire_many()`, `try_acquire_many()` to `Semaphore` ([#3067]).
+
+[#2743]: https://github.com/tokio-rs/tokio/pull/2743
+[#3067]: https://github.com/tokio-rs/tokio/pull/3067
+[#3079]: https://github.com/tokio-rs/tokio/pull/3079
+[#3085]: https://github.com/tokio-rs/tokio/pull/3085
+[#3093]: https://github.com/tokio-rs/tokio/pull/3093
+[#3098]: https://github.com/tokio-rs/tokio/pull/3098
+[#3104]: https://github.com/tokio-rs/tokio/pull/3104
+[#3119]: https://github.com/tokio-rs/tokio/pull/3119
+[#3130]: https://github.com/tokio-rs/tokio/pull/3130
+[#3138]: https://github.com/tokio-rs/tokio/pull/3138
+[#3143]: https://github.com/tokio-rs/tokio/pull/3143
+[#3145]: https://github.com/tokio-rs/tokio/pull/3145
+[#3149]: https://github.com/tokio-rs/tokio/pull/3149
# 0.3.3 (November 2, 2020)
@@ -109,28 +255,44 @@ Fixes a soundness hole by adding a missing `Send` bound to
`Runtime::spawn_blocking()`.
### Fixed
-- rt: include missing `Send`, fixing soundness hole (#3089).
-- tracing: avoid huge trace span names (#3074).
+
+- rt: include missing `Send`, fixing soundness hole ([#3089]).
+- tracing: avoid huge trace span names ([#3074]).
### Added
-- net: `TcpSocket::reuseport()`, `TcpSocket::set_reuseport()` (#3083).
-- net: `TcpSocket::reuseaddr()` (#3093).
-- net: `TcpSocket::local_addr()` (#3093).
-- net: add pid to `UCred` (#2633).
+
+- net: `TcpSocket::reuseport()`, `TcpSocket::set_reuseport()` ([#3083]).
+- net: `TcpSocket::reuseaddr()` ([#3093]).
+- net: `TcpSocket::local_addr()` ([#3093]).
+- net: add pid to `UCred` ([#2633]).
+
+[#2633]: https://github.com/tokio-rs/tokio/pull/2633
+[#3074]: https://github.com/tokio-rs/tokio/pull/3074
+[#3083]: https://github.com/tokio-rs/tokio/pull/3083
+[#3089]: https://github.com/tokio-rs/tokio/pull/3089
+[#3093]: https://github.com/tokio-rs/tokio/pull/3093
# 0.3.2 (October 27, 2020)
Adds `AsyncFd` as a replacement for v0.2's `PollEvented`.
### Fixed
-- io: fix a potential deadlock when shutting down the I/O driver (#2903).
-- sync: `RwLockWriteGuard::downgrade()` bug (#2957).
+
+- io: fix a potential deadlock when shutting down the I/O driver ([#2903]).
+- sync: `RwLockWriteGuard::downgrade()` bug ([#2957]).
### Added
-- io: `AsyncFd` for receiving readiness events on raw FDs (#2903).
-- net: `poll_*` function on `UdpSocket` (#2981).
-- net: `UdpSocket::take_error()` (#3051).
-- sync: `oneshot::Sender::poll_closed()` (#3032).
+
+- io: `AsyncFd` for receiving readiness events on raw FDs ([#2903]).
+- net: `poll_*` function on `UdpSocket` ([#2981]).
+- net: `UdpSocket::take_error()` ([#3051]).
+- sync: `oneshot::Sender::poll_closed()` ([#3032]).
+
+[#2903]: https://github.com/tokio-rs/tokio/pull/2903
+[#2957]: https://github.com/tokio-rs/tokio/pull/2957
+[#2981]: https://github.com/tokio-rs/tokio/pull/2981
+[#3032]: https://github.com/tokio-rs/tokio/pull/3032
+[#3051]: https://github.com/tokio-rs/tokio/pull/3051
# 0.3.1 (October 21, 2020)
@@ -139,15 +301,24 @@ and `write_buf` methods have been added back to the IO traits, as the bytes crat
is now on track to reach version 1.0 together with Tokio.
### Fixed
-- net: fix use-after-free (#3019).
-- fs: ensure buffered data is written on shutdown (#3009).
+
+- net: fix use-after-free ([#3019]).
+- fs: ensure buffered data is written on shutdown ([#3009]).
### Added
-- io: `copy_buf()` (#2884).
+
+- io: `copy_buf()` ([#2884]).
- io: `AsyncReadExt::read_buf()`, `AsyncReadExt::write_buf()` for working with
- `Buf`/`BufMut` (#3003).
-- rt: `Runtime::spawn_blocking()` (#2980).
-- sync: `watch::Sender::is_closed()` (#2991).
+ `Buf`/`BufMut` ([#3003]).
+- rt: `Runtime::spawn_blocking()` ([#2980]).
+- sync: `watch::Sender::is_closed()` ([#2991]).
+
+[#2884]: https://github.com/tokio-rs/tokio/pull/2884
+[#2980]: https://github.com/tokio-rs/tokio/pull/2980
+[#2991]: https://github.com/tokio-rs/tokio/pull/2991
+[#3003]: https://github.com/tokio-rs/tokio/pull/3003
+[#3009]: https://github.com/tokio-rs/tokio/pull/3009
+[#3019]: https://github.com/tokio-rs/tokio/pull/3019
# 0.3.0 (October 15, 2020)
@@ -167,136 +338,264 @@ Biggest changes are:
- `parking_lot` is included with `full`
### Changes
+
- meta: Minimum supported Rust version is now 1.45.
- io: `AsyncRead` trait now takes `ReadBuf` in order to safely handle reading
- into uninitialized memory (#2758).
-- io: Internal I/O driver storage is now able to compact (#2757).
-- rt: `Runtime::block_on` now takes `&self` (#2782).
+ into uninitialized memory ([#2758]).
+- io: Internal I/O driver storage is now able to compact ([#2757]).
+- rt: `Runtime::block_on` now takes `&self` ([#2782]).
- sync: `watch` reworked to decouple receiving a change notification from
- receiving the value (#2814, #2806).
-- sync: `Notify::notify` is renamed to `notify_one` (#2822).
-- process: `Child::kill` is now an `async fn` that cleans zombies (#2823).
-- sync: use `const fn` constructors as possible (#2833, #2790)
-- signal: reduce cross-thread notification (#2835).
-- net: tcp,udp,uds types support operations with `&self` (#2828, #2919, #2934).
-- sync: blocking `mpsc` channel supports `send` with `&self` (#2861).
-- time: rename `delay_for` and `delay_until` to `sleep` and `sleep_until` (#2826).
-- io: upgrade to `mio` 0.7 (#2893).
-- io: `AsyncSeek` trait is tweaked (#2885).
-- fs: `File` operations take `&self` (#2930).
-- rt: runtime API, and `#[tokio::main]` macro polish (#2876)
-- rt: `Runtime::enter` uses an RAII guard instead of a closure (#2954).
-- net: the `from_std` function on all sockets no longer sets socket into non-blocking mode (#2893)
+ receiving the value ([#2814], [#2806]).
+- sync: `Notify::notify` is renamed to `notify_one` ([#2822]).
+- process: `Child::kill` is now an `async fn` that cleans zombies ([#2823]).
+- sync: use `const fn` constructors as possible ([#2833], [#2790])
+- signal: reduce cross-thread notification ([#2835]).
+- net: tcp,udp,uds types support operations with `&self` ([#2828], [#2919], [#2934]).
+- sync: blocking `mpsc` channel supports `send` with `&self` ([#2861]).
+- time: rename `delay_for` and `delay_until` to `sleep` and `sleep_until` ([#2826]).
+- io: upgrade to `mio` 0.7 ([#2893]).
+- io: `AsyncSeek` trait is tweaked ([#2885]).
+- fs: `File` operations take `&self` ([#2930]).
+- rt: runtime API, and `#[tokio::main]` macro polish ([#2876])
+- rt: `Runtime::enter` uses an RAII guard instead of a closure ([#2954]).
+- net: the `from_std` function on all sockets no longer sets socket into non-blocking mode ([#2893])
### Added
-- sync: `map` function to lock guards (#2445).
-- sync: `blocking_recv` and `blocking_send` fns to `mpsc` for use outside of Tokio (#2685).
-- rt: `Builder::thread_name_fn` for configuring thread names (#1921).
-- fs: impl `FromRawFd` and `FromRawHandle` for `File` (#2792).
-- process: `Child::wait` and `Child::try_wait` (#2796).
-- rt: support configuring thread keep-alive duration (#2809).
-- rt: `task::JoinHandle::abort` forcibly cancels a spawned task (#2474).
-- sync: `RwLock` write guard to read guard downgrading (#2733).
-- net: add `poll_*` functions that take `&self` to all net types (#2845)
-- sync: `get_mut()` for `Mutex`, `RwLock` (#2856).
-- sync: `mpsc::Sender::closed()` waits for `Receiver` half to close (#2840).
-- sync: `mpsc::Sender::is_closed()` returns true if `Receiver` half is closed (#2726).
-- stream: `iter` and `iter_mut` to `StreamMap` (#2890).
-- net: implement `AsRawSocket` on windows (#2911).
-- net: `TcpSocket` creates a socket without binding or listening (#2920).
+
+- sync: `map` function to lock guards ([#2445]).
+- sync: `blocking_recv` and `blocking_send` fns to `mpsc` for use outside of Tokio ([#2685]).
+- rt: `Builder::thread_name_fn` for configuring thread names ([#1921]).
+- fs: impl `FromRawFd` and `FromRawHandle` for `File` ([#2792]).
+- process: `Child::wait` and `Child::try_wait` ([#2796]).
+- rt: support configuring thread keep-alive duration ([#2809]).
+- rt: `task::JoinHandle::abort` forcibly cancels a spawned task ([#2474]).
+- sync: `RwLock` write guard to read guard downgrading ([#2733]).
+- net: add `poll_*` functions that take `&self` to all net types ([#2845])
+- sync: `get_mut()` for `Mutex`, `RwLock` ([#2856]).
+- sync: `mpsc::Sender::closed()` waits for `Receiver` half to close ([#2840]).
+- sync: `mpsc::Sender::is_closed()` returns true if `Receiver` half is closed ([#2726]).
+- stream: `iter` and `iter_mut` to `StreamMap` ([#2890]).
+- net: implement `AsRawSocket` on windows ([#2911]).
+- net: `TcpSocket` creates a socket without binding or listening ([#2920]).
### Removed
-- io: vectored ops are removed from `AsyncRead`, `AsyncWrite` traits (#2882).
+
+- io: vectored ops are removed from `AsyncRead`, `AsyncWrite` traits ([#2882]).
- io: `mio` is removed from the public API. `PollEvented` and` Registration` are
- removed (#2893).
+ removed ([#2893]).
- io: remove `bytes` from public API. `Buf` and `BufMut` implementation are
- removed (#2908).
-- time: `DelayQueue` is moved to `tokio-util` (#2897).
+ removed ([#2908]).
+- time: `DelayQueue` is moved to `tokio-util` ([#2897]).
### Fixed
-- io: `stdout` and `stderr` buffering on windows (#2734).
+
+- io: `stdout` and `stderr` buffering on windows ([#2734]).
+
+[#1921]: https://github.com/tokio-rs/tokio/pull/1921
+[#2445]: https://github.com/tokio-rs/tokio/pull/2445
+[#2474]: https://github.com/tokio-rs/tokio/pull/2474
+[#2685]: https://github.com/tokio-rs/tokio/pull/2685
+[#2726]: https://github.com/tokio-rs/tokio/pull/2726
+[#2733]: https://github.com/tokio-rs/tokio/pull/2733
+[#2734]: https://github.com/tokio-rs/tokio/pull/2734
+[#2757]: https://github.com/tokio-rs/tokio/pull/2757
+[#2758]: https://github.com/tokio-rs/tokio/pull/2758
+[#2782]: https://github.com/tokio-rs/tokio/pull/2782
+[#2790]: https://github.com/tokio-rs/tokio/pull/2790
+[#2792]: https://github.com/tokio-rs/tokio/pull/2792
+[#2796]: https://github.com/tokio-rs/tokio/pull/2796
+[#2806]: https://github.com/tokio-rs/tokio/pull/2806
+[#2809]: https://github.com/tokio-rs/tokio/pull/2809
+[#2814]: https://github.com/tokio-rs/tokio/pull/2814
+[#2822]: https://github.com/tokio-rs/tokio/pull/2822
+[#2823]: https://github.com/tokio-rs/tokio/pull/2823
+[#2826]: https://github.com/tokio-rs/tokio/pull/2826
+[#2828]: https://github.com/tokio-rs/tokio/pull/2828
+[#2833]: https://github.com/tokio-rs/tokio/pull/2833
+[#2835]: https://github.com/tokio-rs/tokio/pull/2835
+[#2840]: https://github.com/tokio-rs/tokio/pull/2840
+[#2845]: https://github.com/tokio-rs/tokio/pull/2845
+[#2856]: https://github.com/tokio-rs/tokio/pull/2856
+[#2861]: https://github.com/tokio-rs/tokio/pull/2861
+[#2876]: https://github.com/tokio-rs/tokio/pull/2876
+[#2882]: https://github.com/tokio-rs/tokio/pull/2882
+[#2885]: https://github.com/tokio-rs/tokio/pull/2885
+[#2890]: https://github.com/tokio-rs/tokio/pull/2890
+[#2893]: https://github.com/tokio-rs/tokio/pull/2893
+[#2897]: https://github.com/tokio-rs/tokio/pull/2897
+[#2908]: https://github.com/tokio-rs/tokio/pull/2908
+[#2911]: https://github.com/tokio-rs/tokio/pull/2911
+[#2919]: https://github.com/tokio-rs/tokio/pull/2919
+[#2920]: https://github.com/tokio-rs/tokio/pull/2920
+[#2930]: https://github.com/tokio-rs/tokio/pull/2930
+[#2934]: https://github.com/tokio-rs/tokio/pull/2934
+[#2954]: https://github.com/tokio-rs/tokio/pull/2954
# 0.2.22 (July 21, 2020)
### Fixes
-- docs: misc improvements (#2572, #2658, #2663, #2656, #2647, #2630, #2487, #2621,
- #2624, #2600, #2623, #2622, #2577, #2569, #2589, #2575, #2540, #2564, #2567,
- #2520, #2521, #2493)
+
+- docs: misc improvements ([#2572], [#2658], [#2663], [#2656], [#2647], [#2630], [#2487], [#2621],
+ [#2624], [#2600], [#2623], [#2622], [#2577], [#2569], [#2589], [#2575], [#2540], [#2564], [#2567],
+ [#2520], [#2521], [#2493])
- rt: allow calls to `block_on` inside calls to `block_in_place` that are
- themselves inside `block_on` (#2645)
-- net: fix non-portable behavior when dropping `TcpStream` `OwnedWriteHalf` (#2597)
+ themselves inside `block_on` ([#2645])
+- net: fix non-portable behavior when dropping `TcpStream` `OwnedWriteHalf` ([#2597])
- io: improve stack usage by allocating large buffers on directly on the heap
- (#2634)
+ ([#2634])
- io: fix unsound pin projection in `AsyncReadExt::read_buf` and
- `AsyncWriteExt::write_buf` (#2612)
-- io: fix unnecessary zeroing for `AsyncRead` implementors (#2525)
-- io: Fix `BufReader` not correctly forwarding `poll_write_buf` (#2654)
-- io: fix panic in `AsyncReadExt::read_line` (#2541)
+ `AsyncWriteExt::write_buf` ([#2612])
+- io: fix unnecessary zeroing for `AsyncRead` implementors ([#2525])
+- io: Fix `BufReader` not correctly forwarding `poll_write_buf` ([#2654])
+- io: fix panic in `AsyncReadExt::read_line` ([#2541])
### Changes
-- coop: returning `Poll::Pending` no longer decrements the task budget (#2549)
+
+- coop: returning `Poll::Pending` no longer decrements the task budget ([#2549])
### Added
+
- io: little-endian variants of `AsyncReadExt` and `AsyncWriteExt` methods
- (#1915)
-- task: add [`tracing`] instrumentation to spawned tasks (#2655)
+ ([#1915])
+- task: add [`tracing`] instrumentation to spawned tasks ([#2655])
- sync: allow unsized types in `Mutex` and `RwLock` (via `default` constructors)
- (#2615)
-- net: add `ToSocketAddrs` implementation for `&[SocketAddr]` (#2604)
-- fs: add `OpenOptionsExt` for `OpenOptions` (#2515)
-- fs: add `DirBuilder` (#2524)
+ ([#2615])
+- net: add `ToSocketAddrs` implementation for `&[SocketAddr]` ([#2604])
+- fs: add `OpenOptionsExt` for `OpenOptions` ([#2515])
+- fs: add `DirBuilder` ([#2524])
[`tracing`]: https://crates.io/crates/tracing
+[#1915]: https://github.com/tokio-rs/tokio/pull/1915
+[#2487]: https://github.com/tokio-rs/tokio/pull/2487
+[#2493]: https://github.com/tokio-rs/tokio/pull/2493
+[#2515]: https://github.com/tokio-rs/tokio/pull/2515
+[#2520]: https://github.com/tokio-rs/tokio/pull/2520
+[#2521]: https://github.com/tokio-rs/tokio/pull/2521
+[#2524]: https://github.com/tokio-rs/tokio/pull/2524
+[#2525]: https://github.com/tokio-rs/tokio/pull/2525
+[#2540]: https://github.com/tokio-rs/tokio/pull/2540
+[#2541]: https://github.com/tokio-rs/tokio/pull/2541
+[#2549]: https://github.com/tokio-rs/tokio/pull/2549
+[#2564]: https://github.com/tokio-rs/tokio/pull/2564
+[#2567]: https://github.com/tokio-rs/tokio/pull/2567
+[#2569]: https://github.com/tokio-rs/tokio/pull/2569
+[#2572]: https://github.com/tokio-rs/tokio/pull/2572
+[#2575]: https://github.com/tokio-rs/tokio/pull/2575
+[#2577]: https://github.com/tokio-rs/tokio/pull/2577
+[#2589]: https://github.com/tokio-rs/tokio/pull/2589
+[#2597]: https://github.com/tokio-rs/tokio/pull/2597
+[#2600]: https://github.com/tokio-rs/tokio/pull/2600
+[#2604]: https://github.com/tokio-rs/tokio/pull/2604
+[#2612]: https://github.com/tokio-rs/tokio/pull/2612
+[#2615]: https://github.com/tokio-rs/tokio/pull/2615
+[#2621]: https://github.com/tokio-rs/tokio/pull/2621
+[#2622]: https://github.com/tokio-rs/tokio/pull/2622
+[#2623]: https://github.com/tokio-rs/tokio/pull/2623
+[#2624]: https://github.com/tokio-rs/tokio/pull/2624
+[#2630]: https://github.com/tokio-rs/tokio/pull/2630
+[#2634]: https://github.com/tokio-rs/tokio/pull/2634
+[#2645]: https://github.com/tokio-rs/tokio/pull/2645
+[#2647]: https://github.com/tokio-rs/tokio/pull/2647
+[#2654]: https://github.com/tokio-rs/tokio/pull/2654
+[#2655]: https://github.com/tokio-rs/tokio/pull/2655
+[#2656]: https://github.com/tokio-rs/tokio/pull/2656
+[#2658]: https://github.com/tokio-rs/tokio/pull/2658
+[#2663]: https://github.com/tokio-rs/tokio/pull/2663
# 0.2.21 (May 13, 2020)
### Fixes
-- macros: disambiguate built-in `#[test]` attribute in macro expansion (#2503)
-- rt: `LocalSet` and task budgeting (#2462).
-- rt: task budgeting with `block_in_place` (#2502).
-- sync: release `broadcast` channel memory without sending a value (#2509).
-- time: notify when resetting a `Delay` to a time in the past (#2290)
+- macros: disambiguate built-in `#[test]` attribute in macro expansion ([#2503])
+- rt: `LocalSet` and task budgeting ([#2462]).
+- rt: task budgeting with `block_in_place` ([#2502]).
+- sync: release `broadcast` channel memory without sending a value ([#2509]).
+- time: notify when resetting a `Delay` to a time in the past ([#2290])
### Added
-- io: `get_mut`, `get_ref`, and `into_inner` to `Lines` (#2450).
-- io: `mio::Ready` argument to `PollEvented` (#2419).
-- os: illumos support (#2486).
-- rt: `Handle::spawn_blocking` (#2501).
-- sync: `OwnedMutexGuard` for `Arc<Mutex<T>>` (#2455).
+
+- io: `get_mut`, `get_ref`, and `into_inner` to `Lines` ([#2450]).
+- io: `mio::Ready` argument to `PollEvented` ([#2419]).
+- os: illumos support ([#2486]).
+- rt: `Handle::spawn_blocking` ([#2501]).
+- sync: `OwnedMutexGuard` for `Arc<Mutex<T>>` ([#2455]).
+
+[#2290]: https://github.com/tokio-rs/tokio/pull/2290
+[#2419]: https://github.com/tokio-rs/tokio/pull/2419
+[#2450]: https://github.com/tokio-rs/tokio/pull/2450
+[#2455]: https://github.com/tokio-rs/tokio/pull/2455
+[#2462]: https://github.com/tokio-rs/tokio/pull/2462
+[#2486]: https://github.com/tokio-rs/tokio/pull/2486
+[#2501]: https://github.com/tokio-rs/tokio/pull/2501
+[#2502]: https://github.com/tokio-rs/tokio/pull/2502
+[#2503]: https://github.com/tokio-rs/tokio/pull/2503
+[#2509]: https://github.com/tokio-rs/tokio/pull/2509
# 0.2.20 (April 28, 2020)
### Fixes
-- sync: `broadcast` closing the channel no longer requires capacity (#2448).
-- rt: regression when configuring runtime with `max_threads` less than number of CPUs (#2457).
+
+- sync: `broadcast` closing the channel no longer requires capacity ([#2448]).
+- rt: regression when configuring runtime with `max_threads` less than number of CPUs ([#2457]).
+
+[#2448]: https://github.com/tokio-rs/tokio/pull/2448
+[#2457]: https://github.com/tokio-rs/tokio/pull/2457
# 0.2.19 (April 24, 2020)
### Fixes
-- docs: misc improvements (#2400, #2405, #2414, #2420, #2423, #2426, #2427, #2434, #2436, #2440).
-- rt: support `block_in_place` in more contexts (#2409, #2410).
-- stream: no panic in `merge()` and `chain()` when using `size_hint()` (#2430).
-- task: include visibility modifier when defining a task-local (#2416).
+
+- docs: misc improvements ([#2400], [#2405], [#2414], [#2420], [#2423], [#2426], [#2427], [#2434], [#2436], [#2440]).
+- rt: support `block_in_place` in more contexts ([#2409], [#2410]).
+- stream: no panic in `merge()` and `chain()` when using `size_hint()` ([#2430]).
+- task: include visibility modifier when defining a task-local ([#2416]).
### Added
-- rt: `runtime::Handle::block_on` (#2437).
-- sync: owned `Semaphore` permit (#2421).
-- tcp: owned split (#2270).
+
+- rt: `runtime::Handle::block_on` ([#2437]).
+- sync: owned `Semaphore` permit ([#2421]).
+- tcp: owned split ([#2270]).
+
+[#2270]: https://github.com/tokio-rs/tokio/pull/2270
+[#2400]: https://github.com/tokio-rs/tokio/pull/2400
+[#2405]: https://github.com/tokio-rs/tokio/pull/2405
+[#2409]: https://github.com/tokio-rs/tokio/pull/2409
+[#2410]: https://github.com/tokio-rs/tokio/pull/2410
+[#2414]: https://github.com/tokio-rs/tokio/pull/2414
+[#2416]: https://github.com/tokio-rs/tokio/pull/2416
+[#2420]: https://github.com/tokio-rs/tokio/pull/2420
+[#2421]: https://github.com/tokio-rs/tokio/pull/2421
+[#2423]: https://github.com/tokio-rs/tokio/pull/2423
+[#2426]: https://github.com/tokio-rs/tokio/pull/2426
+[#2427]: https://github.com/tokio-rs/tokio/pull/2427
+[#2430]: https://github.com/tokio-rs/tokio/pull/2430
+[#2434]: https://github.com/tokio-rs/tokio/pull/2434
+[#2436]: https://github.com/tokio-rs/tokio/pull/2436
+[#2437]: https://github.com/tokio-rs/tokio/pull/2437
+[#2440]: https://github.com/tokio-rs/tokio/pull/2440
# 0.2.18 (April 12, 2020)
### Fixes
-- task: `LocalSet` was incorrectly marked as `Send` (#2398)
-- io: correctly report `WriteZero` failure in `write_int` (#2334)
+
+- task: `LocalSet` was incorrectly marked as `Send` ([#2398])
+- io: correctly report `WriteZero` failure in `write_int` ([#2334])
+
+[#2334]: https://github.com/tokio-rs/tokio/pull/2334
+[#2398]: https://github.com/tokio-rs/tokio/pull/2398
# 0.2.17 (April 9, 2020)
### Fixes
-- rt: bug in work-stealing queue (#2387)
+
+- rt: bug in work-stealing queue ([#2387])
### Changes
-- rt: threadpool uses logical CPU count instead of physical by default (#2391)
+
+- rt: threadpool uses logical CPU count instead of physical by default ([#2391])
+
+[#2387]: https://github.com/tokio-rs/tokio/pull/2387
+[#2391]: https://github.com/tokio-rs/tokio/pull/2391
# 0.2.16 (April 3, 2020)
@@ -311,6 +610,11 @@ Biggest changes are:
- time: added `deadline` method to `delay_queue::Expired` ([#2300])
- io: added `StreamReader` ([#2052])
+[#2052]: https://github.com/tokio-rs/tokio/pull/2052
+[#2300]: https://github.com/tokio-rs/tokio/pull/2300
+[#2354]: https://github.com/tokio-rs/tokio/pull/2354
+[#2375]: https://github.com/tokio-rs/tokio/pull/2375
+
# 0.2.15 (April 2, 2020)
### Fixes
@@ -321,30 +625,51 @@ Biggest changes are:
- sync: Add disarm to `mpsc::Sender` ([#2358]).
+[#2358]: https://github.com/tokio-rs/tokio/pull/2358
+[#2362]: https://github.com/tokio-rs/tokio/pull/2362
+
# 0.2.14 (April 1, 2020)
### Fixes
+
- rt: concurrency bug in scheduler ([#2273]).
- rt: concurrency bug with shell runtime ([#2333]).
- test-util: correct pause/resume of time ([#2253]).
- time: `DelayQueue` correct wakeup after `insert` ([#2285]).
### Added
+
- io: impl `RawFd`, `AsRawHandle` for std io types ([#2335]).
-- rt: automatic cooperative task yielding (#2160, #2343, #2349).
+- rt: automatic cooperative task yielding ([#2160], [#2343], [#2349]).
- sync: `RwLock::into_inner` ([#2321]).
### Changed
+
- sync: semaphore, mutex internals rewritten to avoid allocations ([#2325]).
+[#2160]: https://github.com/tokio-rs/tokio/pull/2160
+[#2253]: https://github.com/tokio-rs/tokio/pull/2253
+[#2273]: https://github.com/tokio-rs/tokio/pull/2273
+[#2285]: https://github.com/tokio-rs/tokio/pull/2285
+[#2321]: https://github.com/tokio-rs/tokio/pull/2321
+[#2325]: https://github.com/tokio-rs/tokio/pull/2325
+[#2333]: https://github.com/tokio-rs/tokio/pull/2333
+[#2335]: https://github.com/tokio-rs/tokio/pull/2335
+[#2343]: https://github.com/tokio-rs/tokio/pull/2343
+[#2349]: https://github.com/tokio-rs/tokio/pull/2349
+
# 0.2.13 (February 28, 2020)
### Fixes
+
- macros: unresolved import in `pin!` ([#2281]).
+[#2281]: https://github.com/tokio-rs/tokio/pull/2281
+
# 0.2.12 (February 27, 2020)
### Fixes
+
- net: `UnixStream::poll_shutdown` should call `shutdown(Write)` ([#2245]).
- process: Wake up read and write on `EPOLLERR` ([#2218]).
- rt: potential deadlock when using `block_in_place` and shutting down the
@@ -355,6 +680,7 @@ Biggest changes are:
- time: avoid having to poll `DelayQueue` after inserting new delay ([#2217]).
### Added
+
- macros: `pin!` variant that assigns to identifier and pins ([#2274]).
- net: impl `Stream` for `Listener` types ([#2275]).
- rt: `Runtime::shutdown_timeout` waits for runtime to shutdown for specified
@@ -369,15 +695,35 @@ Biggest changes are:
for channel capacity ([#2227]).
- time: impl `Ord` and `Hash` for `Instant` ([#2239]).
+[#2119]: https://github.com/tokio-rs/tokio/pull/2119
+[#2184]: https://github.com/tokio-rs/tokio/pull/2184
+[#2185]: https://github.com/tokio-rs/tokio/pull/2185
+[#2186]: https://github.com/tokio-rs/tokio/pull/2186
+[#2191]: https://github.com/tokio-rs/tokio/pull/2191
+[#2204]: https://github.com/tokio-rs/tokio/pull/2204
+[#2205]: https://github.com/tokio-rs/tokio/pull/2205
+[#2210]: https://github.com/tokio-rs/tokio/pull/2210
+[#2217]: https://github.com/tokio-rs/tokio/pull/2217
+[#2218]: https://github.com/tokio-rs/tokio/pull/2218
+[#2227]: https://github.com/tokio-rs/tokio/pull/2227
+[#2238]: https://github.com/tokio-rs/tokio/pull/2238
+[#2239]: https://github.com/tokio-rs/tokio/pull/2239
+[#2245]: https://github.com/tokio-rs/tokio/pull/2245
+[#2250]: https://github.com/tokio-rs/tokio/pull/2250
+[#2274]: https://github.com/tokio-rs/tokio/pull/2274
+[#2275]: https://github.com/tokio-rs/tokio/pull/2275
+
# 0.2.11 (January 27, 2020)
### Fixes
-- docs: misc fixes and tweaks (#2155, #2103, #2027, #2167, #2175).
+
+- docs: misc fixes and tweaks ([#2155], [#2103], [#2027], [#2167], [#2175]).
- macros: handle generics in `#[tokio::main]` method ([#2177]).
- sync: `broadcast` potential lost notifications ([#2135]).
- rt: improve "no runtime" panic messages ([#2145]).
### Added
+
- optional support for using `parking_lot` internally ([#2164]).
- fs: `fs::copy`, an async version of `std::fs::copy` ([#2079]).
- macros: `select!` waits for the first branch to complete ([#2152]).
@@ -390,18 +736,40 @@ Biggest changes are:
- sync: impl `Eq`, `PartialEq` for `oneshot::RecvError` ([#2168]).
- task: methods for inspecting the `JoinError` cause ([#2051]).
+[#2027]: https://github.com/tokio-rs/tokio/pull/2027
+[#2051]: https://github.com/tokio-rs/tokio/pull/2051
+[#2079]: https://github.com/tokio-rs/tokio/pull/2079
+[#2103]: https://github.com/tokio-rs/tokio/pull/2103
+[#2122]: https://github.com/tokio-rs/tokio/pull/2122
+[#2135]: https://github.com/tokio-rs/tokio/pull/2135
+[#2145]: https://github.com/tokio-rs/tokio/pull/2145
+[#2149]: https://github.com/tokio-rs/tokio/pull/2149
+[#2151]: https://github.com/tokio-rs/tokio/pull/2151
+[#2152]: https://github.com/tokio-rs/tokio/pull/2152
+[#2155]: https://github.com/tokio-rs/tokio/pull/2155
+[#2158]: https://github.com/tokio-rs/tokio/pull/2158
+[#2163]: https://github.com/tokio-rs/tokio/pull/2163
+[#2164]: https://github.com/tokio-rs/tokio/pull/2164
+[#2167]: https://github.com/tokio-rs/tokio/pull/2167
+[#2168]: https://github.com/tokio-rs/tokio/pull/2168
+[#2169]: https://github.com/tokio-rs/tokio/pull/2169
+[#2175]: https://github.com/tokio-rs/tokio/pull/2175
+[#2177]: https://github.com/tokio-rs/tokio/pull/2177
+
# 0.2.10 (January 21, 2020)
### Fixes
+
- `#[tokio::main]` when `rt-core` feature flag is not enabled ([#2139]).
- remove `AsyncBufRead` from `BufStream` impl block ([#2108]).
- potential undefined behavior when implementing `AsyncRead` incorrectly ([#2030]).
### Added
+
- `BufStream::with_capacity` ([#2125]).
- impl `From` and `Default` for `RwLock` ([#2089]).
- `io::ReadHalf::is_pair_of` checks if provided `WriteHalf` is for the same
- underlying object (#1762, #2144).
+ underlying object ([#1762], [#2144]).
- `runtime::Handle::try_current()` returns a handle to the current runtime ([#2118]).
- `stream::empty()` returns an immediately ready empty stream ([#2092]).
- `stream::once(val)` returns a stream that yields a single value: `val` ([#2094]).
@@ -412,22 +780,46 @@ Biggest changes are:
- `StreamExt::merge` combines two streams, yielding values as they become ready ([#2091]).
- Task-local storage ([#2126]).
+[#1762]: https://github.com/tokio-rs/tokio/pull/1762
+[#2030]: https://github.com/tokio-rs/tokio/pull/2030
+[#2085]: https://github.com/tokio-rs/tokio/pull/2085
+[#2089]: https://github.com/tokio-rs/tokio/pull/2089
+[#2091]: https://github.com/tokio-rs/tokio/pull/2091
+[#2092]: https://github.com/tokio-rs/tokio/pull/2092
+[#2093]: https://github.com/tokio-rs/tokio/pull/2093
+[#2094]: https://github.com/tokio-rs/tokio/pull/2094
+[#2108]: https://github.com/tokio-rs/tokio/pull/2108
+[#2109]: https://github.com/tokio-rs/tokio/pull/2109
+[#2118]: https://github.com/tokio-rs/tokio/pull/2118
+[#2125]: https://github.com/tokio-rs/tokio/pull/2125
+[#2126]: https://github.com/tokio-rs/tokio/pull/2126
+[#2139]: https://github.com/tokio-rs/tokio/pull/2139
+[#2144]: https://github.com/tokio-rs/tokio/pull/2144
+
# 0.2.9 (January 9, 2020)
### Fixes
+
- `AsyncSeek` impl for `File` ([#1986]).
-- rt: shutdown deadlock in `threaded_scheduler` (#2074, #2082).
+- rt: shutdown deadlock in `threaded_scheduler` ([#2074], [#2082]).
- rt: memory ordering when dropping `JoinHandle` ([#2044]).
- docs: misc API documentation fixes and improvements.
+[#1986]: https://github.com/tokio-rs/tokio/pull/1986
+[#2044]: https://github.com/tokio-rs/tokio/pull/2044
+[#2074]: https://github.com/tokio-rs/tokio/pull/2074
+[#2082]: https://github.com/tokio-rs/tokio/pull/2082
+
# 0.2.8 (January 7, 2020)
### Fixes
+
- depend on new version of `tokio-macros`.
# 0.2.7 (January 7, 2020)
### Fixes
+
- potential deadlock when dropping `basic_scheduler` Runtime.
- calling `spawn_blocking` from within a `spawn_blocking` ([#2006]).
- storing a `Runtime` instance in a thread-local ([#2011]).
@@ -436,6 +828,7 @@ Biggest changes are:
- test-util: `time::advance` runs pending tasks before changing the time ([#2059]).
### Added
+
- `net::lookup_host` maps a `T: ToSocketAddrs` to a stream of `SocketAddrs` ([#1870]).
- `process::Child` fields are made public to match `std` ([#2014]).
- impl `Stream` for `sync::broadcast::Receiver` ([#2012]).
@@ -452,14 +845,37 @@ Biggest changes are:
- `time::DelayQueue::len` returns the number entries in the queue ([#1755]).
- expose runtime options from the `#[tokio::main]` and `#[tokio::test]` ([#2022]).
+[#1699]: https://github.com/tokio-rs/tokio/pull/1699
+[#1755]: https://github.com/tokio-rs/tokio/pull/1755
+[#1870]: https://github.com/tokio-rs/tokio/pull/1870
+[#1971]: https://github.com/tokio-rs/tokio/pull/1971
+[#2001]: https://github.com/tokio-rs/tokio/pull/2001
+[#2005]: https://github.com/tokio-rs/tokio/pull/2005
+[#2006]: https://github.com/tokio-rs/tokio/pull/2006
+[#2011]: https://github.com/tokio-rs/tokio/pull/2011
+[#2012]: https://github.com/tokio-rs/tokio/pull/2012
+[#2014]: https://github.com/tokio-rs/tokio/pull/2014
+[#2022]: https://github.com/tokio-rs/tokio/pull/2022
+[#2025]: https://github.com/tokio-rs/tokio/pull/2025
+[#2029]: https://github.com/tokio-rs/tokio/pull/2029
+[#2034]: https://github.com/tokio-rs/tokio/pull/2034
+[#2035]: https://github.com/tokio-rs/tokio/pull/2035
+[#2040]: https://github.com/tokio-rs/tokio/pull/2040
+[#2045]: https://github.com/tokio-rs/tokio/pull/2045
+[#2059]: https://github.com/tokio-rs/tokio/pull/2059
+
# 0.2.6 (December 19, 2019)
### Fixes
+
- `fs::File::seek` API regression ([#1991]).
+[#1991]: https://github.com/tokio-rs/tokio/pull/1991
+
# 0.2.5 (December 18, 2019)
### Added
+
- `io::AsyncSeek` trait ([#1924]).
- `Mutex::try_lock` ([#1939])
- `mpsc::Receiver::try_recv` and `mpsc::UnboundedReceiver::try_recv` ([#1939]).
@@ -471,24 +887,47 @@ Biggest changes are:
- `stream::StreamExt` provides stream utilities ([#1962]).
### Fixes
+
- deadlock risk while shutting down the runtime ([#1972]).
- panic while shutting down the runtime ([#1978]).
- `sync::MutexGuard` debug output ([#1961]).
-- misc doc improvements (#1933, #1934, #1940, #1942).
+- misc doc improvements ([#1933], [#1934], [#1940], [#1942]).
### Changes
+
- runtime threads are configured with `runtime::Builder::core_threads` and
`runtime::Builder::max_threads`. `runtime::Builder::num_threads` is
deprecated ([#1977]).
+[#1924]: https://github.com/tokio-rs/tokio/pull/1924
+[#1933]: https://github.com/tokio-rs/tokio/pull/1933
+[#1934]: https://github.com/tokio-rs/tokio/pull/1934
+[#1939]: https://github.com/tokio-rs/tokio/pull/1939
+[#1940]: https://github.com/tokio-rs/tokio/pull/1940
+[#1942]: https://github.com/tokio-rs/tokio/pull/1942
+[#1943]: https://github.com/tokio-rs/tokio/pull/1943
+[#1949]: https://github.com/tokio-rs/tokio/pull/1949
+[#1956]: https://github.com/tokio-rs/tokio/pull/1956
+[#1961]: https://github.com/tokio-rs/tokio/pull/1961
+[#1962]: https://github.com/tokio-rs/tokio/pull/1962
+[#1972]: https://github.com/tokio-rs/tokio/pull/1972
+[#1973]: https://github.com/tokio-rs/tokio/pull/1973
+[#1975]: https://github.com/tokio-rs/tokio/pull/1975
+[#1977]: https://github.com/tokio-rs/tokio/pull/1977
+[#1978]: https://github.com/tokio-rs/tokio/pull/1978
+
# 0.2.4 (December 6, 2019)
### Fixes
+
- `sync::Mutex` deadlock when `lock()` future is dropped early ([#1898]).
+[#1898]: https://github.com/tokio-rs/tokio/pull/1898
+
# 0.2.3 (December 6, 2019)
### Added
+
- read / write integers using `AsyncReadExt` and `AsyncWriteExt` ([#1863]).
- `read_buf` / `write_buf` for reading / writing `Buf` / `BufMut` ([#1881]).
- `TcpStream::poll_peek` - pollable API for performing TCP peek ([#1864]).
@@ -500,14 +939,32 @@ Biggest changes are:
`std::time::Instant` ([#1904]).
### Fixes
+
- calling `spawn_blocking` after runtime shutdown ([#1875]).
- `LocalSet` drop inifinite loop ([#1892]).
- `LocalSet` hang under load ([#1905]).
-- improved documentation (#1865, #1866, #1868, #1874, #1876, #1911).
+- improved documentation ([#1865], [#1866], [#1868], [#1874], [#1876], [#1911]).
+
+[#1863]: https://github.com/tokio-rs/tokio/pull/1863
+[#1864]: https://github.com/tokio-rs/tokio/pull/1864
+[#1865]: https://github.com/tokio-rs/tokio/pull/1865
+[#1866]: https://github.com/tokio-rs/tokio/pull/1866
+[#1868]: https://github.com/tokio-rs/tokio/pull/1868
+[#1874]: https://github.com/tokio-rs/tokio/pull/1874
+[#1875]: https://github.com/tokio-rs/tokio/pull/1875
+[#1876]: https://github.com/tokio-rs/tokio/pull/1876
+[#1881]: https://github.com/tokio-rs/tokio/pull/1881
+[#1882]: https://github.com/tokio-rs/tokio/pull/1882
+[#1888]: https://github.com/tokio-rs/tokio/pull/1888
+[#1892]: https://github.com/tokio-rs/tokio/pull/1892
+[#1904]: https://github.com/tokio-rs/tokio/pull/1904
+[#1905]: https://github.com/tokio-rs/tokio/pull/1905
+[#1911]: https://github.com/tokio-rs/tokio/pull/1911
# 0.2.2 (November 29, 2019)
### Fixes
+
- scheduling with `basic_scheduler` ([#1861]).
- update `spawn` panic message to specify that a task scheduler is required ([#1839]).
- API docs example for `runtime::Builder` to include a task scheduler ([#1841]).
@@ -517,27 +974,47 @@ Biggest changes are:
- API docs mention the required Cargo features for `Builder::{basic, threaded}_scheduler` ([#1858]).
### Added
+
- impl `Stream` for `signal::unix::Signal` ([#1849]).
- API docs for platform specific behavior of `signal::ctrl_c` and `signal::unix::Signal` ([#1854]).
- API docs for `signal::unix::Signal::{recv, poll_recv}` and `signal::windows::CtrlBreak::{recv, poll_recv}` ([#1854]).
- `File::into_std` and `File::try_into_std` methods ([#1856]).
+[#1772]: https://github.com/tokio-rs/tokio/pull/1772
+[#1834]: https://github.com/tokio-rs/tokio/pull/1834
+[#1839]: https://github.com/tokio-rs/tokio/pull/1839
+[#1841]: https://github.com/tokio-rs/tokio/pull/1841
+[#1843]: https://github.com/tokio-rs/tokio/pull/1843
+[#1849]: https://github.com/tokio-rs/tokio/pull/1849
+[#1854]: https://github.com/tokio-rs/tokio/pull/1854
+[#1856]: https://github.com/tokio-rs/tokio/pull/1856
+[#1858]: https://github.com/tokio-rs/tokio/pull/1858
+[#1861]: https://github.com/tokio-rs/tokio/pull/1861
+
# 0.2.1 (November 26, 2019)
### Fixes
+
- API docs for `TcpListener::incoming`, `UnixListener::incoming` ([#1831]).
### Added
+
- `tokio::task::LocalSet` provides a strategy for spawning `!Send` tasks ([#1733]).
- export `tokio::time::Elapsed` ([#1826]).
- impl `AsRawFd`, `AsRawHandle` for `tokio::fs::File` ([#1827]).
+[#1733]: https://github.com/tokio-rs/tokio/pull/1733
+[#1826]: https://github.com/tokio-rs/tokio/pull/1826
+[#1827]: https://github.com/tokio-rs/tokio/pull/1827
+[#1831]: https://github.com/tokio-rs/tokio/pull/1831
+
# 0.2.0 (November 26, 2019)
A major breaking change. Most implementation and APIs have changed one way or
another. This changelog entry contains a highlight
### Changed
+
- APIs are updated to use `async / await`.
- most `tokio-*` crates are collapsed into this crate.
- Scheduler is rewritten.
@@ -549,6 +1026,7 @@ another. This changelog entry contains a highlight
- `tokio::codec` is moved to `tokio-util`.
### Removed
+
- Standalone `timer` and `net` drivers are removed, use `Runtime` instead
- `current_thread` runtime is removed, use `tokio::runtime::Runtime` with
`basic_scheduler` instead.
@@ -556,311 +1034,221 @@ another. This changelog entry contains a highlight
# 0.1.21 (May 30, 2019)
### Changed
+
- Bump `tokio-trace-core` version to 0.2 ([#1111]).
+[#1111]: https://github.com/tokio-rs/tokio/pull/1111
+
# 0.1.20 (May 14, 2019)
### Added
+
- `tokio::runtime::Builder::panic_handler` allows configuring handling
panics on the runtime ([#1055]).
+[#1055]: https://github.com/tokio-rs/tokio/pull/1055
+
# 0.1.19 (April 22, 2019)
### Added
+
- Re-export `tokio::sync::Mutex` primitive ([#964]).
+[#964]: https://github.com/tokio-rs/tokio/pull/964
+
# 0.1.18 (March 22, 2019)
### Added
+
- `TypedExecutor` re-export and implementations ([#993]).
+[#993]: https://github.com/tokio-rs/tokio/pull/993
+
# 0.1.17 (March 13, 2019)
### Added
+
- Propagate trace subscriber in the runtime ([#966]).
+[#966]: https://github.com/tokio-rs/tokio/pull/966
+
# 0.1.16 (March 1, 2019)
### Fixed
+
- async-await: track latest nightly changes ([#940]).
### Added
+
- `sync::Watch`, a single value broadcast channel ([#922]).
- Async equivalent of read / write file helpers being added to `std` ([#896]).
+[#896]: https://github.com/tokio-rs/tokio/pull/896
+[#922]: https://github.com/tokio-rs/tokio/pull/922
+[#940]: https://github.com/tokio-rs/tokio/pull/940
+
# 0.1.15 (January 24, 2019)
### Added
+
- Re-export tokio-sync APIs ([#839]).
- Stream enumerate combinator ([#832]).
+[#832]: https://github.com/tokio-rs/tokio/pull/832
+[#839]: https://github.com/tokio-rs/tokio/pull/839
+
# 0.1.14 (January 6, 2019)
-* Use feature flags to break up the crate, allowing users to pick & choose
+- Use feature flags to break up the crate, allowing users to pick & choose
components ([#808]).
-* Export `UnixDatagram` and `UnixDatagramFramed` ([#772]).
+- Export `UnixDatagram` and `UnixDatagramFramed` ([#772]).
+
+[#772]: https://github.com/tokio-rs/tokio/pull/772
+[#808]: https://github.com/tokio-rs/tokio/pull/808
# 0.1.13 (November 21, 2018)
-* Fix `Runtime::reactor()` when no tasks are spawned ([#721]).
-* `runtime::Builder` no longer uses deprecated methods ([#749]).
-* Provide `after_start` and `before_stop` configuration settings for
+- Fix `Runtime::reactor()` when no tasks are spawned ([#721]).
+- `runtime::Builder` no longer uses deprecated methods ([#749]).
+- Provide `after_start` and `before_stop` configuration settings for
`Runtime` ([#756]).
-* Implement throttle stream combinator ([#736]).
+- Implement throttle stream combinator ([#736]).
+
+[#721]: https://github.com/tokio-rs/tokio/pull/721
+[#736]: https://github.com/tokio-rs/tokio/pull/736
+[#749]: https://github.com/tokio-rs/tokio/pull/749
+[#756]: https://github.com/tokio-rs/tokio/pull/756
# 0.1.12 (October 23, 2018)
-* runtime: expose `keep_alive` on runtime builder ([#676]).
-* runtime: create a reactor per worker thread ([#660]).
-* codec: fix panic in `LengthDelimitedCodec` ([#682]).
-* io: re-export `tokio_io::io::read` function ([#689]).
-* runtime: check for executor re-entry in more places ([#708]).
+- runtime: expose `keep_alive` on runtime builder ([#676]).
+- runtime: create a reactor per worker thread ([#660]).
+- codec: fix panic in `LengthDelimitedCodec` ([#682]).
+- io: re-export `tokio_io::io::read` function ([#689]).
+- runtime: check for executor re-entry in more places ([#708]).
+
+[#660]: https://github.com/tokio-rs/tokio/pull/660
+[#676]: https://github.com/tokio-rs/tokio/pull/676
+[#682]: https://github.com/tokio-rs/tokio/pull/682
+[#689]: https://github.com/tokio-rs/tokio/pull/689
+[#708]: https://github.com/tokio-rs/tokio/pull/708
# 0.1.11 (September 28, 2018)
-* Fix `tokio-async-await` dependency ([#675]).
+- Fix `tokio-async-await` dependency ([#675]).
+
+[#675]: https://github.com/tokio-rs/tokio/pull/675
# 0.1.10 (September 27, 2018)
-* Fix minimal versions
+- Fix minimal versions
# 0.1.9 (September 27, 2018)
-* Experimental async/await improvements ([#661]).
-* Re-export `TaskExecutor` from `tokio-current-thread` ([#652]).
-* Improve `Runtime` builder API ([#645]).
-* `tokio::run` panics when called from the context of an executor
+- Experimental async/await improvements ([#661]).
+- Re-export `TaskExecutor` from `tokio-current-thread` ([#652]).
+- Improve `Runtime` builder API ([#645]).
+- `tokio::run` panics when called from the context of an executor
([#646]).
-* Introduce `StreamExt` with a `timeout` helper ([#573]).
-* Move `length_delimited` into `tokio` ([#575]).
-* Re-organize `tokio::net` module ([#548]).
-* Re-export `tokio-current-thread::spawn` in current_thread runtime
+- Introduce `StreamExt` with a `timeout` helper ([#573]).
+- Move `length_delimited` into `tokio` ([#575]).
+- Re-organize `tokio::net` module ([#548]).
+- Re-export `tokio-current-thread::spawn` in current_thread runtime
([#579]).
+[#548]: https://github.com/tokio-rs/tokio/pull/548
+[#573]: https://github.com/tokio-rs/tokio/pull/573
+[#575]: https://github.com/tokio-rs/tokio/pull/575
+[#579]: https://github.com/tokio-rs/tokio/pull/579
+[#645]: https://github.com/tokio-rs/tokio/pull/645
+[#646]: https://github.com/tokio-rs/tokio/pull/646
+[#652]: https://github.com/tokio-rs/tokio/pull/652
+[#661]: https://github.com/tokio-rs/tokio/pull/661
+
# 0.1.8 (August 23, 2018)
-* Extract tokio::executor::current_thread to a sub crate ([#370])
-* Add `Runtime::block_on` ([#398])
-* Add `runtime::current_thread::block_on_all` ([#477])
-* Misc documentation improvements ([#450])
-* Implement `std::error::Error` for error types ([#501])
+- Extract tokio::executor::current_thread to a sub crate ([#370])
+- Add `Runtime::block_on` ([#398])
+- Add `runtime::current_thread::block_on_all` ([#477])
+- Misc documentation improvements ([#450])
+- Implement `std::error::Error` for error types ([#501])
+
+[#370]: https://github.com/tokio-rs/tokio/pull/370
+[#398]: https://github.com/tokio-rs/tokio/pull/398
+[#450]: https://github.com/tokio-rs/tokio/pull/450
+[#477]: https://github.com/tokio-rs/tokio/pull/477
+[#501]: https://github.com/tokio-rs/tokio/pull/501
# 0.1.7 (June 6, 2018)
-* Add `Runtime::block_on` for concurrent runtime ([#391]).
-* Provide handle to `current_thread::Runtime` that allows spawning tasks from
+- Add `Runtime::block_on` for concurrent runtime ([#391]).
+- Provide handle to `current_thread::Runtime` that allows spawning tasks from
other threads ([#340]).
-* Provide `clock::now()`, a configurable source of time ([#381]).
+- Provide `clock::now()`, a configurable source of time ([#381]).
+
+[#340]: https://github.com/tokio-rs/tokio/pull/340
+[#381]: https://github.com/tokio-rs/tokio/pull/381
+[#391]: https://github.com/tokio-rs/tokio/pull/391
# 0.1.6 (May 2, 2018)
-* Add asynchronous filesystem APIs ([#323]).
-* Add "current thread" runtime variant ([#308]).
-* `CurrentThread`: Expose inner `Park` instance.
-* Improve fairness of `CurrentThread` executor ([#313]).
+- Add asynchronous filesystem APIs ([#323]).
+- Add "current thread" runtime variant ([#308]).
+- `CurrentThread`: Expose inner `Park` instance.
+- Improve fairness of `CurrentThread` executor ([#313]).
+
+[#308]: https://github.com/tokio-rs/tokio/pull/308
+[#313]: https://github.com/tokio-rs/tokio/pull/313
+[#323]: https://github.com/tokio-rs/tokio/pull/323
# 0.1.5 (March 30, 2018)
-* Provide timer API ([#266])
+- Provide timer API ([#266])
+
+[#266]: https://github.com/tokio-rs/tokio/pull/266
# 0.1.4 (March 22, 2018)
-* Fix build on FreeBSD ([#218])
-* Shutdown the Runtime when the handle is dropped ([#214])
-* Set Runtime thread name prefix for worker threads ([#232])
-* Add builder for Runtime ([#234])
-* Extract TCP and UDP types into separate crates ([#224])
-* Optionally support futures 0.2.
+- Fix build on FreeBSD ([#218])
+- Shutdown the Runtime when the handle is dropped ([#214])
+- Set Runtime thread name prefix for worker threads ([#232])
+- Add builder for Runtime ([#234])
+- Extract TCP and UDP types into separate crates ([#224])
+- Optionally support futures 0.2.
+
+[#214]: https://github.com/tokio-rs/tokio/pull/214
+[#218]: https://github.com/tokio-rs/tokio/pull/218
+[#224]: https://github.com/tokio-rs/tokio/pull/224
+[#232]: https://github.com/tokio-rs/tokio/pull/232
+[#234]: https://github.com/tokio-rs/tokio/pull/234
# 0.1.3 (March 09, 2018)
-* Fix `CurrentThread::turn` to block on idle ([#212]).
+- Fix `CurrentThread::turn` to block on idle ([#212]).
+
+[#212]: https://github.com/tokio-rs/tokio/pull/212
# 0.1.2 (March 09, 2018)
-* Introduce Tokio Runtime ([#141])
-* Provide `CurrentThread` for more flexible usage of current thread executor ([#141]).
-* Add Lio for platforms that support it ([#142]).
-* I/O resources now lazily bind to the reactor ([#160]).
-* Extract Reactor to dedicated crate ([#169])
-* Add facade to sub crates and add prelude ([#166]).
-* Switch TCP/UDP fns to poll_ -> Poll<...> style ([#175])
+- Introduce Tokio Runtime ([#141])
+- Provide `CurrentThread` for more flexible usage of current thread executor ([#141]).
+- Add Lio for platforms that support it ([#142]).
+- I/O resources now lazily bind to the reactor ([#160]).
+- Extract Reactor to dedicated crate ([#169])
+- Add facade to sub crates and add prelude ([#166]).
+- Switch TCP/UDP fns to poll\_ -> Poll<...> style ([#175])
+
+[#141]: https://github.com/tokio-rs/tokio/pull/141
+[#142]: https://github.com/tokio-rs/tokio/pull/142
+[#160]: https://github.com/tokio-rs/tokio/pull/160
+[#166]: https://github.com/tokio-rs/tokio/pull/166
+[#169]: https://github.com/tokio-rs/tokio/pull/169
+[#175]: https://github.com/tokio-rs/tokio/pull/175
# 0.1.1 (February 09, 2018)
-* Doc fixes
+- Doc fixes
# 0.1.0 (February 07, 2018)
-* Initial crate released based on [RFC](https://github.com/tokio-rs/tokio-rfcs/pull/3).
-
-[#2375]: https://github.com/tokio-rs/tokio/pull/2375
-[#2362]: https://github.com/tokio-rs/tokio/pull/2362
-[#2358]: https://github.com/tokio-rs/tokio/pull/2358
-[#2354]: https://github.com/tokio-rs/tokio/pull/2354
-[#2335]: https://github.com/tokio-rs/tokio/pull/2335
-[#2333]: https://github.com/tokio-rs/tokio/pull/2333
-[#2325]: https://github.com/tokio-rs/tokio/pull/2325
-[#2321]: https://github.com/tokio-rs/tokio/pull/2321
-[#2300]: https://github.com/tokio-rs/tokio/pull/2300
-[#2285]: https://github.com/tokio-rs/tokio/pull/2285
-[#2281]: https://github.com/tokio-rs/tokio/pull/2281
-[#2275]: https://github.com/tokio-rs/tokio/pull/2275
-[#2274]: https://github.com/tokio-rs/tokio/pull/2274
-[#2273]: https://github.com/tokio-rs/tokio/pull/2273
-[#2253]: https://github.com/tokio-rs/tokio/pull/2253
-[#2250]: https://github.com/tokio-rs/tokio/pull/2250
-[#2245]: https://github.com/tokio-rs/tokio/pull/2245
-[#2239]: https://github.com/tokio-rs/tokio/pull/2239
-[#2238]: https://github.com/tokio-rs/tokio/pull/2238
-[#2227]: https://github.com/tokio-rs/tokio/pull/2227
-[#2218]: https://github.com/tokio-rs/tokio/pull/2218
-[#2217]: https://github.com/tokio-rs/tokio/pull/2217
-[#2210]: https://github.com/tokio-rs/tokio/pull/2210
-[#2205]: https://github.com/tokio-rs/tokio/pull/2205
-[#2204]: https://github.com/tokio-rs/tokio/pull/2204
-[#2191]: https://github.com/tokio-rs/tokio/pull/2191
-[#2186]: https://github.com/tokio-rs/tokio/pull/2186
-[#2185]: https://github.com/tokio-rs/tokio/pull/2185
-[#2184]: https://github.com/tokio-rs/tokio/pull/2184
-[#2177]: https://github.com/tokio-rs/tokio/pull/2177
-[#2169]: https://github.com/tokio-rs/tokio/pull/2169
-[#2168]: https://github.com/tokio-rs/tokio/pull/2168
-[#2164]: https://github.com/tokio-rs/tokio/pull/2164
-[#2163]: https://github.com/tokio-rs/tokio/pull/2163
-[#2158]: https://github.com/tokio-rs/tokio/pull/2158
-[#2152]: https://github.com/tokio-rs/tokio/pull/2152
-[#2151]: https://github.com/tokio-rs/tokio/pull/2151
-[#2149]: https://github.com/tokio-rs/tokio/pull/2149
-[#2145]: https://github.com/tokio-rs/tokio/pull/2145
-[#2139]: https://github.com/tokio-rs/tokio/pull/2139
-[#2135]: https://github.com/tokio-rs/tokio/pull/2135
-[#2126]: https://github.com/tokio-rs/tokio/pull/2126
-[#2125]: https://github.com/tokio-rs/tokio/pull/2125
-[#2122]: https://github.com/tokio-rs/tokio/pull/2122
-[#2119]: https://github.com/tokio-rs/tokio/pull/2119
-[#2118]: https://github.com/tokio-rs/tokio/pull/2118
-[#2109]: https://github.com/tokio-rs/tokio/pull/2109
-[#2108]: https://github.com/tokio-rs/tokio/pull/2108
-[#2094]: https://github.com/tokio-rs/tokio/pull/2094
-[#2093]: https://github.com/tokio-rs/tokio/pull/2093
-[#2092]: https://github.com/tokio-rs/tokio/pull/2092
-[#2091]: https://github.com/tokio-rs/tokio/pull/2091
-[#2089]: https://github.com/tokio-rs/tokio/pull/2089
-[#2085]: https://github.com/tokio-rs/tokio/pull/2085
-[#2079]: https://github.com/tokio-rs/tokio/pull/2079
-[#2059]: https://github.com/tokio-rs/tokio/pull/2059
-[#2052]: https://github.com/tokio-rs/tokio/pull/2052
-[#2051]: https://github.com/tokio-rs/tokio/pull/2051
-[#2045]: https://github.com/tokio-rs/tokio/pull/2045
-[#2044]: https://github.com/tokio-rs/tokio/pull/2044
-[#2040]: https://github.com/tokio-rs/tokio/pull/2040
-[#2035]: https://github.com/tokio-rs/tokio/pull/2035
-[#2034]: https://github.com/tokio-rs/tokio/pull/2034
-[#2030]: https://github.com/tokio-rs/tokio/pull/2030
-[#2029]: https://github.com/tokio-rs/tokio/pull/2029
-[#2025]: https://github.com/tokio-rs/tokio/pull/2025
-[#2022]: https://github.com/tokio-rs/tokio/pull/2022
-[#2014]: https://github.com/tokio-rs/tokio/pull/2014
-[#2012]: https://github.com/tokio-rs/tokio/pull/2012
-[#2011]: https://github.com/tokio-rs/tokio/pull/2011
-[#2006]: https://github.com/tokio-rs/tokio/pull/2006
-[#2005]: https://github.com/tokio-rs/tokio/pull/2005
-[#2001]: https://github.com/tokio-rs/tokio/pull/2001
-[#1991]: https://github.com/tokio-rs/tokio/pull/1991
-[#1986]: https://github.com/tokio-rs/tokio/pull/1986
-[#1978]: https://github.com/tokio-rs/tokio/pull/1978
-[#1977]: https://github.com/tokio-rs/tokio/pull/1977
-[#1975]: https://github.com/tokio-rs/tokio/pull/1975
-[#1973]: https://github.com/tokio-rs/tokio/pull/1973
-[#1972]: https://github.com/tokio-rs/tokio/pull/1972
-[#1971]: https://github.com/tokio-rs/tokio/pull/1971
-[#1962]: https://github.com/tokio-rs/tokio/pull/1962
-[#1961]: https://github.com/tokio-rs/tokio/pull/1961
-[#1956]: https://github.com/tokio-rs/tokio/pull/1956
-[#1949]: https://github.com/tokio-rs/tokio/pull/1949
-[#1943]: https://github.com/tokio-rs/tokio/pull/1943
-[#1939]: https://github.com/tokio-rs/tokio/pull/1939
-[#1924]: https://github.com/tokio-rs/tokio/pull/1924
-[#1905]: https://github.com/tokio-rs/tokio/pull/1905
-[#1904]: https://github.com/tokio-rs/tokio/pull/1904
-[#1898]: https://github.com/tokio-rs/tokio/pull/1898
-[#1892]: https://github.com/tokio-rs/tokio/pull/1892
-[#1888]: https://github.com/tokio-rs/tokio/pull/1888
-[#1882]: https://github.com/tokio-rs/tokio/pull/1882
-[#1881]: https://github.com/tokio-rs/tokio/pull/1881
-[#1875]: https://github.com/tokio-rs/tokio/pull/1875
-[#1874]: https://github.com/tokio-rs/tokio/pull/1874
-[#1870]: https://github.com/tokio-rs/tokio/pull/1870
-[#1864]: https://github.com/tokio-rs/tokio/pull/1864
-[#1863]: https://github.com/tokio-rs/tokio/pull/1863
-[#1861]: https://github.com/tokio-rs/tokio/pull/1861
-[#1858]: https://github.com/tokio-rs/tokio/pull/1858
-[#1856]: https://github.com/tokio-rs/tokio/pull/1856
-[#1854]: https://github.com/tokio-rs/tokio/pull/1854
-[#1849]: https://github.com/tokio-rs/tokio/pull/1849
-[#1843]: https://github.com/tokio-rs/tokio/pull/1843
-[#1841]: https://github.com/tokio-rs/tokio/pull/1841
-[#1839]: https://github.com/tokio-rs/tokio/pull/1839
-[#1834]: https://github.com/tokio-rs/tokio/pull/1834
-[#1831]: https://github.com/tokio-rs/tokio/pull/1831
-[#1827]: https://github.com/tokio-rs/tokio/pull/1827
-[#1826]: https://github.com/tokio-rs/tokio/pull/1826
-[#1772]: https://github.com/tokio-rs/tokio/pull/1772
-[#1755]: https://github.com/tokio-rs/tokio/pull/1755
-[#1733]: https://github.com/tokio-rs/tokio/pull/1733
-[#1699]: https://github.com/tokio-rs/tokio/pull/1699
-[#1111]: https://github.com/tokio-rs/tokio/pull/1111
-[#1055]: https://github.com/tokio-rs/tokio/pull/1055
-[#993]: https://github.com/tokio-rs/tokio/pull/993
-[#966]: https://github.com/tokio-rs/tokio/pull/966
-[#964]: https://github.com/tokio-rs/tokio/pull/964
-[#940]: https://github.com/tokio-rs/tokio/pull/940
-[#922]: https://github.com/tokio-rs/tokio/pull/922
-[#896]: https://github.com/tokio-rs/tokio/pull/896
-[#839]: https://github.com/tokio-rs/tokio/pull/839
-[#832]: https://github.com/tokio-rs/tokio/pull/832
-[#808]: https://github.com/tokio-rs/tokio/pull/808
-[#772]: https://github.com/tokio-rs/tokio/pull/772
-[#756]: https://github.com/tokio-rs/tokio/pull/756
-[#749]: https://github.com/tokio-rs/tokio/pull/749
-[#736]: https://github.com/tokio-rs/tokio/pull/736
-[#721]: https://github.com/tokio-rs/tokio/pull/721
-[#708]: https://github.com/tokio-rs/tokio/pull/708
-[#689]: https://github.com/tokio-rs/tokio/pull/689
-[#682]: https://github.com/tokio-rs/tokio/pull/682
-[#676]: https://github.com/tokio-rs/tokio/pull/676
-[#675]: https://github.com/tokio-rs/tokio/pull/675
-[#661]: https://github.com/tokio-rs/tokio/pull/661
-[#660]: https://github.com/tokio-rs/tokio/pull/660
-[#652]: https://github.com/tokio-rs/tokio/pull/652
-[#646]: https://github.com/tokio-rs/tokio/pull/646
-[#645]: https://github.com/tokio-rs/tokio/pull/645
-[#579]: https://github.com/tokio-rs/tokio/pull/579
-[#575]: https://github.com/tokio-rs/tokio/pull/575
-[#573]: https://github.com/tokio-rs/tokio/pull/573
-[#548]: https://github.com/tokio-rs/tokio/pull/548
-[#501]: https://github.com/tokio-rs/tokio/pull/501
-[#477]: https://github.com/tokio-rs/tokio/pull/477
-[#450]: https://github.com/tokio-rs/tokio/pull/450
-[#398]: https://github.com/tokio-rs/tokio/pull/398
-[#391]: https://github.com/tokio-rs/tokio/pull/391
-[#381]: https://github.com/tokio-rs/tokio/pull/381
-[#370]: https://github.com/tokio-rs/tokio/pull/370
-[#340]: https://github.com/tokio-rs/tokio/pull/340
-[#323]: https://github.com/tokio-rs/tokio/pull/323
-[#313]: https://github.com/tokio-rs/tokio/pull/313
-[#308]: https://github.com/tokio-rs/tokio/pull/308
-[#266]: https://github.com/tokio-rs/tokio/pull/266
-[#234]: https://github.com/tokio-rs/tokio/pull/234
-[#232]: https://github.com/tokio-rs/tokio/pull/232
-[#224]: https://github.com/tokio-rs/tokio/pull/224
-[#218]: https://github.com/tokio-rs/tokio/pull/218
-[#214]: https://github.com/tokio-rs/tokio/pull/214
-[#212]: https://github.com/tokio-rs/tokio/pull/212
-[#175]: https://github.com/tokio-rs/tokio/pull/175
-[#169]: https://github.com/tokio-rs/tokio/pull/169
-[#166]: https://github.com/tokio-rs/tokio/pull/166
-[#160]: https://github.com/tokio-rs/tokio/pull/160
-[#142]: https://github.com/tokio-rs/tokio/pull/142
-[#141]: https://github.com/tokio-rs/tokio/pull/141
+- Initial crate released based on [RFC](https://github.com/tokio-rs/tokio-rfcs/pull/3).
diff --git a/Cargo.toml b/Cargo.toml
index 8e9ebcb..9f23031 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,11 +13,11 @@
[package]
edition = "2018"
name = "tokio"
-version = "1.0.2"
+version = "1.2.0"
authors = ["Tokio Contributors <team@tokio.rs>"]
description = "An event-driven, non-blocking I/O platform for writing asynchronous I/O\nbacked applications.\n"
homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio/1.0.2/tokio/"
+documentation = "https://docs.rs/tokio/1.2.0/tokio/"
readme = "README.md"
keywords = ["io", "async", "non-blocking", "futures"]
categories = ["asynchronous", "network-programming"]
@@ -57,7 +57,7 @@ optional = true
version = "0.2.0"
[dependencies.tokio-macros]
-version = "1.0.0"
+version = "1.1.0"
optional = true
[dev-dependencies.async-stream]
version = "0.3"
@@ -69,6 +69,9 @@ features = ["async-await"]
[dev-dependencies.proptest]
version = "0.10.0"
+[dev-dependencies.rand]
+version = "0.8.0"
+
[dev-dependencies.tempfile]
version = "3.1.0"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index f950a28..33c371c 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -8,12 +8,12 @@ name = "tokio"
# - README.md
# - Update CHANGELOG.md.
# - Create "v1.0.x" git tag.
-version = "1.0.2"
+version = "1.2.0"
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
readme = "README.md"
-documentation = "https://docs.rs/tokio/1.0.2/tokio/"
+documentation = "https://docs.rs/tokio/1.2.0/tokio/"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
description = """
@@ -86,7 +86,7 @@ test-util = []
time = []
[dependencies]
-tokio-macros = { version = "1.0.0", path = "../tokio-macros", optional = true }
+tokio-macros = { version = "1.1.0", path = "../tokio-macros", optional = true }
pin-project-lite = "0.2.0"
@@ -121,6 +121,7 @@ tokio-test = { version = "0.4.0", path = "../tokio-test" }
tokio-stream = { version = "0.1", path = "../tokio-stream" }
futures = { version = "0.3.0", features = ["async-await"] }
proptest = "0.10.0"
+rand = "0.8.0"
tempfile = "3.1.0"
async-stream = "0.3"
diff --git a/METADATA b/METADATA
index b1878fb..3f041be 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/tokio/tokio-1.0.2.crate"
+ value: "https://static.crates.io/crates/tokio/tokio-1.2.0.crate"
}
- version: "1.0.2"
+ version: "1.2.0"
license_type: NOTICE
last_upgrade_date {
year: 2021
- month: 1
- day: 14
+ month: 2
+ day: 9
}
}
diff --git a/src/fs/read_dir.rs b/src/fs/read_dir.rs
index 7b21c9c..aedaf7b 100644
--- a/src/fs/read_dir.rs
+++ b/src/fs/read_dir.rs
@@ -20,12 +20,15 @@ pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> {
Ok(ReadDir(State::Idle(Some(std))))
}
-/// Stream of the entries in a directory.
+/// Read the the entries in a directory.
///
-/// This stream is returned from the [`read_dir`] function of this module and
-/// will yield instances of [`DirEntry`]. Through a [`DirEntry`]
-/// information like the entry's path and possibly other metadata can be
-/// learned.
+/// This struct is returned from the [`read_dir`] function of this module and
+/// will yield instances of [`DirEntry`]. Through a [`DirEntry`] information
+/// like the entry's path and possibly other metadata can be learned.
+///
+/// A `ReadDir` can be turned into a `Stream` with [`ReadDirStream`].
+///
+/// [`ReadDirStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReadDirStream.html
///
/// # Errors
///
diff --git a/src/io/async_fd.rs b/src/io/async_fd.rs
index 08d7b91..13f4f2d 100644
--- a/src/io/async_fd.rs
+++ b/src/io/async_fd.rs
@@ -23,9 +23,11 @@ use std::{task::Context, task::Poll};
/// to retake control from the tokio IO reactor.
///
/// The inner object is required to implement [`AsRawFd`]. This file descriptor
-/// must not change while [`AsyncFd`] owns the inner object. Changing the file
-/// descriptor results in unspecified behavior in the IO driver, which may
-/// include breaking notifications for other sockets/etc.
+/// must not change while [`AsyncFd`] owns the inner object, i.e. the
+/// [`AsRawFd::as_raw_fd`] method on the inner type must always return the same
+/// file descriptor when called multiple times. Failure to uphold this results
+/// in unspecified behavior in the IO driver, which may include breaking
+/// notifications for other sockets/etc.
///
/// Polling for readiness is done by calling the async functions [`readable`]
/// and [`writable`]. These functions complete when the associated readiness
diff --git a/src/io/blocking.rs b/src/io/blocking.rs
index 430801e..94a3484 100644
--- a/src/io/blocking.rs
+++ b/src/io/blocking.rs
@@ -175,7 +175,7 @@ where
}
}
-/// Repeates operations that are interrupted
+/// Repeats operations that are interrupted
macro_rules! uninterruptibly {
($e:expr) => {{
loop {
diff --git a/src/io/driver/mod.rs b/src/io/driver/mod.rs
index a1784df..fa2d420 100644
--- a/src/io/driver/mod.rs
+++ b/src/io/driver/mod.rs
@@ -259,8 +259,7 @@ cfg_rt! {
/// This function panics if there is no current reactor set and `rt` feature
/// flag is not enabled.
pub(super) fn current() -> Self {
- crate::runtime::context::io_handle()
- .expect("there is no reactor running, must be called from the context of Tokio runtime")
+ crate::runtime::context::io_handle().expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.")
}
}
}
@@ -274,7 +273,7 @@ cfg_not_rt! {
/// This function panics if there is no current reactor set, or if the `rt`
/// feature flag is not enabled.
pub(super) fn current() -> Self {
- panic!("there is no reactor running, must be called from the context of Tokio runtime with `rt` enabled.")
+ panic!(crate::util::error::CONTEXT_MISSING_ERROR)
}
}
}
diff --git a/src/io/driver/registration.rs b/src/io/driver/registration.rs
index 9312581..1451224 100644
--- a/src/io/driver/registration.rs
+++ b/src/io/driver/registration.rs
@@ -205,6 +205,19 @@ impl Registration {
}
}
+impl Drop for Registration {
+ fn drop(&mut self) {
+ // It is possible for a cycle to be created between wakers stored in
+ // `ScheduledIo` instances and `Arc<driver::Inner>`. To break this
+ // cycle, wakers are cleared. This is an imperfect solution as it is
+ // possible to store a `Registration` in a waker. In this case, the
+ // cycle would remain.
+ //
+ // See tokio-rs/tokio#3481 for more details.
+ self.shared.clear_wakers();
+ }
+}
+
fn gone() -> io::Error {
io::Error::new(io::ErrorKind::Other, "IO driver has terminated")
}
diff --git a/src/io/driver/scheduled_io.rs b/src/io/driver/scheduled_io.rs
index 75d5623..71864b3 100644
--- a/src/io/driver/scheduled_io.rs
+++ b/src/io/driver/scheduled_io.rs
@@ -355,6 +355,12 @@ impl ScheduledIo {
// result isn't important
let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed);
}
+
+ pub(crate) fn clear_wakers(&self) {
+ let mut waiters = self.waiters.lock();
+ waiters.reader.take();
+ waiters.writer.take();
+ }
}
impl Drop for ScheduledIo {
diff --git a/src/io/poll_evented.rs b/src/io/poll_evented.rs
index 0ecdb18..27a4cb7 100644
--- a/src/io/poll_evented.rs
+++ b/src/io/poll_evented.rs
@@ -126,7 +126,7 @@ impl<E: Source> PollEvented<E> {
}
/// Deregister the inner io from the registration and returns a Result containing the inner io
- #[cfg(feature = "net")]
+ #[cfg(any(feature = "net", feature = "process"))]
pub(crate) fn into_inner(mut self) -> io::Result<E> {
let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here.
self.registration.deregister(&mut inner)?;
diff --git a/src/io/read_buf.rs b/src/io/read_buf.rs
index 486b69b..38e857d 100644
--- a/src/io/read_buf.rs
+++ b/src/io/read_buf.rs
@@ -113,6 +113,25 @@ impl<'a> ReadBuf<'a> {
unsafe { mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(slice) }
}
+ /// Returns a mutable reference to the entire buffer, without ensuring that it has been fully
+ /// initialized.
+ ///
+ /// The elements between 0 and `self.filled().len()` are filled, and those between 0 and
+ /// `self.initialized().len()` are initialized (and so can be transmuted to a `&mut [u8]`).
+ ///
+ /// The caller of this method must ensure that these invariants are upheld. For example, if the
+ /// caller initializes some of the uninitialized section of the buffer, it must call
+ /// [`assume_init`](Self::assume_init) with the number of bytes initialized.
+ ///
+ /// # Safety
+ ///
+ /// The caller must not de-initialize portions of the buffer that have already been initialized.
+ /// This includes any bytes in the region marked as uninitialized by `ReadBuf`.
+ #[inline]
+ pub unsafe fn inner_mut(&mut self) -> &mut [MaybeUninit<u8>] {
+ self.buf
+ }
+
/// Returns a mutable reference to the unfilled part of the buffer without ensuring that it has been fully
/// initialized.
///
diff --git a/src/io/split.rs b/src/io/split.rs
index fd3273e..732eb3b 100644
--- a/src/io/split.rs
+++ b/src/io/split.rs
@@ -131,7 +131,11 @@ impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
impl<T> Inner<T> {
fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<Guard<'_, T>> {
- if !self.locked.compare_and_swap(false, true, Acquire) {
+ if self
+ .locked
+ .compare_exchange(false, true, Acquire, Acquire)
+ .is_ok()
+ {
Poll::Ready(Guard { inner: self })
} else {
// Spin... but investigate a better strategy
diff --git a/src/io/util/lines.rs b/src/io/util/lines.rs
index 25df78e..ed6a944 100644
--- a/src/io/util/lines.rs
+++ b/src/io/util/lines.rs
@@ -8,7 +8,15 @@ use std::pin::Pin;
use std::task::{Context, Poll};
pin_project! {
- /// Stream for the [`lines`](crate::io::AsyncBufReadExt::lines) method.
+ /// Read lines from an [`AsyncBufRead`].
+ ///
+ /// A `Lines` can be turned into a `Stream` with [`LinesStream`].
+ ///
+ /// This type is usually created using the [`lines`] method.
+ ///
+ /// [`AsyncBufRead`]: crate::io::AsyncBufRead
+ /// [`LinesStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.LinesStream.html
+ /// [`lines`]: crate::io::AsyncBufReadExt::lines
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
diff --git a/src/io/util/mod.rs b/src/io/util/mod.rs
index e75ea03..e06e7e2 100644
--- a/src/io/util/mod.rs
+++ b/src/io/util/mod.rs
@@ -48,6 +48,7 @@ cfg_io_util! {
mod read_line;
mod read_to_end;
+ mod vec_with_initialized;
cfg_process! {
pub(crate) use read_to_end::read_to_end;
}
@@ -82,6 +83,7 @@ cfg_io_util! {
cfg_not_io_util! {
cfg_process! {
+ mod vec_with_initialized;
mod read_to_end;
// Used by process
pub(crate) use read_to_end::read_to_end;
diff --git a/src/io/util/read_to_end.rs b/src/io/util/read_to_end.rs
index 1aee681..f4a564d 100644
--- a/src/io/util/read_to_end.rs
+++ b/src/io/util/read_to_end.rs
@@ -1,10 +1,11 @@
-use crate::io::{AsyncRead, ReadBuf};
+use crate::io::util::vec_with_initialized::{into_read_buf_parts, VecU8, VecWithInitialized};
+use crate::io::AsyncRead;
use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
use std::marker::PhantomPinned;
-use std::mem::{self, MaybeUninit};
+use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -13,7 +14,7 @@ pin_project! {
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadToEnd<'a, R: ?Sized> {
reader: &'a mut R,
- buf: &'a mut Vec<u8>,
+ buf: VecWithInitialized<&'a mut Vec<u8>>,
// The number of bytes appended to buf. This can be less than buf.len() if
// the buffer was not empty when the operation was started.
read: usize,
@@ -29,20 +30,19 @@ where
{
ReadToEnd {
reader,
- buf: buffer,
+ buf: VecWithInitialized::new(buffer),
read: 0,
_pin: PhantomPinned,
}
}
-pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
- buf: &mut Vec<u8>,
+pub(super) fn read_to_end_internal<V: VecU8, R: AsyncRead + ?Sized>(
+ buf: &mut VecWithInitialized<V>,
mut reader: Pin<&mut R>,
num_read: &mut usize,
cx: &mut Context<'_>,
) -> Poll<io::Result<usize>> {
loop {
- // safety: The caller promised to prepare the buffer.
let ret = ready!(poll_read_to_end(buf, reader.as_mut(), cx));
match ret {
Err(err) => return Poll::Ready(Err(err)),
@@ -57,8 +57,8 @@ pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
/// Tries to read from the provided AsyncRead.
///
/// The length of the buffer is increased by the number of bytes read.
-fn poll_read_to_end<R: AsyncRead + ?Sized>(
- buf: &mut Vec<u8>,
+fn poll_read_to_end<V: VecU8, R: AsyncRead + ?Sized>(
+ buf: &mut VecWithInitialized<V>,
read: Pin<&mut R>,
cx: &mut Context<'_>,
) -> Poll<io::Result<usize>> {
@@ -68,37 +68,34 @@ fn poll_read_to_end<R: AsyncRead + ?Sized>(
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return.
- reserve(buf, 32);
+ buf.reserve(32);
- let mut unused_capacity = ReadBuf::uninit(get_unused_capacity(buf));
+ // Get a ReadBuf into the vector.
+ let mut read_buf = buf.get_read_buf();
- let ptr = unused_capacity.filled().as_ptr();
- ready!(read.poll_read(cx, &mut unused_capacity))?;
- assert_eq!(ptr, unused_capacity.filled().as_ptr());
+ let filled_before = read_buf.filled().len();
+ let poll_result = read.poll_read(cx, &mut read_buf);
+ let filled_after = read_buf.filled().len();
+ let n = filled_after - filled_before;
- let n = unused_capacity.filled().len();
- let new_len = buf.len() + n;
+ // Update the length of the vector using the result of poll_read.
+ let read_buf_parts = into_read_buf_parts(read_buf);
+ buf.apply_read_buf(read_buf_parts);
- assert!(new_len <= buf.capacity());
- unsafe {
- buf.set_len(new_len);
- }
- Poll::Ready(Ok(n))
-}
-
-/// Allocates more memory and ensures that the unused capacity is prepared for use
-/// with the `AsyncRead`.
-fn reserve(buf: &mut Vec<u8>, bytes: usize) {
- if buf.capacity() - buf.len() >= bytes {
- return;
+ match poll_result {
+ Poll::Pending => {
+ // In this case, nothing should have been read. However we still
+ // update the vector in case the poll_read call initialized parts of
+ // the vector's unused capacity.
+ debug_assert_eq!(filled_before, filled_after);
+ Poll::Pending
+ }
+ Poll::Ready(Err(err)) => {
+ debug_assert_eq!(filled_before, filled_after);
+ Poll::Ready(Err(err))
+ }
+ Poll::Ready(Ok(())) => Poll::Ready(Ok(n)),
}
- buf.reserve(bytes);
-}
-
-/// Returns the unused capacity of the provided vector.
-fn get_unused_capacity(buf: &mut Vec<u8>) -> &mut [MaybeUninit<u8>] {
- let uninit = bytes::BufMut::chunk_mut(buf);
- unsafe { &mut *(uninit as *mut _ as *mut [MaybeUninit<u8>]) }
}
impl<A> Future for ReadToEnd<'_, A>
diff --git a/src/io/util/read_to_string.rs b/src/io/util/read_to_string.rs
index e463203..2c17383 100644
--- a/src/io/util/read_to_string.rs
+++ b/src/io/util/read_to_string.rs
@@ -1,5 +1,6 @@
use crate::io::util::read_line::finish_string_read;
use crate::io::util::read_to_end::read_to_end_internal;
+use crate::io::util::vec_with_initialized::VecWithInitialized;
use crate::io::AsyncRead;
use pin_project_lite::pin_project;
@@ -19,7 +20,7 @@ pin_project! {
// while reading to postpone utf-8 handling until after reading.
output: &'a mut String,
// The actual allocation of the string is moved into this vector instead.
- buf: Vec<u8>,
+ buf: VecWithInitialized<Vec<u8>>,
// The number of bytes appended to buf. This can be less than buf.len() if
// the buffer was not empty when the operation was started.
read: usize,
@@ -39,27 +40,22 @@ where
let buf = mem::replace(string, String::new()).into_bytes();
ReadToString {
reader,
- buf,
+ buf: VecWithInitialized::new(buf),
output: string,
read: 0,
_pin: PhantomPinned,
}
}
-/// # Safety
-///
-/// Before first calling this method, the unused capacity must have been
-/// prepared for use with the provided AsyncRead. This can be done using the
-/// `prepare_buffer` function in `read_to_end.rs`.
-unsafe fn read_to_string_internal<R: AsyncRead + ?Sized>(
+fn read_to_string_internal<R: AsyncRead + ?Sized>(
reader: Pin<&mut R>,
output: &mut String,
- buf: &mut Vec<u8>,
+ buf: &mut VecWithInitialized<Vec<u8>>,
read: &mut usize,
cx: &mut Context<'_>,
) -> Poll<io::Result<usize>> {
let io_res = ready!(read_to_end_internal(buf, reader, read, cx));
- let utf8_res = String::from_utf8(mem::replace(buf, Vec::new()));
+ let utf8_res = String::from_utf8(buf.take());
// At this point both buf and output are empty. The allocation is in utf8_res.
@@ -77,7 +73,6 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
- // safety: The constructor of ReadToString called `prepare_buffer`.
- unsafe { read_to_string_internal(Pin::new(*me.reader), me.output, me.buf, me.read, cx) }
+ read_to_string_internal(Pin::new(*me.reader), me.output, me.buf, me.read, cx)
}
}
diff --git a/src/io/util/split.rs b/src/io/util/split.rs
index eb82865..4f3ce4e 100644
--- a/src/io/util/split.rs
+++ b/src/io/util/split.rs
@@ -8,7 +8,11 @@ use std::pin::Pin;
use std::task::{Context, Poll};
pin_project! {
- /// Stream for the [`split`](crate::io::AsyncBufReadExt::split) method.
+ /// Splitter for the [`split`](crate::io::AsyncBufReadExt::split) method.
+ ///
+ /// A `Split` can be turned into a `Stream` with [`SplitStream`].
+ ///
+ /// [`SplitStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.SplitStream.html
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
diff --git a/src/io/util/vec_with_initialized.rs b/src/io/util/vec_with_initialized.rs
new file mode 100644
index 0000000..208cc93
--- /dev/null
+++ b/src/io/util/vec_with_initialized.rs
@@ -0,0 +1,132 @@
+use crate::io::ReadBuf;
+use std::mem::MaybeUninit;
+
+mod private {
+ pub trait Sealed {}
+
+ impl Sealed for Vec<u8> {}
+ impl Sealed for &mut Vec<u8> {}
+}
+
+/// A sealed trait that constrains the generic type parameter in `VecWithInitialized<V>`. That struct's safety relies
+/// on certain invariants upheld by `Vec<u8>`.
+pub(crate) trait VecU8: AsMut<Vec<u8>> + private::Sealed {}
+
+impl VecU8 for Vec<u8> {}
+impl VecU8 for &mut Vec<u8> {}
+/// This struct wraps a `Vec<u8>` or `&mut Vec<u8>`, combining it with a
+/// `num_initialized`, which keeps track of the number of initialized bytes
+/// in the unused capacity.
+///
+/// The purpose of this struct is to remember how many bytes were initialized
+/// through a `ReadBuf` from call to call.
+///
+/// This struct has the safety invariant that the first `num_initialized` of the
+/// vector's allocation must be initialized at any time.
+#[derive(Debug)]
+pub(crate) struct VecWithInitialized<V> {
+ vec: V,
+ // The number of initialized bytes in the vector.
+ // Always between `vec.len()` and `vec.capacity()`.
+ num_initialized: usize,
+}
+
+impl VecWithInitialized<Vec<u8>> {
+ #[cfg(feature = "io-util")]
+ pub(crate) fn take(&mut self) -> Vec<u8> {
+ self.num_initialized = 0;
+ std::mem::take(&mut self.vec)
+ }
+}
+
+impl<V> VecWithInitialized<V>
+where
+ V: VecU8,
+{
+ pub(crate) fn new(mut vec: V) -> Self {
+ // SAFETY: The safety invariants of vector guarantee that the bytes up
+ // to its length are initialized.
+ Self {
+ num_initialized: vec.as_mut().len(),
+ vec,
+ }
+ }
+
+ pub(crate) fn reserve(&mut self, num_bytes: usize) {
+ let vec = self.vec.as_mut();
+ if vec.capacity() - vec.len() >= num_bytes {
+ return;
+ }
+ // SAFETY: Setting num_initialized to `vec.len()` is correct as
+ // `reserve` does not change the length of the vector.
+ self.num_initialized = vec.len();
+ vec.reserve(num_bytes);
+ }
+
+ #[cfg(feature = "io-util")]
+ pub(crate) fn is_empty(&mut self) -> bool {
+ self.vec.as_mut().is_empty()
+ }
+
+ pub(crate) fn get_read_buf<'a>(&'a mut self) -> ReadBuf<'a> {
+ let num_initialized = self.num_initialized;
+
+ // SAFETY: Creating the slice is safe because of the safety invariants
+ // on Vec<u8>. The safety invariants of `ReadBuf` will further guarantee
+ // that no bytes in the slice are de-initialized.
+ let vec = self.vec.as_mut();
+ let len = vec.len();
+ let cap = vec.capacity();
+ let ptr = vec.as_mut_ptr().cast::<MaybeUninit<u8>>();
+ let slice = unsafe { std::slice::from_raw_parts_mut::<'a, MaybeUninit<u8>>(ptr, cap) };
+
+ // SAFETY: This is safe because the safety invariants of
+ // VecWithInitialized say that the first num_initialized bytes must be
+ // initialized.
+ let mut read_buf = ReadBuf::uninit(slice);
+ unsafe {
+ read_buf.assume_init(num_initialized);
+ }
+ read_buf.set_filled(len);
+
+ read_buf
+ }
+
+ pub(crate) fn apply_read_buf(&mut self, parts: ReadBufParts) {
+ let vec = self.vec.as_mut();
+ assert_eq!(vec.as_ptr(), parts.ptr);
+
+ // SAFETY:
+ // The ReadBufParts really does point inside `self.vec` due to the above
+ // check, and the safety invariants of `ReadBuf` guarantee that the
+ // first `parts.initialized` bytes of `self.vec` really have been
+ // initialized. Additionally, `ReadBuf` guarantees that `parts.len` is
+ // at most `parts.initialized`, so the first `parts.len` bytes are also
+ // initialized.
+ //
+ // Note that this relies on the fact that `V` is either `Vec<u8>` or
+ // `&mut Vec<u8>`, so the vector returned by `self.vec.as_mut()` cannot
+ // change from call to call.
+ unsafe {
+ self.num_initialized = parts.initialized;
+ vec.set_len(parts.len);
+ }
+ }
+}
+
+pub(crate) struct ReadBufParts {
+ // Pointer is only used to check that the ReadBuf actually came from the
+ // right VecWithInitialized.
+ ptr: *const u8,
+ len: usize,
+ initialized: usize,
+}
+
+// This is needed to release the borrow on `VecWithInitialized<V>`.
+pub(crate) fn into_read_buf_parts(rb: ReadBuf<'_>) -> ReadBufParts {
+ ReadBufParts {
+ ptr: rb.filled().as_ptr(),
+ len: rb.filled().len(),
+ initialized: rb.initialized().len(),
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index 7b098c7..46f1b84 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,4 +1,4 @@
-#![doc(html_root_url = "https://docs.rs/tokio/1.0.2")]
+#![doc(html_root_url = "https://docs.rs/tokio/1.2.0")]
#![allow(
clippy::cognitive_complexity,
clippy::large_enum_variant,
@@ -301,7 +301,7 @@
//! Beware though that this will pull in many extra dependencies that you may not
//! need.
//!
-//! - `full`: Enables all Tokio public API features listed below.
+//! - `full`: Enables all Tokio public API features listed below except `test-util`.
//! - `rt`: Enables `tokio::spawn`, the basic (current thread) scheduler,
//! and non-scheduler utilities.
//! - `rt-multi-thread`: Enables the heavier, multi-threaded, work-stealing scheduler.
diff --git a/src/loom/std/atomic_u64.rs b/src/loom/std/atomic_u64.rs
index 206954f..a86a195 100644
--- a/src/loom/std/atomic_u64.rs
+++ b/src/loom/std/atomic_u64.rs
@@ -24,8 +24,8 @@ mod imp {
}
impl AtomicU64 {
- pub(crate) fn new(val: u64) -> AtomicU64 {
- AtomicU64 {
+ pub(crate) fn new(val: u64) -> Self {
+ Self {
inner: Mutex::new(val),
}
}
@@ -45,16 +45,31 @@ mod imp {
prev
}
- pub(crate) fn compare_and_swap(&self, old: u64, new: u64, _: Ordering) -> u64 {
+ pub(crate) fn compare_exchange(
+ &self,
+ current: u64,
+ new: u64,
+ _success: Ordering,
+ _failure: Ordering,
+ ) -> Result<u64, u64> {
let mut lock = self.inner.lock().unwrap();
- let prev = *lock;
- if prev != old {
- return prev;
+ if *lock == current {
+ *lock = new;
+ Ok(current)
+ } else {
+ Err(*lock)
}
+ }
- *lock = new;
- prev
+ pub(crate) fn compare_exchange_weak(
+ &self,
+ current: u64,
+ new: u64,
+ success: Ordering,
+ failure: Ordering,
+ ) -> Result<u64, u64> {
+ self.compare_exchange(current, new, success, failure)
}
}
}
diff --git a/src/loom/std/mod.rs b/src/loom/std/mod.rs
index c3f74ef..b29cbee 100644
--- a/src/loom/std/mod.rs
+++ b/src/loom/std/mod.rs
@@ -74,7 +74,10 @@ pub(crate) mod sync {
pub(crate) use crate::loom::std::atomic_u8::AtomicU8;
pub(crate) use crate::loom::std::atomic_usize::AtomicUsize;
- pub(crate) use std::sync::atomic::{fence, spin_loop_hint, AtomicBool, Ordering};
+ pub(crate) use std::sync::atomic::{fence, AtomicBool, Ordering};
+ // TODO: once we bump MSRV to 1.49+, use `hint::spin_loop` instead.
+ #[allow(deprecated)]
+ pub(crate) use std::sync::atomic::spin_loop_hint;
}
}
diff --git a/src/macros/select.rs b/src/macros/select.rs
index ca4f963..aa29105 100644
--- a/src/macros/select.rs
+++ b/src/macros/select.rs
@@ -446,7 +446,7 @@ macro_rules! select {
(@ { $($t:tt)* } ) => {
// No `else` branch
- $crate::select!(@{ $($t)*; unreachable!() })
+ $crate::select!(@{ $($t)*; panic!("all branches are disabled and there is no else branch") })
};
(@ { $($t:tt)* } else => $else:expr $(,)?) => {
$crate::select!(@{ $($t)*; $else })
diff --git a/src/net/addr.rs b/src/net/addr.rs
index 7cbe531..ec4fa19 100644
--- a/src/net/addr.rs
+++ b/src/net/addr.rs
@@ -8,8 +8,6 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV
/// # DNS
///
/// Implementations of `ToSocketAddrs` for string types require a DNS lookup.
-/// These implementations are only provided when Tokio is used with the
-/// **`net`** feature flag.
///
/// # Calling
///
diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs
index a2a8637..1ff0949 100644
--- a/src/net/tcp/listener.rs
+++ b/src/net/tcp/listener.rs
@@ -14,6 +14,10 @@ cfg_net! {
/// You can accept a new connection by using the [`accept`](`TcpListener::accept`)
/// method.
///
+ /// A `TcpListener` can be turned into a `Stream` with [`TcpListenerStream`].
+ ///
+ /// [`TcpListenerStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.TcpListenerStream.html
+ ///
/// # Errors
///
/// Note that accepting a connection can lead to various errors and not all
@@ -60,9 +64,6 @@ impl TcpListener {
/// method.
///
/// The address type can be any implementor of the [`ToSocketAddrs`] trait.
- /// Note that strings only implement this trait when the **`net`** feature
- /// is enabled, as strings may contain domain names that need to be resolved.
- ///
/// If `addr` yields multiple addresses, bind will be attempted with each of
/// the addresses until one succeeds and returns the listener. If none of
/// the addresses succeed in creating a listener, the error returned from
@@ -70,7 +71,11 @@ impl TcpListener {
///
/// This function sets the `SO_REUSEADDR` option on the socket.
///
+ /// To configure the socket before binding, you can use the [`TcpSocket`]
+ /// type.
+ ///
/// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
+ /// [`TcpSocket`]: struct@crate::net::TcpSocket
///
/// # Examples
///
diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs
index d4bfba4..91e357f 100644
--- a/src/net/tcp/stream.rs
+++ b/src/net/tcp/stream.rs
@@ -17,11 +17,16 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
cfg_net! {
/// A TCP stream between a local and a remote socket.
///
/// A TCP stream can either be created by connecting to an endpoint, via the
- /// [`connect`] method, or by [accepting] a connection from a [listener].
+ /// [`connect`] method, or by [accepting] a connection from a [listener]. A
+ /// TCP stream can also be created via the [`TcpSocket`] type.
///
/// Reading and writing to a `TcpStream` is usually done using the
/// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`]
@@ -30,6 +35,7 @@ cfg_net! {
/// [`connect`]: method@TcpStream::connect
/// [accepting]: method@crate::net::TcpListener::accept
/// [listener]: struct@crate::net::TcpListener
+ /// [`TcpSocket`]: struct@crate::net::TcpSocket
/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
///
@@ -72,16 +78,17 @@ impl TcpStream {
/// Opens a TCP connection to a remote host.
///
/// `addr` is an address of the remote host. Anything which implements the
- /// [`ToSocketAddrs`] trait can be supplied as the address. Note that
- /// strings only implement this trait when the **`net`** feature is enabled,
- /// as strings may contain domain names that need to be resolved.
+ /// [`ToSocketAddrs`] trait can be supplied as the address. If `addr`
+ /// yields multiple addresses, connect will be attempted with each of the
+ /// addresses until a connection is successful. If none of the addresses
+ /// result in a successful connection, the error returned from the last
+ /// connection attempt (the last address) is returned.
///
- /// If `addr` yields multiple addresses, connect will be attempted with each
- /// of the addresses until a connection is successful. If none of the
- /// addresses result in a successful connection, the error returned from the
- /// last connection attempt (the last address) is returned.
+ /// To configure the socket before connecting, you can use the [`TcpSocket`]
+ /// type.
///
/// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
+ /// [`TcpSocket`]: struct@crate::net::TcpSocket
///
/// # Examples
///
@@ -559,6 +566,85 @@ impl TcpStream {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
+ cfg_io_util! {
+ /// Try to read data from the stream into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: TcpStream::readable()
+ /// [`ready()`]: TcpStream::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// stream.readable().await?;
+ ///
+ /// let mut buf = Vec::with_capacity(4096);
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_read_buf(&mut buf) {
+ /// Ok(0) => break,
+ /// Ok(n) => {
+ /// println!("read {} bytes", n);
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.io.registration().try_io(Interest::READABLE, || {
+ use std::io::Read;
+
+ let dst = buf.chunk_mut();
+ let dst =
+ unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
+
+ // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
+ // buffer.
+ let n = (&*self.io).read(dst)?;
+
+ unsafe {
+ buf.advance_mut(n);
+ }
+
+ Ok(n)
+ })
+ }
+ }
+
/// Wait for the socket to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
diff --git a/src/net/udp.rs b/src/net/udp.rs
index 23abe98..86b4fe9 100644
--- a/src/net/udp.rs
+++ b/src/net/udp.rs
@@ -7,6 +7,10 @@ use std::io;
use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::task::{Context, Poll};
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
cfg_net! {
/// A UDP socket
///
@@ -18,8 +22,13 @@ cfg_net! {
/// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single address, using [`send`](`UdpSocket::send`)
/// and [`recv`](`UdpSocket::recv`) to communicate only with that remote address
///
- /// `UdpSocket` can also be used concurrently to `send_to` and `recv_from` in different tasks,
- /// all that's required is that you `Arc<UdpSocket>` and clone a reference for each task.
+ /// This type does not provide a `split` method, because this functionality
+ /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do
+ /// not need a `Mutex` to share the `UdpSocket` — an `Arc<UdpSocket>` is
+ /// enough. This is because all of the methods take `&self` instead of `&mut
+ /// self`.
+ ///
+ /// [`Arc`]: std::sync::Arc
///
/// # Streams
///
@@ -74,11 +83,12 @@ cfg_net! {
/// }
/// ```
///
- /// # Example: Sending/Receiving concurrently
+ /// # Example: Splitting with `Arc`
///
- /// Because `send_to` and `recv_from` take `&self`. It's perfectly alright to `Arc<UdpSocket>`
- /// and share the references to multiple tasks, in order to send/receive concurrently. Here is
- /// a similar "echo" example but that supports concurrent sending/receiving:
+ /// Because `send_to` and `recv_from` take `&self`. It's perfectly alright
+ /// to use an `Arc<UdpSocket>` and share the references to multiple tasks.
+ /// Here is a similar "echo" example that supports concurrent
+ /// sending/receiving:
///
/// ```no_run
/// use tokio::{net::UdpSocket, sync::mpsc};
@@ -683,6 +693,137 @@ impl UdpSocket {
.try_io(Interest::READABLE, || self.io.recv(buf))
}
+ cfg_io_util! {
+ /// Try to receive data from the stream into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// The function must be called with valid byte array buf of sufficient size
+ /// to hold the message bytes. If a message is too long to fit in the
+ /// supplied buffer, excess bytes may be discarded.
+ ///
+ /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
+ /// returned. This function is usually paired with `readable()`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// let mut buf = Vec::with_capacity(1024);
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv_buf(&mut buf) {
+ /// Ok(n) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.io.registration().try_io(Interest::READABLE, || {
+ let dst = buf.chunk_mut();
+ let dst =
+ unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
+
+ // Safety: We trust `UdpSocket::recv` to have filled up `n` bytes in the
+ // buffer.
+ let n = (&*self.io).recv(dst)?;
+
+ unsafe {
+ buf.advance_mut(n);
+ }
+
+ Ok(n)
+ })
+ }
+
+ /// Try to receive a single datagram message on the socket. On success,
+ /// returns the number of bytes read and the origin.
+ ///
+ /// The function must be called with valid byte array buf of sufficient size
+ /// to hold the message bytes. If a message is too long to fit in the
+ /// supplied buffer, excess bytes may be discarded.
+ ///
+ /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
+ /// returned. This function is usually paired with `readable()`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// let mut buf = Vec::with_capacity(1024);
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv_buf_from(&mut buf) {
+ /// Ok((n, _addr)) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
+ self.io.registration().try_io(Interest::READABLE, || {
+ let dst = buf.chunk_mut();
+ let dst =
+ unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
+
+ // Safety: We trust `UdpSocket::recv_from` to have filled up `n` bytes in the
+ // buffer.
+ let (n, addr) = (&*self.io).recv_from(dst)?;
+
+ unsafe {
+ buf.advance_mut(n);
+ }
+
+ Ok((n, addr))
+ })
+ }
+ }
+
/// Sends data on the socket to the given address. On success, returns the
/// number of bytes written.
///
@@ -904,7 +1045,6 @@ impl UdpSocket {
/// async fn main() -> io::Result<()> {
/// // Connect to a peer
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
- /// socket.connect("127.0.0.1:8081").await?;
///
/// loop {
/// // Wait for the socket to be readable
diff --git a/src/net/unix/datagram/socket.rs b/src/net/unix/datagram/socket.rs
index fb5f602..126a243 100644
--- a/src/net/unix/datagram/socket.rs
+++ b/src/net/unix/datagram/socket.rs
@@ -10,17 +10,29 @@ use std::os::unix::net;
use std::path::Path;
use std::task::{Context, Poll};
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
cfg_net_unix! {
/// An I/O object representing a Unix datagram socket.
///
/// A socket can be either named (associated with a filesystem path) or
/// unnamed.
///
+ /// This type does not provide a `split` method, because this functionality
+ /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do
+ /// not need a `Mutex` to share the `UnixDatagram` — an `Arc<UnixDatagram>`
+ /// is enough. This is because all of the methods take `&self` instead of
+ /// `&mut self`.
+ ///
/// **Note:** named sockets are persisted even after the object is dropped
/// and the program has exited, and cannot be reconnected. It is advised
/// that you either check for and unlink the existing socket if it exists,
/// or use a temporary file that is guaranteed to not already exist.
///
+ /// [`Arc`]: std::sync::Arc
+ ///
/// # Examples
/// Using named sockets, associated with a filesystem path:
/// ```
@@ -652,6 +664,130 @@ impl UnixDatagram {
.try_io(Interest::READABLE, || self.io.recv(buf))
}
+ cfg_io_util! {
+ /// Try to receive data from the socket without waiting.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// let mut buf = Vec::with_capacity(1024);
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv_buf_from(&mut buf) {
+ /// Ok((n, _addr)) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
+ let (n, addr) = self.io.registration().try_io(Interest::READABLE, || {
+ let dst = buf.chunk_mut();
+ let dst =
+ unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
+
+ // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
+ // buffer.
+ let (n, addr) = (&*self.io).recv_from(dst)?;
+
+ unsafe {
+ buf.advance_mut(n);
+ }
+
+ Ok((n, addr))
+ })?;
+
+ Ok((n, SocketAddr(addr)))
+ }
+
+ /// Try to read data from the stream into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ /// socket.connect(&server_path)?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// let mut buf = Vec::with_capacity(1024);
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv_buf(&mut buf) {
+ /// Ok(n) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.io.registration().try_io(Interest::READABLE, || {
+ let dst = buf.chunk_mut();
+ let dst =
+ unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
+
+ // Safety: We trust `UnixDatagram::recv` to have filled up `n` bytes in the
+ // buffer.
+ let n = (&*self.io).recv(dst)?;
+
+ unsafe {
+ buf.advance_mut(n);
+ }
+
+ Ok(n)
+ })
+ }
+ }
+
/// Sends data on the socket to the specified address.
///
/// # Examples
diff --git a/src/net/unix/listener.rs b/src/net/unix/listener.rs
index 9ed4ce1..d1c063e 100644
--- a/src/net/unix/listener.rs
+++ b/src/net/unix/listener.rs
@@ -14,6 +14,10 @@ cfg_net_unix! {
///
/// You can accept a new connection by using the [`accept`](`UnixListener::accept`) method.
///
+ /// A `UnixListener` can be turned into a `Stream` with [`UnixListenerStream`].
+ ///
+ /// [`UnixListenerStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnixListenerStream.html
+ ///
/// # Errors
///
/// Note that accepting a connection can lead to various errors and not all
diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs
index dc929dc..a3e3487 100644
--- a/src/net/unix/stream.rs
+++ b/src/net/unix/stream.rs
@@ -15,6 +15,10 @@ use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
cfg_net_unix! {
/// A structure representing a connected Unix socket.
///
@@ -267,6 +271,87 @@ impl UnixStream {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
+ cfg_io_util! {
+ /// Try to read data from the stream into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: UnixStream::readable()
+ /// [`ready()`]: UnixStream::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ /// let stream = UnixStream::connect(bind_path).await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// stream.readable().await?;
+ ///
+ /// let mut buf = Vec::with_capacity(4096);
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_read_buf(&mut buf) {
+ /// Ok(0) => break,
+ /// Ok(n) => {
+ /// println!("read {} bytes", n);
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.io.registration().try_io(Interest::READABLE, || {
+ use std::io::Read;
+
+ let dst = buf.chunk_mut();
+ let dst =
+ unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
+
+ // Safety: We trust `UnixStream::read` to have filled up `n` bytes in the
+ // buffer.
+ let n = (&*self.io).read(dst)?;
+
+ unsafe {
+ buf.advance_mut(n);
+ }
+
+ Ok(n)
+ })
+ }
+ }
+
/// Wait for the socket to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
diff --git a/src/process/mod.rs b/src/process/mod.rs
index bd23e1f..7180d51 100644
--- a/src/process/mod.rs
+++ b/src/process/mod.rs
@@ -97,6 +97,51 @@
//! }
//! ```
//!
+//! With some coordination, we can also pipe the output of one command into
+//! another.
+//!
+//! ```no_run
+//! use tokio::join;
+//! use tokio::process::Command;
+//! use std::convert::TryInto;
+//! use std::process::Stdio;
+//!
+//! #[tokio::main]
+//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let mut echo = Command::new("echo")
+//! .arg("hello world!")
+//! .stdout(Stdio::piped())
+//! .spawn()
+//! .expect("failed to spawn echo");
+//!
+//! let tr_stdin: Stdio = echo
+//! .stdout
+//! .take()
+//! .unwrap()
+//! .try_into()
+//! .expect("failed to convert to Stdio");
+//!
+//! let tr = Command::new("tr")
+//! .arg("a-z")
+//! .arg("A-Z")
+//! .stdin(tr_stdin)
+//! .stdout(Stdio::piped())
+//! .spawn()
+//! .expect("failed to spawn tr");
+//!
+//! let (echo_result, tr_output) = join!(echo.wait(), tr.wait_with_output());
+//!
+//! assert!(echo_result.unwrap().success());
+//!
+//! let tr_output = tr_output.expect("failed to await tr");
+//! assert!(tr_output.status.success());
+//!
+//! assert_eq!(tr_output.stdout, b"HELLO WORLD!\n");
+//!
+//! Ok(())
+//! }
+//! ```
+//!
//! # Caveats
//!
//! ## Dropping/Cancellation
@@ -147,6 +192,7 @@ mod kill;
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::process::kill::Kill;
+use std::convert::TryInto;
use std::ffi::OsStr;
use std::future::Future;
use std::io;
@@ -839,15 +885,38 @@ pub struct Child {
child: FusedChild,
/// The handle for writing to the child's standard input (stdin), if it has
- /// been captured.
+ /// been captured. To avoid partially moving the `child` and thus blocking
+ /// yourself from calling functions on `child` while using `stdin`, you might
+ /// find it helpful to do:
+ ///
+ /// ```no_run
+ /// # let mut child = tokio::process::Command::new("echo").spawn().unwrap();
+ /// let stdin = child.stdin.take().unwrap();
+ /// ```
pub stdin: Option<ChildStdin>,
/// The handle for reading from the child's standard output (stdout), if it
- /// has been captured.
+ /// has been captured. You might find it helpful to do
+ ///
+ /// ```no_run
+ /// # let mut child = tokio::process::Command::new("echo").spawn().unwrap();
+ /// let stdout = child.stdout.take().unwrap();
+ /// ```
+ ///
+ /// to avoid partially moving the `child` and thus blocking yourself from calling
+ /// functions on `child` while using `stdout`.
pub stdout: Option<ChildStdout>,
/// The handle for reading from the child's standard error (stderr), if it
- /// has been captured.
+ /// has been captured. You might find it helpful to do
+ ///
+ /// ```no_run
+ /// # let mut child = tokio::process::Command::new("echo").spawn().unwrap();
+ /// let stderr = child.stderr.take().unwrap();
+ /// ```
+ ///
+ /// to avoid partially moving the `child` and thus blocking yourself from calling
+ /// functions on `child` while using `stderr`.
pub stderr: Option<ChildStderr>,
}
@@ -921,7 +990,36 @@ impl Child {
/// before waiting. This helps avoid deadlock: it ensures that the
/// child does not block waiting for input from the parent, while
/// the parent waits for the child to exit.
+ ///
+ /// If the caller wishes to explicitly control when the child's stdin
+ /// handle is closed, they may `.take()` it before calling `.wait()`:
+ ///
+ /// ```no_run
+ /// use tokio::io::AsyncWriteExt;
+ /// use tokio::process::Command;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let mut child = Command::new("cat").spawn().unwrap();
+ ///
+ /// let mut stdin = child.stdin.take().unwrap();
+ /// tokio::spawn(async move {
+ /// // do something with stdin here...
+ /// stdin.write_all(b"hello world\n").await.unwrap();
+ ///
+ /// // then drop when finished
+ /// drop(stdin);
+ /// });
+ ///
+ /// // wait for the process to complete
+ /// let _ = child.wait().await;
+ /// }
+ /// ```
pub async fn wait(&mut self) -> io::Result<ExitStatus> {
+ // Ensure stdin is closed so the child isn't stuck waiting on
+ // input while the parent is waiting for it to exit.
+ drop(self.stdin.take());
+
match &mut self.child {
FusedChild::Done(exit) => Ok(*exit),
FusedChild::Child(child) => {
@@ -995,7 +1093,6 @@ impl Child {
Ok(vec)
}
- drop(self.stdin.take());
let stdout_fut = read_to_end(self.stdout.take());
let stderr_fut = read_to_end(self.stderr.take());
@@ -1076,6 +1173,30 @@ impl AsyncRead for ChildStderr {
}
}
+impl TryInto<Stdio> for ChildStdin {
+ type Error = io::Error;
+
+ fn try_into(self) -> Result<Stdio, Self::Error> {
+ imp::convert_to_stdio(self.inner)
+ }
+}
+
+impl TryInto<Stdio> for ChildStdout {
+ type Error = io::Error;
+
+ fn try_into(self) -> Result<Stdio, Self::Error> {
+ imp::convert_to_stdio(self.inner)
+ }
+}
+
+impl TryInto<Stdio> for ChildStderr {
+ type Error = io::Error;
+
+ fn try_into(self) -> Result<Stdio, Self::Error> {
+ imp::convert_to_stdio(self.inner)
+ }
+}
+
#[cfg(unix)]
mod sys {
use std::os::unix::io::{AsRawFd, RawFd};
diff --git a/src/process/unix/mod.rs b/src/process/unix/mod.rs
index 3608b9f..852a191 100644
--- a/src/process/unix/mod.rs
+++ b/src/process/unix/mod.rs
@@ -43,7 +43,7 @@ use std::future::Future;
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::pin::Pin;
-use std::process::{Child as StdChild, ExitStatus};
+use std::process::{Child as StdChild, ExitStatus, Stdio};
use std::task::Context;
use std::task::Poll;
@@ -176,6 +176,18 @@ impl AsRawFd for Pipe {
}
}
+pub(crate) fn convert_to_stdio(io: PollEvented<Pipe>) -> io::Result<Stdio> {
+ let mut fd = io.into_inner()?.fd;
+
+ // Ensure that the fd to be inherited is set to *blocking* mode, as this
+ // is the default that virtually all programs expect to have. Those
+ // programs that know how to work with nonblocking stdio will know how to
+ // change it to nonblocking mode.
+ set_nonblocking(&mut fd, false)?;
+
+ Ok(Stdio::from(fd))
+}
+
impl Source for Pipe {
fn register(
&mut self,
@@ -204,6 +216,29 @@ pub(crate) type ChildStdin = PollEvented<Pipe>;
pub(crate) type ChildStdout = PollEvented<Pipe>;
pub(crate) type ChildStderr = PollEvented<Pipe>;
+fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> {
+ unsafe {
+ let fd = fd.as_raw_fd();
+ let previous = libc::fcntl(fd, libc::F_GETFL);
+ if previous == -1 {
+ return Err(io::Error::last_os_error());
+ }
+
+ let new = if nonblocking {
+ previous | libc::O_NONBLOCK
+ } else {
+ previous & !libc::O_NONBLOCK
+ };
+
+ let r = libc::fcntl(fd, libc::F_SETFL, new);
+ if r == -1 {
+ return Err(io::Error::last_os_error());
+ }
+ }
+
+ Ok(())
+}
+
fn stdio<T>(option: Option<T>) -> io::Result<Option<PollEvented<Pipe>>>
where
T: IntoRawFd,
@@ -214,20 +249,8 @@ where
};
// Set the fd to nonblocking before we pass it to the event loop
- let pipe = unsafe {
- let pipe = Pipe::from(io);
- let fd = pipe.as_raw_fd();
- let r = libc::fcntl(fd, libc::F_GETFL);
- if r == -1 {
- return Err(io::Error::last_os_error());
- }
- let r = libc::fcntl(fd, libc::F_SETFL, r | libc::O_NONBLOCK);
- if r == -1 {
- return Err(io::Error::last_os_error());
- }
-
- pipe
- };
+ let mut pipe = Pipe::from(io);
+ set_nonblocking(&mut pipe, true)?;
Ok(Some(PollEvented::new(pipe)?))
}
diff --git a/src/process/windows.rs b/src/process/windows.rs
index 1aa6c89..7237525 100644
--- a/src/process/windows.rs
+++ b/src/process/windows.rs
@@ -26,14 +26,19 @@ use std::future::Future;
use std::io;
use std::os::windows::prelude::{AsRawHandle, FromRawHandle, IntoRawHandle};
use std::pin::Pin;
+use std::process::Stdio;
use std::process::{Child as StdChild, Command as StdCommand, ExitStatus};
use std::ptr;
use std::task::Context;
use std::task::Poll;
-use winapi::um::handleapi::INVALID_HANDLE_VALUE;
+use winapi::shared::minwindef::{DWORD, FALSE};
+use winapi::um::handleapi::{DuplicateHandle, INVALID_HANDLE_VALUE};
+use winapi::um::processthreadsapi::GetCurrentProcess;
use winapi::um::threadpoollegacyapiset::UnregisterWaitEx;
use winapi::um::winbase::{RegisterWaitForSingleObject, INFINITE};
-use winapi::um::winnt::{BOOLEAN, HANDLE, PVOID, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE};
+use winapi::um::winnt::{
+ BOOLEAN, DUPLICATE_SAME_ACCESS, HANDLE, PVOID, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
+};
#[must_use = "futures do nothing unless polled"]
pub(crate) struct Child {
@@ -171,3 +176,30 @@ where
let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) };
PollEvented::new(pipe).ok()
}
+
+pub(crate) fn convert_to_stdio(io: PollEvented<NamedPipe>) -> io::Result<Stdio> {
+ let named_pipe = io.into_inner()?;
+
+ // Mio does not implement `IntoRawHandle` for `NamedPipe`, so we'll manually
+ // duplicate the handle here...
+ unsafe {
+ let mut dup_handle = INVALID_HANDLE_VALUE;
+ let cur_proc = GetCurrentProcess();
+
+ let status = DuplicateHandle(
+ cur_proc,
+ named_pipe.as_raw_handle(),
+ cur_proc,
+ &mut dup_handle,
+ 0 as DWORD,
+ FALSE,
+ DUPLICATE_SAME_ACCESS,
+ );
+
+ if status == 0 {
+ return Err(io::Error::last_os_error());
+ }
+
+ Ok(Stdio::from_raw_handle(dup_handle))
+ }
+}
diff --git a/src/runtime/basic_scheduler.rs b/src/runtime/basic_scheduler.rs
index 860eab4..aeb0150 100644
--- a/src/runtime/basic_scheduler.rs
+++ b/src/runtime/basic_scheduler.rs
@@ -129,7 +129,7 @@ impl<P: Park> BasicScheduler<P> {
pin!(future);
// Attempt to steal the dedicated parker and block_on the future if we can there,
- // othwerwise, lets select on a notification that the parker is available
+ // otherwise, lets select on a notification that the parker is available
// or the future is complete.
loop {
if let Some(inner) = &mut self.take_inner() {
diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs
index 9f98d89..791e405 100644
--- a/src/runtime/blocking/pool.rs
+++ b/src/runtime/blocking/pool.rs
@@ -9,6 +9,7 @@ use crate::runtime::builder::ThreadNameFn;
use crate::runtime::context;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};
+use crate::util::error::CONTEXT_MISSING_ERROR;
use std::collections::{HashMap, VecDeque};
use std::fmt;
@@ -81,7 +82,7 @@ where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
- let rt = context::current().expect("not currently running on the Tokio runtime.");
+ let rt = context::current().expect(CONTEXT_MISSING_ERROR);
rt.spawn_blocking(func)
}
@@ -91,7 +92,7 @@ where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
- let rt = context::current().expect("not currently running on the Tokio runtime.");
+ let rt = context::current().expect(CONTEXT_MISSING_ERROR);
let (task, _handle) = task::joinable(BlockingTask::new(func));
rt.blocking_spawner.spawn(task, &rt)
diff --git a/src/runtime/builder.rs b/src/runtime/builder.rs
index 1f8892e..e845192 100644
--- a/src/runtime/builder.rs
+++ b/src/runtime/builder.rs
@@ -47,6 +47,9 @@ pub struct Builder {
/// Whether or not to enable the time driver
enable_time: bool,
+ /// Whether or not the clock should start paused.
+ start_paused: bool,
+
/// The number of worker threads, used by Runtime.
///
/// Only used when not using the current-thread executor.
@@ -110,6 +113,9 @@ impl Builder {
// Time defaults to "off"
enable_time: false,
+ // The clock starts not-paused
+ start_paused: false,
+
// Default to lazy auto-detection (one thread per CPU core)
worker_threads: None,
@@ -386,6 +392,7 @@ impl Builder {
},
enable_io: self.enable_io,
enable_time: self.enable_time,
+ start_paused: self.start_paused,
}
}
@@ -489,6 +496,31 @@ cfg_time! {
}
}
+cfg_test_util! {
+ impl Builder {
+ /// Controls if the runtime's clock starts paused or advancing.
+ ///
+ /// Pausing time requires the current-thread runtime; construction of
+ /// the runtime will panic otherwise.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime;
+ ///
+ /// let rt = runtime::Builder::new_current_thread()
+ /// .enable_time()
+ /// .start_paused(true)
+ /// .build()
+ /// .unwrap();
+ /// ```
+ pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
+ self.start_paused = start_paused;
+ self
+ }
+ }
+}
+
cfg_rt_multi_thread! {
impl Builder {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
diff --git a/src/runtime/context.rs b/src/runtime/context.rs
index 0817019..6e4a016 100644
--- a/src/runtime/context.rs
+++ b/src/runtime/context.rs
@@ -13,9 +13,9 @@ pub(crate) fn current() -> Option<Handle> {
cfg_io_driver! {
pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle {
- CONTEXT.with(|ctx| match *ctx.borrow() {
- Some(ref ctx) => ctx.io_handle.clone(),
- None => Default::default(),
+ CONTEXT.with(|ctx| {
+ let ctx = ctx.borrow();
+ ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).io_handle.clone()
})
}
}
@@ -23,18 +23,18 @@ cfg_io_driver! {
cfg_signal_internal! {
#[cfg(unix)]
pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle {
- CONTEXT.with(|ctx| match *ctx.borrow() {
- Some(ref ctx) => ctx.signal_handle.clone(),
- None => Default::default(),
+ CONTEXT.with(|ctx| {
+ let ctx = ctx.borrow();
+ ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).signal_handle.clone()
})
}
}
cfg_time! {
pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle {
- CONTEXT.with(|ctx| match *ctx.borrow() {
- Some(ref ctx) => ctx.time_handle.clone(),
- None => Default::default(),
+ CONTEXT.with(|ctx| {
+ let ctx = ctx.borrow();
+ ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).time_handle.clone()
})
}
diff --git a/src/runtime/driver.rs b/src/runtime/driver.rs
index b89fa4f..a0e8e23 100644
--- a/src/runtime/driver.rs
+++ b/src/runtime/driver.rs
@@ -103,8 +103,8 @@ cfg_time! {
pub(crate) type Clock = crate::time::Clock;
pub(crate) type TimeHandle = Option<crate::time::driver::Handle>;
- fn create_clock(enable_pausing: bool) -> Clock {
- crate::time::Clock::new(enable_pausing)
+ fn create_clock(enable_pausing: bool, start_paused: bool) -> Clock {
+ crate::time::Clock::new(enable_pausing, start_paused)
}
fn create_time_driver(
@@ -131,7 +131,7 @@ cfg_not_time! {
pub(crate) type Clock = ();
pub(crate) type TimeHandle = ();
- fn create_clock(_enable_pausing: bool) -> Clock {
+ fn create_clock(_enable_pausing: bool, _start_paused: bool) -> Clock {
()
}
@@ -162,13 +162,15 @@ pub(crate) struct Cfg {
pub(crate) enable_io: bool,
pub(crate) enable_time: bool,
pub(crate) enable_pause_time: bool,
+ pub(crate) start_paused: bool,
}
impl Driver {
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?;
- let clock = create_clock(cfg.enable_pause_time);
+ let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
+
let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, io_stack, clock.clone());
diff --git a/src/runtime/handle.rs b/src/runtime/handle.rs
index 6ff3c39..76b28f2 100644
--- a/src/runtime/handle.rs
+++ b/src/runtime/handle.rs
@@ -1,6 +1,7 @@
use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{blocking, context, driver, Spawner};
+use crate::util::error::CONTEXT_MISSING_ERROR;
use std::future::Future;
use std::{error, fmt};
@@ -97,7 +98,7 @@ impl Handle {
/// # }
/// ```
pub fn current() -> Self {
- context::current().expect("not currently running on the Tokio runtime.")
+ context::current().expect(CONTEXT_MISSING_ERROR)
}
/// Returns a Handle view over the currently running Runtime
@@ -213,7 +214,7 @@ impl fmt::Debug for TryCurrentError {
impl fmt::Display for TryCurrentError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.write_str("no tokio Runtime has been initialized")
+ f.write_str(CONTEXT_MISSING_ERROR)
}
}
diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs
index 2c90acb..b138a66 100644
--- a/src/runtime/mod.rs
+++ b/src/runtime/mod.rs
@@ -10,7 +10,7 @@
//!
//! Tokio's [`Runtime`] bundles all of these services as a single type, allowing
//! them to be started, shut down, and configured together. However, often it is
-//! not required to configure a [`Runtime`] manually, and user may just use the
+//! not required to configure a [`Runtime`] manually, and a user may just use the
//! [`tokio::main`] attribute macro, which creates a [`Runtime`] under the hood.
//!
//! # Usage
@@ -114,7 +114,7 @@
//!
//! The multi-thread scheduler executes futures on a _thread pool_, using a
//! work-stealing strategy. By default, it will start a worker thread for each
-//! CPU core available on the system. This tends to be the ideal configurations
+//! CPU core available on the system. This tends to be the ideal configuration
//! for most applications. The multi-thread scheduler requires the `rt-multi-thread`
//! feature flag, and is selected by default:
//! ```
diff --git a/src/runtime/queue.rs b/src/runtime/queue.rs
index cdf4009..1c7bb23 100644
--- a/src/runtime/queue.rs
+++ b/src/runtime/queue.rs
@@ -8,7 +8,7 @@ use crate::runtime::task;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr::{self, NonNull};
-use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
/// Producer handle. May only be used from a single thread.
pub(super) struct Local<T: 'static> {
@@ -194,13 +194,17 @@ impl<T> Local<T> {
// work. This is because all tasks are pushed into the queue from the
// current thread (or memory has been acquired if the local queue handle
// moved).
- let actual = self.inner.head.compare_and_swap(
- prev,
- pack(head.wrapping_add(n), head.wrapping_add(n)),
- Release,
- );
-
- if actual != prev {
+ if self
+ .inner
+ .head
+ .compare_exchange(
+ prev,
+ pack(head.wrapping_add(n), head.wrapping_add(n)),
+ Release,
+ Relaxed,
+ )
+ .is_err()
+ {
// We failed to claim the tasks, losing the race. Return out of
// this function and try the full `push` routine again. The queue
// may not be full anymore.
@@ -231,7 +235,7 @@ impl<T> Local<T> {
// tasks and we are the only producer.
self.inner.buffer[i_idx].with_mut(|ptr| unsafe {
let ptr = (*ptr).as_ptr();
- (*ptr).header().queue_next.with_mut(|ptr| *ptr = Some(next));
+ (*ptr).header().set_next(Some(next))
});
}
@@ -606,7 +610,7 @@ fn get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>> {
fn set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>) {
unsafe {
- header.as_ref().queue_next.with_mut(|ptr| *ptr = val);
+ header.as_ref().set_next(val);
}
}
diff --git a/src/runtime/task/core.rs b/src/runtime/task/core.rs
index dfa8764..9f7ff55 100644
--- a/src/runtime/task/core.rs
+++ b/src/runtime/task/core.rs
@@ -12,7 +12,6 @@
use crate::loom::cell::UnsafeCell;
use crate::runtime::task::raw::{self, Vtable};
use crate::runtime::task::state::State;
-use crate::runtime::task::waker::waker_ref;
use crate::runtime::task::{Notified, Schedule, Task};
use crate::util::linked_list;
@@ -37,15 +36,23 @@ pub(super) struct Cell<T: Future, S> {
pub(super) trailer: Trailer,
}
+pub(super) struct Scheduler<S> {
+ scheduler: UnsafeCell<Option<S>>,
+}
+
+pub(super) struct CoreStage<T: Future> {
+ stage: UnsafeCell<Stage<T>>,
+}
+
/// The core of the task.
///
/// Holds the future or output, depending on the stage of execution.
pub(super) struct Core<T: Future, S> {
/// Scheduler used to drive this future
- pub(super) scheduler: UnsafeCell<Option<S>>,
+ pub(super) scheduler: Scheduler<S>,
/// Either the future or the output
- pub(super) stage: UnsafeCell<Stage<T>>,
+ pub(super) stage: CoreStage<T>,
}
/// Crate public as this is also needed by the pool.
@@ -95,8 +102,12 @@ impl<T: Future, S: Schedule> Cell<T, S> {
vtable: raw::vtable::<T, S>(),
},
core: Core {
- scheduler: UnsafeCell::new(None),
- stage: UnsafeCell::new(Stage::Running(future)),
+ scheduler: Scheduler {
+ scheduler: UnsafeCell::new(None),
+ },
+ stage: CoreStage {
+ stage: UnsafeCell::new(Stage::Running(future)),
+ },
},
trailer: Trailer {
waker: UnsafeCell::new(None),
@@ -105,7 +116,11 @@ impl<T: Future, S: Schedule> Cell<T, S> {
}
}
-impl<T: Future, S: Schedule> Core<T, S> {
+impl<S: Schedule> Scheduler<S> {
+ pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Option<S>) -> R) -> R {
+ self.scheduler.with_mut(f)
+ }
+
/// Bind a scheduler to the task.
///
/// This only happens on the first poll and must be preceeded by a call to
@@ -140,6 +155,58 @@ impl<T: Future, S: Schedule> Core<T, S> {
self.scheduler.with(|ptr| unsafe { (*ptr).is_some() })
}
+ /// Schedule the future for execution
+ pub(super) fn schedule(&self, task: Notified<S>) {
+ self.scheduler.with(|ptr| {
+ // Safety: Can only be called after initial `poll`, which is the
+ // only time the field is mutated.
+ match unsafe { &*ptr } {
+ Some(scheduler) => scheduler.schedule(task),
+ None => panic!("no scheduler set"),
+ }
+ });
+ }
+
+ /// Schedule the future for execution in the near future, yielding the
+ /// thread to other tasks.
+ pub(super) fn yield_now(&self, task: Notified<S>) {
+ self.scheduler.with(|ptr| {
+ // Safety: Can only be called after initial `poll`, which is the
+ // only time the field is mutated.
+ match unsafe { &*ptr } {
+ Some(scheduler) => scheduler.yield_now(task),
+ None => panic!("no scheduler set"),
+ }
+ });
+ }
+
+ /// Release the task
+ ///
+ /// If the `Scheduler` implementation is able to, it returns the `Task`
+ /// handle immediately. The caller of this function will batch a ref-dec
+ /// with a state change.
+ pub(super) fn release(&self, task: Task<S>) -> Option<Task<S>> {
+ use std::mem::ManuallyDrop;
+
+ let task = ManuallyDrop::new(task);
+
+ self.scheduler.with(|ptr| {
+ // Safety: Can only be called after initial `poll`, which is the
+ // only time the field is mutated.
+ match unsafe { &*ptr } {
+ Some(scheduler) => scheduler.release(&*task),
+ // Task was never polled
+ None => None,
+ }
+ })
+ }
+}
+
+impl<T: Future> CoreStage<T> {
+ pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Stage<T>) -> R) -> R {
+ self.stage.with_mut(f)
+ }
+
/// Poll the future
///
/// # Safety
@@ -153,7 +220,7 @@ impl<T: Future, S: Schedule> Core<T, S> {
///
/// `self` must also be pinned. This is handled by storing the task on the
/// heap.
- pub(super) fn poll(&self, header: &Header) -> Poll<T::Output> {
+ pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> {
let res = {
self.stage.with_mut(|ptr| {
// Safety: The caller ensures mutual exclusion to the field.
@@ -165,11 +232,6 @@ impl<T: Future, S: Schedule> Core<T, S> {
// Safety: The caller ensures the future is pinned.
let future = unsafe { Pin::new_unchecked(future) };
- // The waker passed into the `poll` function does not require a ref
- // count increment.
- let waker_ref = waker_ref::<T, S>(header);
- let mut cx = Context::from_waker(&*waker_ref);
-
future.poll(&mut cx)
})
};
@@ -187,10 +249,10 @@ impl<T: Future, S: Schedule> Core<T, S> {
///
/// The caller must ensure it is safe to mutate the `stage` field.
pub(super) fn drop_future_or_output(&self) {
- self.stage.with_mut(|ptr| {
- // Safety: The caller ensures mutal exclusion to the field.
- unsafe { *ptr = Stage::Consumed };
- });
+ // Safety: the caller ensures mutual exclusion to the field.
+ unsafe {
+ self.set_stage(Stage::Consumed);
+ }
}
/// Store the task output
@@ -199,10 +261,10 @@ impl<T: Future, S: Schedule> Core<T, S> {
///
/// The caller must ensure it is safe to mutate the `stage` field.
pub(super) fn store_output(&self, output: super::Result<T::Output>) {
- self.stage.with_mut(|ptr| {
- // Safety: the caller ensures mutual exclusion to the field.
- unsafe { *ptr = Stage::Finished(output) };
- });
+ // Safety: the caller ensures mutual exclusion to the field.
+ unsafe {
+ self.set_stage(Stage::Finished(output));
+ }
}
/// Take the task output
@@ -222,50 +284,8 @@ impl<T: Future, S: Schedule> Core<T, S> {
})
}
- /// Schedule the future for execution
- pub(super) fn schedule(&self, task: Notified<S>) {
- self.scheduler.with(|ptr| {
- // Safety: Can only be called after initial `poll`, which is the
- // only time the field is mutated.
- match unsafe { &*ptr } {
- Some(scheduler) => scheduler.schedule(task),
- None => panic!("no scheduler set"),
- }
- });
- }
-
- /// Schedule the future for execution in the near future, yielding the
- /// thread to other tasks.
- pub(super) fn yield_now(&self, task: Notified<S>) {
- self.scheduler.with(|ptr| {
- // Safety: Can only be called after initial `poll`, which is the
- // only time the field is mutated.
- match unsafe { &*ptr } {
- Some(scheduler) => scheduler.yield_now(task),
- None => panic!("no scheduler set"),
- }
- });
- }
-
- /// Release the task
- ///
- /// If the `Scheduler` implementation is able to, it returns the `Task`
- /// handle immediately. The caller of this function will batch a ref-dec
- /// with a state change.
- pub(super) fn release(&self, task: Task<S>) -> Option<Task<S>> {
- use std::mem::ManuallyDrop;
-
- let task = ManuallyDrop::new(task);
-
- self.scheduler.with(|ptr| {
- // Safety: Can only be called after initial `poll`, which is the
- // only time the field is mutated.
- match unsafe { &*ptr } {
- Some(scheduler) => scheduler.release(&*task),
- // Task was never polled
- None => None,
- }
- })
+ unsafe fn set_stage(&self, stage: Stage<T>) {
+ self.stage.with_mut(|ptr| *ptr = stage)
}
}
@@ -277,6 +297,30 @@ cfg_rt_multi_thread! {
let task = unsafe { RawTask::from_raw(self.into()) };
task.shutdown();
}
+
+ pub(crate) unsafe fn set_next(&self, next: Option<NonNull<Header>>) {
+ self.queue_next.with_mut(|ptr| *ptr = next);
+ }
+ }
+}
+
+impl Trailer {
+ pub(crate) unsafe fn set_waker(&self, waker: Option<Waker>) {
+ self.waker.with_mut(|ptr| {
+ *ptr = waker;
+ });
+ }
+
+ pub(crate) unsafe fn will_wake(&self, waker: &Waker) -> bool {
+ self.waker
+ .with(|ptr| (*ptr).as_ref().unwrap().will_wake(waker))
+ }
+
+ pub(crate) fn wake_join(&self) {
+ self.waker.with(|ptr| match unsafe { &*ptr } {
+ Some(waker) => waker.wake_by_ref(),
+ None => panic!("waker missing"),
+ });
}
}
diff --git a/src/runtime/task/harness.rs b/src/runtime/task/harness.rs
index 208d48c..7d596e3 100644
--- a/src/runtime/task/harness.rs
+++ b/src/runtime/task/harness.rs
@@ -1,12 +1,13 @@
-use crate::runtime::task::core::{Cell, Core, Header, Trailer};
+use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Scheduler, Trailer};
use crate::runtime::task::state::Snapshot;
+use crate::runtime::task::waker::waker_ref;
use crate::runtime::task::{JoinError, Notified, Schedule, Task};
use std::future::Future;
use std::mem;
use std::panic;
use std::ptr::NonNull;
-use std::task::{Poll, Waker};
+use std::task::{Context, Poll, Waker};
/// Typed raw task handle
pub(super) struct Harness<T: Future, S: 'static> {
@@ -35,6 +36,13 @@ where
fn core(&self) -> &Core<T, S> {
unsafe { &self.cell.as_ref().core }
}
+
+ fn scheduler_view(&self) -> SchedulerView<'_, S> {
+ SchedulerView {
+ header: self.header(),
+ scheduler: &self.core().scheduler,
+ }
+ }
}
impl<T, S> Harness<T, S>
@@ -48,102 +56,46 @@ where
///
/// Panics raised while polling the future are handled.
pub(super) fn poll(self) {
- // If this is the first time the task is polled, the task will be bound
- // to the scheduler, in which case the task ref count must be
- // incremented.
- let is_not_bound = !self.core().is_bound();
-
- // Transition the task to the running state.
- //
- // A failure to transition here indicates the task has been cancelled
- // while in the run queue pending execution.
- let snapshot = match self.header().state.transition_to_running(is_not_bound) {
- Ok(snapshot) => snapshot,
- Err(_) => {
- // The task was shutdown while in the run queue. At this point,
- // we just hold a ref counted reference. Drop it here.
+ match self.poll_inner() {
+ PollFuture::Notified => {
+ // Signal yield
+ self.core().scheduler.yield_now(Notified(self.to_task()));
+ // The ref-count was incremented as part of
+ // `transition_to_idle`.
self.drop_reference();
- return;
}
- };
-
- if is_not_bound {
- // Ensure the task is bound to a scheduler instance. Since this is
- // the first time polling the task, a scheduler instance is pulled
- // from the local context and assigned to the task.
- //
- // The scheduler maintains ownership of the task and responds to
- // `wake` calls.
- //
- // The task reference count has been incremented.
- //
- // Safety: Since we have unique access to the task so that we can
- // safely call `bind_scheduler`.
- self.core().bind_scheduler(self.to_task());
+ PollFuture::DropReference => {
+ self.drop_reference();
+ }
+ PollFuture::Complete(out, is_join_interested) => {
+ self.complete(out, is_join_interested);
+ }
+ PollFuture::None => (),
}
+ }
+
+ fn poll_inner(&self) -> PollFuture<T::Output> {
+ let snapshot = match self.scheduler_view().transition_to_running() {
+ TransitionToRunning::Ok(snapshot) => snapshot,
+ TransitionToRunning::DropReference => return PollFuture::DropReference,
+ };
// The transition to `Running` done above ensures that a lock on the
// future has been obtained. This also ensures the `*mut T` pointer
// contains the future (as opposed to the output) and is initialized.
- let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
- struct Guard<'a, T: Future, S: Schedule> {
- core: &'a Core<T, S>,
- }
-
- impl<T: Future, S: Schedule> Drop for Guard<'_, T, S> {
- fn drop(&mut self) {
- self.core.drop_future_or_output();
- }
- }
-
- let guard = Guard { core: self.core() };
-
- // If the task is cancelled, avoid polling it, instead signalling it
- // is complete.
- if snapshot.is_cancelled() {
- Poll::Ready(Err(JoinError::cancelled()))
- } else {
- let res = guard.core.poll(self.header());
-
- // prevent the guard from dropping the future
- mem::forget(guard);
-
- res.map(Ok)
- }
- }));
-
- match res {
- Ok(Poll::Ready(out)) => {
- self.complete(out, snapshot.is_join_interested());
- }
- Ok(Poll::Pending) => {
- match self.header().state.transition_to_idle() {
- Ok(snapshot) => {
- if snapshot.is_notified() {
- // Signal yield
- self.core().yield_now(Notified(self.to_task()));
- // The ref-count was incremented as part of
- // `transition_to_idle`.
- self.drop_reference();
- }
- }
- Err(_) => self.cancel_task(),
- }
- }
- Err(err) => {
- self.complete(Err(JoinError::panic(err)), snapshot.is_join_interested());
- }
- }
+ let waker_ref = waker_ref::<T, S>(self.header());
+ let cx = Context::from_waker(&*waker_ref);
+ poll_future(self.header(), &self.core().stage, snapshot, cx)
}
pub(super) fn dealloc(self) {
// Release the join waker, if there is one.
- self.trailer().waker.with_mut(|_| ());
+ self.trailer().waker.with_mut(drop);
// Check causality
- self.core().stage.with_mut(|_| {});
- self.core().scheduler.with_mut(|_| {});
+ self.core().stage.with_mut(drop);
+ self.core().scheduler.with_mut(drop);
unsafe {
drop(Box::from_raw(self.cell.as_ptr()));
@@ -154,83 +106,9 @@ where
/// Read the task output into `dst`.
pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) {
- // Load a snapshot of the current task state
- let snapshot = self.header().state.load();
-
- debug_assert!(snapshot.is_join_interested());
-
- if !snapshot.is_complete() {
- // The waker must be stored in the task struct.
- let res = if snapshot.has_join_waker() {
- // There already is a waker stored in the struct. If it matches
- // the provided waker, then there is no further work to do.
- // Otherwise, the waker must be swapped.
- let will_wake = unsafe {
- // Safety: when `JOIN_INTEREST` is set, only `JOIN_HANDLE`
- // may mutate the `waker` field.
- self.trailer()
- .waker
- .with(|ptr| (*ptr).as_ref().unwrap().will_wake(waker))
- };
-
- if will_wake {
- // The task is not complete **and** the waker is up to date,
- // there is nothing further that needs to be done.
- return;
- }
-
- // Unset the `JOIN_WAKER` to gain mutable access to the `waker`
- // field then update the field with the new join worker.
- //
- // This requires two atomic operations, unsetting the bit and
- // then resetting it. If the task transitions to complete
- // concurrently to either one of those operations, then setting
- // the join waker fails and we proceed to reading the task
- // output.
- self.header()
- .state
- .unset_waker()
- .and_then(|snapshot| self.set_join_waker(waker.clone(), snapshot))
- } else {
- self.set_join_waker(waker.clone(), snapshot)
- };
-
- match res {
- Ok(_) => return,
- Err(snapshot) => {
- assert!(snapshot.is_complete());
- }
- }
+ if can_read_output(self.header(), self.trailer(), waker) {
+ *dst = Poll::Ready(self.core().stage.take_output());
}
-
- *dst = Poll::Ready(self.core().take_output());
- }
-
- fn set_join_waker(&self, waker: Waker, snapshot: Snapshot) -> Result<Snapshot, Snapshot> {
- assert!(snapshot.is_join_interested());
- assert!(!snapshot.has_join_waker());
-
- // Safety: Only the `JoinHandle` may set the `waker` field. When
- // `JOIN_INTEREST` is **not** set, nothing else will touch the field.
- unsafe {
- self.trailer().waker.with_mut(|ptr| {
- *ptr = Some(waker);
- });
- }
-
- // Update the `JoinWaker` state accordingly
- let res = self.header().state.set_join_waker();
-
- // If the state could not be updated, then clear the join waker
- if res.is_err() {
- unsafe {
- self.trailer().waker.with_mut(|ptr| {
- *ptr = None;
- });
- }
- }
-
- res
}
pub(super) fn drop_join_handle_slow(self) {
@@ -242,7 +120,7 @@ where
// the scheduler or `JoinHandle`. i.e. if the output remains in the
// task structure until the task is deallocated, it may be dropped
// by a Waker on any arbitrary thread.
- self.core().drop_future_or_output();
+ self.core().stage.drop_future_or_output();
}
// Drop the `JoinHandle` reference, possibly deallocating the task
@@ -258,7 +136,7 @@ where
pub(super) fn wake_by_ref(&self) {
if self.header().state.transition_to_notified() {
- self.core().schedule(Notified(self.to_task()));
+ self.core().scheduler.schedule(Notified(self.to_task()));
}
}
@@ -282,44 +160,65 @@ where
// By transitioning the lifcycle to `Running`, we have permission to
// drop the future.
- self.cancel_task();
+ let err = cancel_task(&self.core().stage);
+ self.complete(Err(err), true)
}
// ====== internal ======
- fn cancel_task(self) {
- // Drop the future from a panic guard.
- let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
- self.core().drop_future_or_output();
- }));
-
- if let Err(err) = res {
- // Dropping the future panicked, complete the join
- // handle with the panic to avoid dropping the panic
- // on the ground.
- self.complete(Err(JoinError::panic(err)), true);
- } else {
- self.complete(Err(JoinError::cancelled()), true);
- }
- }
-
- fn complete(mut self, output: super::Result<T::Output>, is_join_interested: bool) {
+ fn complete(self, output: super::Result<T::Output>, is_join_interested: bool) {
if is_join_interested {
// Store the output. The future has already been dropped
//
// Safety: Mutual exclusion is obtained by having transitioned the task
// state -> Running
- self.core().store_output(output);
+ let stage = &self.core().stage;
+ stage.store_output(output);
// Transition to `Complete`, notifying the `JoinHandle` if necessary.
- self.transition_to_complete();
+ transition_to_complete(self.header(), stage, &self.trailer());
}
// The task has completed execution and will no longer be scheduled.
//
// Attempts to batch a ref-dec with the state transition below.
- let ref_dec = if self.core().is_bound() {
- if let Some(task) = self.core().release(self.to_task()) {
+
+ if self
+ .scheduler_view()
+ .transition_to_terminal(is_join_interested)
+ {
+ self.dealloc()
+ }
+ }
+
+ fn to_task(&self) -> Task<S> {
+ self.scheduler_view().to_task()
+ }
+}
+
+enum TransitionToRunning {
+ Ok(Snapshot),
+ DropReference,
+}
+
+struct SchedulerView<'a, S> {
+ header: &'a Header,
+ scheduler: &'a Scheduler<S>,
+}
+
+impl<'a, S> SchedulerView<'a, S>
+where
+ S: Schedule,
+{
+ fn to_task(&self) -> Task<S> {
+ // SAFETY The header is from the same struct containing the scheduler `S` so the cast is safe
+ unsafe { Task::from_raw(self.header.into()) }
+ }
+
+ /// Returns true if the task should be deallocated.
+ fn transition_to_terminal(&self, is_join_interested: bool) -> bool {
+ let ref_dec = if self.scheduler.is_bound() {
+ if let Some(task) = self.scheduler.release(self.to_task()) {
mem::forget(task);
true
} else {
@@ -331,41 +230,217 @@ where
// This might deallocate
let snapshot = self
- .header()
+ .header
.state
.transition_to_terminal(!is_join_interested, ref_dec);
- if snapshot.ref_count() == 0 {
- self.dealloc()
+ snapshot.ref_count() == 0
+ }
+
+ fn transition_to_running(&self) -> TransitionToRunning {
+ // If this is the first time the task is polled, the task will be bound
+ // to the scheduler, in which case the task ref count must be
+ // incremented.
+ let is_not_bound = !self.scheduler.is_bound();
+
+ // Transition the task to the running state.
+ //
+ // A failure to transition here indicates the task has been cancelled
+ // while in the run queue pending execution.
+ let snapshot = match self.header.state.transition_to_running(is_not_bound) {
+ Ok(snapshot) => snapshot,
+ Err(_) => {
+ // The task was shutdown while in the run queue. At this point,
+ // we just hold a ref counted reference. Since we do not have access to it here
+ // return `DropReference` so the caller drops it.
+ return TransitionToRunning::DropReference;
+ }
+ };
+
+ if is_not_bound {
+ // Ensure the task is bound to a scheduler instance. Since this is
+ // the first time polling the task, a scheduler instance is pulled
+ // from the local context and assigned to the task.
+ //
+ // The scheduler maintains ownership of the task and responds to
+ // `wake` calls.
+ //
+ // The task reference count has been incremented.
+ //
+ // Safety: Since we have unique access to the task so that we can
+ // safely call `bind_scheduler`.
+ self.scheduler.bind_scheduler(self.to_task());
}
+ TransitionToRunning::Ok(snapshot)
}
+}
- /// Transitions the task's lifecycle to `Complete`. Notifies the
- /// `JoinHandle` if it still has interest in the completion.
- fn transition_to_complete(&mut self) {
- // Transition the task's lifecycle to `Complete` and get a snapshot of
- // the task's sate.
- let snapshot = self.header().state.transition_to_complete();
-
- if !snapshot.is_join_interested() {
- // The `JoinHandle` is not interested in the output of this task. It
- // is our responsibility to drop the output.
- self.core().drop_future_or_output();
- } else if snapshot.has_join_waker() {
- // Notify the join handle. The previous transition obtains the
- // lock on the waker cell.
- self.wake_join();
+/// Transitions the task's lifecycle to `Complete`. Notifies the
+/// `JoinHandle` if it still has interest in the completion.
+fn transition_to_complete<T>(header: &Header, stage: &CoreStage<T>, trailer: &Trailer)
+where
+ T: Future,
+{
+ // Transition the task's lifecycle to `Complete` and get a snapshot of
+ // the task's sate.
+ let snapshot = header.state.transition_to_complete();
+
+ if !snapshot.is_join_interested() {
+ // The `JoinHandle` is not interested in the output of this task. It
+ // is our responsibility to drop the output.
+ stage.drop_future_or_output();
+ } else if snapshot.has_join_waker() {
+ // Notify the join handle. The previous transition obtains the
+ // lock on the waker cell.
+ trailer.wake_join();
+ }
+}
+
+fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool {
+ // Load a snapshot of the current task state
+ let snapshot = header.state.load();
+
+ debug_assert!(snapshot.is_join_interested());
+
+ if !snapshot.is_complete() {
+ // The waker must be stored in the task struct.
+ let res = if snapshot.has_join_waker() {
+ // There already is a waker stored in the struct. If it matches
+ // the provided waker, then there is no further work to do.
+ // Otherwise, the waker must be swapped.
+ let will_wake = unsafe {
+ // Safety: when `JOIN_INTEREST` is set, only `JOIN_HANDLE`
+ // may mutate the `waker` field.
+ trailer.will_wake(waker)
+ };
+
+ if will_wake {
+ // The task is not complete **and** the waker is up to date,
+ // there is nothing further that needs to be done.
+ return false;
+ }
+
+ // Unset the `JOIN_WAKER` to gain mutable access to the `waker`
+ // field then update the field with the new join worker.
+ //
+ // This requires two atomic operations, unsetting the bit and
+ // then resetting it. If the task transitions to complete
+ // concurrently to either one of those operations, then setting
+ // the join waker fails and we proceed to reading the task
+ // output.
+ header
+ .state
+ .unset_waker()
+ .and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot))
+ } else {
+ set_join_waker(header, trailer, waker.clone(), snapshot)
+ };
+
+ match res {
+ Ok(_) => return false,
+ Err(snapshot) => {
+ assert!(snapshot.is_complete());
+ }
}
}
+ true
+}
- fn wake_join(&self) {
- self.trailer().waker.with(|ptr| match unsafe { &*ptr } {
- Some(waker) => waker.wake_by_ref(),
- None => panic!("waker missing"),
- });
+fn set_join_waker(
+ header: &Header,
+ trailer: &Trailer,
+ waker: Waker,
+ snapshot: Snapshot,
+) -> Result<Snapshot, Snapshot> {
+ assert!(snapshot.is_join_interested());
+ assert!(!snapshot.has_join_waker());
+
+ // Safety: Only the `JoinHandle` may set the `waker` field. When
+ // `JOIN_INTEREST` is **not** set, nothing else will touch the field.
+ unsafe {
+ trailer.set_waker(Some(waker));
}
- fn to_task(&self) -> Task<S> {
- unsafe { Task::from_raw(self.header().into()) }
+ // Update the `JoinWaker` state accordingly
+ let res = header.state.set_join_waker();
+
+ // If the state could not be updated, then clear the join waker
+ if res.is_err() {
+ unsafe {
+ trailer.set_waker(None);
+ }
+ }
+
+ res
+}
+
+enum PollFuture<T> {
+ Complete(Result<T, JoinError>, bool),
+ DropReference,
+ Notified,
+ None,
+}
+
+fn cancel_task<T: Future>(stage: &CoreStage<T>) -> JoinError {
+ // Drop the future from a panic guard.
+ let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ stage.drop_future_or_output();
+ }));
+
+ if let Err(err) = res {
+ // Dropping the future panicked, complete the join
+ // handle with the panic to avoid dropping the panic
+ // on the ground.
+ JoinError::panic(err)
+ } else {
+ JoinError::cancelled()
+ }
+}
+
+fn poll_future<T: Future>(
+ header: &Header,
+ core: &CoreStage<T>,
+ snapshot: Snapshot,
+ cx: Context<'_>,
+) -> PollFuture<T::Output> {
+ if snapshot.is_cancelled() {
+ PollFuture::Complete(Err(JoinError::cancelled()), snapshot.is_join_interested())
+ } else {
+ let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ struct Guard<'a, T: Future> {
+ core: &'a CoreStage<T>,
+ }
+
+ impl<T: Future> Drop for Guard<'_, T> {
+ fn drop(&mut self) {
+ self.core.drop_future_or_output();
+ }
+ }
+
+ let guard = Guard { core };
+
+ let res = guard.core.poll(cx);
+
+ // prevent the guard from dropping the future
+ mem::forget(guard);
+
+ res
+ }));
+ match res {
+ Ok(Poll::Pending) => match header.state.transition_to_idle() {
+ Ok(snapshot) => {
+ if snapshot.is_notified() {
+ PollFuture::Notified
+ } else {
+ PollFuture::None
+ }
+ }
+ Err(_) => PollFuture::Complete(Err(cancel_task(core)), true),
+ },
+ Ok(Poll::Ready(ok)) => PollFuture::Complete(Ok(ok), snapshot.is_join_interested()),
+ Err(err) => {
+ PollFuture::Complete(Err(JoinError::panic(err)), snapshot.is_join_interested())
+ }
+ }
}
}
diff --git a/src/runtime/thread_pool/idle.rs b/src/runtime/thread_pool/idle.rs
index 6e692fd..b77cce5 100644
--- a/src/runtime/thread_pool/idle.rs
+++ b/src/runtime/thread_pool/idle.rs
@@ -118,7 +118,7 @@ impl Idle {
if sleepers[index] == worker_id {
sleepers.swap_remove(index);
- // Update the state accordingly whle the lock is held.
+ // Update the state accordingly while the lock is held.
State::unpark_one(&self.state);
return;
diff --git a/src/runtime/thread_pool/worker.rs b/src/runtime/thread_pool/worker.rs
index 31712e4..dbd1aff 100644
--- a/src/runtime/thread_pool/worker.rs
+++ b/src/runtime/thread_pool/worker.rs
@@ -337,7 +337,7 @@ impl Context {
}
fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
- // Make sure thew orker is not in the **searching** state. This enables
+ // Make sure the worker is not in the **searching** state. This enables
// another idle worker to try to steal work.
core.transition_from_searching(&self.worker);
diff --git a/src/signal/unix.rs b/src/signal/unix.rs
index fc0f16d..0de875a 100644
--- a/src/signal/unix.rs
+++ b/src/signal/unix.rs
@@ -397,7 +397,41 @@ impl Signal {
poll_fn(|cx| self.poll_recv(cx)).await
}
- pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ /// Polls to receive the next signal notification event, outside of an
+ /// `async` context.
+ ///
+ /// This method returns:
+ ///
+ /// * `Poll::Pending` if no signals are available but the channel is not
+ /// closed.
+ /// * `Poll::Ready(Some(()))` if a signal is available.
+ /// * `Poll::Ready(None)` if the channel has been closed and all signals
+ /// sent before it was closed have been received.
+ ///
+ /// # Examples
+ ///
+ /// Polling from a manually implemented future
+ ///
+ /// ```rust,no_run
+ /// use std::pin::Pin;
+ /// use std::future::Future;
+ /// use std::task::{Context, Poll};
+ /// use tokio::signal::unix::Signal;
+ ///
+ /// struct MyFuture {
+ /// signal: Signal,
+ /// }
+ ///
+ /// impl Future for MyFuture {
+ /// type Output = Option<()>;
+ ///
+ /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ /// println!("polling MyFuture");
+ /// self.signal.poll_recv(cx)
+ /// }
+ /// }
+ /// ```
+ pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
self.rx.poll_recv(cx)
}
diff --git a/src/sync/broadcast.rs b/src/sync/broadcast.rs
index 1b94600..3ef8f84 100644
--- a/src/sync/broadcast.rs
+++ b/src/sync/broadcast.rs
@@ -163,6 +163,11 @@ pub struct Sender<T> {
/// Must not be used concurrently. Messages may be retrieved using
/// [`recv`][Receiver::recv].
///
+/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
+/// wrapper.
+///
+/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
+///
/// # Examples
///
/// ```
@@ -361,38 +366,16 @@ struct RecvGuard<'a, T> {
}
/// Receive a value future
-struct Recv<R, T>
-where
- R: AsMut<Receiver<T>>,
-{
+struct Recv<'a, T> {
/// Receiver being waited on
- receiver: R,
+ receiver: &'a mut Receiver<T>,
/// Entry in the waiter `LinkedList`
waiter: UnsafeCell<Waiter>,
-
- _p: std::marker::PhantomData<T>,
-}
-
-/// `AsMut<T>` is not implemented for `T` (coherence). Explicitly implementing
-/// `AsMut` for `Receiver` would be included in the public API of the receiver
-/// type. Instead, `Borrow` is used internally to bridge the gap.
-struct Borrow<T>(T);
-
-impl<T> AsMut<Receiver<T>> for Borrow<Receiver<T>> {
- fn as_mut(&mut self) -> &mut Receiver<T> {
- &mut self.0
- }
-}
-
-impl<'a, T> AsMut<Receiver<T>> for Borrow<&'a mut Receiver<T>> {
- fn as_mut(&mut self) -> &mut Receiver<T> {
- &mut *self.0
- }
}
-unsafe impl<R: AsMut<Receiver<T>> + Send, T: Send> Send for Recv<R, T> {}
-unsafe impl<R: AsMut<Receiver<T>> + Sync, T: Send> Sync for Recv<R, T> {}
+unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
+unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
/// Max number of receivers. Reserve space to lock.
const MAX_RECEIVERS: usize = usize::MAX >> 2;
@@ -892,7 +875,7 @@ impl<T: Clone> Receiver<T> {
/// }
/// ```
pub async fn recv(&mut self) -> Result<T, RecvError> {
- let fut = Recv::<_, T>::new(Borrow(self));
+ let fut = Recv::new(self);
fut.await
}
@@ -951,7 +934,7 @@ impl<T> Drop for Receiver<T> {
drop(tail);
- while self.next != until {
+ while self.next < until {
match self.recv_ref(None) {
Ok(_) => {}
// The channel is closed
@@ -965,11 +948,8 @@ impl<T> Drop for Receiver<T> {
}
}
-impl<R, T> Recv<R, T>
-where
- R: AsMut<Receiver<T>>,
-{
- fn new(receiver: R) -> Recv<R, T> {
+impl<'a, T> Recv<'a, T> {
+ fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
Recv {
receiver,
waiter: UnsafeCell::new(Waiter {
@@ -978,7 +958,6 @@ where
pointers: linked_list::Pointers::new(),
_p: PhantomPinned,
}),
- _p: std::marker::PhantomData,
}
}
@@ -990,14 +969,13 @@ where
is_unpin::<&mut Receiver<T>>();
let me = self.get_unchecked_mut();
- (me.receiver.as_mut(), &me.waiter)
+ (me.receiver, &me.waiter)
}
}
}
-impl<R, T> Future for Recv<R, T>
+impl<'a, T> Future for Recv<'a, T>
where
- R: AsMut<Receiver<T>>,
T: Clone,
{
type Output = Result<T, RecvError>;
@@ -1016,14 +994,11 @@ where
}
}
-impl<R, T> Drop for Recv<R, T>
-where
- R: AsMut<Receiver<T>>,
-{
+impl<'a, T> Drop for Recv<'a, T> {
fn drop(&mut self) {
// Acquire the tail lock. This is required for safety before accessing
// the waiter node.
- let mut tail = self.receiver.as_mut().shared.tail.lock();
+ let mut tail = self.receiver.shared.tail.lock();
// safety: tail lock is held
let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued });
diff --git a/src/sync/mpsc/block.rs b/src/sync/mpsc/block.rs
index e062f2b..6bef794 100644
--- a/src/sync/mpsc/block.rs
+++ b/src/sync/mpsc/block.rs
@@ -258,13 +258,15 @@ impl<T> Block<T> {
pub(crate) unsafe fn try_push(
&self,
block: &mut NonNull<Block<T>>,
- ordering: Ordering,
+ success: Ordering,
+ failure: Ordering,
) -> Result<(), NonNull<Block<T>>> {
block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);
let next_ptr = self
.next
- .compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering);
+ .compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure)
+ .unwrap_or_else(|x| x);
match NonNull::new(next_ptr) {
Some(next_ptr) => Err(next_ptr),
@@ -306,11 +308,11 @@ impl<T> Block<T> {
//
// `Release` ensures that the newly allocated block is available to
// other threads acquiring the next pointer.
- let next = NonNull::new(self.next.compare_and_swap(
- ptr::null_mut(),
- new_block.as_ptr(),
- AcqRel,
- ));
+ let next = NonNull::new(
+ self.next
+ .compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire)
+ .unwrap_or_else(|x| x),
+ );
let next = match next {
Some(next) => next,
@@ -333,7 +335,7 @@ impl<T> Block<T> {
// TODO: Should this iteration be capped?
loop {
- let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) };
+ let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel, Acquire) };
curr = match actual {
Ok(_) => {
diff --git a/src/sync/mpsc/bounded.rs b/src/sync/mpsc/bounded.rs
index 2dae7e2..dfe4a74 100644
--- a/src/sync/mpsc/bounded.rs
+++ b/src/sync/mpsc/bounded.rs
@@ -22,10 +22,11 @@ pub struct Sender<T> {
/// Permit to send one value into the channel.
///
-/// `Permit` values are returned by [`Sender::reserve()`] and are used to
-/// guarantee channel capacity before generating a message to send.
+/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
+/// and are used to guarantee channel capacity before generating a message to send.
///
/// [`Sender::reserve()`]: Sender::reserve
+/// [`Sender::try_reserve()`]: Sender::try_reserve
pub struct Permit<'a, T> {
chan: &'a chan::Tx<T, Semaphore>,
}
@@ -33,6 +34,10 @@ pub struct Permit<'a, T> {
/// Receive values from the associated `Sender`.
///
/// Instances are created by the [`channel`](channel) function.
+///
+/// This receiver can be turned into a `Stream` using [`ReceiverStream`].
+///
+/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
pub struct Receiver<T> {
/// The channel receiver
chan: chan::Rx<T, Semaphore>,
@@ -104,8 +109,21 @@ impl<T> Receiver<T> {
/// Receives the next value for this receiver.
///
- /// `None` is returned when all `Sender` halves have dropped, indicating
- /// that no further values can be sent on the channel.
+ /// This method returns `None` if the channel has been closed and there are
+ /// no remaining messages in the channel's buffer. This indicates that no
+ /// further values can ever be received from this `Receiver`. The channel is
+ /// closed when all senders have been dropped, or when [`close`] is called.
+ ///
+ /// If there are no messages in the channel's buffer, but the channel has
+ /// not yet been closed, this method will sleep until a message is sent or
+ /// the channel is closed.
+ ///
+ /// Note that if [`close`] is called, but there are still outstanding
+ /// [`Permits`] from before it was closed, the channel is not considered
+ /// closed by `recv` until the permits are released.
+ ///
+ /// [`close`]: Self::close
+ /// [`Permits`]: struct@crate::sync::mpsc::Permit
///
/// # Examples
///
@@ -148,6 +166,27 @@ impl<T> Receiver<T> {
/// Blocking receive to call outside of asynchronous contexts.
///
+ /// This method returns `None` if the channel has been closed and there are
+ /// no remaining messages in the channel's buffer. This indicates that no
+ /// further values can ever be received from this `Receiver`. The channel is
+ /// closed when all senders have been dropped, or when [`close`] is called.
+ ///
+ /// If there are no messages in the channel's buffer, but the channel has
+ /// not yet been closed, this method will block until a message is sent or
+ /// the channel is closed.
+ ///
+ /// This method is intended for use cases where you are sending from
+ /// asynchronous code to synchronous code, and will work even if the sender
+ /// is not using [`blocking_send`] to send the message.
+ ///
+ /// Note that if [`close`] is called, but there are still outstanding
+ /// [`Permits`] from before it was closed, the channel is not considered
+ /// closed by `blocking_recv` until the permits are released.
+ ///
+ /// [`close`]: Self::close
+ /// [`Permits`]: struct@crate::sync::mpsc::Permit
+ /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
+ ///
/// # Panics
///
/// This function panics if called within an asynchronous execution
@@ -197,14 +236,16 @@ impl<T> Receiver<T> {
self.chan.try_recv()
}
- /// Closes the receiving half of a channel, without dropping it.
+ /// Closes the receiving half of a channel without dropping it.
///
/// This prevents any further messages from being sent on the channel while
/// still enabling the receiver to drain messages that are buffered. Any
/// outstanding [`Permit`] values will still be able to send messages.
///
- /// In order to guarantee no messages are dropped, after calling `close()`,
- /// `recv()` must be called until `None` is returned.
+ /// To guarantee that no messages are dropped, after calling `close()`,
+ /// `recv()` must be called until `None` is returned. If there are
+ /// outstanding [`Permit`] values, the `recv` method will not return `None`
+ /// until those are released.
///
/// [`Permit`]: Permit
///
@@ -356,7 +397,7 @@ impl<T> Sender<T> {
/// tx4.closed(),
/// tx5.closed()
/// );
- //// println!("Receiver dropped");
+ /// println!("Receiver dropped");
/// }
/// ```
pub async fn closed(&self) {
@@ -501,6 +542,12 @@ impl<T> Sender<T> {
/// Blocking send to call outside of asynchronous contexts.
///
+ /// This method is intended for use cases where you are sending from
+ /// synchronous code to asynchronous code, and will work even if the
+ /// receiver is not using [`blocking_recv`] to receive the message.
+ ///
+ /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
+ ///
/// # Panics
///
/// This function panics if called within an asynchronous execution
@@ -599,6 +646,58 @@ impl<T> Sender<T> {
Ok(Permit { chan: &self.chan })
}
+
+ /// Try to acquire a slot in the channel without waiting for the slot to become
+ /// available.
+ ///
+ /// If the channel is full this function will return [`TrySendError`], otherwise
+ /// if there is a slot available it will return a [`Permit`] that will then allow you
+ /// to [`send`] on the channel with a guaranteed slot. This function is similar to
+ /// [`reserve`] except it does not await for the slot to become available.
+ ///
+ /// Dropping [`Permit`] without sending a message releases the capacity back
+ /// to the channel.
+ ///
+ /// [`Permit`]: Permit
+ /// [`send`]: Permit::send
+ /// [`reserve`]: Sender::reserve
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Reserve capacity
+ /// let permit = tx.try_reserve().unwrap();
+ ///
+ /// // Trying to send directly on the `tx` will fail due to no
+ /// // available capacity.
+ /// assert!(tx.try_send(123).is_err());
+ ///
+ /// // Trying to reserve an additional slot on the `tx` will
+ /// // fail because there is no capacity.
+ /// assert!(tx.try_reserve().is_err());
+ ///
+ /// // Sending on the permit succeeds
+ /// permit.send(456);
+ ///
+ /// // The value sent on the permit is received
+ /// assert_eq!(rx.recv().await.unwrap(), 456);
+ ///
+ /// }
+ /// ```
+ pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
+ match self.chan.semaphore().0.try_acquire(1) {
+ Ok(_) => {}
+ Err(_) => return Err(TrySendError::Full(())),
+ }
+
+ Ok(Permit { chan: &self.chan })
+ }
}
impl<T> Clone for Sender<T> {
diff --git a/src/sync/mpsc/list.rs b/src/sync/mpsc/list.rs
index 2f4c532..5dad2ba 100644
--- a/src/sync/mpsc/list.rs
+++ b/src/sync/mpsc/list.rs
@@ -140,11 +140,11 @@ impl<T> Tx<T> {
//
// Acquire is not needed as any "actual" value is not accessed.
// At this point, the linked list is walked to acquire blocks.
- let actual =
- self.block_tail
- .compare_and_swap(block_ptr, next_block.as_ptr(), Release);
-
- if actual == block_ptr {
+ if self
+ .block_tail
+ .compare_exchange(block_ptr, next_block.as_ptr(), Release, Relaxed)
+ .is_ok()
+ {
// Synchronize with any senders
let tail_position = self.tail_position.fetch_add(0, Release);
@@ -191,7 +191,7 @@ impl<T> Tx<T> {
// TODO: Unify this logic with Block::grow
for _ in 0..3 {
- match curr.as_ref().try_push(&mut block, AcqRel) {
+ match curr.as_ref().try_push(&mut block, AcqRel, Acquire) {
Ok(_) => {
reused = true;
break;
diff --git a/src/sync/mpsc/unbounded.rs b/src/sync/mpsc/unbounded.rs
index 38953b8..29a0a29 100644
--- a/src/sync/mpsc/unbounded.rs
+++ b/src/sync/mpsc/unbounded.rs
@@ -33,6 +33,10 @@ impl<T> fmt::Debug for UnboundedSender<T> {
///
/// Instances are created by the
/// [`unbounded_channel`](unbounded_channel) function.
+///
+/// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`].
+///
+/// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html
pub struct UnboundedReceiver<T> {
/// The channel receiver
chan: chan::Rx<T, Semaphore>,
diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs
index 21e44ca..0283336 100644
--- a/src/sync/mutex.rs
+++ b/src/sync/mutex.rs
@@ -142,7 +142,7 @@ pub struct MutexGuard<'a, T: ?Sized> {
/// unlike `MutexGuard`, it will have the `'static` lifetime.
///
/// As long as you have this guard, you have exclusive access to the underlying
-/// `T`. The guard internally keeps a reference-couned pointer to the original
+/// `T`. The guard internally keeps a reference-counted pointer to the original
/// `Mutex`, so even if the lock goes away, the guard remains valid.
///
/// The lock is automatically released whenever the guard is dropped, at which
@@ -161,13 +161,22 @@ unsafe impl<T> Sync for Mutex<T> where T: ?Sized + Send {}
unsafe impl<T> Sync for MutexGuard<'_, T> where T: ?Sized + Send + Sync {}
unsafe impl<T> Sync for OwnedMutexGuard<T> where T: ?Sized + Send + Sync {}
-/// Error returned from the [`Mutex::try_lock`] function.
+/// Error returned from the [`Mutex::try_lock`], [`RwLock::try_read`] and
+/// [`RwLock::try_write`] functions.
///
-/// A `try_lock` operation can only fail if the mutex is already locked.
+/// `Mutex::try_lock` operation will only fail if the mutex is already locked.
+///
+/// `RwLock::try_read` operation will only fail if the lock is currently held
+/// by an exclusive writer.
+///
+/// `RwLock::try_write` operation will if lock is held by any reader or by an
+/// exclusive writer.
///
/// [`Mutex::try_lock`]: Mutex::try_lock
+/// [`RwLock::try_read`]: fn@super::RwLock::try_read
+/// [`RwLock::try_write`]: fn@super::RwLock::try_write
#[derive(Debug)]
-pub struct TryLockError(());
+pub struct TryLockError(pub(super) ());
impl fmt::Display for TryLockError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
diff --git a/src/sync/oneshot.rs b/src/sync/oneshot.rs
index ece9aba..20d39dc 100644
--- a/src/sync/oneshot.rs
+++ b/src/sync/oneshot.rs
@@ -84,10 +84,42 @@ struct Inner<T> {
value: UnsafeCell<Option<T>>,
/// The task to notify when the receiver drops without consuming the value.
- tx_task: UnsafeCell<MaybeUninit<Waker>>,
+ tx_task: Task,
/// The task to notify when the value is sent.
- rx_task: UnsafeCell<MaybeUninit<Waker>>,
+ rx_task: Task,
+}
+
+struct Task(UnsafeCell<MaybeUninit<Waker>>);
+
+impl Task {
+ unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool {
+ self.with_task(|w| w.will_wake(cx.waker()))
+ }
+
+ unsafe fn with_task<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(&Waker) -> R,
+ {
+ self.0.with(|ptr| {
+ let waker: *const Waker = (&*ptr).as_ptr();
+ f(&*waker)
+ })
+ }
+
+ unsafe fn drop_task(&self) {
+ self.0.with_mut(|ptr| {
+ let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
+ ptr.drop_in_place();
+ });
+ }
+
+ unsafe fn set_task(&self, cx: &mut Context<'_>) {
+ self.0.with_mut(|ptr| {
+ let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
+ ptr.write(cx.waker().clone());
+ });
+ }
}
#[derive(Clone, Copy)]
@@ -127,8 +159,8 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Inner {
state: AtomicUsize::new(State::new().as_usize()),
value: UnsafeCell::new(None),
- tx_task: UnsafeCell::new(MaybeUninit::uninit()),
- rx_task: UnsafeCell::new(MaybeUninit::uninit()),
+ tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
+ rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
});
let tx = Sender {
@@ -188,9 +220,9 @@ impl<T> Sender<T> {
});
if !inner.complete() {
- return Err(inner
- .value
- .with_mut(|ptr| unsafe { (*ptr).take() }.unwrap()));
+ unsafe {
+ return Err(inner.consume_value().unwrap());
+ }
}
Ok(())
@@ -357,7 +389,7 @@ impl<T> Sender<T> {
}
if state.is_tx_task_set() {
- let will_notify = unsafe { inner.with_tx_task(|w| w.will_wake(cx.waker())) };
+ let will_notify = unsafe { inner.tx_task.will_wake(cx) };
if !will_notify {
state = State::unset_tx_task(&inner.state);
@@ -368,7 +400,7 @@ impl<T> Sender<T> {
coop.made_progress();
return Ready(());
} else {
- unsafe { inner.drop_tx_task() };
+ unsafe { inner.tx_task.drop_task() };
}
}
}
@@ -376,7 +408,7 @@ impl<T> Sender<T> {
if !state.is_tx_task_set() {
// Attempt to set the task
unsafe {
- inner.set_tx_task(cx);
+ inner.tx_task.set_task(cx);
}
// Update the state
@@ -584,7 +616,7 @@ impl<T> Inner<T> {
if prev.is_rx_task_set() {
// TODO: Consume waker?
unsafe {
- self.with_rx_task(Waker::wake_by_ref);
+ self.rx_task.with_task(Waker::wake_by_ref);
}
}
@@ -609,7 +641,7 @@ impl<T> Inner<T> {
Ready(Err(RecvError(())))
} else {
if state.is_rx_task_set() {
- let will_notify = unsafe { self.with_rx_task(|w| w.will_wake(cx.waker())) };
+ let will_notify = unsafe { self.rx_task.will_wake(cx) };
// Check if the task is still the same
if !will_notify {
@@ -625,7 +657,7 @@ impl<T> Inner<T> {
None => Ready(Err(RecvError(()))),
};
} else {
- unsafe { self.drop_rx_task() };
+ unsafe { self.rx_task.drop_task() };
}
}
}
@@ -633,7 +665,7 @@ impl<T> Inner<T> {
if !state.is_rx_task_set() {
// Attempt to set the task
unsafe {
- self.set_rx_task(cx);
+ self.rx_task.set_task(cx);
}
// Update the state
@@ -660,7 +692,7 @@ impl<T> Inner<T> {
if prev.is_tx_task_set() && !prev.is_complete() {
unsafe {
- self.with_tx_task(Waker::wake_by_ref);
+ self.tx_task.with_task(Waker::wake_by_ref);
}
}
}
@@ -669,72 +701,28 @@ impl<T> Inner<T> {
unsafe fn consume_value(&self) -> Option<T> {
self.value.with_mut(|ptr| (*ptr).take())
}
-
- unsafe fn with_rx_task<F, R>(&self, f: F) -> R
- where
- F: FnOnce(&Waker) -> R,
- {
- self.rx_task.with(|ptr| {
- let waker: *const Waker = (&*ptr).as_ptr();
- f(&*waker)
- })
- }
-
- unsafe fn with_tx_task<F, R>(&self, f: F) -> R
- where
- F: FnOnce(&Waker) -> R,
- {
- self.tx_task.with(|ptr| {
- let waker: *const Waker = (&*ptr).as_ptr();
- f(&*waker)
- })
- }
-
- unsafe fn drop_rx_task(&self) {
- self.rx_task.with_mut(|ptr| {
- let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
- ptr.drop_in_place();
- });
- }
-
- unsafe fn drop_tx_task(&self) {
- self.tx_task.with_mut(|ptr| {
- let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
- ptr.drop_in_place();
- });
- }
-
- unsafe fn set_rx_task(&self, cx: &mut Context<'_>) {
- self.rx_task.with_mut(|ptr| {
- let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
- ptr.write(cx.waker().clone());
- });
- }
-
- unsafe fn set_tx_task(&self, cx: &mut Context<'_>) {
- self.tx_task.with_mut(|ptr| {
- let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
- ptr.write(cx.waker().clone());
- });
- }
}
unsafe impl<T: Send> Send for Inner<T> {}
unsafe impl<T: Send> Sync for Inner<T> {}
+fn mut_load(this: &mut AtomicUsize) -> usize {
+ this.with_mut(|v| *v)
+}
+
impl<T> Drop for Inner<T> {
fn drop(&mut self) {
- let state = State(self.state.with_mut(|v| *v));
+ let state = State(mut_load(&mut self.state));
if state.is_rx_task_set() {
unsafe {
- self.drop_rx_task();
+ self.rx_task.drop_task();
}
}
if state.is_tx_task_set() {
unsafe {
- self.drop_tx_task();
+ self.tx_task.drop_task();
}
}
}
diff --git a/src/sync/rwlock.rs b/src/sync/rwlock.rs
index 2e72cf7..b0777b2 100644
--- a/src/sync/rwlock.rs
+++ b/src/sync/rwlock.rs
@@ -1,4 +1,5 @@
-use crate::sync::batch_semaphore::Semaphore;
+use crate::sync::batch_semaphore::{Semaphore, TryAcquireError};
+use crate::sync::mutex::TryLockError;
use std::cell::UnsafeCell;
use std::fmt;
use std::marker;
@@ -244,7 +245,7 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
/// locks, since [`RwLock`] is fair and it is possible that a writer is next
/// in line.
///
- /// Returns an RAII guard which will drop the read access of this rwlock
+ /// Returns an RAII guard which will drop this read access of the `RwLock`
/// when dropped.
///
/// # Examples
@@ -397,12 +398,20 @@ impl<T: ?Sized> RwLock<T> {
}
}
- /// Locks this rwlock with shared read access, causing the current task
+ /// Locks this `RwLock` with shared read access, causing the current task
/// to yield until the lock has been acquired.
///
- /// The calling task will yield until there are no more writers which
- /// hold the lock. There may be other readers currently inside the lock when
- /// this method returns.
+ /// The calling task will yield until there are no writers which hold the
+ /// lock. There may be other readers inside the lock when the task resumes.
+ ///
+ /// Note that under the priority policy of [`RwLock`], read locks are not
+ /// granted until prior write locks, to prevent starvation. Therefore
+ /// deadlock may occur if a read lock is held by the current task, a write
+ /// lock attempt is made, and then a subsequent read lock attempt is made
+ /// by the current task.
+ ///
+ /// Returns an RAII guard which will drop this read access of the `RwLock`
+ /// when dropped.
///
/// # Examples
///
@@ -422,12 +431,13 @@ impl<T: ?Sized> RwLock<T> {
/// // While main has an active read lock, we acquire one too.
/// let r = c_lock.read().await;
/// assert_eq!(*r, 1);
- /// }).await.expect("The spawned task has paniced");
+ /// }).await.expect("The spawned task has panicked");
///
/// // Drop the guard after the spawned task finishes.
/// drop(n);
///}
/// ```
+ ///
pub async fn read(&self) -> RwLockReadGuard<'_, T> {
self.s.acquire(1).await.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
@@ -441,13 +451,59 @@ impl<T: ?Sized> RwLock<T> {
}
}
- /// Locks this rwlock with exclusive write access, causing the current task
- /// to yield until the lock has been acquired.
+ /// Attempts to acquire this `RwLock` with shared read access.
+ ///
+ /// If the access couldn't be acquired immediately, returns [`TryLockError`].
+ /// Otherwise, an RAII guard is returned which will release read access
+ /// when dropped.
+ ///
+ /// [`TryLockError`]: TryLockError
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::RwLock;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let lock = Arc::new(RwLock::new(1));
+ /// let c_lock = lock.clone();
+ ///
+ /// let v = lock.try_read().unwrap();
+ /// assert_eq!(*v, 1);
+ ///
+ /// tokio::spawn(async move {
+ /// // While main has an active read lock, we acquire one too.
+ /// let n = c_lock.read().await;
+ /// assert_eq!(*n, 1);
+ /// }).await.expect("The spawned task has panicked");
+ ///
+ /// // Drop the guard when spawned task finishes.
+ /// drop(v);
+ /// }
+ /// ```
+ pub fn try_read(&self) -> Result<RwLockReadGuard<'_, T>, TryLockError> {
+ match self.s.try_acquire(1) {
+ Ok(permit) => permit,
+ Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
+ Err(TryAcquireError::Closed) => unreachable!(),
+ }
+
+ Ok(RwLockReadGuard {
+ s: &self.s,
+ data: self.c.get(),
+ marker: marker::PhantomData,
+ })
+ }
+
+ /// Locks this `RwLock` with exclusive write access, causing the current
+ /// task to yield until the lock has been acquired.
///
- /// This function will not return while other writers or other readers
+ /// The calling task will yield while other writers or readers
/// currently have access to the lock.
///
- /// Returns an RAII guard which will drop the write access of this rwlock
+ /// Returns an RAII guard which will drop the write access of this `RwLock`
/// when dropped.
///
/// # Examples
@@ -476,6 +532,43 @@ impl<T: ?Sized> RwLock<T> {
}
}
+ /// Attempts to acquire this `RwLock` with exclusive write access.
+ ///
+ /// If the access couldn't be acquired immediately, returns [`TryLockError`].
+ /// Otherwise, an RAII guard is returned which will release write access
+ /// when dropped.
+ ///
+ /// [`TryLockError`]: TryLockError
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::RwLock;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let rw = RwLock::new(1);
+ ///
+ /// let v = rw.read().await;
+ /// assert_eq!(*v, 1);
+ ///
+ /// assert!(rw.try_write().is_err());
+ /// }
+ /// ```
+ pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, T>, TryLockError> {
+ match self.s.try_acquire(MAX_READS as u32) {
+ Ok(permit) => permit,
+ Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
+ Err(TryAcquireError::Closed) => unreachable!(),
+ }
+
+ Ok(RwLockWriteGuard {
+ s: &self.s,
+ data: self.c.get(),
+ marker: marker::PhantomData,
+ })
+ }
+
/// Returns a mutable reference to the underlying data.
///
/// Since this call borrows the `RwLock` mutably, no actual locking needs to
diff --git a/src/sync/semaphore.rs b/src/sync/semaphore.rs
index 5555bdf..bff5de9 100644
--- a/src/sync/semaphore.rs
+++ b/src/sync/semaphore.rs
@@ -13,6 +13,11 @@ use std::sync::Arc;
/// function immediately returns a permit. However, if no remaining permits are
/// available, `acquire` (asynchronously) waits until an outstanding permit is
/// dropped. At this point, the freed permit is assigned to the caller.
+///
+/// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`]
+/// utility.
+///
+/// [`PollSemaphore`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSemaphore.html
#[derive(Debug)]
pub struct Semaphore {
/// The low level semaphore
diff --git a/src/sync/task/atomic_waker.rs b/src/sync/task/atomic_waker.rs
index ae4cac7..5917204 100644
--- a/src/sync/task/atomic_waker.rs
+++ b/src/sync/task/atomic_waker.rs
@@ -171,7 +171,11 @@ impl AtomicWaker {
where
W: WakerRef,
{
- match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) {
+ match self
+ .state
+ .compare_exchange(WAITING, REGISTERING, Acquire, Acquire)
+ .unwrap_or_else(|x| x)
+ {
WAITING => {
unsafe {
// Locked acquired, update the waker cell
@@ -219,6 +223,8 @@ impl AtomicWaker {
waker.wake();
// This is equivalent to a spin lock, so use a spin hint.
+ // TODO: once we bump MSRV to 1.49+, use `hint::spin_loop` instead.
+ #[allow(deprecated)]
atomic::spin_loop_hint();
}
state => {
diff --git a/src/sync/tests/loom_broadcast.rs b/src/sync/tests/loom_broadcast.rs
index 4b1f034..039b01b 100644
--- a/src/sync/tests/loom_broadcast.rs
+++ b/src/sync/tests/loom_broadcast.rs
@@ -178,3 +178,30 @@ fn drop_rx() {
assert_ok!(th2.join());
});
}
+
+#[test]
+fn drop_multiple_rx_with_overflow() {
+ loom::model(move || {
+ // It is essential to have multiple senders and receivers in this test case.
+ let (tx, mut rx) = broadcast::channel(1);
+ let _rx2 = tx.subscribe();
+
+ let _ = tx.send(());
+ let tx2 = tx.clone();
+ let th1 = thread::spawn(move || {
+ block_on(async {
+ for _ in 0..100 {
+ let _ = tx2.send(());
+ }
+ });
+ });
+ let _ = tx.send(());
+
+ let th2 = thread::spawn(move || {
+ block_on(async { while let Ok(_) = rx.recv().await {} });
+ });
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ });
+}
diff --git a/src/sync/watch.rs b/src/sync/watch.rs
index 6732d38..5590a75 100644
--- a/src/sync/watch.rs
+++ b/src/sync/watch.rs
@@ -61,6 +61,11 @@ use std::ops;
/// Receives values from the associated [`Sender`](struct@Sender).
///
/// Instances are created by the [`channel`](fn@channel) function.
+///
+/// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
+/// wrapper.
+///
+/// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
#[derive(Debug)]
pub struct Receiver<T> {
/// Pointer to the shared state
diff --git a/src/task/local.rs b/src/task/local.rs
index 566b2f2..ee11511 100644
--- a/src/task/local.rs
+++ b/src/task/local.rs
@@ -43,10 +43,13 @@ cfg_rt! {
/// }).await.unwrap();
/// }
/// ```
- /// In order to spawn `!Send` futures, we can use a local task set to
- /// schedule them on the thread calling [`Runtime::block_on`]. When running
- /// inside of the local task set, we can use [`task::spawn_local`], which can
- /// spawn `!Send` futures. For example:
+ ///
+ /// # Use with `run_until`
+ ///
+ /// To spawn `!Send` futures, we can use a local task set to schedule them
+ /// on the thread calling [`Runtime::block_on`]. When running inside of the
+ /// local task set, we can use [`task::spawn_local`], which can spawn
+ /// `!Send` futures. For example:
///
/// ```rust
/// use std::rc::Rc;
@@ -71,6 +74,9 @@ cfg_rt! {
/// }).await;
/// }
/// ```
+ /// **Note:** The `run_until` method can only be used in `#[tokio::main]`,
+ /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It
+ /// cannot be used inside a task spawned with `tokio::spawn`.
///
/// ## Awaiting a `LocalSet`
///
@@ -104,11 +110,106 @@ cfg_rt! {
/// local.await;
/// }
/// ```
+ /// **Note:** Awaiting a `LocalSet` can only be done inside
+ /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to
+ /// [`Runtime::block_on`]. It cannot be used inside a task spawned with
+ /// `tokio::spawn`.
+ ///
+ /// ## Use inside `tokio::spawn`
+ ///
+ /// The two methods mentioned above cannot be used inside `tokio::spawn`, so
+ /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do
+ /// something else. The solution is to create the `LocalSet` somewhere else,
+ /// and communicate with it using an [`mpsc`] channel.
+ ///
+ /// The following example puts the `LocalSet` inside a new thread.
+ /// ```
+ /// use tokio::runtime::Builder;
+ /// use tokio::sync::{mpsc, oneshot};
+ /// use tokio::task::LocalSet;
+ ///
+ /// // This struct describes the task you want to spawn. Here we include
+ /// // some simple examples. The oneshot channel allows sending a response
+ /// // to the spawner.
+ /// #[derive(Debug)]
+ /// enum Task {
+ /// PrintNumber(u32),
+ /// AddOne(u32, oneshot::Sender<u32>),
+ /// }
+ ///
+ /// #[derive(Clone)]
+ /// struct LocalSpawner {
+ /// send: mpsc::UnboundedSender<Task>,
+ /// }
+ ///
+ /// impl LocalSpawner {
+ /// pub fn new() -> Self {
+ /// let (send, mut recv) = mpsc::unbounded_channel();
+ ///
+ /// let rt = Builder::new_current_thread()
+ /// .enable_all()
+ /// .build()
+ /// .unwrap();
+ ///
+ /// std::thread::spawn(move || {
+ /// let local = LocalSet::new();
+ ///
+ /// local.spawn_local(async move {
+ /// while let Some(new_task) = recv.recv().await {
+ /// tokio::task::spawn_local(run_task(new_task));
+ /// }
+ /// // If the while loop returns, then all the LocalSpawner
+ /// // objects have have been dropped.
+ /// });
+ ///
+ /// // This will return once all senders are dropped and all
+ /// // spawned tasks have returned.
+ /// rt.block_on(local);
+ /// });
+ ///
+ /// Self {
+ /// send,
+ /// }
+ /// }
+ ///
+ /// pub fn spawn(&self, task: Task) {
+ /// self.send.send(task).expect("Thread with LocalSet has shut down.");
+ /// }
+ /// }
+ ///
+ /// // This task may do !Send stuff. We use printing a number as an example,
+ /// // but it could be anything.
+ /// //
+ /// // The Task struct is an enum to support spawning many different kinds
+ /// // of operations.
+ /// async fn run_task(task: Task) {
+ /// match task {
+ /// Task::PrintNumber(n) => {
+ /// println!("{}", n);
+ /// },
+ /// Task::AddOne(n, response) => {
+ /// // We ignore failures to send the response.
+ /// let _ = response.send(n + 1);
+ /// },
+ /// }
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let spawner = LocalSpawner::new();
+ ///
+ /// let (send, response) = oneshot::channel();
+ /// spawner.spawn(Task::AddOne(10, send));
+ /// let eleven = response.await.unwrap();
+ /// assert_eq!(eleven, 11);
+ /// }
+ /// ```
///
/// [`Send`]: trait@std::marker::Send
/// [local task set]: struct@LocalSet
/// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
/// [`task::spawn_local`]: fn@spawn_local
+ /// [`mpsc`]: mod@crate::sync::mpsc
pub struct LocalSet {
/// Current scheduler tick
tick: Cell<u8>,
@@ -283,6 +384,7 @@ impl LocalSet {
let future = crate::util::trace::task(future, "local");
let (task, handle) = unsafe { task::joinable_local(future) };
self.context.tasks.borrow_mut().queue.push_back(task);
+ self.context.shared.waker.wake();
handle
}
diff --git a/src/task/spawn.rs b/src/task/spawn.rs
index a060852..d846fb4 100644
--- a/src/task/spawn.rs
+++ b/src/task/spawn.rs
@@ -1,5 +1,6 @@
use crate::runtime;
use crate::task::JoinHandle;
+use crate::util::error::CONTEXT_MISSING_ERROR;
use std::future::Future;
@@ -129,7 +130,7 @@ cfg_rt! {
T::Output: Send + 'static,
{
let spawn_handle = runtime::context::spawn_handle()
- .expect("must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`");
+ .expect(CONTEXT_MISSING_ERROR);
let task = crate::util::trace::task(task, "task");
spawn_handle.spawn(task)
}
diff --git a/src/time/clock.rs b/src/time/clock.rs
index a62fbe3..8957800 100644
--- a/src/time/clock.rs
+++ b/src/time/clock.rs
@@ -17,7 +17,7 @@ cfg_not_test_util! {
}
impl Clock {
- pub(crate) fn new(_enable_pausing: bool) -> Clock {
+ pub(crate) fn new(_enable_pausing: bool, _start_paused: bool) -> Clock {
Clock {}
}
@@ -78,7 +78,8 @@ cfg_test_util! {
/// that depend on time.
///
/// Pausing time requires the `current_thread` Tokio runtime. This is the
- /// default runtime used by `#[tokio::test]`
+ /// default runtime used by `#[tokio::test]`. The runtime can be initialized
+ /// with time in a paused state using the `Builder::start_paused` method.
///
/// # Panics
///
@@ -149,16 +150,22 @@ cfg_test_util! {
impl Clock {
/// Return a new `Clock` instance that uses the current execution context's
/// source of time.
- pub(crate) fn new(enable_pausing: bool) -> Clock {
+ pub(crate) fn new(enable_pausing: bool, start_paused: bool) -> Clock {
let now = std::time::Instant::now();
- Clock {
+ let clock = Clock {
inner: Arc::new(Mutex::new(Inner {
enable_pausing,
base: now,
unfrozen: Some(now),
})),
+ };
+
+ if start_paused {
+ clock.pause();
}
+
+ clock
}
pub(crate) fn pause(&self) {
diff --git a/src/time/driver/entry.rs b/src/time/driver/entry.rs
index bcad988..11366d2 100644
--- a/src/time/driver/entry.rs
+++ b/src/time/driver/entry.rs
@@ -53,6 +53,7 @@
//! refuse to mark the timer as pending.
use crate::loom::cell::UnsafeCell;
+use crate::loom::sync::atomic::AtomicU64;
use crate::loom::sync::atomic::Ordering;
use crate::sync::AtomicWaker;
@@ -71,79 +72,6 @@ const STATE_DEREGISTERED: u64 = u64::max_value();
const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1;
const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE;
-/// Not all platforms support 64-bit compare-and-swap. This hack replaces the
-/// AtomicU64 with a mutex around a u64 on platforms that don't. This is slow,
-/// unfortunately, but 32-bit platforms are a bit niche so it'll do for now.
-///
-/// Note: We use "x86 or 64-bit pointers" as the condition here because
-/// target_has_atomic is not stable.
-#[cfg(all(
- not(tokio_force_time_entry_locked),
- any(target_arch = "x86", target_pointer_width = "64")
-))]
-type AtomicU64 = crate::loom::sync::atomic::AtomicU64;
-
-#[cfg(not(all(
- not(tokio_force_time_entry_locked),
- any(target_arch = "x86", target_pointer_width = "64")
-)))]
-#[derive(Debug)]
-struct AtomicU64 {
- inner: crate::loom::sync::Mutex<u64>,
-}
-
-#[cfg(not(all(
- not(tokio_force_time_entry_locked),
- any(target_arch = "x86", target_pointer_width = "64")
-)))]
-impl AtomicU64 {
- fn new(v: u64) -> Self {
- Self {
- inner: crate::loom::sync::Mutex::new(v),
- }
- }
-
- fn load(&self, _order: Ordering) -> u64 {
- debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock
- *self.inner.lock()
- }
-
- fn store(&self, v: u64, _order: Ordering) {
- debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock
- *self.inner.lock() = v;
- }
-
- fn compare_exchange(
- &self,
- current: u64,
- new: u64,
- _success: Ordering,
- _failure: Ordering,
- ) -> Result<u64, u64> {
- debug_assert_ne!(_success, Ordering::SeqCst); // we only provide AcqRel with the lock
- debug_assert_ne!(_failure, Ordering::SeqCst);
-
- let mut lock = self.inner.lock();
-
- if *lock == current {
- *lock = new;
- Ok(current)
- } else {
- Err(*lock)
- }
- }
-
- fn compare_exchange_weak(
- &self,
- current: u64,
- new: u64,
- success: Ordering,
- failure: Ordering,
- ) -> Result<u64, u64> {
- self.compare_exchange(current, new, success, failure)
- }
-}
-
/// This structure holds the current shared state of the timer - its scheduled
/// time (if registered), or otherwise the result of the timer completing, as
/// well as the registered waker.
@@ -300,7 +228,7 @@ impl StateCell {
/// expiration time.
///
/// While this function is memory-safe, it should only be called from a
- /// context holding both `&mut TimerEntry` and the driver lock.
+ /// context holding both `&mut TimerEntry` and the driver lock.
fn set_expiration(&self, timestamp: u64) {
debug_assert!(timestamp < STATE_MIN_VALUE);
diff --git a/src/time/driver/handle.rs b/src/time/driver/handle.rs
index bfc49fb..e934b56 100644
--- a/src/time/driver/handle.rs
+++ b/src/time/driver/handle.rs
@@ -47,7 +47,7 @@ cfg_rt! {
/// panicking.
pub(crate) fn current() -> Self {
crate::runtime::context::time_handle()
- .expect("there is no timer running, must be called from the context of Tokio runtime")
+ .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
}
}
}
@@ -71,8 +71,7 @@ cfg_not_rt! {
/// lazy, and so outside executed inside the runtime successfuly without
/// panicking.
pub(crate) fn current() -> Self {
- panic!("there is no timer running, must be called from the context of Tokio runtime or \
- `rt` is not enabled")
+ panic!(crate::util::error::CONTEXT_MISSING_ERROR)
}
}
}
diff --git a/src/time/driver/mod.rs b/src/time/driver/mod.rs
index 9fbc0b3..615307e 100644
--- a/src/time/driver/mod.rs
+++ b/src/time/driver/mod.rs
@@ -102,8 +102,8 @@ pub(self) struct ClockTime {
impl ClockTime {
pub(self) fn new(clock: Clock) -> Self {
Self {
+ start_time: clock.now(),
clock,
- start_time: super::clock::now(),
}
}
diff --git a/src/time/driver/sleep.rs b/src/time/driver/sleep.rs
index 69a6e6d..2438f14 100644
--- a/src/time/driver/sleep.rs
+++ b/src/time/driver/sleep.rs
@@ -58,8 +58,93 @@ pub fn sleep(duration: Duration) -> Sleep {
}
pin_project! {
- /// Future returned by [`sleep`](sleep) and
- /// [`sleep_until`](sleep_until).
+ /// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until).
+ ///
+ /// This type does not implement the `Unpin` trait, which means that if you
+ /// use it with [`select!`] or by calling `poll`, you have to pin it first.
+ /// If you use it with `.await`, this does not apply.
+ ///
+ /// # Examples
+ ///
+ /// Wait 100ms and print "100 ms have elapsed".
+ ///
+ /// ```
+ /// use tokio::time::{sleep, Duration};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// sleep(Duration::from_millis(100)).await;
+ /// println!("100 ms have elapsed");
+ /// }
+ /// ```
+ ///
+ /// Use with [`select!`]. Pinning the `Sleep` with [`tokio::pin!`] is
+ /// necessary when the same `Sleep` is selected on multiple times.
+ /// ```no_run
+ /// use tokio::time::{self, Duration, Instant};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let sleep = time::sleep(Duration::from_millis(10));
+ /// tokio::pin!(sleep);
+ ///
+ /// loop {
+ /// tokio::select! {
+ /// () = &mut sleep => {
+ /// println!("timer elapsed");
+ /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(50));
+ /// },
+ /// }
+ /// }
+ /// }
+ /// ```
+ /// Use in a struct with boxing. By pinning the `Sleep` with a `Box`, the
+ /// `HasSleep` struct implements `Unpin`, even though `Sleep` does not.
+ /// ```
+ /// use std::future::Future;
+ /// use std::pin::Pin;
+ /// use std::task::{Context, Poll};
+ /// use tokio::time::Sleep;
+ ///
+ /// struct HasSleep {
+ /// sleep: Pin<Box<Sleep>>,
+ /// }
+ ///
+ /// impl Future for HasSleep {
+ /// type Output = ();
+ ///
+ /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ /// self.sleep.as_mut().poll(cx)
+ /// }
+ /// }
+ /// ```
+ /// Use in a struct with pin projection. This method avoids the `Box`, but
+ /// the `HasSleep` struct will not be `Unpin` as a consequence.
+ /// ```
+ /// use std::future::Future;
+ /// use std::pin::Pin;
+ /// use std::task::{Context, Poll};
+ /// use tokio::time::Sleep;
+ /// use pin_project_lite::pin_project;
+ ///
+ /// pin_project! {
+ /// struct HasSleep {
+ /// #[pin]
+ /// sleep: Sleep,
+ /// }
+ /// }
+ ///
+ /// impl Future for HasSleep {
+ /// type Output = ();
+ ///
+ /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ /// self.project().sleep.poll(cx)
+ /// }
+ /// }
+ /// ```
+ ///
+ /// [`select!`]: ../macro.select.html
+ /// [`tokio::pin!`]: ../macro.pin.html
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Sleep {
@@ -98,6 +183,26 @@ impl Sleep {
///
/// This function can be called both before and after the future has
/// completed.
+ ///
+ /// To call this method, you will usually combine the call with
+ /// [`Pin::as_mut`], which lets you call the method with consuming the
+ /// `Sleep` itself.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// use tokio::time::{Duration, Instant};
+ ///
+ /// # #[tokio::main(flavor = "current_thread")]
+ /// # async fn main() {
+ /// let sleep = tokio::time::sleep(Duration::from_millis(10));
+ /// tokio::pin!(sleep);
+ ///
+ /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(20));
+ /// # }
+ /// ```
+ ///
+ /// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut
pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
let me = self.project();
me.entry.reset(deadline);
diff --git a/src/time/driver/tests/mod.rs b/src/time/driver/tests/mod.rs
index cfefed3..8ae4a84 100644
--- a/src/time/driver/tests/mod.rs
+++ b/src/time/driver/tests/mod.rs
@@ -41,7 +41,7 @@ fn model(f: impl Fn() + Send + Sync + 'static) {
#[test]
fn single_timer() {
model(|| {
- let clock = crate::time::clock::Clock::new(true);
+ let clock = crate::time::clock::Clock::new(true, false);
let time_source = super::ClockTime::new(clock.clone());
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
@@ -72,7 +72,7 @@ fn single_timer() {
#[test]
fn drop_timer() {
model(|| {
- let clock = crate::time::clock::Clock::new(true);
+ let clock = crate::time::clock::Clock::new(true, false);
let time_source = super::ClockTime::new(clock.clone());
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
@@ -103,7 +103,7 @@ fn drop_timer() {
#[test]
fn change_waker() {
model(|| {
- let clock = crate::time::clock::Clock::new(true);
+ let clock = crate::time::clock::Clock::new(true, false);
let time_source = super::ClockTime::new(clock.clone());
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
@@ -138,7 +138,7 @@ fn reset_future() {
model(|| {
let finished_early = Arc::new(AtomicBool::new(false));
- let clock = crate::time::clock::Clock::new(true);
+ let clock = crate::time::clock::Clock::new(true, false);
let time_source = super::ClockTime::new(clock.clone());
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
@@ -185,7 +185,7 @@ fn reset_future() {
#[test]
#[cfg(not(loom))]
fn poll_process_levels() {
- let clock = crate::time::clock::Clock::new(true);
+ let clock = crate::time::clock::Clock::new(true, false);
clock.pause();
let time_source = super::ClockTime::new(clock.clone());
@@ -226,7 +226,7 @@ fn poll_process_levels() {
fn poll_process_levels_targeted() {
let mut context = Context::from_waker(noop_waker_ref());
- let clock = crate::time::clock::Clock::new(true);
+ let clock = crate::time::clock::Clock::new(true, false);
clock.pause();
let time_source = super::ClockTime::new(clock.clone());
diff --git a/src/time/driver/wheel/level.rs b/src/time/driver/wheel/level.rs
index 58280b1..81d6b58 100644
--- a/src/time/driver/wheel/level.rs
+++ b/src/time/driver/wheel/level.rs
@@ -255,14 +255,13 @@ fn slot_for(duration: u64, level: usize) -> usize {
((duration >> (level * 6)) % LEVEL_MULT as u64) as usize
}
-/*
#[cfg(all(test, not(loom)))]
mod test {
use super::*;
#[test]
fn test_slot_for() {
- for pos in 1..64 {
+ for pos in 0..64 {
assert_eq!(pos as usize, slot_for(pos, 0));
}
@@ -274,4 +273,3 @@ mod test {
}
}
}
-*/
diff --git a/src/time/driver/wheel/mod.rs b/src/time/driver/wheel/mod.rs
index 164cac4..24bf517 100644
--- a/src/time/driver/wheel/mod.rs
+++ b/src/time/driver/wheel/mod.rs
@@ -122,6 +122,13 @@ impl Wheel {
if when == u64::max_value() {
self.pending.remove(item);
} else {
+ debug_assert!(
+ self.elapsed <= when,
+ "elapsed={}; when={}",
+ self.elapsed,
+ when
+ );
+
let level = self.level_for(when);
self.levels[level].remove_entry(item);
@@ -281,15 +288,17 @@ impl Wheel {
}
fn level_for(elapsed: u64, when: u64) -> usize {
- let mut masked = elapsed ^ when;
+ const SLOT_MASK: u64 = (1 << 6) - 1;
+
+ // Mask in the trailing bits ignored by the level calculation in order to cap
+ // the possible leading zeros
+ let mut masked = elapsed ^ when | SLOT_MASK;
if masked >= MAX_DURATION {
// Fudge the timer into the top level
masked = MAX_DURATION - 1;
}
- assert!(masked != 0, "elapsed={}; when={}", elapsed, when);
-
let leading_zeros = masked.leading_zeros() as usize;
let significant = 63 - leading_zeros;
@@ -302,7 +311,7 @@ mod test {
#[test]
fn test_level_for() {
- for pos in 1..64 {
+ for pos in 0..64 {
assert_eq!(
0,
level_for(0, pos),
diff --git a/src/time/interval.rs b/src/time/interval.rs
index be93ba1..20cfcec 100644
--- a/src/time/interval.rs
+++ b/src/time/interval.rs
@@ -106,7 +106,16 @@ pub fn interval_at(start: Instant, period: Duration) -> Interval {
}
}
-/// Stream returned by [`interval`](interval) and [`interval_at`](interval_at).
+/// Interval returned by [`interval`](interval) and [`interval_at`](interval_at).
+///
+/// This type allows you to wait on a sequence of instants with a certain
+/// duration between each instant. Unlike calling [`sleep`](crate::time::sleep)
+/// in a loop, this lets you count the time spent between the calls to `sleep`
+/// as well.
+///
+/// An `Interval` can be turned into a `Stream` with [`IntervalStream`].
+///
+/// [`IntervalStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.IntervalStream.html
#[derive(Debug)]
pub struct Interval {
/// Future that completes the next time the `Interval` yields a value.
diff --git a/src/util/error.rs b/src/util/error.rs
new file mode 100644
index 0000000..518cb2c
--- /dev/null
+++ b/src/util/error.rs
@@ -0,0 +1,3 @@
+/// Error string explaining that the Tokio context hasn't been instantiated.
+pub(crate) const CONTEXT_MISSING_ERROR: &str =
+ "there is no reactor running, must be called from the context of a Tokio 1.x runtime";
diff --git a/src/util/mod.rs b/src/util/mod.rs
index 382bbb9..b267125 100644
--- a/src/util/mod.rs
+++ b/src/util/mod.rs
@@ -24,7 +24,7 @@ cfg_rt! {
}
cfg_rt_multi_thread! {
- pub(crate) use rand::FastRand;
+ pub(crate) use self::rand::FastRand;
mod try_lock;
pub(crate) use try_lock::TryLock;
@@ -34,4 +34,13 @@ pub(crate) mod trace;
#[cfg(any(feature = "macros"))]
#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))]
-pub use rand::thread_rng_n;
+pub use self::rand::thread_rng_n;
+
+#[cfg(any(
+ feature = "rt",
+ feature = "time",
+ feature = "net",
+ feature = "process",
+ all(unix, feature = "signal")
+))]
+pub(crate) mod error;
diff --git a/tests/io_driver.rs b/tests/io_driver.rs
index 9a40247..6fb566d 100644
--- a/tests/io_driver.rs
+++ b/tests/io_driver.rs
@@ -84,3 +84,16 @@ fn test_drop_on_notify() {
// Force the reactor to turn
rt.block_on(async {});
}
+
+#[test]
+#[should_panic(
+ expected = "A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO."
+)]
+fn panics_when_io_disabled() {
+ let rt = runtime::Builder::new_current_thread().build().unwrap();
+
+ rt.block_on(async {
+ let _ =
+ tokio::net::TcpListener::from_std(std::net::TcpListener::bind("127.0.0.1:0").unwrap());
+ });
+}
diff --git a/tests/io_read_to_end.rs b/tests/io_read_to_end.rs
index ee636ba..171e6d6 100644
--- a/tests/io_read_to_end.rs
+++ b/tests/io_read_to_end.rs
@@ -1,7 +1,9 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
-use tokio::io::AsyncReadExt;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tokio_test::assert_ok;
#[tokio::test]
@@ -13,3 +15,64 @@ async fn read_to_end() {
assert_eq!(n, 11);
assert_eq!(buf[..], b"hello world"[..]);
}
+
+#[derive(Copy, Clone, Debug)]
+enum State {
+ Initializing,
+ JustFilling,
+ Done,
+}
+
+struct UninitTest {
+ num_init: usize,
+ state: State,
+}
+
+impl AsyncRead for UninitTest {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<std::io::Result<()>> {
+ let me = Pin::into_inner(self);
+ let real_num_init = buf.initialized().len() - buf.filled().len();
+ assert_eq!(real_num_init, me.num_init, "{:?}", me.state);
+
+ match me.state {
+ State::Initializing => {
+ buf.initialize_unfilled_to(me.num_init + 2);
+ buf.advance(1);
+ me.num_init += 1;
+
+ if me.num_init == 24 {
+ me.state = State::JustFilling;
+ }
+ }
+ State::JustFilling => {
+ buf.advance(1);
+ me.num_init -= 1;
+
+ if me.num_init == 15 {
+ // The buffer is resized on next call.
+ me.num_init = 0;
+ me.state = State::Done;
+ }
+ }
+ State::Done => { /* .. do nothing .. */ }
+ }
+
+ Poll::Ready(Ok(()))
+ }
+}
+
+#[tokio::test]
+async fn read_to_end_uninit() {
+ let mut buf = Vec::with_capacity(64);
+ let mut test = UninitTest {
+ num_init: 0,
+ state: State::Initializing,
+ };
+
+ test.read_to_end(&mut buf).await.unwrap();
+ assert_eq!(buf.len(), 33);
+}
diff --git a/tests/macros_test.rs b/tests/macros_test.rs
index 8e68b8a..8396398 100644
--- a/tests/macros_test.rs
+++ b/tests/macros_test.rs
@@ -17,3 +17,11 @@ async fn test_macro_is_resilient_to_shadowing() {
.await
.unwrap();
}
+
+// https://github.com/tokio-rs/tokio/issues/3403
+#[rustfmt::skip] // this `rustfmt::skip` is necessary because unused_braces does not warn if the block contains newline.
+#[tokio::main]
+async fn unused_braces_main() { println!("hello") }
+#[rustfmt::skip] // this `rustfmt::skip` is necessary because unused_braces does not warn if the block contains newline.
+#[tokio::test]
+async fn unused_braces_test() { assert_eq!(1 + 1, 2) }
diff --git a/tests/no_rt.rs b/tests/no_rt.rs
index 962eed7..8437b80 100644
--- a/tests/no_rt.rs
+++ b/tests/no_rt.rs
@@ -7,13 +7,17 @@ use futures::executor::block_on;
use std::net::TcpListener;
#[test]
-#[should_panic(expected = "no timer running")]
-fn panics_when_no_timer() {
+#[should_panic(
+ expected = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"
+)]
+fn timeout_panics_when_no_tokio_context() {
block_on(timeout_value());
}
#[test]
-#[should_panic(expected = "no reactor running")]
+#[should_panic(
+ expected = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"
+)]
fn panics_when_no_reactor() {
let srv = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = srv.local_addr().unwrap();
@@ -25,3 +29,11 @@ async fn timeout_value() {
let dur = Duration::from_millis(20);
let _ = timeout(dur, rx).await;
}
+
+#[test]
+#[should_panic(
+ expected = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"
+)]
+fn io_panics_when_no_tokio_context() {
+ let _ = tokio::net::TcpListener::from_std(std::net::TcpListener::bind("127.0.0.1:0").unwrap());
+}
diff --git a/tests/rt_basic.rs b/tests/rt_basic.rs
index 977a838..4b1bdad 100644
--- a/tests/rt_basic.rs
+++ b/tests/rt_basic.rs
@@ -6,7 +6,7 @@ use tokio::sync::oneshot;
use tokio_test::{assert_err, assert_ok};
use std::thread;
-use std::time::Duration;
+use tokio::time::{timeout, Duration};
mod support {
pub(crate) mod mpsc_stream;
@@ -135,6 +135,21 @@ fn acquire_mutex_in_drop() {
drop(rt);
}
+#[test]
+#[should_panic(
+ expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers."
+)]
+fn timeout_panics_when_no_time_handle() {
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .build()
+ .unwrap();
+ rt.block_on(async {
+ let (_tx, rx) = oneshot::channel::<()>();
+ let dur = Duration::from_millis(20);
+ let _ = timeout(dur, rx).await;
+ });
+}
+
fn rt() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
diff --git a/tests/rt_common.rs b/tests/rt_common.rs
index 66e6f2c..9aef4b9 100644
--- a/tests/rt_common.rs
+++ b/tests/rt_common.rs
@@ -1021,39 +1021,45 @@ rt_test! {
// other tasks.
#[test]
fn ping_pong_saturation() {
+ use std::sync::atomic::{Ordering, AtomicBool};
use tokio::sync::mpsc;
const NUM: usize = 100;
let rt = rt();
+ let running = Arc::new(AtomicBool::new(true));
+
rt.block_on(async {
let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();
+ let mut tasks = vec![];
// Spawn a bunch of tasks that ping ping between each other to
// saturate the runtime.
for _ in 0..NUM {
let (tx1, mut rx1) = mpsc::unbounded_channel();
let (tx2, mut rx2) = mpsc::unbounded_channel();
let spawned_tx = spawned_tx.clone();
-
- task::spawn(async move {
+ let running = running.clone();
+ tasks.push(task::spawn(async move {
spawned_tx.send(()).unwrap();
- tx1.send(()).unwrap();
- loop {
- rx2.recv().await.unwrap();
+ while running.load(Ordering::Relaxed) {
tx1.send(()).unwrap();
+ rx2.recv().await.unwrap();
}
- });
- task::spawn(async move {
- loop {
- rx1.recv().await.unwrap();
+ // Close the channel and wait for the other task to exit.
+ drop(tx1);
+ assert!(rx2.recv().await.is_none());
+ }));
+
+ tasks.push(task::spawn(async move {
+ while rx1.recv().await.is_some() {
tx2.send(()).unwrap();
}
- });
+ }));
}
for _ in 0..NUM {
@@ -1068,6 +1074,10 @@ rt_test! {
}
});
handle.await.unwrap();
+ running.store(false, Ordering::Relaxed);
+ for t in tasks {
+ t.await.unwrap();
+ }
});
}
}
diff --git a/tests/sync_mpsc.rs b/tests/sync_mpsc.rs
index b378e6b..cd43ad4 100644
--- a/tests/sync_mpsc.rs
+++ b/tests/sync_mpsc.rs
@@ -328,6 +328,29 @@ async fn try_send_fail() {
}
#[tokio::test]
+async fn try_reserve_fails() {
+ let (tx, mut rx) = mpsc::channel(1);
+
+ let permit = tx.try_reserve().unwrap();
+
+ // This should fail
+ match assert_err!(tx.try_reserve()) {
+ TrySendError::Full(()) => {}
+ _ => panic!(),
+ }
+
+ permit.send("foo");
+
+ assert_eq!(rx.recv().await, Some("foo"));
+
+ // Dropping permit releases the slot.
+ let permit = tx.try_reserve().unwrap();
+ drop(permit);
+
+ let _permit = tx.try_reserve().unwrap();
+}
+
+#[tokio::test]
async fn drop_permit_releases_permit() {
// poll_ready reserves capacity, ensure that the capacity is released if tx
// is dropped w/o sending a value.
diff --git a/tests/sync_rwlock.rs b/tests/sync_rwlock.rs
index 7676035..872b845 100644
--- a/tests/sync_rwlock.rs
+++ b/tests/sync_rwlock.rs
@@ -235,3 +235,36 @@ async fn multithreaded() {
let g = rwlock.read().await;
assert_eq!(*g, 17_000);
}
+
+#[tokio::test]
+async fn try_write() {
+ let lock = RwLock::new(0);
+ let read_guard = lock.read().await;
+ assert!(lock.try_write().is_err());
+ drop(read_guard);
+ assert!(lock.try_write().is_ok());
+}
+
+#[test]
+fn try_read_try_write() {
+ let lock: RwLock<usize> = RwLock::new(15);
+
+ {
+ let rg1 = lock.try_read().unwrap();
+ assert_eq!(*rg1, 15);
+
+ assert!(lock.try_write().is_err());
+
+ let rg2 = lock.try_read().unwrap();
+ assert_eq!(*rg2, 15)
+ }
+
+ {
+ let mut wg = lock.try_write().unwrap();
+ *wg = 1515;
+
+ assert!(lock.try_read().is_err())
+ }
+
+ assert_eq!(*lock.try_read().unwrap(), 1515);
+}
diff --git a/tests/task_local_set.rs b/tests/task_local_set.rs
index dda4280..8513609 100644
--- a/tests/task_local_set.rs
+++ b/tests/task_local_set.rs
@@ -1,6 +1,11 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
+use futures::{
+ future::{pending, ready},
+ FutureExt,
+};
+
use tokio::runtime::{self, Runtime};
use tokio::sync::{mpsc, oneshot};
use tokio::task::{self, LocalSet};
@@ -486,6 +491,15 @@ async fn acquire_mutex_in_drop() {
drop(local);
}
+#[tokio::test]
+async fn spawn_wakes_localset() {
+ let local = LocalSet::new();
+ futures::select! {
+ _ = local.run_until(pending::<()>()).fuse() => unreachable!(),
+ ret = async { local.spawn_local(ready(())).await.unwrap()}.fuse() => ret
+ }
+}
+
fn rt() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs
index 58b06ee..e34c2bb 100644
--- a/tests/tcp_stream.rs
+++ b/tests/tcp_stream.rs
@@ -234,3 +234,81 @@ fn write_until_pending(stream: &mut TcpStream) {
}
}
}
+
+#[tokio::test]
+async fn try_read_buf() {
+ const DATA: &[u8] = b"this is some data to write to the socket";
+
+ // Create listener
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+
+ // Create socket pair
+ let client = TcpStream::connect(listener.local_addr().unwrap())
+ .await
+ .unwrap();
+ let (server, _) = listener.accept().await.unwrap();
+ let mut written = DATA.to_vec();
+
+ // Track the server receiving data
+ let mut readable = task::spawn(server.readable());
+ assert_pending!(readable.poll());
+
+ // Write data.
+ client.writable().await.unwrap();
+ assert_eq!(DATA.len(), client.try_write(DATA).unwrap());
+
+ // The task should be notified
+ while !readable.is_woken() {
+ tokio::task::yield_now().await;
+ }
+
+ // Fill the write buffer
+ loop {
+ // Still ready
+ let mut writable = task::spawn(client.writable());
+ assert_ready_ok!(writable.poll());
+
+ match client.try_write(DATA) {
+ Ok(n) => written.extend(&DATA[..n]),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ break;
+ }
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ {
+ // Write buffer full
+ let mut writable = task::spawn(client.writable());
+ assert_pending!(writable.poll());
+
+ // Drain the socket from the server end
+ let mut read = Vec::with_capacity(written.len());
+ let mut i = 0;
+
+ while i < read.capacity() {
+ server.readable().await.unwrap();
+
+ match server.try_read_buf(&mut read) {
+ Ok(n) => i += n,
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ assert_eq!(read, written);
+ }
+
+ // Now, we listen for shutdown
+ drop(client);
+
+ loop {
+ let ready = server.ready(Interest::READABLE).await.unwrap();
+
+ if ready.is_read_closed() {
+ return;
+ } else {
+ tokio::task::yield_now().await;
+ }
+ }
+}
diff --git a/tests/time_pause.rs b/tests/time_pause.rs
index 49a7677..bc84ac5 100644
--- a/tests/time_pause.rs
+++ b/tests/time_pause.rs
@@ -1,6 +1,9 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
+use rand::SeedableRng;
+use rand::{rngs::StdRng, Rng};
+use tokio::time::{self, Duration, Instant};
use tokio_test::assert_err;
#[tokio::test]
@@ -31,3 +34,26 @@ async fn pause_time_in_spawn_threads() {
assert_err!(t.await);
}
+
+#[test]
+fn paused_time_is_deterministic() {
+ let run_1 = paused_time_stress_run();
+ let run_2 = paused_time_stress_run();
+
+ assert_eq!(run_1, run_2);
+}
+
+#[tokio::main(flavor = "current_thread", start_paused = true)]
+async fn paused_time_stress_run() -> Vec<Duration> {
+ let mut rng = StdRng::seed_from_u64(1);
+
+ let mut times = vec![];
+ let start = Instant::now();
+ for _ in 0..10_000 {
+ let sleep = rng.gen_range(Duration::from_secs(0)..Duration::from_secs(1));
+ time::sleep(sleep).await;
+ times.push(start.elapsed());
+ }
+
+ times
+}
diff --git a/tests/udp.rs b/tests/udp.rs
index 7cbba1b..715d8eb 100644
--- a/tests/udp.rs
+++ b/tests/udp.rs
@@ -353,3 +353,90 @@ async fn try_send_to_recv_from() {
}
}
}
+
+#[tokio::test]
+async fn try_recv_buf() {
+ // Create listener
+ let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+
+ // Create socket pair
+ let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+
+ // Connect the two
+ client.connect(server.local_addr().unwrap()).await.unwrap();
+ server.connect(client.local_addr().unwrap()).await.unwrap();
+
+ for _ in 0..5 {
+ loop {
+ client.writable().await.unwrap();
+
+ match client.try_send(b"hello world") {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+
+ loop {
+ server.readable().await.unwrap();
+
+ let mut buf = Vec::with_capacity(512);
+
+ match server.try_recv_buf(&mut buf) {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ assert_eq!(&buf[0..11], &b"hello world"[..]);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+ }
+}
+
+#[tokio::test]
+async fn try_recv_buf_from() {
+ // Create listener
+ let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+ let saddr = server.local_addr().unwrap();
+
+ // Create socket pair
+ let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+ let caddr = client.local_addr().unwrap();
+
+ for _ in 0..5 {
+ loop {
+ client.writable().await.unwrap();
+
+ match client.try_send_to(b"hello world", saddr) {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+
+ loop {
+ server.readable().await.unwrap();
+
+ let mut buf = Vec::with_capacity(512);
+
+ match server.try_recv_buf_from(&mut buf) {
+ Ok((n, addr)) => {
+ assert_eq!(n, 11);
+ assert_eq!(addr, caddr);
+ assert_eq!(&buf[0..11], &b"hello world"[..]);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+ }
+}
diff --git a/tests/uds_datagram.rs b/tests/uds_datagram.rs
index cdabd7b..10314be 100644
--- a/tests/uds_datagram.rs
+++ b/tests/uds_datagram.rs
@@ -230,3 +230,95 @@ async fn try_send_to_recv_from() -> std::io::Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn try_recv_buf_from() -> std::io::Result<()> {
+ let dir = tempfile::tempdir().unwrap();
+ let server_path = dir.path().join("server.sock");
+ let client_path = dir.path().join("client.sock");
+
+ // Create listener
+ let server = UnixDatagram::bind(&server_path)?;
+
+ // Create socket pair
+ let client = UnixDatagram::bind(&client_path)?;
+
+ for _ in 0..5 {
+ loop {
+ client.writable().await?;
+
+ match client.try_send_to(b"hello world", &server_path) {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+
+ loop {
+ server.readable().await?;
+
+ let mut buf = Vec::with_capacity(512);
+
+ match server.try_recv_buf_from(&mut buf) {
+ Ok((n, addr)) => {
+ assert_eq!(n, 11);
+ assert_eq!(addr.as_pathname(), Some(client_path.as_ref()));
+ assert_eq!(&buf[0..11], &b"hello world"[..]);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+ }
+
+ Ok(())
+}
+
+// Even though we use sync non-blocking io we still need a reactor.
+#[tokio::test]
+async fn try_recv_buf_never_block() -> io::Result<()> {
+ let payload = b"PAYLOAD";
+ let mut count = 0;
+
+ let (dgram1, dgram2) = UnixDatagram::pair()?;
+
+ // Send until we hit the OS `net.unix.max_dgram_qlen`.
+ loop {
+ dgram1.writable().await.unwrap();
+
+ match dgram1.try_send(payload) {
+ Err(err) => match err.kind() {
+ io::ErrorKind::WouldBlock | io::ErrorKind::Other => break,
+ _ => unreachable!("unexpected error {:?}", err),
+ },
+ Ok(len) => {
+ assert_eq!(len, payload.len());
+ }
+ }
+ count += 1;
+ }
+
+ // Read every dgram we sent.
+ while count > 0 {
+ let mut recv_buf = Vec::with_capacity(16);
+
+ dgram2.readable().await.unwrap();
+ let len = dgram2.try_recv_buf(&mut recv_buf)?;
+ assert_eq!(len, payload.len());
+ assert_eq!(payload, &recv_buf[..len]);
+ count -= 1;
+ }
+
+ let mut recv_buf = vec![0; 16];
+ let err = dgram2.try_recv_from(&mut recv_buf).unwrap_err();
+ match err.kind() {
+ io::ErrorKind::WouldBlock => (),
+ _ => unreachable!("unexpected error {:?}", err),
+ }
+
+ Ok(())
+}
diff --git a/tests/uds_stream.rs b/tests/uds_stream.rs
index 5160f17..c528620 100644
--- a/tests/uds_stream.rs
+++ b/tests/uds_stream.rs
@@ -252,3 +252,85 @@ fn write_until_pending(stream: &mut UnixStream) {
}
}
}
+
+#[tokio::test]
+async fn try_read_buf() -> std::io::Result<()> {
+ let msg = b"hello world";
+
+ let dir = tempfile::tempdir()?;
+ let bind_path = dir.path().join("bind.sock");
+
+ // Create listener
+ let listener = UnixListener::bind(&bind_path)?;
+
+ // Create socket pair
+ let client = UnixStream::connect(&bind_path).await?;
+
+ let (server, _) = listener.accept().await?;
+ let mut written = msg.to_vec();
+
+ // Track the server receiving data
+ let mut readable = task::spawn(server.readable());
+ assert_pending!(readable.poll());
+
+ // Write data.
+ client.writable().await?;
+ assert_eq!(msg.len(), client.try_write(msg)?);
+
+ // The task should be notified
+ while !readable.is_woken() {
+ tokio::task::yield_now().await;
+ }
+
+ // Fill the write buffer
+ loop {
+ // Still ready
+ let mut writable = task::spawn(client.writable());
+ assert_ready_ok!(writable.poll());
+
+ match client.try_write(msg) {
+ Ok(n) => written.extend(&msg[..n]),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ break;
+ }
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ {
+ // Write buffer full
+ let mut writable = task::spawn(client.writable());
+ assert_pending!(writable.poll());
+
+ // Drain the socket from the server end
+ let mut read = Vec::with_capacity(written.len());
+ let mut i = 0;
+
+ while i < read.capacity() {
+ server.readable().await?;
+
+ match server.try_read_buf(&mut read) {
+ Ok(n) => i += n,
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ assert_eq!(read, written);
+ }
+
+ // Now, we listen for shutdown
+ drop(client);
+
+ loop {
+ let ready = server.ready(Interest::READABLE).await?;
+
+ if ready.is_read_closed() {
+ break;
+ } else {
+ tokio::task::yield_now().await;
+ }
+ }
+
+ Ok(())
+}