diff options
author | Haibo Huang <hhb@google.com> | 2021-02-09 18:18:56 -0800 |
---|---|---|
committer | Stephen Hines <srhines@google.com> | 2021-03-03 21:34:52 +0000 |
commit | e3d8d80d2d8744ccdcd175323e0864c8f30fcedc (patch) | |
tree | 16d053e70d21e456d52f4a7762ee41441342b7a2 | |
parent | 925d648e545e70d6a4faae3d7efe5e0de885f922 (diff) | |
download | tokio-e3d8d80d2d8744ccdcd175323e0864c8f30fcedc.tar.gz |
Upgrade rust/crates/tokio to 1.2.0
Test: make
Change-Id: Ib0f6a5201b51e9d122b6e867388a3856e16f803a
87 files changed, 3431 insertions, 1075 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 0ccdcd3..9e33e70 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "5d35c907f693e25ba20c3cfb47e0cb1957679019" + "sha1": "572a897d43d5e4942f26b7a67bed862d642679e4" } } @@ -56,15 +56,15 @@ rust_library { // dependent_library ["feature_list"] // autocfg-1.0.1 // bytes-1.0.1 "default,std" -// cfg-if-0.1.10 -// libc-0.2.82 "align,default,extra_traits,std" -// log-0.4.13 +// cfg-if-1.0.0 +// libc-0.2.86 "align,default,extra_traits,std" +// log-0.4.14 // memchr-2.3.4 "default,std" // mio-0.7.7 "default,net,os-ext,os-poll,os-util,tcp,udp,uds" // num_cpus-1.13.0 // pin-project-lite-0.2.4 // proc-macro2-1.0.24 "default,proc-macro" // quote-1.0.8 "default,proc-macro" -// syn-1.0.58 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit,visit-mut" -// tokio-macros-1.0.0 +// syn-1.0.60 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut" +// tokio-macros-1.1.0 // unicode-xid-0.2.1 "default" diff --git a/CHANGELOG.md b/CHANGELOG.md index a36212d..cc1a305 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,91 @@ -# 1.0.2 (January 14, 2020) +# 1.2.0 (February 5, 2021) + +### Added + +- signal: make `Signal::poll_recv` method public ([#3383]) + +### Fixed + +- time: make `test-util` paused time fully deterministic ([#3492]) + +### Documented + +- sync: link to new broadcast and watch wrappers ([#3504]) + +[#3383]: https://github.com/tokio-rs/tokio/pull/3383 +[#3492]: https://github.com/tokio-rs/tokio/pull/3492 +[#3504]: https://github.com/tokio-rs/tokio/pull/3504 + +# 1.1.1 (January 29, 2021) + +Forward ports 1.0.3 fix. + +### Fixed +- io: memory leak during shutdown ([#3477]). + +# 1.1.0 (January 22, 2021) + +### Added + +- net: add `try_read_buf` and `try_recv_buf` ([#3351]) +- mpsc: Add `Sender::try_reserve` function ([#3418]) +- sync: add `RwLock` `try_read` and `try_write` methods ([#3400]) +- io: add `ReadBuf::inner_mut` ([#3443]) + +### Changed + +- macros: improve `select!` error message ([#3352]) +- io: keep track of initialized bytes in `read_to_end` ([#3426]) +- runtime: consolidate errors for context missing ([#3441]) + +### Fixed + +- task: wake `LocalSet` on `spawn_local` ([#3369]) +- sync: fix panic in broadcast::Receiver drop ([#3434]) + +### Documented +- stream: link to new `Stream` wrappers in `tokio-stream` ([#3343]) +- docs: mention that `test-util` feature is not enabled with full ([#3397]) +- process: add documentation to process::Child fields ([#3437]) +- io: clarify `AsyncFd` docs about changes of the inner fd ([#3430]) +- net: update datagram docs on splitting ([#3448]) +- time: document that `Sleep` is not `Unpin` ([#3457]) +- sync: add link to `PollSemaphore` ([#3456]) +- task: add `LocalSet` example ([#3438]) +- sync: improve bounded `mpsc` documentation ([#3458]) + +[#3343]: https://github.com/tokio-rs/tokio/pull/3343 +[#3351]: https://github.com/tokio-rs/tokio/pull/3351 +[#3352]: https://github.com/tokio-rs/tokio/pull/3352 +[#3369]: https://github.com/tokio-rs/tokio/pull/3369 +[#3397]: https://github.com/tokio-rs/tokio/pull/3397 +[#3400]: https://github.com/tokio-rs/tokio/pull/3400 +[#3418]: https://github.com/tokio-rs/tokio/pull/3418 +[#3426]: https://github.com/tokio-rs/tokio/pull/3426 +[#3430]: https://github.com/tokio-rs/tokio/pull/3430 +[#3434]: https://github.com/tokio-rs/tokio/pull/3434 +[#3437]: https://github.com/tokio-rs/tokio/pull/3437 +[#3438]: https://github.com/tokio-rs/tokio/pull/3438 +[#3441]: https://github.com/tokio-rs/tokio/pull/3441 +[#3443]: https://github.com/tokio-rs/tokio/pull/3443 +[#3448]: https://github.com/tokio-rs/tokio/pull/3448 +[#3456]: https://github.com/tokio-rs/tokio/pull/3456 +[#3457]: https://github.com/tokio-rs/tokio/pull/3457 +[#3458]: https://github.com/tokio-rs/tokio/pull/3458 + +# 1.0.3 (January 28, 2021) ### Fixed -- io: soundness in `read_to_end` (#3428). +- io: memory leak during shutdown ([#3477]). + +[#3477]: https://github.com/tokio-rs/tokio/pull/3477 + +# 1.0.2 (January 14, 2021) + +### Fixed +- io: soundness in `read_to_end` ([#3428]). + +[#3428]: https://github.com/tokio-rs/tokio/pull/3428 # 1.0.1 (December 25, 2020) @@ -17,91 +101,153 @@ Due to the soundness hole, we have also yanked Tokio version 1.0.0. ### Removed - - sync: remove `RwLockWriteGuard::map` and `RwLockWriteGuard::try_map` (#3345) +- sync: remove `RwLockWriteGuard::map` and `RwLockWriteGuard::try_map` ([#3345]) ### Fixed - - docs: remove stream feature from docs (#3335) +- docs: remove stream feature from docs ([#3335]) [semver]: https://github.com/rust-lang/rfcs/blob/master/text/1122-language-semver.md#soundness-changes +[#3335]: https://github.com/tokio-rs/tokio/pull/3335 +[#3345]: https://github.com/tokio-rs/tokio/pull/3345 # 1.0.0 (December 23, 2020) Commit to the API and long-term support. ### Fixed -- sync: spurious wakeup in `watch` (#3234). + +- sync: spurious wakeup in `watch` ([#3234]). ### Changed -- io: rename `AsyncFd::with_io()` to `try_io()` (#3306) -- fs: avoid OS specific `*Ext` traits in favor of conditionally defining the fn (#3264). -- fs: `Sleep` is `!Unpin` (#3278). -- net: pass `SocketAddr` by value (#3125). -- net: `TcpStream::poll_peek` takes `ReadBuf` (#3259). -- rt: rename `runtime::Builder::max_threads()` to `max_blocking_threads()` (#3287). -- time: require `current_thread` runtime when calling `time::pause()` (#3289). + +- io: rename `AsyncFd::with_io()` to `try_io()` ([#3306]) +- fs: avoid OS specific `*Ext` traits in favor of conditionally defining the fn ([#3264]). +- fs: `Sleep` is `!Unpin` ([#3278]). +- net: pass `SocketAddr` by value ([#3125]). +- net: `TcpStream::poll_peek` takes `ReadBuf` ([#3259]). +- rt: rename `runtime::Builder::max_threads()` to `max_blocking_threads()` ([#3287]). +- time: require `current_thread` runtime when calling `time::pause()` ([#3289]). ### Removed -- remove `tokio::prelude` (#3299). -- io: remove `AsyncFd::with_poll()` (#3306). -- net: remove `{Tcp,Unix}Stream::shutdown()` in favor of `AsyncWrite::shutdown()` (#3298). + +- remove `tokio::prelude` ([#3299]). +- io: remove `AsyncFd::with_poll()` ([#3306]). +- net: remove `{Tcp,Unix}Stream::shutdown()` in favor of `AsyncWrite::shutdown()` ([#3298]). - stream: move all stream utilities to `tokio-stream` until `Stream` is added to - `std` (#3277). -- sync: mpsc `try_recv()` due to unexpected behavior (#3263). -- tracing: make unstable as `tracing-core` is not 1.0 yet (#3266). + `std` ([#3277]). +- sync: mpsc `try_recv()` due to unexpected behavior ([#3263]). +- tracing: make unstable as `tracing-core` is not 1.0 yet ([#3266]). ### Added -- fs: `poll_*` fns to `DirEntry` (#3308). -- io: `poll_*` fns to `io::Lines`, `io::Split` (#3308). -- io: `_mut` method variants to `AsyncFd` (#3304). -- net: `poll_*` fns to `UnixDatagram` (#3223). -- net: `UnixStream` readiness and non-blocking ops (#3246). -- sync: `UnboundedReceiver::blocking_recv()` (#3262). -- sync: `watch::Sender::borrow()` (#3269). -- sync: `Semaphore::close()` (#3065). -- sync: `poll_recv` fns to `mpsc::Receiver`, `mpsc::UnboundedReceiver` (#3308). -- time: `poll_tick` fn to `time::Interval` (#3316). + +- fs: `poll_*` fns to `DirEntry` ([#3308]). +- io: `poll_*` fns to `io::Lines`, `io::Split` ([#3308]). +- io: `_mut` method variants to `AsyncFd` ([#3304]). +- net: `poll_*` fns to `UnixDatagram` ([#3223]). +- net: `UnixStream` readiness and non-blocking ops ([#3246]). +- sync: `UnboundedReceiver::blocking_recv()` ([#3262]). +- sync: `watch::Sender::borrow()` ([#3269]). +- sync: `Semaphore::close()` ([#3065]). +- sync: `poll_recv` fns to `mpsc::Receiver`, `mpsc::UnboundedReceiver` ([#3308]). +- time: `poll_tick` fn to `time::Interval` ([#3316]). + +[#3065]: https://github.com/tokio-rs/tokio/pull/3065 +[#3125]: https://github.com/tokio-rs/tokio/pull/3125 +[#3223]: https://github.com/tokio-rs/tokio/pull/3223 +[#3234]: https://github.com/tokio-rs/tokio/pull/3234 +[#3246]: https://github.com/tokio-rs/tokio/pull/3246 +[#3259]: https://github.com/tokio-rs/tokio/pull/3259 +[#3262]: https://github.com/tokio-rs/tokio/pull/3262 +[#3263]: https://github.com/tokio-rs/tokio/pull/3263 +[#3264]: https://github.com/tokio-rs/tokio/pull/3264 +[#3266]: https://github.com/tokio-rs/tokio/pull/3266 +[#3269]: https://github.com/tokio-rs/tokio/pull/3269 +[#3277]: https://github.com/tokio-rs/tokio/pull/3277 +[#3278]: https://github.com/tokio-rs/tokio/pull/3278 +[#3287]: https://github.com/tokio-rs/tokio/pull/3287 +[#3289]: https://github.com/tokio-rs/tokio/pull/3289 +[#3298]: https://github.com/tokio-rs/tokio/pull/3298 +[#3299]: https://github.com/tokio-rs/tokio/pull/3299 +[#3304]: https://github.com/tokio-rs/tokio/pull/3304 +[#3306]: https://github.com/tokio-rs/tokio/pull/3306 +[#3308]: https://github.com/tokio-rs/tokio/pull/3308 +[#3316]: https://github.com/tokio-rs/tokio/pull/3316 # 0.3.6 (December 14, 2020) ### Fixed -- rt: fix deadlock in shutdown (#3228) -- rt: fix panic in task abort when off rt (#3159) -- sync: make `add_permits` panic with usize::MAX >> 3 permits (#3188) -- time: Fix race condition in timer drop (#3229) -- watch: fix spurious wakeup (#3244) + +- rt: fix deadlock in shutdown ([#3228]) +- rt: fix panic in task abort when off rt ([#3159]) +- sync: make `add_permits` panic with usize::MAX >> 3 permits ([#3188]) +- time: Fix race condition in timer drop ([#3229]) +- watch: fix spurious wakeup ([#3244]) ### Added -- example: add back udp-codec example (#3205) -- net: add `TcpStream::into_std` (#3189) + +- example: add back udp-codec example ([#3205]) +- net: add `TcpStream::into_std` ([#3189]) + +[#3159]: https://github.com/tokio-rs/tokio/pull/3159 +[#3188]: https://github.com/tokio-rs/tokio/pull/3188 +[#3189]: https://github.com/tokio-rs/tokio/pull/3189 +[#3205]: https://github.com/tokio-rs/tokio/pull/3205 +[#3228]: https://github.com/tokio-rs/tokio/pull/3228 +[#3229]: https://github.com/tokio-rs/tokio/pull/3229 +[#3244]: https://github.com/tokio-rs/tokio/pull/3244 # 0.3.5 (November 30, 2020) ### Fixed -- rt: fix `shutdown_timeout(0)` (#3196). -- time: fixed race condition with small sleeps (#3069). + +- rt: fix `shutdown_timeout(0)` ([#3196]). +- time: fixed race condition with small sleeps ([#3069]). ### Added -- io: `AsyncFd::with_interest()` (#3167). -- signal: `CtrlC` stream on windows (#3186). + +- io: `AsyncFd::with_interest()` ([#3167]). +- signal: `CtrlC` stream on windows ([#3186]). + +[#3069]: https://github.com/tokio-rs/tokio/pull/3069 +[#3167]: https://github.com/tokio-rs/tokio/pull/3167 +[#3186]: https://github.com/tokio-rs/tokio/pull/3186 +[#3196]: https://github.com/tokio-rs/tokio/pull/3196 # 0.3.4 (November 18, 2020) ### Fixed -- stream: `StreamMap` `Default` impl bound (#3093). -- io: `AsyncFd::into_inner()` should deregister the FD (#3104). + +- stream: `StreamMap` `Default` impl bound ([#3093]). +- io: `AsyncFd::into_inner()` should deregister the FD ([#3104]). ### Changed -- meta: `parking_lot` feature enabled with `full` (#3119). + +- meta: `parking_lot` feature enabled with `full` ([#3119]). ### Added -- io: `AsyncWrite` vectored writes (#3149). -- net: TCP/UDP readiness and non-blocking ops (#3130, #2743, #3138). -- net: TCP socket option (linger, send/recv buf size) (#3145, #3143). -- net: PID field in `UCred` with solaris/illumos (#3085). -- rt: `runtime::Handle` allows spawning onto a runtime (#3079). -- sync: `Notify::notify_waiters()` (#3098). -- sync: `acquire_many()`, `try_acquire_many()` to `Semaphore` (#3067). + +- io: `AsyncWrite` vectored writes ([#3149]). +- net: TCP/UDP readiness and non-blocking ops ([#3130], [#2743], [#3138]). +- net: TCP socket option (linger, send/recv buf size) ([#3145], [#3143]). +- net: PID field in `UCred` with solaris/illumos ([#3085]). +- rt: `runtime::Handle` allows spawning onto a runtime ([#3079]). +- sync: `Notify::notify_waiters()` ([#3098]). +- sync: `acquire_many()`, `try_acquire_many()` to `Semaphore` ([#3067]). + +[#2743]: https://github.com/tokio-rs/tokio/pull/2743 +[#3067]: https://github.com/tokio-rs/tokio/pull/3067 +[#3079]: https://github.com/tokio-rs/tokio/pull/3079 +[#3085]: https://github.com/tokio-rs/tokio/pull/3085 +[#3093]: https://github.com/tokio-rs/tokio/pull/3093 +[#3098]: https://github.com/tokio-rs/tokio/pull/3098 +[#3104]: https://github.com/tokio-rs/tokio/pull/3104 +[#3119]: https://github.com/tokio-rs/tokio/pull/3119 +[#3130]: https://github.com/tokio-rs/tokio/pull/3130 +[#3138]: https://github.com/tokio-rs/tokio/pull/3138 +[#3143]: https://github.com/tokio-rs/tokio/pull/3143 +[#3145]: https://github.com/tokio-rs/tokio/pull/3145 +[#3149]: https://github.com/tokio-rs/tokio/pull/3149 # 0.3.3 (November 2, 2020) @@ -109,28 +255,44 @@ Fixes a soundness hole by adding a missing `Send` bound to `Runtime::spawn_blocking()`. ### Fixed -- rt: include missing `Send`, fixing soundness hole (#3089). -- tracing: avoid huge trace span names (#3074). + +- rt: include missing `Send`, fixing soundness hole ([#3089]). +- tracing: avoid huge trace span names ([#3074]). ### Added -- net: `TcpSocket::reuseport()`, `TcpSocket::set_reuseport()` (#3083). -- net: `TcpSocket::reuseaddr()` (#3093). -- net: `TcpSocket::local_addr()` (#3093). -- net: add pid to `UCred` (#2633). + +- net: `TcpSocket::reuseport()`, `TcpSocket::set_reuseport()` ([#3083]). +- net: `TcpSocket::reuseaddr()` ([#3093]). +- net: `TcpSocket::local_addr()` ([#3093]). +- net: add pid to `UCred` ([#2633]). + +[#2633]: https://github.com/tokio-rs/tokio/pull/2633 +[#3074]: https://github.com/tokio-rs/tokio/pull/3074 +[#3083]: https://github.com/tokio-rs/tokio/pull/3083 +[#3089]: https://github.com/tokio-rs/tokio/pull/3089 +[#3093]: https://github.com/tokio-rs/tokio/pull/3093 # 0.3.2 (October 27, 2020) Adds `AsyncFd` as a replacement for v0.2's `PollEvented`. ### Fixed -- io: fix a potential deadlock when shutting down the I/O driver (#2903). -- sync: `RwLockWriteGuard::downgrade()` bug (#2957). + +- io: fix a potential deadlock when shutting down the I/O driver ([#2903]). +- sync: `RwLockWriteGuard::downgrade()` bug ([#2957]). ### Added -- io: `AsyncFd` for receiving readiness events on raw FDs (#2903). -- net: `poll_*` function on `UdpSocket` (#2981). -- net: `UdpSocket::take_error()` (#3051). -- sync: `oneshot::Sender::poll_closed()` (#3032). + +- io: `AsyncFd` for receiving readiness events on raw FDs ([#2903]). +- net: `poll_*` function on `UdpSocket` ([#2981]). +- net: `UdpSocket::take_error()` ([#3051]). +- sync: `oneshot::Sender::poll_closed()` ([#3032]). + +[#2903]: https://github.com/tokio-rs/tokio/pull/2903 +[#2957]: https://github.com/tokio-rs/tokio/pull/2957 +[#2981]: https://github.com/tokio-rs/tokio/pull/2981 +[#3032]: https://github.com/tokio-rs/tokio/pull/3032 +[#3051]: https://github.com/tokio-rs/tokio/pull/3051 # 0.3.1 (October 21, 2020) @@ -139,15 +301,24 @@ and `write_buf` methods have been added back to the IO traits, as the bytes crat is now on track to reach version 1.0 together with Tokio. ### Fixed -- net: fix use-after-free (#3019). -- fs: ensure buffered data is written on shutdown (#3009). + +- net: fix use-after-free ([#3019]). +- fs: ensure buffered data is written on shutdown ([#3009]). ### Added -- io: `copy_buf()` (#2884). + +- io: `copy_buf()` ([#2884]). - io: `AsyncReadExt::read_buf()`, `AsyncReadExt::write_buf()` for working with - `Buf`/`BufMut` (#3003). -- rt: `Runtime::spawn_blocking()` (#2980). -- sync: `watch::Sender::is_closed()` (#2991). + `Buf`/`BufMut` ([#3003]). +- rt: `Runtime::spawn_blocking()` ([#2980]). +- sync: `watch::Sender::is_closed()` ([#2991]). + +[#2884]: https://github.com/tokio-rs/tokio/pull/2884 +[#2980]: https://github.com/tokio-rs/tokio/pull/2980 +[#2991]: https://github.com/tokio-rs/tokio/pull/2991 +[#3003]: https://github.com/tokio-rs/tokio/pull/3003 +[#3009]: https://github.com/tokio-rs/tokio/pull/3009 +[#3019]: https://github.com/tokio-rs/tokio/pull/3019 # 0.3.0 (October 15, 2020) @@ -167,136 +338,264 @@ Biggest changes are: - `parking_lot` is included with `full` ### Changes + - meta: Minimum supported Rust version is now 1.45. - io: `AsyncRead` trait now takes `ReadBuf` in order to safely handle reading - into uninitialized memory (#2758). -- io: Internal I/O driver storage is now able to compact (#2757). -- rt: `Runtime::block_on` now takes `&self` (#2782). + into uninitialized memory ([#2758]). +- io: Internal I/O driver storage is now able to compact ([#2757]). +- rt: `Runtime::block_on` now takes `&self` ([#2782]). - sync: `watch` reworked to decouple receiving a change notification from - receiving the value (#2814, #2806). -- sync: `Notify::notify` is renamed to `notify_one` (#2822). -- process: `Child::kill` is now an `async fn` that cleans zombies (#2823). -- sync: use `const fn` constructors as possible (#2833, #2790) -- signal: reduce cross-thread notification (#2835). -- net: tcp,udp,uds types support operations with `&self` (#2828, #2919, #2934). -- sync: blocking `mpsc` channel supports `send` with `&self` (#2861). -- time: rename `delay_for` and `delay_until` to `sleep` and `sleep_until` (#2826). -- io: upgrade to `mio` 0.7 (#2893). -- io: `AsyncSeek` trait is tweaked (#2885). -- fs: `File` operations take `&self` (#2930). -- rt: runtime API, and `#[tokio::main]` macro polish (#2876) -- rt: `Runtime::enter` uses an RAII guard instead of a closure (#2954). -- net: the `from_std` function on all sockets no longer sets socket into non-blocking mode (#2893) + receiving the value ([#2814], [#2806]). +- sync: `Notify::notify` is renamed to `notify_one` ([#2822]). +- process: `Child::kill` is now an `async fn` that cleans zombies ([#2823]). +- sync: use `const fn` constructors as possible ([#2833], [#2790]) +- signal: reduce cross-thread notification ([#2835]). +- net: tcp,udp,uds types support operations with `&self` ([#2828], [#2919], [#2934]). +- sync: blocking `mpsc` channel supports `send` with `&self` ([#2861]). +- time: rename `delay_for` and `delay_until` to `sleep` and `sleep_until` ([#2826]). +- io: upgrade to `mio` 0.7 ([#2893]). +- io: `AsyncSeek` trait is tweaked ([#2885]). +- fs: `File` operations take `&self` ([#2930]). +- rt: runtime API, and `#[tokio::main]` macro polish ([#2876]) +- rt: `Runtime::enter` uses an RAII guard instead of a closure ([#2954]). +- net: the `from_std` function on all sockets no longer sets socket into non-blocking mode ([#2893]) ### Added -- sync: `map` function to lock guards (#2445). -- sync: `blocking_recv` and `blocking_send` fns to `mpsc` for use outside of Tokio (#2685). -- rt: `Builder::thread_name_fn` for configuring thread names (#1921). -- fs: impl `FromRawFd` and `FromRawHandle` for `File` (#2792). -- process: `Child::wait` and `Child::try_wait` (#2796). -- rt: support configuring thread keep-alive duration (#2809). -- rt: `task::JoinHandle::abort` forcibly cancels a spawned task (#2474). -- sync: `RwLock` write guard to read guard downgrading (#2733). -- net: add `poll_*` functions that take `&self` to all net types (#2845) -- sync: `get_mut()` for `Mutex`, `RwLock` (#2856). -- sync: `mpsc::Sender::closed()` waits for `Receiver` half to close (#2840). -- sync: `mpsc::Sender::is_closed()` returns true if `Receiver` half is closed (#2726). -- stream: `iter` and `iter_mut` to `StreamMap` (#2890). -- net: implement `AsRawSocket` on windows (#2911). -- net: `TcpSocket` creates a socket without binding or listening (#2920). + +- sync: `map` function to lock guards ([#2445]). +- sync: `blocking_recv` and `blocking_send` fns to `mpsc` for use outside of Tokio ([#2685]). +- rt: `Builder::thread_name_fn` for configuring thread names ([#1921]). +- fs: impl `FromRawFd` and `FromRawHandle` for `File` ([#2792]). +- process: `Child::wait` and `Child::try_wait` ([#2796]). +- rt: support configuring thread keep-alive duration ([#2809]). +- rt: `task::JoinHandle::abort` forcibly cancels a spawned task ([#2474]). +- sync: `RwLock` write guard to read guard downgrading ([#2733]). +- net: add `poll_*` functions that take `&self` to all net types ([#2845]) +- sync: `get_mut()` for `Mutex`, `RwLock` ([#2856]). +- sync: `mpsc::Sender::closed()` waits for `Receiver` half to close ([#2840]). +- sync: `mpsc::Sender::is_closed()` returns true if `Receiver` half is closed ([#2726]). +- stream: `iter` and `iter_mut` to `StreamMap` ([#2890]). +- net: implement `AsRawSocket` on windows ([#2911]). +- net: `TcpSocket` creates a socket without binding or listening ([#2920]). ### Removed -- io: vectored ops are removed from `AsyncRead`, `AsyncWrite` traits (#2882). + +- io: vectored ops are removed from `AsyncRead`, `AsyncWrite` traits ([#2882]). - io: `mio` is removed from the public API. `PollEvented` and` Registration` are - removed (#2893). + removed ([#2893]). - io: remove `bytes` from public API. `Buf` and `BufMut` implementation are - removed (#2908). -- time: `DelayQueue` is moved to `tokio-util` (#2897). + removed ([#2908]). +- time: `DelayQueue` is moved to `tokio-util` ([#2897]). ### Fixed -- io: `stdout` and `stderr` buffering on windows (#2734). + +- io: `stdout` and `stderr` buffering on windows ([#2734]). + +[#1921]: https://github.com/tokio-rs/tokio/pull/1921 +[#2445]: https://github.com/tokio-rs/tokio/pull/2445 +[#2474]: https://github.com/tokio-rs/tokio/pull/2474 +[#2685]: https://github.com/tokio-rs/tokio/pull/2685 +[#2726]: https://github.com/tokio-rs/tokio/pull/2726 +[#2733]: https://github.com/tokio-rs/tokio/pull/2733 +[#2734]: https://github.com/tokio-rs/tokio/pull/2734 +[#2757]: https://github.com/tokio-rs/tokio/pull/2757 +[#2758]: https://github.com/tokio-rs/tokio/pull/2758 +[#2782]: https://github.com/tokio-rs/tokio/pull/2782 +[#2790]: https://github.com/tokio-rs/tokio/pull/2790 +[#2792]: https://github.com/tokio-rs/tokio/pull/2792 +[#2796]: https://github.com/tokio-rs/tokio/pull/2796 +[#2806]: https://github.com/tokio-rs/tokio/pull/2806 +[#2809]: https://github.com/tokio-rs/tokio/pull/2809 +[#2814]: https://github.com/tokio-rs/tokio/pull/2814 +[#2822]: https://github.com/tokio-rs/tokio/pull/2822 +[#2823]: https://github.com/tokio-rs/tokio/pull/2823 +[#2826]: https://github.com/tokio-rs/tokio/pull/2826 +[#2828]: https://github.com/tokio-rs/tokio/pull/2828 +[#2833]: https://github.com/tokio-rs/tokio/pull/2833 +[#2835]: https://github.com/tokio-rs/tokio/pull/2835 +[#2840]: https://github.com/tokio-rs/tokio/pull/2840 +[#2845]: https://github.com/tokio-rs/tokio/pull/2845 +[#2856]: https://github.com/tokio-rs/tokio/pull/2856 +[#2861]: https://github.com/tokio-rs/tokio/pull/2861 +[#2876]: https://github.com/tokio-rs/tokio/pull/2876 +[#2882]: https://github.com/tokio-rs/tokio/pull/2882 +[#2885]: https://github.com/tokio-rs/tokio/pull/2885 +[#2890]: https://github.com/tokio-rs/tokio/pull/2890 +[#2893]: https://github.com/tokio-rs/tokio/pull/2893 +[#2897]: https://github.com/tokio-rs/tokio/pull/2897 +[#2908]: https://github.com/tokio-rs/tokio/pull/2908 +[#2911]: https://github.com/tokio-rs/tokio/pull/2911 +[#2919]: https://github.com/tokio-rs/tokio/pull/2919 +[#2920]: https://github.com/tokio-rs/tokio/pull/2920 +[#2930]: https://github.com/tokio-rs/tokio/pull/2930 +[#2934]: https://github.com/tokio-rs/tokio/pull/2934 +[#2954]: https://github.com/tokio-rs/tokio/pull/2954 # 0.2.22 (July 21, 2020) ### Fixes -- docs: misc improvements (#2572, #2658, #2663, #2656, #2647, #2630, #2487, #2621, - #2624, #2600, #2623, #2622, #2577, #2569, #2589, #2575, #2540, #2564, #2567, - #2520, #2521, #2493) + +- docs: misc improvements ([#2572], [#2658], [#2663], [#2656], [#2647], [#2630], [#2487], [#2621], + [#2624], [#2600], [#2623], [#2622], [#2577], [#2569], [#2589], [#2575], [#2540], [#2564], [#2567], + [#2520], [#2521], [#2493]) - rt: allow calls to `block_on` inside calls to `block_in_place` that are - themselves inside `block_on` (#2645) -- net: fix non-portable behavior when dropping `TcpStream` `OwnedWriteHalf` (#2597) + themselves inside `block_on` ([#2645]) +- net: fix non-portable behavior when dropping `TcpStream` `OwnedWriteHalf` ([#2597]) - io: improve stack usage by allocating large buffers on directly on the heap - (#2634) + ([#2634]) - io: fix unsound pin projection in `AsyncReadExt::read_buf` and - `AsyncWriteExt::write_buf` (#2612) -- io: fix unnecessary zeroing for `AsyncRead` implementors (#2525) -- io: Fix `BufReader` not correctly forwarding `poll_write_buf` (#2654) -- io: fix panic in `AsyncReadExt::read_line` (#2541) + `AsyncWriteExt::write_buf` ([#2612]) +- io: fix unnecessary zeroing for `AsyncRead` implementors ([#2525]) +- io: Fix `BufReader` not correctly forwarding `poll_write_buf` ([#2654]) +- io: fix panic in `AsyncReadExt::read_line` ([#2541]) ### Changes -- coop: returning `Poll::Pending` no longer decrements the task budget (#2549) + +- coop: returning `Poll::Pending` no longer decrements the task budget ([#2549]) ### Added + - io: little-endian variants of `AsyncReadExt` and `AsyncWriteExt` methods - (#1915) -- task: add [`tracing`] instrumentation to spawned tasks (#2655) + ([#1915]) +- task: add [`tracing`] instrumentation to spawned tasks ([#2655]) - sync: allow unsized types in `Mutex` and `RwLock` (via `default` constructors) - (#2615) -- net: add `ToSocketAddrs` implementation for `&[SocketAddr]` (#2604) -- fs: add `OpenOptionsExt` for `OpenOptions` (#2515) -- fs: add `DirBuilder` (#2524) + ([#2615]) +- net: add `ToSocketAddrs` implementation for `&[SocketAddr]` ([#2604]) +- fs: add `OpenOptionsExt` for `OpenOptions` ([#2515]) +- fs: add `DirBuilder` ([#2524]) [`tracing`]: https://crates.io/crates/tracing +[#1915]: https://github.com/tokio-rs/tokio/pull/1915 +[#2487]: https://github.com/tokio-rs/tokio/pull/2487 +[#2493]: https://github.com/tokio-rs/tokio/pull/2493 +[#2515]: https://github.com/tokio-rs/tokio/pull/2515 +[#2520]: https://github.com/tokio-rs/tokio/pull/2520 +[#2521]: https://github.com/tokio-rs/tokio/pull/2521 +[#2524]: https://github.com/tokio-rs/tokio/pull/2524 +[#2525]: https://github.com/tokio-rs/tokio/pull/2525 +[#2540]: https://github.com/tokio-rs/tokio/pull/2540 +[#2541]: https://github.com/tokio-rs/tokio/pull/2541 +[#2549]: https://github.com/tokio-rs/tokio/pull/2549 +[#2564]: https://github.com/tokio-rs/tokio/pull/2564 +[#2567]: https://github.com/tokio-rs/tokio/pull/2567 +[#2569]: https://github.com/tokio-rs/tokio/pull/2569 +[#2572]: https://github.com/tokio-rs/tokio/pull/2572 +[#2575]: https://github.com/tokio-rs/tokio/pull/2575 +[#2577]: https://github.com/tokio-rs/tokio/pull/2577 +[#2589]: https://github.com/tokio-rs/tokio/pull/2589 +[#2597]: https://github.com/tokio-rs/tokio/pull/2597 +[#2600]: https://github.com/tokio-rs/tokio/pull/2600 +[#2604]: https://github.com/tokio-rs/tokio/pull/2604 +[#2612]: https://github.com/tokio-rs/tokio/pull/2612 +[#2615]: https://github.com/tokio-rs/tokio/pull/2615 +[#2621]: https://github.com/tokio-rs/tokio/pull/2621 +[#2622]: https://github.com/tokio-rs/tokio/pull/2622 +[#2623]: https://github.com/tokio-rs/tokio/pull/2623 +[#2624]: https://github.com/tokio-rs/tokio/pull/2624 +[#2630]: https://github.com/tokio-rs/tokio/pull/2630 +[#2634]: https://github.com/tokio-rs/tokio/pull/2634 +[#2645]: https://github.com/tokio-rs/tokio/pull/2645 +[#2647]: https://github.com/tokio-rs/tokio/pull/2647 +[#2654]: https://github.com/tokio-rs/tokio/pull/2654 +[#2655]: https://github.com/tokio-rs/tokio/pull/2655 +[#2656]: https://github.com/tokio-rs/tokio/pull/2656 +[#2658]: https://github.com/tokio-rs/tokio/pull/2658 +[#2663]: https://github.com/tokio-rs/tokio/pull/2663 # 0.2.21 (May 13, 2020) ### Fixes -- macros: disambiguate built-in `#[test]` attribute in macro expansion (#2503) -- rt: `LocalSet` and task budgeting (#2462). -- rt: task budgeting with `block_in_place` (#2502). -- sync: release `broadcast` channel memory without sending a value (#2509). -- time: notify when resetting a `Delay` to a time in the past (#2290) +- macros: disambiguate built-in `#[test]` attribute in macro expansion ([#2503]) +- rt: `LocalSet` and task budgeting ([#2462]). +- rt: task budgeting with `block_in_place` ([#2502]). +- sync: release `broadcast` channel memory without sending a value ([#2509]). +- time: notify when resetting a `Delay` to a time in the past ([#2290]) ### Added -- io: `get_mut`, `get_ref`, and `into_inner` to `Lines` (#2450). -- io: `mio::Ready` argument to `PollEvented` (#2419). -- os: illumos support (#2486). -- rt: `Handle::spawn_blocking` (#2501). -- sync: `OwnedMutexGuard` for `Arc<Mutex<T>>` (#2455). + +- io: `get_mut`, `get_ref`, and `into_inner` to `Lines` ([#2450]). +- io: `mio::Ready` argument to `PollEvented` ([#2419]). +- os: illumos support ([#2486]). +- rt: `Handle::spawn_blocking` ([#2501]). +- sync: `OwnedMutexGuard` for `Arc<Mutex<T>>` ([#2455]). + +[#2290]: https://github.com/tokio-rs/tokio/pull/2290 +[#2419]: https://github.com/tokio-rs/tokio/pull/2419 +[#2450]: https://github.com/tokio-rs/tokio/pull/2450 +[#2455]: https://github.com/tokio-rs/tokio/pull/2455 +[#2462]: https://github.com/tokio-rs/tokio/pull/2462 +[#2486]: https://github.com/tokio-rs/tokio/pull/2486 +[#2501]: https://github.com/tokio-rs/tokio/pull/2501 +[#2502]: https://github.com/tokio-rs/tokio/pull/2502 +[#2503]: https://github.com/tokio-rs/tokio/pull/2503 +[#2509]: https://github.com/tokio-rs/tokio/pull/2509 # 0.2.20 (April 28, 2020) ### Fixes -- sync: `broadcast` closing the channel no longer requires capacity (#2448). -- rt: regression when configuring runtime with `max_threads` less than number of CPUs (#2457). + +- sync: `broadcast` closing the channel no longer requires capacity ([#2448]). +- rt: regression when configuring runtime with `max_threads` less than number of CPUs ([#2457]). + +[#2448]: https://github.com/tokio-rs/tokio/pull/2448 +[#2457]: https://github.com/tokio-rs/tokio/pull/2457 # 0.2.19 (April 24, 2020) ### Fixes -- docs: misc improvements (#2400, #2405, #2414, #2420, #2423, #2426, #2427, #2434, #2436, #2440). -- rt: support `block_in_place` in more contexts (#2409, #2410). -- stream: no panic in `merge()` and `chain()` when using `size_hint()` (#2430). -- task: include visibility modifier when defining a task-local (#2416). + +- docs: misc improvements ([#2400], [#2405], [#2414], [#2420], [#2423], [#2426], [#2427], [#2434], [#2436], [#2440]). +- rt: support `block_in_place` in more contexts ([#2409], [#2410]). +- stream: no panic in `merge()` and `chain()` when using `size_hint()` ([#2430]). +- task: include visibility modifier when defining a task-local ([#2416]). ### Added -- rt: `runtime::Handle::block_on` (#2437). -- sync: owned `Semaphore` permit (#2421). -- tcp: owned split (#2270). + +- rt: `runtime::Handle::block_on` ([#2437]). +- sync: owned `Semaphore` permit ([#2421]). +- tcp: owned split ([#2270]). + +[#2270]: https://github.com/tokio-rs/tokio/pull/2270 +[#2400]: https://github.com/tokio-rs/tokio/pull/2400 +[#2405]: https://github.com/tokio-rs/tokio/pull/2405 +[#2409]: https://github.com/tokio-rs/tokio/pull/2409 +[#2410]: https://github.com/tokio-rs/tokio/pull/2410 +[#2414]: https://github.com/tokio-rs/tokio/pull/2414 +[#2416]: https://github.com/tokio-rs/tokio/pull/2416 +[#2420]: https://github.com/tokio-rs/tokio/pull/2420 +[#2421]: https://github.com/tokio-rs/tokio/pull/2421 +[#2423]: https://github.com/tokio-rs/tokio/pull/2423 +[#2426]: https://github.com/tokio-rs/tokio/pull/2426 +[#2427]: https://github.com/tokio-rs/tokio/pull/2427 +[#2430]: https://github.com/tokio-rs/tokio/pull/2430 +[#2434]: https://github.com/tokio-rs/tokio/pull/2434 +[#2436]: https://github.com/tokio-rs/tokio/pull/2436 +[#2437]: https://github.com/tokio-rs/tokio/pull/2437 +[#2440]: https://github.com/tokio-rs/tokio/pull/2440 # 0.2.18 (April 12, 2020) ### Fixes -- task: `LocalSet` was incorrectly marked as `Send` (#2398) -- io: correctly report `WriteZero` failure in `write_int` (#2334) + +- task: `LocalSet` was incorrectly marked as `Send` ([#2398]) +- io: correctly report `WriteZero` failure in `write_int` ([#2334]) + +[#2334]: https://github.com/tokio-rs/tokio/pull/2334 +[#2398]: https://github.com/tokio-rs/tokio/pull/2398 # 0.2.17 (April 9, 2020) ### Fixes -- rt: bug in work-stealing queue (#2387) + +- rt: bug in work-stealing queue ([#2387]) ### Changes -- rt: threadpool uses logical CPU count instead of physical by default (#2391) + +- rt: threadpool uses logical CPU count instead of physical by default ([#2391]) + +[#2387]: https://github.com/tokio-rs/tokio/pull/2387 +[#2391]: https://github.com/tokio-rs/tokio/pull/2391 # 0.2.16 (April 3, 2020) @@ -311,6 +610,11 @@ Biggest changes are: - time: added `deadline` method to `delay_queue::Expired` ([#2300]) - io: added `StreamReader` ([#2052]) +[#2052]: https://github.com/tokio-rs/tokio/pull/2052 +[#2300]: https://github.com/tokio-rs/tokio/pull/2300 +[#2354]: https://github.com/tokio-rs/tokio/pull/2354 +[#2375]: https://github.com/tokio-rs/tokio/pull/2375 + # 0.2.15 (April 2, 2020) ### Fixes @@ -321,30 +625,51 @@ Biggest changes are: - sync: Add disarm to `mpsc::Sender` ([#2358]). +[#2358]: https://github.com/tokio-rs/tokio/pull/2358 +[#2362]: https://github.com/tokio-rs/tokio/pull/2362 + # 0.2.14 (April 1, 2020) ### Fixes + - rt: concurrency bug in scheduler ([#2273]). - rt: concurrency bug with shell runtime ([#2333]). - test-util: correct pause/resume of time ([#2253]). - time: `DelayQueue` correct wakeup after `insert` ([#2285]). ### Added + - io: impl `RawFd`, `AsRawHandle` for std io types ([#2335]). -- rt: automatic cooperative task yielding (#2160, #2343, #2349). +- rt: automatic cooperative task yielding ([#2160], [#2343], [#2349]). - sync: `RwLock::into_inner` ([#2321]). ### Changed + - sync: semaphore, mutex internals rewritten to avoid allocations ([#2325]). +[#2160]: https://github.com/tokio-rs/tokio/pull/2160 +[#2253]: https://github.com/tokio-rs/tokio/pull/2253 +[#2273]: https://github.com/tokio-rs/tokio/pull/2273 +[#2285]: https://github.com/tokio-rs/tokio/pull/2285 +[#2321]: https://github.com/tokio-rs/tokio/pull/2321 +[#2325]: https://github.com/tokio-rs/tokio/pull/2325 +[#2333]: https://github.com/tokio-rs/tokio/pull/2333 +[#2335]: https://github.com/tokio-rs/tokio/pull/2335 +[#2343]: https://github.com/tokio-rs/tokio/pull/2343 +[#2349]: https://github.com/tokio-rs/tokio/pull/2349 + # 0.2.13 (February 28, 2020) ### Fixes + - macros: unresolved import in `pin!` ([#2281]). +[#2281]: https://github.com/tokio-rs/tokio/pull/2281 + # 0.2.12 (February 27, 2020) ### Fixes + - net: `UnixStream::poll_shutdown` should call `shutdown(Write)` ([#2245]). - process: Wake up read and write on `EPOLLERR` ([#2218]). - rt: potential deadlock when using `block_in_place` and shutting down the @@ -355,6 +680,7 @@ Biggest changes are: - time: avoid having to poll `DelayQueue` after inserting new delay ([#2217]). ### Added + - macros: `pin!` variant that assigns to identifier and pins ([#2274]). - net: impl `Stream` for `Listener` types ([#2275]). - rt: `Runtime::shutdown_timeout` waits for runtime to shutdown for specified @@ -369,15 +695,35 @@ Biggest changes are: for channel capacity ([#2227]). - time: impl `Ord` and `Hash` for `Instant` ([#2239]). +[#2119]: https://github.com/tokio-rs/tokio/pull/2119 +[#2184]: https://github.com/tokio-rs/tokio/pull/2184 +[#2185]: https://github.com/tokio-rs/tokio/pull/2185 +[#2186]: https://github.com/tokio-rs/tokio/pull/2186 +[#2191]: https://github.com/tokio-rs/tokio/pull/2191 +[#2204]: https://github.com/tokio-rs/tokio/pull/2204 +[#2205]: https://github.com/tokio-rs/tokio/pull/2205 +[#2210]: https://github.com/tokio-rs/tokio/pull/2210 +[#2217]: https://github.com/tokio-rs/tokio/pull/2217 +[#2218]: https://github.com/tokio-rs/tokio/pull/2218 +[#2227]: https://github.com/tokio-rs/tokio/pull/2227 +[#2238]: https://github.com/tokio-rs/tokio/pull/2238 +[#2239]: https://github.com/tokio-rs/tokio/pull/2239 +[#2245]: https://github.com/tokio-rs/tokio/pull/2245 +[#2250]: https://github.com/tokio-rs/tokio/pull/2250 +[#2274]: https://github.com/tokio-rs/tokio/pull/2274 +[#2275]: https://github.com/tokio-rs/tokio/pull/2275 + # 0.2.11 (January 27, 2020) ### Fixes -- docs: misc fixes and tweaks (#2155, #2103, #2027, #2167, #2175). + +- docs: misc fixes and tweaks ([#2155], [#2103], [#2027], [#2167], [#2175]). - macros: handle generics in `#[tokio::main]` method ([#2177]). - sync: `broadcast` potential lost notifications ([#2135]). - rt: improve "no runtime" panic messages ([#2145]). ### Added + - optional support for using `parking_lot` internally ([#2164]). - fs: `fs::copy`, an async version of `std::fs::copy` ([#2079]). - macros: `select!` waits for the first branch to complete ([#2152]). @@ -390,18 +736,40 @@ Biggest changes are: - sync: impl `Eq`, `PartialEq` for `oneshot::RecvError` ([#2168]). - task: methods for inspecting the `JoinError` cause ([#2051]). +[#2027]: https://github.com/tokio-rs/tokio/pull/2027 +[#2051]: https://github.com/tokio-rs/tokio/pull/2051 +[#2079]: https://github.com/tokio-rs/tokio/pull/2079 +[#2103]: https://github.com/tokio-rs/tokio/pull/2103 +[#2122]: https://github.com/tokio-rs/tokio/pull/2122 +[#2135]: https://github.com/tokio-rs/tokio/pull/2135 +[#2145]: https://github.com/tokio-rs/tokio/pull/2145 +[#2149]: https://github.com/tokio-rs/tokio/pull/2149 +[#2151]: https://github.com/tokio-rs/tokio/pull/2151 +[#2152]: https://github.com/tokio-rs/tokio/pull/2152 +[#2155]: https://github.com/tokio-rs/tokio/pull/2155 +[#2158]: https://github.com/tokio-rs/tokio/pull/2158 +[#2163]: https://github.com/tokio-rs/tokio/pull/2163 +[#2164]: https://github.com/tokio-rs/tokio/pull/2164 +[#2167]: https://github.com/tokio-rs/tokio/pull/2167 +[#2168]: https://github.com/tokio-rs/tokio/pull/2168 +[#2169]: https://github.com/tokio-rs/tokio/pull/2169 +[#2175]: https://github.com/tokio-rs/tokio/pull/2175 +[#2177]: https://github.com/tokio-rs/tokio/pull/2177 + # 0.2.10 (January 21, 2020) ### Fixes + - `#[tokio::main]` when `rt-core` feature flag is not enabled ([#2139]). - remove `AsyncBufRead` from `BufStream` impl block ([#2108]). - potential undefined behavior when implementing `AsyncRead` incorrectly ([#2030]). ### Added + - `BufStream::with_capacity` ([#2125]). - impl `From` and `Default` for `RwLock` ([#2089]). - `io::ReadHalf::is_pair_of` checks if provided `WriteHalf` is for the same - underlying object (#1762, #2144). + underlying object ([#1762], [#2144]). - `runtime::Handle::try_current()` returns a handle to the current runtime ([#2118]). - `stream::empty()` returns an immediately ready empty stream ([#2092]). - `stream::once(val)` returns a stream that yields a single value: `val` ([#2094]). @@ -412,22 +780,46 @@ Biggest changes are: - `StreamExt::merge` combines two streams, yielding values as they become ready ([#2091]). - Task-local storage ([#2126]). +[#1762]: https://github.com/tokio-rs/tokio/pull/1762 +[#2030]: https://github.com/tokio-rs/tokio/pull/2030 +[#2085]: https://github.com/tokio-rs/tokio/pull/2085 +[#2089]: https://github.com/tokio-rs/tokio/pull/2089 +[#2091]: https://github.com/tokio-rs/tokio/pull/2091 +[#2092]: https://github.com/tokio-rs/tokio/pull/2092 +[#2093]: https://github.com/tokio-rs/tokio/pull/2093 +[#2094]: https://github.com/tokio-rs/tokio/pull/2094 +[#2108]: https://github.com/tokio-rs/tokio/pull/2108 +[#2109]: https://github.com/tokio-rs/tokio/pull/2109 +[#2118]: https://github.com/tokio-rs/tokio/pull/2118 +[#2125]: https://github.com/tokio-rs/tokio/pull/2125 +[#2126]: https://github.com/tokio-rs/tokio/pull/2126 +[#2139]: https://github.com/tokio-rs/tokio/pull/2139 +[#2144]: https://github.com/tokio-rs/tokio/pull/2144 + # 0.2.9 (January 9, 2020) ### Fixes + - `AsyncSeek` impl for `File` ([#1986]). -- rt: shutdown deadlock in `threaded_scheduler` (#2074, #2082). +- rt: shutdown deadlock in `threaded_scheduler` ([#2074], [#2082]). - rt: memory ordering when dropping `JoinHandle` ([#2044]). - docs: misc API documentation fixes and improvements. +[#1986]: https://github.com/tokio-rs/tokio/pull/1986 +[#2044]: https://github.com/tokio-rs/tokio/pull/2044 +[#2074]: https://github.com/tokio-rs/tokio/pull/2074 +[#2082]: https://github.com/tokio-rs/tokio/pull/2082 + # 0.2.8 (January 7, 2020) ### Fixes + - depend on new version of `tokio-macros`. # 0.2.7 (January 7, 2020) ### Fixes + - potential deadlock when dropping `basic_scheduler` Runtime. - calling `spawn_blocking` from within a `spawn_blocking` ([#2006]). - storing a `Runtime` instance in a thread-local ([#2011]). @@ -436,6 +828,7 @@ Biggest changes are: - test-util: `time::advance` runs pending tasks before changing the time ([#2059]). ### Added + - `net::lookup_host` maps a `T: ToSocketAddrs` to a stream of `SocketAddrs` ([#1870]). - `process::Child` fields are made public to match `std` ([#2014]). - impl `Stream` for `sync::broadcast::Receiver` ([#2012]). @@ -452,14 +845,37 @@ Biggest changes are: - `time::DelayQueue::len` returns the number entries in the queue ([#1755]). - expose runtime options from the `#[tokio::main]` and `#[tokio::test]` ([#2022]). +[#1699]: https://github.com/tokio-rs/tokio/pull/1699 +[#1755]: https://github.com/tokio-rs/tokio/pull/1755 +[#1870]: https://github.com/tokio-rs/tokio/pull/1870 +[#1971]: https://github.com/tokio-rs/tokio/pull/1971 +[#2001]: https://github.com/tokio-rs/tokio/pull/2001 +[#2005]: https://github.com/tokio-rs/tokio/pull/2005 +[#2006]: https://github.com/tokio-rs/tokio/pull/2006 +[#2011]: https://github.com/tokio-rs/tokio/pull/2011 +[#2012]: https://github.com/tokio-rs/tokio/pull/2012 +[#2014]: https://github.com/tokio-rs/tokio/pull/2014 +[#2022]: https://github.com/tokio-rs/tokio/pull/2022 +[#2025]: https://github.com/tokio-rs/tokio/pull/2025 +[#2029]: https://github.com/tokio-rs/tokio/pull/2029 +[#2034]: https://github.com/tokio-rs/tokio/pull/2034 +[#2035]: https://github.com/tokio-rs/tokio/pull/2035 +[#2040]: https://github.com/tokio-rs/tokio/pull/2040 +[#2045]: https://github.com/tokio-rs/tokio/pull/2045 +[#2059]: https://github.com/tokio-rs/tokio/pull/2059 + # 0.2.6 (December 19, 2019) ### Fixes + - `fs::File::seek` API regression ([#1991]). +[#1991]: https://github.com/tokio-rs/tokio/pull/1991 + # 0.2.5 (December 18, 2019) ### Added + - `io::AsyncSeek` trait ([#1924]). - `Mutex::try_lock` ([#1939]) - `mpsc::Receiver::try_recv` and `mpsc::UnboundedReceiver::try_recv` ([#1939]). @@ -471,24 +887,47 @@ Biggest changes are: - `stream::StreamExt` provides stream utilities ([#1962]). ### Fixes + - deadlock risk while shutting down the runtime ([#1972]). - panic while shutting down the runtime ([#1978]). - `sync::MutexGuard` debug output ([#1961]). -- misc doc improvements (#1933, #1934, #1940, #1942). +- misc doc improvements ([#1933], [#1934], [#1940], [#1942]). ### Changes + - runtime threads are configured with `runtime::Builder::core_threads` and `runtime::Builder::max_threads`. `runtime::Builder::num_threads` is deprecated ([#1977]). +[#1924]: https://github.com/tokio-rs/tokio/pull/1924 +[#1933]: https://github.com/tokio-rs/tokio/pull/1933 +[#1934]: https://github.com/tokio-rs/tokio/pull/1934 +[#1939]: https://github.com/tokio-rs/tokio/pull/1939 +[#1940]: https://github.com/tokio-rs/tokio/pull/1940 +[#1942]: https://github.com/tokio-rs/tokio/pull/1942 +[#1943]: https://github.com/tokio-rs/tokio/pull/1943 +[#1949]: https://github.com/tokio-rs/tokio/pull/1949 +[#1956]: https://github.com/tokio-rs/tokio/pull/1956 +[#1961]: https://github.com/tokio-rs/tokio/pull/1961 +[#1962]: https://github.com/tokio-rs/tokio/pull/1962 +[#1972]: https://github.com/tokio-rs/tokio/pull/1972 +[#1973]: https://github.com/tokio-rs/tokio/pull/1973 +[#1975]: https://github.com/tokio-rs/tokio/pull/1975 +[#1977]: https://github.com/tokio-rs/tokio/pull/1977 +[#1978]: https://github.com/tokio-rs/tokio/pull/1978 + # 0.2.4 (December 6, 2019) ### Fixes + - `sync::Mutex` deadlock when `lock()` future is dropped early ([#1898]). +[#1898]: https://github.com/tokio-rs/tokio/pull/1898 + # 0.2.3 (December 6, 2019) ### Added + - read / write integers using `AsyncReadExt` and `AsyncWriteExt` ([#1863]). - `read_buf` / `write_buf` for reading / writing `Buf` / `BufMut` ([#1881]). - `TcpStream::poll_peek` - pollable API for performing TCP peek ([#1864]). @@ -500,14 +939,32 @@ Biggest changes are: `std::time::Instant` ([#1904]). ### Fixes + - calling `spawn_blocking` after runtime shutdown ([#1875]). - `LocalSet` drop inifinite loop ([#1892]). - `LocalSet` hang under load ([#1905]). -- improved documentation (#1865, #1866, #1868, #1874, #1876, #1911). +- improved documentation ([#1865], [#1866], [#1868], [#1874], [#1876], [#1911]). + +[#1863]: https://github.com/tokio-rs/tokio/pull/1863 +[#1864]: https://github.com/tokio-rs/tokio/pull/1864 +[#1865]: https://github.com/tokio-rs/tokio/pull/1865 +[#1866]: https://github.com/tokio-rs/tokio/pull/1866 +[#1868]: https://github.com/tokio-rs/tokio/pull/1868 +[#1874]: https://github.com/tokio-rs/tokio/pull/1874 +[#1875]: https://github.com/tokio-rs/tokio/pull/1875 +[#1876]: https://github.com/tokio-rs/tokio/pull/1876 +[#1881]: https://github.com/tokio-rs/tokio/pull/1881 +[#1882]: https://github.com/tokio-rs/tokio/pull/1882 +[#1888]: https://github.com/tokio-rs/tokio/pull/1888 +[#1892]: https://github.com/tokio-rs/tokio/pull/1892 +[#1904]: https://github.com/tokio-rs/tokio/pull/1904 +[#1905]: https://github.com/tokio-rs/tokio/pull/1905 +[#1911]: https://github.com/tokio-rs/tokio/pull/1911 # 0.2.2 (November 29, 2019) ### Fixes + - scheduling with `basic_scheduler` ([#1861]). - update `spawn` panic message to specify that a task scheduler is required ([#1839]). - API docs example for `runtime::Builder` to include a task scheduler ([#1841]). @@ -517,27 +974,47 @@ Biggest changes are: - API docs mention the required Cargo features for `Builder::{basic, threaded}_scheduler` ([#1858]). ### Added + - impl `Stream` for `signal::unix::Signal` ([#1849]). - API docs for platform specific behavior of `signal::ctrl_c` and `signal::unix::Signal` ([#1854]). - API docs for `signal::unix::Signal::{recv, poll_recv}` and `signal::windows::CtrlBreak::{recv, poll_recv}` ([#1854]). - `File::into_std` and `File::try_into_std` methods ([#1856]). +[#1772]: https://github.com/tokio-rs/tokio/pull/1772 +[#1834]: https://github.com/tokio-rs/tokio/pull/1834 +[#1839]: https://github.com/tokio-rs/tokio/pull/1839 +[#1841]: https://github.com/tokio-rs/tokio/pull/1841 +[#1843]: https://github.com/tokio-rs/tokio/pull/1843 +[#1849]: https://github.com/tokio-rs/tokio/pull/1849 +[#1854]: https://github.com/tokio-rs/tokio/pull/1854 +[#1856]: https://github.com/tokio-rs/tokio/pull/1856 +[#1858]: https://github.com/tokio-rs/tokio/pull/1858 +[#1861]: https://github.com/tokio-rs/tokio/pull/1861 + # 0.2.1 (November 26, 2019) ### Fixes + - API docs for `TcpListener::incoming`, `UnixListener::incoming` ([#1831]). ### Added + - `tokio::task::LocalSet` provides a strategy for spawning `!Send` tasks ([#1733]). - export `tokio::time::Elapsed` ([#1826]). - impl `AsRawFd`, `AsRawHandle` for `tokio::fs::File` ([#1827]). +[#1733]: https://github.com/tokio-rs/tokio/pull/1733 +[#1826]: https://github.com/tokio-rs/tokio/pull/1826 +[#1827]: https://github.com/tokio-rs/tokio/pull/1827 +[#1831]: https://github.com/tokio-rs/tokio/pull/1831 + # 0.2.0 (November 26, 2019) A major breaking change. Most implementation and APIs have changed one way or another. This changelog entry contains a highlight ### Changed + - APIs are updated to use `async / await`. - most `tokio-*` crates are collapsed into this crate. - Scheduler is rewritten. @@ -549,6 +1026,7 @@ another. This changelog entry contains a highlight - `tokio::codec` is moved to `tokio-util`. ### Removed + - Standalone `timer` and `net` drivers are removed, use `Runtime` instead - `current_thread` runtime is removed, use `tokio::runtime::Runtime` with `basic_scheduler` instead. @@ -556,311 +1034,221 @@ another. This changelog entry contains a highlight # 0.1.21 (May 30, 2019) ### Changed + - Bump `tokio-trace-core` version to 0.2 ([#1111]). +[#1111]: https://github.com/tokio-rs/tokio/pull/1111 + # 0.1.20 (May 14, 2019) ### Added + - `tokio::runtime::Builder::panic_handler` allows configuring handling panics on the runtime ([#1055]). +[#1055]: https://github.com/tokio-rs/tokio/pull/1055 + # 0.1.19 (April 22, 2019) ### Added + - Re-export `tokio::sync::Mutex` primitive ([#964]). +[#964]: https://github.com/tokio-rs/tokio/pull/964 + # 0.1.18 (March 22, 2019) ### Added + - `TypedExecutor` re-export and implementations ([#993]). +[#993]: https://github.com/tokio-rs/tokio/pull/993 + # 0.1.17 (March 13, 2019) ### Added + - Propagate trace subscriber in the runtime ([#966]). +[#966]: https://github.com/tokio-rs/tokio/pull/966 + # 0.1.16 (March 1, 2019) ### Fixed + - async-await: track latest nightly changes ([#940]). ### Added + - `sync::Watch`, a single value broadcast channel ([#922]). - Async equivalent of read / write file helpers being added to `std` ([#896]). +[#896]: https://github.com/tokio-rs/tokio/pull/896 +[#922]: https://github.com/tokio-rs/tokio/pull/922 +[#940]: https://github.com/tokio-rs/tokio/pull/940 + # 0.1.15 (January 24, 2019) ### Added + - Re-export tokio-sync APIs ([#839]). - Stream enumerate combinator ([#832]). +[#832]: https://github.com/tokio-rs/tokio/pull/832 +[#839]: https://github.com/tokio-rs/tokio/pull/839 + # 0.1.14 (January 6, 2019) -* Use feature flags to break up the crate, allowing users to pick & choose +- Use feature flags to break up the crate, allowing users to pick & choose components ([#808]). -* Export `UnixDatagram` and `UnixDatagramFramed` ([#772]). +- Export `UnixDatagram` and `UnixDatagramFramed` ([#772]). + +[#772]: https://github.com/tokio-rs/tokio/pull/772 +[#808]: https://github.com/tokio-rs/tokio/pull/808 # 0.1.13 (November 21, 2018) -* Fix `Runtime::reactor()` when no tasks are spawned ([#721]). -* `runtime::Builder` no longer uses deprecated methods ([#749]). -* Provide `after_start` and `before_stop` configuration settings for +- Fix `Runtime::reactor()` when no tasks are spawned ([#721]). +- `runtime::Builder` no longer uses deprecated methods ([#749]). +- Provide `after_start` and `before_stop` configuration settings for `Runtime` ([#756]). -* Implement throttle stream combinator ([#736]). +- Implement throttle stream combinator ([#736]). + +[#721]: https://github.com/tokio-rs/tokio/pull/721 +[#736]: https://github.com/tokio-rs/tokio/pull/736 +[#749]: https://github.com/tokio-rs/tokio/pull/749 +[#756]: https://github.com/tokio-rs/tokio/pull/756 # 0.1.12 (October 23, 2018) -* runtime: expose `keep_alive` on runtime builder ([#676]). -* runtime: create a reactor per worker thread ([#660]). -* codec: fix panic in `LengthDelimitedCodec` ([#682]). -* io: re-export `tokio_io::io::read` function ([#689]). -* runtime: check for executor re-entry in more places ([#708]). +- runtime: expose `keep_alive` on runtime builder ([#676]). +- runtime: create a reactor per worker thread ([#660]). +- codec: fix panic in `LengthDelimitedCodec` ([#682]). +- io: re-export `tokio_io::io::read` function ([#689]). +- runtime: check for executor re-entry in more places ([#708]). + +[#660]: https://github.com/tokio-rs/tokio/pull/660 +[#676]: https://github.com/tokio-rs/tokio/pull/676 +[#682]: https://github.com/tokio-rs/tokio/pull/682 +[#689]: https://github.com/tokio-rs/tokio/pull/689 +[#708]: https://github.com/tokio-rs/tokio/pull/708 # 0.1.11 (September 28, 2018) -* Fix `tokio-async-await` dependency ([#675]). +- Fix `tokio-async-await` dependency ([#675]). + +[#675]: https://github.com/tokio-rs/tokio/pull/675 # 0.1.10 (September 27, 2018) -* Fix minimal versions +- Fix minimal versions # 0.1.9 (September 27, 2018) -* Experimental async/await improvements ([#661]). -* Re-export `TaskExecutor` from `tokio-current-thread` ([#652]). -* Improve `Runtime` builder API ([#645]). -* `tokio::run` panics when called from the context of an executor +- Experimental async/await improvements ([#661]). +- Re-export `TaskExecutor` from `tokio-current-thread` ([#652]). +- Improve `Runtime` builder API ([#645]). +- `tokio::run` panics when called from the context of an executor ([#646]). -* Introduce `StreamExt` with a `timeout` helper ([#573]). -* Move `length_delimited` into `tokio` ([#575]). -* Re-organize `tokio::net` module ([#548]). -* Re-export `tokio-current-thread::spawn` in current_thread runtime +- Introduce `StreamExt` with a `timeout` helper ([#573]). +- Move `length_delimited` into `tokio` ([#575]). +- Re-organize `tokio::net` module ([#548]). +- Re-export `tokio-current-thread::spawn` in current_thread runtime ([#579]). +[#548]: https://github.com/tokio-rs/tokio/pull/548 +[#573]: https://github.com/tokio-rs/tokio/pull/573 +[#575]: https://github.com/tokio-rs/tokio/pull/575 +[#579]: https://github.com/tokio-rs/tokio/pull/579 +[#645]: https://github.com/tokio-rs/tokio/pull/645 +[#646]: https://github.com/tokio-rs/tokio/pull/646 +[#652]: https://github.com/tokio-rs/tokio/pull/652 +[#661]: https://github.com/tokio-rs/tokio/pull/661 + # 0.1.8 (August 23, 2018) -* Extract tokio::executor::current_thread to a sub crate ([#370]) -* Add `Runtime::block_on` ([#398]) -* Add `runtime::current_thread::block_on_all` ([#477]) -* Misc documentation improvements ([#450]) -* Implement `std::error::Error` for error types ([#501]) +- Extract tokio::executor::current_thread to a sub crate ([#370]) +- Add `Runtime::block_on` ([#398]) +- Add `runtime::current_thread::block_on_all` ([#477]) +- Misc documentation improvements ([#450]) +- Implement `std::error::Error` for error types ([#501]) + +[#370]: https://github.com/tokio-rs/tokio/pull/370 +[#398]: https://github.com/tokio-rs/tokio/pull/398 +[#450]: https://github.com/tokio-rs/tokio/pull/450 +[#477]: https://github.com/tokio-rs/tokio/pull/477 +[#501]: https://github.com/tokio-rs/tokio/pull/501 # 0.1.7 (June 6, 2018) -* Add `Runtime::block_on` for concurrent runtime ([#391]). -* Provide handle to `current_thread::Runtime` that allows spawning tasks from +- Add `Runtime::block_on` for concurrent runtime ([#391]). +- Provide handle to `current_thread::Runtime` that allows spawning tasks from other threads ([#340]). -* Provide `clock::now()`, a configurable source of time ([#381]). +- Provide `clock::now()`, a configurable source of time ([#381]). + +[#340]: https://github.com/tokio-rs/tokio/pull/340 +[#381]: https://github.com/tokio-rs/tokio/pull/381 +[#391]: https://github.com/tokio-rs/tokio/pull/391 # 0.1.6 (May 2, 2018) -* Add asynchronous filesystem APIs ([#323]). -* Add "current thread" runtime variant ([#308]). -* `CurrentThread`: Expose inner `Park` instance. -* Improve fairness of `CurrentThread` executor ([#313]). +- Add asynchronous filesystem APIs ([#323]). +- Add "current thread" runtime variant ([#308]). +- `CurrentThread`: Expose inner `Park` instance. +- Improve fairness of `CurrentThread` executor ([#313]). + +[#308]: https://github.com/tokio-rs/tokio/pull/308 +[#313]: https://github.com/tokio-rs/tokio/pull/313 +[#323]: https://github.com/tokio-rs/tokio/pull/323 # 0.1.5 (March 30, 2018) -* Provide timer API ([#266]) +- Provide timer API ([#266]) + +[#266]: https://github.com/tokio-rs/tokio/pull/266 # 0.1.4 (March 22, 2018) -* Fix build on FreeBSD ([#218]) -* Shutdown the Runtime when the handle is dropped ([#214]) -* Set Runtime thread name prefix for worker threads ([#232]) -* Add builder for Runtime ([#234]) -* Extract TCP and UDP types into separate crates ([#224]) -* Optionally support futures 0.2. +- Fix build on FreeBSD ([#218]) +- Shutdown the Runtime when the handle is dropped ([#214]) +- Set Runtime thread name prefix for worker threads ([#232]) +- Add builder for Runtime ([#234]) +- Extract TCP and UDP types into separate crates ([#224]) +- Optionally support futures 0.2. + +[#214]: https://github.com/tokio-rs/tokio/pull/214 +[#218]: https://github.com/tokio-rs/tokio/pull/218 +[#224]: https://github.com/tokio-rs/tokio/pull/224 +[#232]: https://github.com/tokio-rs/tokio/pull/232 +[#234]: https://github.com/tokio-rs/tokio/pull/234 # 0.1.3 (March 09, 2018) -* Fix `CurrentThread::turn` to block on idle ([#212]). +- Fix `CurrentThread::turn` to block on idle ([#212]). + +[#212]: https://github.com/tokio-rs/tokio/pull/212 # 0.1.2 (March 09, 2018) -* Introduce Tokio Runtime ([#141]) -* Provide `CurrentThread` for more flexible usage of current thread executor ([#141]). -* Add Lio for platforms that support it ([#142]). -* I/O resources now lazily bind to the reactor ([#160]). -* Extract Reactor to dedicated crate ([#169]) -* Add facade to sub crates and add prelude ([#166]). -* Switch TCP/UDP fns to poll_ -> Poll<...> style ([#175]) +- Introduce Tokio Runtime ([#141]) +- Provide `CurrentThread` for more flexible usage of current thread executor ([#141]). +- Add Lio for platforms that support it ([#142]). +- I/O resources now lazily bind to the reactor ([#160]). +- Extract Reactor to dedicated crate ([#169]) +- Add facade to sub crates and add prelude ([#166]). +- Switch TCP/UDP fns to poll\_ -> Poll<...> style ([#175]) + +[#141]: https://github.com/tokio-rs/tokio/pull/141 +[#142]: https://github.com/tokio-rs/tokio/pull/142 +[#160]: https://github.com/tokio-rs/tokio/pull/160 +[#166]: https://github.com/tokio-rs/tokio/pull/166 +[#169]: https://github.com/tokio-rs/tokio/pull/169 +[#175]: https://github.com/tokio-rs/tokio/pull/175 # 0.1.1 (February 09, 2018) -* Doc fixes +- Doc fixes # 0.1.0 (February 07, 2018) -* Initial crate released based on [RFC](https://github.com/tokio-rs/tokio-rfcs/pull/3). - -[#2375]: https://github.com/tokio-rs/tokio/pull/2375 -[#2362]: https://github.com/tokio-rs/tokio/pull/2362 -[#2358]: https://github.com/tokio-rs/tokio/pull/2358 -[#2354]: https://github.com/tokio-rs/tokio/pull/2354 -[#2335]: https://github.com/tokio-rs/tokio/pull/2335 -[#2333]: https://github.com/tokio-rs/tokio/pull/2333 -[#2325]: https://github.com/tokio-rs/tokio/pull/2325 -[#2321]: https://github.com/tokio-rs/tokio/pull/2321 -[#2300]: https://github.com/tokio-rs/tokio/pull/2300 -[#2285]: https://github.com/tokio-rs/tokio/pull/2285 -[#2281]: https://github.com/tokio-rs/tokio/pull/2281 -[#2275]: https://github.com/tokio-rs/tokio/pull/2275 -[#2274]: https://github.com/tokio-rs/tokio/pull/2274 -[#2273]: https://github.com/tokio-rs/tokio/pull/2273 -[#2253]: https://github.com/tokio-rs/tokio/pull/2253 -[#2250]: https://github.com/tokio-rs/tokio/pull/2250 -[#2245]: https://github.com/tokio-rs/tokio/pull/2245 -[#2239]: https://github.com/tokio-rs/tokio/pull/2239 -[#2238]: https://github.com/tokio-rs/tokio/pull/2238 -[#2227]: https://github.com/tokio-rs/tokio/pull/2227 -[#2218]: https://github.com/tokio-rs/tokio/pull/2218 -[#2217]: https://github.com/tokio-rs/tokio/pull/2217 -[#2210]: https://github.com/tokio-rs/tokio/pull/2210 -[#2205]: https://github.com/tokio-rs/tokio/pull/2205 -[#2204]: https://github.com/tokio-rs/tokio/pull/2204 -[#2191]: https://github.com/tokio-rs/tokio/pull/2191 -[#2186]: https://github.com/tokio-rs/tokio/pull/2186 -[#2185]: https://github.com/tokio-rs/tokio/pull/2185 -[#2184]: https://github.com/tokio-rs/tokio/pull/2184 -[#2177]: https://github.com/tokio-rs/tokio/pull/2177 -[#2169]: https://github.com/tokio-rs/tokio/pull/2169 -[#2168]: https://github.com/tokio-rs/tokio/pull/2168 -[#2164]: https://github.com/tokio-rs/tokio/pull/2164 -[#2163]: https://github.com/tokio-rs/tokio/pull/2163 -[#2158]: https://github.com/tokio-rs/tokio/pull/2158 -[#2152]: https://github.com/tokio-rs/tokio/pull/2152 -[#2151]: https://github.com/tokio-rs/tokio/pull/2151 -[#2149]: https://github.com/tokio-rs/tokio/pull/2149 -[#2145]: https://github.com/tokio-rs/tokio/pull/2145 -[#2139]: https://github.com/tokio-rs/tokio/pull/2139 -[#2135]: https://github.com/tokio-rs/tokio/pull/2135 -[#2126]: https://github.com/tokio-rs/tokio/pull/2126 -[#2125]: https://github.com/tokio-rs/tokio/pull/2125 -[#2122]: https://github.com/tokio-rs/tokio/pull/2122 -[#2119]: https://github.com/tokio-rs/tokio/pull/2119 -[#2118]: https://github.com/tokio-rs/tokio/pull/2118 -[#2109]: https://github.com/tokio-rs/tokio/pull/2109 -[#2108]: https://github.com/tokio-rs/tokio/pull/2108 -[#2094]: https://github.com/tokio-rs/tokio/pull/2094 -[#2093]: https://github.com/tokio-rs/tokio/pull/2093 -[#2092]: https://github.com/tokio-rs/tokio/pull/2092 -[#2091]: https://github.com/tokio-rs/tokio/pull/2091 -[#2089]: https://github.com/tokio-rs/tokio/pull/2089 -[#2085]: https://github.com/tokio-rs/tokio/pull/2085 -[#2079]: https://github.com/tokio-rs/tokio/pull/2079 -[#2059]: https://github.com/tokio-rs/tokio/pull/2059 -[#2052]: https://github.com/tokio-rs/tokio/pull/2052 -[#2051]: https://github.com/tokio-rs/tokio/pull/2051 -[#2045]: https://github.com/tokio-rs/tokio/pull/2045 -[#2044]: https://github.com/tokio-rs/tokio/pull/2044 -[#2040]: https://github.com/tokio-rs/tokio/pull/2040 -[#2035]: https://github.com/tokio-rs/tokio/pull/2035 -[#2034]: https://github.com/tokio-rs/tokio/pull/2034 -[#2030]: https://github.com/tokio-rs/tokio/pull/2030 -[#2029]: https://github.com/tokio-rs/tokio/pull/2029 -[#2025]: https://github.com/tokio-rs/tokio/pull/2025 -[#2022]: https://github.com/tokio-rs/tokio/pull/2022 -[#2014]: https://github.com/tokio-rs/tokio/pull/2014 -[#2012]: https://github.com/tokio-rs/tokio/pull/2012 -[#2011]: https://github.com/tokio-rs/tokio/pull/2011 -[#2006]: https://github.com/tokio-rs/tokio/pull/2006 -[#2005]: https://github.com/tokio-rs/tokio/pull/2005 -[#2001]: https://github.com/tokio-rs/tokio/pull/2001 -[#1991]: https://github.com/tokio-rs/tokio/pull/1991 -[#1986]: https://github.com/tokio-rs/tokio/pull/1986 -[#1978]: https://github.com/tokio-rs/tokio/pull/1978 -[#1977]: https://github.com/tokio-rs/tokio/pull/1977 -[#1975]: https://github.com/tokio-rs/tokio/pull/1975 -[#1973]: https://github.com/tokio-rs/tokio/pull/1973 -[#1972]: https://github.com/tokio-rs/tokio/pull/1972 -[#1971]: https://github.com/tokio-rs/tokio/pull/1971 -[#1962]: https://github.com/tokio-rs/tokio/pull/1962 -[#1961]: https://github.com/tokio-rs/tokio/pull/1961 -[#1956]: https://github.com/tokio-rs/tokio/pull/1956 -[#1949]: https://github.com/tokio-rs/tokio/pull/1949 -[#1943]: https://github.com/tokio-rs/tokio/pull/1943 -[#1939]: https://github.com/tokio-rs/tokio/pull/1939 -[#1924]: https://github.com/tokio-rs/tokio/pull/1924 -[#1905]: https://github.com/tokio-rs/tokio/pull/1905 -[#1904]: https://github.com/tokio-rs/tokio/pull/1904 -[#1898]: https://github.com/tokio-rs/tokio/pull/1898 -[#1892]: https://github.com/tokio-rs/tokio/pull/1892 -[#1888]: https://github.com/tokio-rs/tokio/pull/1888 -[#1882]: https://github.com/tokio-rs/tokio/pull/1882 -[#1881]: https://github.com/tokio-rs/tokio/pull/1881 -[#1875]: https://github.com/tokio-rs/tokio/pull/1875 -[#1874]: https://github.com/tokio-rs/tokio/pull/1874 -[#1870]: https://github.com/tokio-rs/tokio/pull/1870 -[#1864]: https://github.com/tokio-rs/tokio/pull/1864 -[#1863]: https://github.com/tokio-rs/tokio/pull/1863 -[#1861]: https://github.com/tokio-rs/tokio/pull/1861 -[#1858]: https://github.com/tokio-rs/tokio/pull/1858 -[#1856]: https://github.com/tokio-rs/tokio/pull/1856 -[#1854]: https://github.com/tokio-rs/tokio/pull/1854 -[#1849]: https://github.com/tokio-rs/tokio/pull/1849 -[#1843]: https://github.com/tokio-rs/tokio/pull/1843 -[#1841]: https://github.com/tokio-rs/tokio/pull/1841 -[#1839]: https://github.com/tokio-rs/tokio/pull/1839 -[#1834]: https://github.com/tokio-rs/tokio/pull/1834 -[#1831]: https://github.com/tokio-rs/tokio/pull/1831 -[#1827]: https://github.com/tokio-rs/tokio/pull/1827 -[#1826]: https://github.com/tokio-rs/tokio/pull/1826 -[#1772]: https://github.com/tokio-rs/tokio/pull/1772 -[#1755]: https://github.com/tokio-rs/tokio/pull/1755 -[#1733]: https://github.com/tokio-rs/tokio/pull/1733 -[#1699]: https://github.com/tokio-rs/tokio/pull/1699 -[#1111]: https://github.com/tokio-rs/tokio/pull/1111 -[#1055]: https://github.com/tokio-rs/tokio/pull/1055 -[#993]: https://github.com/tokio-rs/tokio/pull/993 -[#966]: https://github.com/tokio-rs/tokio/pull/966 -[#964]: https://github.com/tokio-rs/tokio/pull/964 -[#940]: https://github.com/tokio-rs/tokio/pull/940 -[#922]: https://github.com/tokio-rs/tokio/pull/922 -[#896]: https://github.com/tokio-rs/tokio/pull/896 -[#839]: https://github.com/tokio-rs/tokio/pull/839 -[#832]: https://github.com/tokio-rs/tokio/pull/832 -[#808]: https://github.com/tokio-rs/tokio/pull/808 -[#772]: https://github.com/tokio-rs/tokio/pull/772 -[#756]: https://github.com/tokio-rs/tokio/pull/756 -[#749]: https://github.com/tokio-rs/tokio/pull/749 -[#736]: https://github.com/tokio-rs/tokio/pull/736 -[#721]: https://github.com/tokio-rs/tokio/pull/721 -[#708]: https://github.com/tokio-rs/tokio/pull/708 -[#689]: https://github.com/tokio-rs/tokio/pull/689 -[#682]: https://github.com/tokio-rs/tokio/pull/682 -[#676]: https://github.com/tokio-rs/tokio/pull/676 -[#675]: https://github.com/tokio-rs/tokio/pull/675 -[#661]: https://github.com/tokio-rs/tokio/pull/661 -[#660]: https://github.com/tokio-rs/tokio/pull/660 -[#652]: https://github.com/tokio-rs/tokio/pull/652 -[#646]: https://github.com/tokio-rs/tokio/pull/646 -[#645]: https://github.com/tokio-rs/tokio/pull/645 -[#579]: https://github.com/tokio-rs/tokio/pull/579 -[#575]: https://github.com/tokio-rs/tokio/pull/575 -[#573]: https://github.com/tokio-rs/tokio/pull/573 -[#548]: https://github.com/tokio-rs/tokio/pull/548 -[#501]: https://github.com/tokio-rs/tokio/pull/501 -[#477]: https://github.com/tokio-rs/tokio/pull/477 -[#450]: https://github.com/tokio-rs/tokio/pull/450 -[#398]: https://github.com/tokio-rs/tokio/pull/398 -[#391]: https://github.com/tokio-rs/tokio/pull/391 -[#381]: https://github.com/tokio-rs/tokio/pull/381 -[#370]: https://github.com/tokio-rs/tokio/pull/370 -[#340]: https://github.com/tokio-rs/tokio/pull/340 -[#323]: https://github.com/tokio-rs/tokio/pull/323 -[#313]: https://github.com/tokio-rs/tokio/pull/313 -[#308]: https://github.com/tokio-rs/tokio/pull/308 -[#266]: https://github.com/tokio-rs/tokio/pull/266 -[#234]: https://github.com/tokio-rs/tokio/pull/234 -[#232]: https://github.com/tokio-rs/tokio/pull/232 -[#224]: https://github.com/tokio-rs/tokio/pull/224 -[#218]: https://github.com/tokio-rs/tokio/pull/218 -[#214]: https://github.com/tokio-rs/tokio/pull/214 -[#212]: https://github.com/tokio-rs/tokio/pull/212 -[#175]: https://github.com/tokio-rs/tokio/pull/175 -[#169]: https://github.com/tokio-rs/tokio/pull/169 -[#166]: https://github.com/tokio-rs/tokio/pull/166 -[#160]: https://github.com/tokio-rs/tokio/pull/160 -[#142]: https://github.com/tokio-rs/tokio/pull/142 -[#141]: https://github.com/tokio-rs/tokio/pull/141 +- Initial crate released based on [RFC](https://github.com/tokio-rs/tokio-rfcs/pull/3). @@ -13,11 +13,11 @@ [package] edition = "2018" name = "tokio" -version = "1.0.2" +version = "1.2.0" authors = ["Tokio Contributors <team@tokio.rs>"] description = "An event-driven, non-blocking I/O platform for writing asynchronous I/O\nbacked applications.\n" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio/1.0.2/tokio/" +documentation = "https://docs.rs/tokio/1.2.0/tokio/" readme = "README.md" keywords = ["io", "async", "non-blocking", "futures"] categories = ["asynchronous", "network-programming"] @@ -57,7 +57,7 @@ optional = true version = "0.2.0" [dependencies.tokio-macros] -version = "1.0.0" +version = "1.1.0" optional = true [dev-dependencies.async-stream] version = "0.3" @@ -69,6 +69,9 @@ features = ["async-await"] [dev-dependencies.proptest] version = "0.10.0" +[dev-dependencies.rand] +version = "0.8.0" + [dev-dependencies.tempfile] version = "3.1.0" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index f950a28..33c371c 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -8,12 +8,12 @@ name = "tokio" # - README.md # - Update CHANGELOG.md. # - Create "v1.0.x" git tag. -version = "1.0.2" +version = "1.2.0" edition = "2018" authors = ["Tokio Contributors <team@tokio.rs>"] license = "MIT" readme = "README.md" -documentation = "https://docs.rs/tokio/1.0.2/tokio/" +documentation = "https://docs.rs/tokio/1.2.0/tokio/" repository = "https://github.com/tokio-rs/tokio" homepage = "https://tokio.rs" description = """ @@ -86,7 +86,7 @@ test-util = [] time = [] [dependencies] -tokio-macros = { version = "1.0.0", path = "../tokio-macros", optional = true } +tokio-macros = { version = "1.1.0", path = "../tokio-macros", optional = true } pin-project-lite = "0.2.0" @@ -121,6 +121,7 @@ tokio-test = { version = "0.4.0", path = "../tokio-test" } tokio-stream = { version = "0.1", path = "../tokio-stream" } futures = { version = "0.3.0", features = ["async-await"] } proptest = "0.10.0" +rand = "0.8.0" tempfile = "3.1.0" async-stream = "0.3" @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/tokio/tokio-1.0.2.crate" + value: "https://static.crates.io/crates/tokio/tokio-1.2.0.crate" } - version: "1.0.2" + version: "1.2.0" license_type: NOTICE last_upgrade_date { year: 2021 - month: 1 - day: 14 + month: 2 + day: 9 } } diff --git a/src/fs/read_dir.rs b/src/fs/read_dir.rs index 7b21c9c..aedaf7b 100644 --- a/src/fs/read_dir.rs +++ b/src/fs/read_dir.rs @@ -20,12 +20,15 @@ pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> { Ok(ReadDir(State::Idle(Some(std)))) } -/// Stream of the entries in a directory. +/// Read the the entries in a directory. /// -/// This stream is returned from the [`read_dir`] function of this module and -/// will yield instances of [`DirEntry`]. Through a [`DirEntry`] -/// information like the entry's path and possibly other metadata can be -/// learned. +/// This struct is returned from the [`read_dir`] function of this module and +/// will yield instances of [`DirEntry`]. Through a [`DirEntry`] information +/// like the entry's path and possibly other metadata can be learned. +/// +/// A `ReadDir` can be turned into a `Stream` with [`ReadDirStream`]. +/// +/// [`ReadDirStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReadDirStream.html /// /// # Errors /// diff --git a/src/io/async_fd.rs b/src/io/async_fd.rs index 08d7b91..13f4f2d 100644 --- a/src/io/async_fd.rs +++ b/src/io/async_fd.rs @@ -23,9 +23,11 @@ use std::{task::Context, task::Poll}; /// to retake control from the tokio IO reactor. /// /// The inner object is required to implement [`AsRawFd`]. This file descriptor -/// must not change while [`AsyncFd`] owns the inner object. Changing the file -/// descriptor results in unspecified behavior in the IO driver, which may -/// include breaking notifications for other sockets/etc. +/// must not change while [`AsyncFd`] owns the inner object, i.e. the +/// [`AsRawFd::as_raw_fd`] method on the inner type must always return the same +/// file descriptor when called multiple times. Failure to uphold this results +/// in unspecified behavior in the IO driver, which may include breaking +/// notifications for other sockets/etc. /// /// Polling for readiness is done by calling the async functions [`readable`] /// and [`writable`]. These functions complete when the associated readiness diff --git a/src/io/blocking.rs b/src/io/blocking.rs index 430801e..94a3484 100644 --- a/src/io/blocking.rs +++ b/src/io/blocking.rs @@ -175,7 +175,7 @@ where } } -/// Repeates operations that are interrupted +/// Repeats operations that are interrupted macro_rules! uninterruptibly { ($e:expr) => {{ loop { diff --git a/src/io/driver/mod.rs b/src/io/driver/mod.rs index a1784df..fa2d420 100644 --- a/src/io/driver/mod.rs +++ b/src/io/driver/mod.rs @@ -259,8 +259,7 @@ cfg_rt! { /// This function panics if there is no current reactor set and `rt` feature /// flag is not enabled. pub(super) fn current() -> Self { - crate::runtime::context::io_handle() - .expect("there is no reactor running, must be called from the context of Tokio runtime") + crate::runtime::context::io_handle().expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.") } } } @@ -274,7 +273,7 @@ cfg_not_rt! { /// This function panics if there is no current reactor set, or if the `rt` /// feature flag is not enabled. pub(super) fn current() -> Self { - panic!("there is no reactor running, must be called from the context of Tokio runtime with `rt` enabled.") + panic!(crate::util::error::CONTEXT_MISSING_ERROR) } } } diff --git a/src/io/driver/registration.rs b/src/io/driver/registration.rs index 9312581..1451224 100644 --- a/src/io/driver/registration.rs +++ b/src/io/driver/registration.rs @@ -205,6 +205,19 @@ impl Registration { } } +impl Drop for Registration { + fn drop(&mut self) { + // It is possible for a cycle to be created between wakers stored in + // `ScheduledIo` instances and `Arc<driver::Inner>`. To break this + // cycle, wakers are cleared. This is an imperfect solution as it is + // possible to store a `Registration` in a waker. In this case, the + // cycle would remain. + // + // See tokio-rs/tokio#3481 for more details. + self.shared.clear_wakers(); + } +} + fn gone() -> io::Error { io::Error::new(io::ErrorKind::Other, "IO driver has terminated") } diff --git a/src/io/driver/scheduled_io.rs b/src/io/driver/scheduled_io.rs index 75d5623..71864b3 100644 --- a/src/io/driver/scheduled_io.rs +++ b/src/io/driver/scheduled_io.rs @@ -355,6 +355,12 @@ impl ScheduledIo { // result isn't important let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed); } + + pub(crate) fn clear_wakers(&self) { + let mut waiters = self.waiters.lock(); + waiters.reader.take(); + waiters.writer.take(); + } } impl Drop for ScheduledIo { diff --git a/src/io/poll_evented.rs b/src/io/poll_evented.rs index 0ecdb18..27a4cb7 100644 --- a/src/io/poll_evented.rs +++ b/src/io/poll_evented.rs @@ -126,7 +126,7 @@ impl<E: Source> PollEvented<E> { } /// Deregister the inner io from the registration and returns a Result containing the inner io - #[cfg(feature = "net")] + #[cfg(any(feature = "net", feature = "process"))] pub(crate) fn into_inner(mut self) -> io::Result<E> { let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here. self.registration.deregister(&mut inner)?; diff --git a/src/io/read_buf.rs b/src/io/read_buf.rs index 486b69b..38e857d 100644 --- a/src/io/read_buf.rs +++ b/src/io/read_buf.rs @@ -113,6 +113,25 @@ impl<'a> ReadBuf<'a> { unsafe { mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(slice) } } + /// Returns a mutable reference to the entire buffer, without ensuring that it has been fully + /// initialized. + /// + /// The elements between 0 and `self.filled().len()` are filled, and those between 0 and + /// `self.initialized().len()` are initialized (and so can be transmuted to a `&mut [u8]`). + /// + /// The caller of this method must ensure that these invariants are upheld. For example, if the + /// caller initializes some of the uninitialized section of the buffer, it must call + /// [`assume_init`](Self::assume_init) with the number of bytes initialized. + /// + /// # Safety + /// + /// The caller must not de-initialize portions of the buffer that have already been initialized. + /// This includes any bytes in the region marked as uninitialized by `ReadBuf`. + #[inline] + pub unsafe fn inner_mut(&mut self) -> &mut [MaybeUninit<u8>] { + self.buf + } + /// Returns a mutable reference to the unfilled part of the buffer without ensuring that it has been fully /// initialized. /// diff --git a/src/io/split.rs b/src/io/split.rs index fd3273e..732eb3b 100644 --- a/src/io/split.rs +++ b/src/io/split.rs @@ -131,7 +131,11 @@ impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> { impl<T> Inner<T> { fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<Guard<'_, T>> { - if !self.locked.compare_and_swap(false, true, Acquire) { + if self + .locked + .compare_exchange(false, true, Acquire, Acquire) + .is_ok() + { Poll::Ready(Guard { inner: self }) } else { // Spin... but investigate a better strategy diff --git a/src/io/util/lines.rs b/src/io/util/lines.rs index 25df78e..ed6a944 100644 --- a/src/io/util/lines.rs +++ b/src/io/util/lines.rs @@ -8,7 +8,15 @@ use std::pin::Pin; use std::task::{Context, Poll}; pin_project! { - /// Stream for the [`lines`](crate::io::AsyncBufReadExt::lines) method. + /// Read lines from an [`AsyncBufRead`]. + /// + /// A `Lines` can be turned into a `Stream` with [`LinesStream`]. + /// + /// This type is usually created using the [`lines`] method. + /// + /// [`AsyncBufRead`]: crate::io::AsyncBufRead + /// [`LinesStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.LinesStream.html + /// [`lines`]: crate::io::AsyncBufReadExt::lines #[derive(Debug)] #[must_use = "streams do nothing unless polled"] #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] diff --git a/src/io/util/mod.rs b/src/io/util/mod.rs index e75ea03..e06e7e2 100644 --- a/src/io/util/mod.rs +++ b/src/io/util/mod.rs @@ -48,6 +48,7 @@ cfg_io_util! { mod read_line; mod read_to_end; + mod vec_with_initialized; cfg_process! { pub(crate) use read_to_end::read_to_end; } @@ -82,6 +83,7 @@ cfg_io_util! { cfg_not_io_util! { cfg_process! { + mod vec_with_initialized; mod read_to_end; // Used by process pub(crate) use read_to_end::read_to_end; diff --git a/src/io/util/read_to_end.rs b/src/io/util/read_to_end.rs index 1aee681..f4a564d 100644 --- a/src/io/util/read_to_end.rs +++ b/src/io/util/read_to_end.rs @@ -1,10 +1,11 @@ -use crate::io::{AsyncRead, ReadBuf}; +use crate::io::util::vec_with_initialized::{into_read_buf_parts, VecU8, VecWithInitialized}; +use crate::io::AsyncRead; use pin_project_lite::pin_project; use std::future::Future; use std::io; use std::marker::PhantomPinned; -use std::mem::{self, MaybeUninit}; +use std::mem; use std::pin::Pin; use std::task::{Context, Poll}; @@ -13,7 +14,7 @@ pin_project! { #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct ReadToEnd<'a, R: ?Sized> { reader: &'a mut R, - buf: &'a mut Vec<u8>, + buf: VecWithInitialized<&'a mut Vec<u8>>, // The number of bytes appended to buf. This can be less than buf.len() if // the buffer was not empty when the operation was started. read: usize, @@ -29,20 +30,19 @@ where { ReadToEnd { reader, - buf: buffer, + buf: VecWithInitialized::new(buffer), read: 0, _pin: PhantomPinned, } } -pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>( - buf: &mut Vec<u8>, +pub(super) fn read_to_end_internal<V: VecU8, R: AsyncRead + ?Sized>( + buf: &mut VecWithInitialized<V>, mut reader: Pin<&mut R>, num_read: &mut usize, cx: &mut Context<'_>, ) -> Poll<io::Result<usize>> { loop { - // safety: The caller promised to prepare the buffer. let ret = ready!(poll_read_to_end(buf, reader.as_mut(), cx)); match ret { Err(err) => return Poll::Ready(Err(err)), @@ -57,8 +57,8 @@ pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>( /// Tries to read from the provided AsyncRead. /// /// The length of the buffer is increased by the number of bytes read. -fn poll_read_to_end<R: AsyncRead + ?Sized>( - buf: &mut Vec<u8>, +fn poll_read_to_end<V: VecU8, R: AsyncRead + ?Sized>( + buf: &mut VecWithInitialized<V>, read: Pin<&mut R>, cx: &mut Context<'_>, ) -> Poll<io::Result<usize>> { @@ -68,37 +68,34 @@ fn poll_read_to_end<R: AsyncRead + ?Sized>( // of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every // time is 4,500 times (!) slower than this if the reader has a very small // amount of data to return. - reserve(buf, 32); + buf.reserve(32); - let mut unused_capacity = ReadBuf::uninit(get_unused_capacity(buf)); + // Get a ReadBuf into the vector. + let mut read_buf = buf.get_read_buf(); - let ptr = unused_capacity.filled().as_ptr(); - ready!(read.poll_read(cx, &mut unused_capacity))?; - assert_eq!(ptr, unused_capacity.filled().as_ptr()); + let filled_before = read_buf.filled().len(); + let poll_result = read.poll_read(cx, &mut read_buf); + let filled_after = read_buf.filled().len(); + let n = filled_after - filled_before; - let n = unused_capacity.filled().len(); - let new_len = buf.len() + n; + // Update the length of the vector using the result of poll_read. + let read_buf_parts = into_read_buf_parts(read_buf); + buf.apply_read_buf(read_buf_parts); - assert!(new_len <= buf.capacity()); - unsafe { - buf.set_len(new_len); - } - Poll::Ready(Ok(n)) -} - -/// Allocates more memory and ensures that the unused capacity is prepared for use -/// with the `AsyncRead`. -fn reserve(buf: &mut Vec<u8>, bytes: usize) { - if buf.capacity() - buf.len() >= bytes { - return; + match poll_result { + Poll::Pending => { + // In this case, nothing should have been read. However we still + // update the vector in case the poll_read call initialized parts of + // the vector's unused capacity. + debug_assert_eq!(filled_before, filled_after); + Poll::Pending + } + Poll::Ready(Err(err)) => { + debug_assert_eq!(filled_before, filled_after); + Poll::Ready(Err(err)) + } + Poll::Ready(Ok(())) => Poll::Ready(Ok(n)), } - buf.reserve(bytes); -} - -/// Returns the unused capacity of the provided vector. -fn get_unused_capacity(buf: &mut Vec<u8>) -> &mut [MaybeUninit<u8>] { - let uninit = bytes::BufMut::chunk_mut(buf); - unsafe { &mut *(uninit as *mut _ as *mut [MaybeUninit<u8>]) } } impl<A> Future for ReadToEnd<'_, A> diff --git a/src/io/util/read_to_string.rs b/src/io/util/read_to_string.rs index e463203..2c17383 100644 --- a/src/io/util/read_to_string.rs +++ b/src/io/util/read_to_string.rs @@ -1,5 +1,6 @@ use crate::io::util::read_line::finish_string_read; use crate::io::util::read_to_end::read_to_end_internal; +use crate::io::util::vec_with_initialized::VecWithInitialized; use crate::io::AsyncRead; use pin_project_lite::pin_project; @@ -19,7 +20,7 @@ pin_project! { // while reading to postpone utf-8 handling until after reading. output: &'a mut String, // The actual allocation of the string is moved into this vector instead. - buf: Vec<u8>, + buf: VecWithInitialized<Vec<u8>>, // The number of bytes appended to buf. This can be less than buf.len() if // the buffer was not empty when the operation was started. read: usize, @@ -39,27 +40,22 @@ where let buf = mem::replace(string, String::new()).into_bytes(); ReadToString { reader, - buf, + buf: VecWithInitialized::new(buf), output: string, read: 0, _pin: PhantomPinned, } } -/// # Safety -/// -/// Before first calling this method, the unused capacity must have been -/// prepared for use with the provided AsyncRead. This can be done using the -/// `prepare_buffer` function in `read_to_end.rs`. -unsafe fn read_to_string_internal<R: AsyncRead + ?Sized>( +fn read_to_string_internal<R: AsyncRead + ?Sized>( reader: Pin<&mut R>, output: &mut String, - buf: &mut Vec<u8>, + buf: &mut VecWithInitialized<Vec<u8>>, read: &mut usize, cx: &mut Context<'_>, ) -> Poll<io::Result<usize>> { let io_res = ready!(read_to_end_internal(buf, reader, read, cx)); - let utf8_res = String::from_utf8(mem::replace(buf, Vec::new())); + let utf8_res = String::from_utf8(buf.take()); // At this point both buf and output are empty. The allocation is in utf8_res. @@ -77,7 +73,6 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let me = self.project(); - // safety: The constructor of ReadToString called `prepare_buffer`. - unsafe { read_to_string_internal(Pin::new(*me.reader), me.output, me.buf, me.read, cx) } + read_to_string_internal(Pin::new(*me.reader), me.output, me.buf, me.read, cx) } } diff --git a/src/io/util/split.rs b/src/io/util/split.rs index eb82865..4f3ce4e 100644 --- a/src/io/util/split.rs +++ b/src/io/util/split.rs @@ -8,7 +8,11 @@ use std::pin::Pin; use std::task::{Context, Poll}; pin_project! { - /// Stream for the [`split`](crate::io::AsyncBufReadExt::split) method. + /// Splitter for the [`split`](crate::io::AsyncBufReadExt::split) method. + /// + /// A `Split` can be turned into a `Stream` with [`SplitStream`]. + /// + /// [`SplitStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.SplitStream.html #[derive(Debug)] #[must_use = "streams do nothing unless polled"] #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] diff --git a/src/io/util/vec_with_initialized.rs b/src/io/util/vec_with_initialized.rs new file mode 100644 index 0000000..208cc93 --- /dev/null +++ b/src/io/util/vec_with_initialized.rs @@ -0,0 +1,132 @@ +use crate::io::ReadBuf; +use std::mem::MaybeUninit; + +mod private { + pub trait Sealed {} + + impl Sealed for Vec<u8> {} + impl Sealed for &mut Vec<u8> {} +} + +/// A sealed trait that constrains the generic type parameter in `VecWithInitialized<V>`. That struct's safety relies +/// on certain invariants upheld by `Vec<u8>`. +pub(crate) trait VecU8: AsMut<Vec<u8>> + private::Sealed {} + +impl VecU8 for Vec<u8> {} +impl VecU8 for &mut Vec<u8> {} +/// This struct wraps a `Vec<u8>` or `&mut Vec<u8>`, combining it with a +/// `num_initialized`, which keeps track of the number of initialized bytes +/// in the unused capacity. +/// +/// The purpose of this struct is to remember how many bytes were initialized +/// through a `ReadBuf` from call to call. +/// +/// This struct has the safety invariant that the first `num_initialized` of the +/// vector's allocation must be initialized at any time. +#[derive(Debug)] +pub(crate) struct VecWithInitialized<V> { + vec: V, + // The number of initialized bytes in the vector. + // Always between `vec.len()` and `vec.capacity()`. + num_initialized: usize, +} + +impl VecWithInitialized<Vec<u8>> { + #[cfg(feature = "io-util")] + pub(crate) fn take(&mut self) -> Vec<u8> { + self.num_initialized = 0; + std::mem::take(&mut self.vec) + } +} + +impl<V> VecWithInitialized<V> +where + V: VecU8, +{ + pub(crate) fn new(mut vec: V) -> Self { + // SAFETY: The safety invariants of vector guarantee that the bytes up + // to its length are initialized. + Self { + num_initialized: vec.as_mut().len(), + vec, + } + } + + pub(crate) fn reserve(&mut self, num_bytes: usize) { + let vec = self.vec.as_mut(); + if vec.capacity() - vec.len() >= num_bytes { + return; + } + // SAFETY: Setting num_initialized to `vec.len()` is correct as + // `reserve` does not change the length of the vector. + self.num_initialized = vec.len(); + vec.reserve(num_bytes); + } + + #[cfg(feature = "io-util")] + pub(crate) fn is_empty(&mut self) -> bool { + self.vec.as_mut().is_empty() + } + + pub(crate) fn get_read_buf<'a>(&'a mut self) -> ReadBuf<'a> { + let num_initialized = self.num_initialized; + + // SAFETY: Creating the slice is safe because of the safety invariants + // on Vec<u8>. The safety invariants of `ReadBuf` will further guarantee + // that no bytes in the slice are de-initialized. + let vec = self.vec.as_mut(); + let len = vec.len(); + let cap = vec.capacity(); + let ptr = vec.as_mut_ptr().cast::<MaybeUninit<u8>>(); + let slice = unsafe { std::slice::from_raw_parts_mut::<'a, MaybeUninit<u8>>(ptr, cap) }; + + // SAFETY: This is safe because the safety invariants of + // VecWithInitialized say that the first num_initialized bytes must be + // initialized. + let mut read_buf = ReadBuf::uninit(slice); + unsafe { + read_buf.assume_init(num_initialized); + } + read_buf.set_filled(len); + + read_buf + } + + pub(crate) fn apply_read_buf(&mut self, parts: ReadBufParts) { + let vec = self.vec.as_mut(); + assert_eq!(vec.as_ptr(), parts.ptr); + + // SAFETY: + // The ReadBufParts really does point inside `self.vec` due to the above + // check, and the safety invariants of `ReadBuf` guarantee that the + // first `parts.initialized` bytes of `self.vec` really have been + // initialized. Additionally, `ReadBuf` guarantees that `parts.len` is + // at most `parts.initialized`, so the first `parts.len` bytes are also + // initialized. + // + // Note that this relies on the fact that `V` is either `Vec<u8>` or + // `&mut Vec<u8>`, so the vector returned by `self.vec.as_mut()` cannot + // change from call to call. + unsafe { + self.num_initialized = parts.initialized; + vec.set_len(parts.len); + } + } +} + +pub(crate) struct ReadBufParts { + // Pointer is only used to check that the ReadBuf actually came from the + // right VecWithInitialized. + ptr: *const u8, + len: usize, + initialized: usize, +} + +// This is needed to release the borrow on `VecWithInitialized<V>`. +pub(crate) fn into_read_buf_parts(rb: ReadBuf<'_>) -> ReadBufParts { + ReadBufParts { + ptr: rb.filled().as_ptr(), + len: rb.filled().len(), + initialized: rb.initialized().len(), + } +} @@ -1,4 +1,4 @@ -#![doc(html_root_url = "https://docs.rs/tokio/1.0.2")] +#![doc(html_root_url = "https://docs.rs/tokio/1.2.0")] #![allow( clippy::cognitive_complexity, clippy::large_enum_variant, @@ -301,7 +301,7 @@ //! Beware though that this will pull in many extra dependencies that you may not //! need. //! -//! - `full`: Enables all Tokio public API features listed below. +//! - `full`: Enables all Tokio public API features listed below except `test-util`. //! - `rt`: Enables `tokio::spawn`, the basic (current thread) scheduler, //! and non-scheduler utilities. //! - `rt-multi-thread`: Enables the heavier, multi-threaded, work-stealing scheduler. diff --git a/src/loom/std/atomic_u64.rs b/src/loom/std/atomic_u64.rs index 206954f..a86a195 100644 --- a/src/loom/std/atomic_u64.rs +++ b/src/loom/std/atomic_u64.rs @@ -24,8 +24,8 @@ mod imp { } impl AtomicU64 { - pub(crate) fn new(val: u64) -> AtomicU64 { - AtomicU64 { + pub(crate) fn new(val: u64) -> Self { + Self { inner: Mutex::new(val), } } @@ -45,16 +45,31 @@ mod imp { prev } - pub(crate) fn compare_and_swap(&self, old: u64, new: u64, _: Ordering) -> u64 { + pub(crate) fn compare_exchange( + &self, + current: u64, + new: u64, + _success: Ordering, + _failure: Ordering, + ) -> Result<u64, u64> { let mut lock = self.inner.lock().unwrap(); - let prev = *lock; - if prev != old { - return prev; + if *lock == current { + *lock = new; + Ok(current) + } else { + Err(*lock) } + } - *lock = new; - prev + pub(crate) fn compare_exchange_weak( + &self, + current: u64, + new: u64, + success: Ordering, + failure: Ordering, + ) -> Result<u64, u64> { + self.compare_exchange(current, new, success, failure) } } } diff --git a/src/loom/std/mod.rs b/src/loom/std/mod.rs index c3f74ef..b29cbee 100644 --- a/src/loom/std/mod.rs +++ b/src/loom/std/mod.rs @@ -74,7 +74,10 @@ pub(crate) mod sync { pub(crate) use crate::loom::std::atomic_u8::AtomicU8; pub(crate) use crate::loom::std::atomic_usize::AtomicUsize; - pub(crate) use std::sync::atomic::{fence, spin_loop_hint, AtomicBool, Ordering}; + pub(crate) use std::sync::atomic::{fence, AtomicBool, Ordering}; + // TODO: once we bump MSRV to 1.49+, use `hint::spin_loop` instead. + #[allow(deprecated)] + pub(crate) use std::sync::atomic::spin_loop_hint; } } diff --git a/src/macros/select.rs b/src/macros/select.rs index ca4f963..aa29105 100644 --- a/src/macros/select.rs +++ b/src/macros/select.rs @@ -446,7 +446,7 @@ macro_rules! select { (@ { $($t:tt)* } ) => { // No `else` branch - $crate::select!(@{ $($t)*; unreachable!() }) + $crate::select!(@{ $($t)*; panic!("all branches are disabled and there is no else branch") }) }; (@ { $($t:tt)* } else => $else:expr $(,)?) => { $crate::select!(@{ $($t)*; $else }) diff --git a/src/net/addr.rs b/src/net/addr.rs index 7cbe531..ec4fa19 100644 --- a/src/net/addr.rs +++ b/src/net/addr.rs @@ -8,8 +8,6 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV /// # DNS /// /// Implementations of `ToSocketAddrs` for string types require a DNS lookup. -/// These implementations are only provided when Tokio is used with the -/// **`net`** feature flag. /// /// # Calling /// diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index a2a8637..1ff0949 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -14,6 +14,10 @@ cfg_net! { /// You can accept a new connection by using the [`accept`](`TcpListener::accept`) /// method. /// + /// A `TcpListener` can be turned into a `Stream` with [`TcpListenerStream`]. + /// + /// [`TcpListenerStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.TcpListenerStream.html + /// /// # Errors /// /// Note that accepting a connection can lead to various errors and not all @@ -60,9 +64,6 @@ impl TcpListener { /// method. /// /// The address type can be any implementor of the [`ToSocketAddrs`] trait. - /// Note that strings only implement this trait when the **`net`** feature - /// is enabled, as strings may contain domain names that need to be resolved. - /// /// If `addr` yields multiple addresses, bind will be attempted with each of /// the addresses until one succeeds and returns the listener. If none of /// the addresses succeed in creating a listener, the error returned from @@ -70,7 +71,11 @@ impl TcpListener { /// /// This function sets the `SO_REUSEADDR` option on the socket. /// + /// To configure the socket before binding, you can use the [`TcpSocket`] + /// type. + /// /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs + /// [`TcpSocket`]: struct@crate::net::TcpSocket /// /// # Examples /// diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index d4bfba4..91e357f 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -17,11 +17,16 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; +cfg_io_util! { + use bytes::BufMut; +} + cfg_net! { /// A TCP stream between a local and a remote socket. /// /// A TCP stream can either be created by connecting to an endpoint, via the - /// [`connect`] method, or by [accepting] a connection from a [listener]. + /// [`connect`] method, or by [accepting] a connection from a [listener]. A + /// TCP stream can also be created via the [`TcpSocket`] type. /// /// Reading and writing to a `TcpStream` is usually done using the /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] @@ -30,6 +35,7 @@ cfg_net! { /// [`connect`]: method@TcpStream::connect /// [accepting]: method@crate::net::TcpListener::accept /// [listener]: struct@crate::net::TcpListener + /// [`TcpSocket`]: struct@crate::net::TcpSocket /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt /// @@ -72,16 +78,17 @@ impl TcpStream { /// Opens a TCP connection to a remote host. /// /// `addr` is an address of the remote host. Anything which implements the - /// [`ToSocketAddrs`] trait can be supplied as the address. Note that - /// strings only implement this trait when the **`net`** feature is enabled, - /// as strings may contain domain names that need to be resolved. + /// [`ToSocketAddrs`] trait can be supplied as the address. If `addr` + /// yields multiple addresses, connect will be attempted with each of the + /// addresses until a connection is successful. If none of the addresses + /// result in a successful connection, the error returned from the last + /// connection attempt (the last address) is returned. /// - /// If `addr` yields multiple addresses, connect will be attempted with each - /// of the addresses until a connection is successful. If none of the - /// addresses result in a successful connection, the error returned from the - /// last connection attempt (the last address) is returned. + /// To configure the socket before connecting, you can use the [`TcpSocket`] + /// type. /// /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs + /// [`TcpSocket`]: struct@crate::net::TcpSocket /// /// # Examples /// @@ -559,6 +566,85 @@ impl TcpStream { .try_io(Interest::READABLE, || (&*self.io).read(buf)) } + cfg_io_util! { + /// Try to read data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: TcpStream::readable() + /// [`ready()`]: TcpStream::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// loop { + /// // Wait for the socket to be readable + /// stream.readable().await?; + /// + /// let mut buf = Vec::with_capacity(4096); + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_read_buf(&mut buf) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { + self.io.registration().try_io(Interest::READABLE, || { + use std::io::Read; + + let dst = buf.chunk_mut(); + let dst = + unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; + + // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the + // buffer. + let n = (&*self.io).read(dst)?; + + unsafe { + buf.advance_mut(n); + } + + Ok(n) + }) + } + } + /// Wait for the socket to become writable. /// /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually diff --git a/src/net/udp.rs b/src/net/udp.rs index 23abe98..86b4fe9 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -7,6 +7,10 @@ use std::io; use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::task::{Context, Poll}; +cfg_io_util! { + use bytes::BufMut; +} + cfg_net! { /// A UDP socket /// @@ -18,8 +22,13 @@ cfg_net! { /// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single address, using [`send`](`UdpSocket::send`) /// and [`recv`](`UdpSocket::recv`) to communicate only with that remote address /// - /// `UdpSocket` can also be used concurrently to `send_to` and `recv_from` in different tasks, - /// all that's required is that you `Arc<UdpSocket>` and clone a reference for each task. + /// This type does not provide a `split` method, because this functionality + /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do + /// not need a `Mutex` to share the `UdpSocket` — an `Arc<UdpSocket>` is + /// enough. This is because all of the methods take `&self` instead of `&mut + /// self`. + /// + /// [`Arc`]: std::sync::Arc /// /// # Streams /// @@ -74,11 +83,12 @@ cfg_net! { /// } /// ``` /// - /// # Example: Sending/Receiving concurrently + /// # Example: Splitting with `Arc` /// - /// Because `send_to` and `recv_from` take `&self`. It's perfectly alright to `Arc<UdpSocket>` - /// and share the references to multiple tasks, in order to send/receive concurrently. Here is - /// a similar "echo" example but that supports concurrent sending/receiving: + /// Because `send_to` and `recv_from` take `&self`. It's perfectly alright + /// to use an `Arc<UdpSocket>` and share the references to multiple tasks. + /// Here is a similar "echo" example that supports concurrent + /// sending/receiving: /// /// ```no_run /// use tokio::{net::UdpSocket, sync::mpsc}; @@ -683,6 +693,137 @@ impl UdpSocket { .try_io(Interest::READABLE, || self.io.recv(buf)) } + cfg_io_util! { + /// Try to receive data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + /// + /// The function must be called with valid byte array buf of sufficient size + /// to hold the message bytes. If a message is too long to fit in the + /// supplied buffer, excess bytes may be discarded. + /// + /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is + /// returned. This function is usually paired with `readable()`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Connect to a peer + /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; + /// socket.connect("127.0.0.1:8081").await?; + /// + /// loop { + /// // Wait for the socket to be readable + /// socket.readable().await?; + /// + /// let mut buf = Vec::with_capacity(1024); + /// + /// // Try to recv data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_recv_buf(&mut buf) { + /// Ok(n) => { + /// println!("GOT {:?}", &buf[..n]); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { + self.io.registration().try_io(Interest::READABLE, || { + let dst = buf.chunk_mut(); + let dst = + unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; + + // Safety: We trust `UdpSocket::recv` to have filled up `n` bytes in the + // buffer. + let n = (&*self.io).recv(dst)?; + + unsafe { + buf.advance_mut(n); + } + + Ok(n) + }) + } + + /// Try to receive a single datagram message on the socket. On success, + /// returns the number of bytes read and the origin. + /// + /// The function must be called with valid byte array buf of sufficient size + /// to hold the message bytes. If a message is too long to fit in the + /// supplied buffer, excess bytes may be discarded. + /// + /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is + /// returned. This function is usually paired with `readable()`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Connect to a peer + /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; + /// + /// loop { + /// // Wait for the socket to be readable + /// socket.readable().await?; + /// + /// let mut buf = Vec::with_capacity(1024); + /// + /// // Try to recv data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_recv_buf_from(&mut buf) { + /// Ok((n, _addr)) => { + /// println!("GOT {:?}", &buf[..n]); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> { + self.io.registration().try_io(Interest::READABLE, || { + let dst = buf.chunk_mut(); + let dst = + unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; + + // Safety: We trust `UdpSocket::recv_from` to have filled up `n` bytes in the + // buffer. + let (n, addr) = (&*self.io).recv_from(dst)?; + + unsafe { + buf.advance_mut(n); + } + + Ok((n, addr)) + }) + } + } + /// Sends data on the socket to the given address. On success, returns the /// number of bytes written. /// @@ -904,7 +1045,6 @@ impl UdpSocket { /// async fn main() -> io::Result<()> { /// // Connect to a peer /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; - /// socket.connect("127.0.0.1:8081").await?; /// /// loop { /// // Wait for the socket to be readable diff --git a/src/net/unix/datagram/socket.rs b/src/net/unix/datagram/socket.rs index fb5f602..126a243 100644 --- a/src/net/unix/datagram/socket.rs +++ b/src/net/unix/datagram/socket.rs @@ -10,17 +10,29 @@ use std::os::unix::net; use std::path::Path; use std::task::{Context, Poll}; +cfg_io_util! { + use bytes::BufMut; +} + cfg_net_unix! { /// An I/O object representing a Unix datagram socket. /// /// A socket can be either named (associated with a filesystem path) or /// unnamed. /// + /// This type does not provide a `split` method, because this functionality + /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do + /// not need a `Mutex` to share the `UnixDatagram` — an `Arc<UnixDatagram>` + /// is enough. This is because all of the methods take `&self` instead of + /// `&mut self`. + /// /// **Note:** named sockets are persisted even after the object is dropped /// and the program has exited, and cannot be reconnected. It is advised /// that you either check for and unlink the existing socket if it exists, /// or use a temporary file that is guaranteed to not already exist. /// + /// [`Arc`]: std::sync::Arc + /// /// # Examples /// Using named sockets, associated with a filesystem path: /// ``` @@ -652,6 +664,130 @@ impl UnixDatagram { .try_io(Interest::READABLE, || self.io.recv(buf)) } + cfg_io_util! { + /// Try to receive data from the socket without waiting. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixDatagram; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Connect to a peer + /// let dir = tempfile::tempdir().unwrap(); + /// let client_path = dir.path().join("client.sock"); + /// let server_path = dir.path().join("server.sock"); + /// let socket = UnixDatagram::bind(&client_path)?; + /// + /// loop { + /// // Wait for the socket to be readable + /// socket.readable().await?; + /// + /// let mut buf = Vec::with_capacity(1024); + /// + /// // Try to recv data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_recv_buf_from(&mut buf) { + /// Ok((n, _addr)) => { + /// println!("GOT {:?}", &buf[..n]); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> { + let (n, addr) = self.io.registration().try_io(Interest::READABLE, || { + let dst = buf.chunk_mut(); + let dst = + unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; + + // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the + // buffer. + let (n, addr) = (&*self.io).recv_from(dst)?; + + unsafe { + buf.advance_mut(n); + } + + Ok((n, addr)) + })?; + + Ok((n, SocketAddr(addr))) + } + + /// Try to read data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixDatagram; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Connect to a peer + /// let dir = tempfile::tempdir().unwrap(); + /// let client_path = dir.path().join("client.sock"); + /// let server_path = dir.path().join("server.sock"); + /// let socket = UnixDatagram::bind(&client_path)?; + /// socket.connect(&server_path)?; + /// + /// loop { + /// // Wait for the socket to be readable + /// socket.readable().await?; + /// + /// let mut buf = Vec::with_capacity(1024); + /// + /// // Try to recv data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_recv_buf(&mut buf) { + /// Ok(n) => { + /// println!("GOT {:?}", &buf[..n]); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { + self.io.registration().try_io(Interest::READABLE, || { + let dst = buf.chunk_mut(); + let dst = + unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; + + // Safety: We trust `UnixDatagram::recv` to have filled up `n` bytes in the + // buffer. + let n = (&*self.io).recv(dst)?; + + unsafe { + buf.advance_mut(n); + } + + Ok(n) + }) + } + } + /// Sends data on the socket to the specified address. /// /// # Examples diff --git a/src/net/unix/listener.rs b/src/net/unix/listener.rs index 9ed4ce1..d1c063e 100644 --- a/src/net/unix/listener.rs +++ b/src/net/unix/listener.rs @@ -14,6 +14,10 @@ cfg_net_unix! { /// /// You can accept a new connection by using the [`accept`](`UnixListener::accept`) method. /// + /// A `UnixListener` can be turned into a `Stream` with [`UnixListenerStream`]. + /// + /// [`UnixListenerStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnixListenerStream.html + /// /// # Errors /// /// Note that accepting a connection can lead to various errors and not all diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs index dc929dc..a3e3487 100644 --- a/src/net/unix/stream.rs +++ b/src/net/unix/stream.rs @@ -15,6 +15,10 @@ use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; +cfg_io_util! { + use bytes::BufMut; +} + cfg_net_unix! { /// A structure representing a connected Unix socket. /// @@ -267,6 +271,87 @@ impl UnixStream { .try_io(Interest::READABLE, || (&*self.io).read(buf)) } + cfg_io_util! { + /// Try to read data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: UnixStream::readable() + /// [`ready()`]: UnixStream::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let dir = tempfile::tempdir().unwrap(); + /// let bind_path = dir.path().join("bind_path"); + /// let stream = UnixStream::connect(bind_path).await?; + /// + /// loop { + /// // Wait for the socket to be readable + /// stream.readable().await?; + /// + /// let mut buf = Vec::with_capacity(4096); + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_read_buf(&mut buf) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { + self.io.registration().try_io(Interest::READABLE, || { + use std::io::Read; + + let dst = buf.chunk_mut(); + let dst = + unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; + + // Safety: We trust `UnixStream::read` to have filled up `n` bytes in the + // buffer. + let n = (&*self.io).read(dst)?; + + unsafe { + buf.advance_mut(n); + } + + Ok(n) + }) + } + } + /// Wait for the socket to become writable. /// /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually diff --git a/src/process/mod.rs b/src/process/mod.rs index bd23e1f..7180d51 100644 --- a/src/process/mod.rs +++ b/src/process/mod.rs @@ -97,6 +97,51 @@ //! } //! ``` //! +//! With some coordination, we can also pipe the output of one command into +//! another. +//! +//! ```no_run +//! use tokio::join; +//! use tokio::process::Command; +//! use std::convert::TryInto; +//! use std::process::Stdio; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let mut echo = Command::new("echo") +//! .arg("hello world!") +//! .stdout(Stdio::piped()) +//! .spawn() +//! .expect("failed to spawn echo"); +//! +//! let tr_stdin: Stdio = echo +//! .stdout +//! .take() +//! .unwrap() +//! .try_into() +//! .expect("failed to convert to Stdio"); +//! +//! let tr = Command::new("tr") +//! .arg("a-z") +//! .arg("A-Z") +//! .stdin(tr_stdin) +//! .stdout(Stdio::piped()) +//! .spawn() +//! .expect("failed to spawn tr"); +//! +//! let (echo_result, tr_output) = join!(echo.wait(), tr.wait_with_output()); +//! +//! assert!(echo_result.unwrap().success()); +//! +//! let tr_output = tr_output.expect("failed to await tr"); +//! assert!(tr_output.status.success()); +//! +//! assert_eq!(tr_output.stdout, b"HELLO WORLD!\n"); +//! +//! Ok(()) +//! } +//! ``` +//! //! # Caveats //! //! ## Dropping/Cancellation @@ -147,6 +192,7 @@ mod kill; use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; use crate::process::kill::Kill; +use std::convert::TryInto; use std::ffi::OsStr; use std::future::Future; use std::io; @@ -839,15 +885,38 @@ pub struct Child { child: FusedChild, /// The handle for writing to the child's standard input (stdin), if it has - /// been captured. + /// been captured. To avoid partially moving the `child` and thus blocking + /// yourself from calling functions on `child` while using `stdin`, you might + /// find it helpful to do: + /// + /// ```no_run + /// # let mut child = tokio::process::Command::new("echo").spawn().unwrap(); + /// let stdin = child.stdin.take().unwrap(); + /// ``` pub stdin: Option<ChildStdin>, /// The handle for reading from the child's standard output (stdout), if it - /// has been captured. + /// has been captured. You might find it helpful to do + /// + /// ```no_run + /// # let mut child = tokio::process::Command::new("echo").spawn().unwrap(); + /// let stdout = child.stdout.take().unwrap(); + /// ``` + /// + /// to avoid partially moving the `child` and thus blocking yourself from calling + /// functions on `child` while using `stdout`. pub stdout: Option<ChildStdout>, /// The handle for reading from the child's standard error (stderr), if it - /// has been captured. + /// has been captured. You might find it helpful to do + /// + /// ```no_run + /// # let mut child = tokio::process::Command::new("echo").spawn().unwrap(); + /// let stderr = child.stderr.take().unwrap(); + /// ``` + /// + /// to avoid partially moving the `child` and thus blocking yourself from calling + /// functions on `child` while using `stderr`. pub stderr: Option<ChildStderr>, } @@ -921,7 +990,36 @@ impl Child { /// before waiting. This helps avoid deadlock: it ensures that the /// child does not block waiting for input from the parent, while /// the parent waits for the child to exit. + /// + /// If the caller wishes to explicitly control when the child's stdin + /// handle is closed, they may `.take()` it before calling `.wait()`: + /// + /// ```no_run + /// use tokio::io::AsyncWriteExt; + /// use tokio::process::Command; + /// + /// #[tokio::main] + /// async fn main() { + /// let mut child = Command::new("cat").spawn().unwrap(); + /// + /// let mut stdin = child.stdin.take().unwrap(); + /// tokio::spawn(async move { + /// // do something with stdin here... + /// stdin.write_all(b"hello world\n").await.unwrap(); + /// + /// // then drop when finished + /// drop(stdin); + /// }); + /// + /// // wait for the process to complete + /// let _ = child.wait().await; + /// } + /// ``` pub async fn wait(&mut self) -> io::Result<ExitStatus> { + // Ensure stdin is closed so the child isn't stuck waiting on + // input while the parent is waiting for it to exit. + drop(self.stdin.take()); + match &mut self.child { FusedChild::Done(exit) => Ok(*exit), FusedChild::Child(child) => { @@ -995,7 +1093,6 @@ impl Child { Ok(vec) } - drop(self.stdin.take()); let stdout_fut = read_to_end(self.stdout.take()); let stderr_fut = read_to_end(self.stderr.take()); @@ -1076,6 +1173,30 @@ impl AsyncRead for ChildStderr { } } +impl TryInto<Stdio> for ChildStdin { + type Error = io::Error; + + fn try_into(self) -> Result<Stdio, Self::Error> { + imp::convert_to_stdio(self.inner) + } +} + +impl TryInto<Stdio> for ChildStdout { + type Error = io::Error; + + fn try_into(self) -> Result<Stdio, Self::Error> { + imp::convert_to_stdio(self.inner) + } +} + +impl TryInto<Stdio> for ChildStderr { + type Error = io::Error; + + fn try_into(self) -> Result<Stdio, Self::Error> { + imp::convert_to_stdio(self.inner) + } +} + #[cfg(unix)] mod sys { use std::os::unix::io::{AsRawFd, RawFd}; diff --git a/src/process/unix/mod.rs b/src/process/unix/mod.rs index 3608b9f..852a191 100644 --- a/src/process/unix/mod.rs +++ b/src/process/unix/mod.rs @@ -43,7 +43,7 @@ use std::future::Future; use std::io; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::pin::Pin; -use std::process::{Child as StdChild, ExitStatus}; +use std::process::{Child as StdChild, ExitStatus, Stdio}; use std::task::Context; use std::task::Poll; @@ -176,6 +176,18 @@ impl AsRawFd for Pipe { } } +pub(crate) fn convert_to_stdio(io: PollEvented<Pipe>) -> io::Result<Stdio> { + let mut fd = io.into_inner()?.fd; + + // Ensure that the fd to be inherited is set to *blocking* mode, as this + // is the default that virtually all programs expect to have. Those + // programs that know how to work with nonblocking stdio will know how to + // change it to nonblocking mode. + set_nonblocking(&mut fd, false)?; + + Ok(Stdio::from(fd)) +} + impl Source for Pipe { fn register( &mut self, @@ -204,6 +216,29 @@ pub(crate) type ChildStdin = PollEvented<Pipe>; pub(crate) type ChildStdout = PollEvented<Pipe>; pub(crate) type ChildStderr = PollEvented<Pipe>; +fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> { + unsafe { + let fd = fd.as_raw_fd(); + let previous = libc::fcntl(fd, libc::F_GETFL); + if previous == -1 { + return Err(io::Error::last_os_error()); + } + + let new = if nonblocking { + previous | libc::O_NONBLOCK + } else { + previous & !libc::O_NONBLOCK + }; + + let r = libc::fcntl(fd, libc::F_SETFL, new); + if r == -1 { + return Err(io::Error::last_os_error()); + } + } + + Ok(()) +} + fn stdio<T>(option: Option<T>) -> io::Result<Option<PollEvented<Pipe>>> where T: IntoRawFd, @@ -214,20 +249,8 @@ where }; // Set the fd to nonblocking before we pass it to the event loop - let pipe = unsafe { - let pipe = Pipe::from(io); - let fd = pipe.as_raw_fd(); - let r = libc::fcntl(fd, libc::F_GETFL); - if r == -1 { - return Err(io::Error::last_os_error()); - } - let r = libc::fcntl(fd, libc::F_SETFL, r | libc::O_NONBLOCK); - if r == -1 { - return Err(io::Error::last_os_error()); - } - - pipe - }; + let mut pipe = Pipe::from(io); + set_nonblocking(&mut pipe, true)?; Ok(Some(PollEvented::new(pipe)?)) } diff --git a/src/process/windows.rs b/src/process/windows.rs index 1aa6c89..7237525 100644 --- a/src/process/windows.rs +++ b/src/process/windows.rs @@ -26,14 +26,19 @@ use std::future::Future; use std::io; use std::os::windows::prelude::{AsRawHandle, FromRawHandle, IntoRawHandle}; use std::pin::Pin; +use std::process::Stdio; use std::process::{Child as StdChild, Command as StdCommand, ExitStatus}; use std::ptr; use std::task::Context; use std::task::Poll; -use winapi::um::handleapi::INVALID_HANDLE_VALUE; +use winapi::shared::minwindef::{DWORD, FALSE}; +use winapi::um::handleapi::{DuplicateHandle, INVALID_HANDLE_VALUE}; +use winapi::um::processthreadsapi::GetCurrentProcess; use winapi::um::threadpoollegacyapiset::UnregisterWaitEx; use winapi::um::winbase::{RegisterWaitForSingleObject, INFINITE}; -use winapi::um::winnt::{BOOLEAN, HANDLE, PVOID, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE}; +use winapi::um::winnt::{ + BOOLEAN, DUPLICATE_SAME_ACCESS, HANDLE, PVOID, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE, +}; #[must_use = "futures do nothing unless polled"] pub(crate) struct Child { @@ -171,3 +176,30 @@ where let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) }; PollEvented::new(pipe).ok() } + +pub(crate) fn convert_to_stdio(io: PollEvented<NamedPipe>) -> io::Result<Stdio> { + let named_pipe = io.into_inner()?; + + // Mio does not implement `IntoRawHandle` for `NamedPipe`, so we'll manually + // duplicate the handle here... + unsafe { + let mut dup_handle = INVALID_HANDLE_VALUE; + let cur_proc = GetCurrentProcess(); + + let status = DuplicateHandle( + cur_proc, + named_pipe.as_raw_handle(), + cur_proc, + &mut dup_handle, + 0 as DWORD, + FALSE, + DUPLICATE_SAME_ACCESS, + ); + + if status == 0 { + return Err(io::Error::last_os_error()); + } + + Ok(Stdio::from_raw_handle(dup_handle)) + } +} diff --git a/src/runtime/basic_scheduler.rs b/src/runtime/basic_scheduler.rs index 860eab4..aeb0150 100644 --- a/src/runtime/basic_scheduler.rs +++ b/src/runtime/basic_scheduler.rs @@ -129,7 +129,7 @@ impl<P: Park> BasicScheduler<P> { pin!(future); // Attempt to steal the dedicated parker and block_on the future if we can there, - // othwerwise, lets select on a notification that the parker is available + // otherwise, lets select on a notification that the parker is available // or the future is complete. loop { if let Some(inner) = &mut self.take_inner() { diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs index 9f98d89..791e405 100644 --- a/src/runtime/blocking/pool.rs +++ b/src/runtime/blocking/pool.rs @@ -9,6 +9,7 @@ use crate::runtime::builder::ThreadNameFn; use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle}; +use crate::util::error::CONTEXT_MISSING_ERROR; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -81,7 +82,7 @@ where F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let rt = context::current().expect("not currently running on the Tokio runtime."); + let rt = context::current().expect(CONTEXT_MISSING_ERROR); rt.spawn_blocking(func) } @@ -91,7 +92,7 @@ where F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let rt = context::current().expect("not currently running on the Tokio runtime."); + let rt = context::current().expect(CONTEXT_MISSING_ERROR); let (task, _handle) = task::joinable(BlockingTask::new(func)); rt.blocking_spawner.spawn(task, &rt) diff --git a/src/runtime/builder.rs b/src/runtime/builder.rs index 1f8892e..e845192 100644 --- a/src/runtime/builder.rs +++ b/src/runtime/builder.rs @@ -47,6 +47,9 @@ pub struct Builder { /// Whether or not to enable the time driver enable_time: bool, + /// Whether or not the clock should start paused. + start_paused: bool, + /// The number of worker threads, used by Runtime. /// /// Only used when not using the current-thread executor. @@ -110,6 +113,9 @@ impl Builder { // Time defaults to "off" enable_time: false, + // The clock starts not-paused + start_paused: false, + // Default to lazy auto-detection (one thread per CPU core) worker_threads: None, @@ -386,6 +392,7 @@ impl Builder { }, enable_io: self.enable_io, enable_time: self.enable_time, + start_paused: self.start_paused, } } @@ -489,6 +496,31 @@ cfg_time! { } } +cfg_test_util! { + impl Builder { + /// Controls if the runtime's clock starts paused or advancing. + /// + /// Pausing time requires the current-thread runtime; construction of + /// the runtime will panic otherwise. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new_current_thread() + /// .enable_time() + /// .start_paused(true) + /// .build() + /// .unwrap(); + /// ``` + pub fn start_paused(&mut self, start_paused: bool) -> &mut Self { + self.start_paused = start_paused; + self + } + } +} + cfg_rt_multi_thread! { impl Builder { fn build_threaded_runtime(&mut self) -> io::Result<Runtime> { diff --git a/src/runtime/context.rs b/src/runtime/context.rs index 0817019..6e4a016 100644 --- a/src/runtime/context.rs +++ b/src/runtime/context.rs @@ -13,9 +13,9 @@ pub(crate) fn current() -> Option<Handle> { cfg_io_driver! { pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle { - CONTEXT.with(|ctx| match *ctx.borrow() { - Some(ref ctx) => ctx.io_handle.clone(), - None => Default::default(), + CONTEXT.with(|ctx| { + let ctx = ctx.borrow(); + ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).io_handle.clone() }) } } @@ -23,18 +23,18 @@ cfg_io_driver! { cfg_signal_internal! { #[cfg(unix)] pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle { - CONTEXT.with(|ctx| match *ctx.borrow() { - Some(ref ctx) => ctx.signal_handle.clone(), - None => Default::default(), + CONTEXT.with(|ctx| { + let ctx = ctx.borrow(); + ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).signal_handle.clone() }) } } cfg_time! { pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle { - CONTEXT.with(|ctx| match *ctx.borrow() { - Some(ref ctx) => ctx.time_handle.clone(), - None => Default::default(), + CONTEXT.with(|ctx| { + let ctx = ctx.borrow(); + ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).time_handle.clone() }) } diff --git a/src/runtime/driver.rs b/src/runtime/driver.rs index b89fa4f..a0e8e23 100644 --- a/src/runtime/driver.rs +++ b/src/runtime/driver.rs @@ -103,8 +103,8 @@ cfg_time! { pub(crate) type Clock = crate::time::Clock; pub(crate) type TimeHandle = Option<crate::time::driver::Handle>; - fn create_clock(enable_pausing: bool) -> Clock { - crate::time::Clock::new(enable_pausing) + fn create_clock(enable_pausing: bool, start_paused: bool) -> Clock { + crate::time::Clock::new(enable_pausing, start_paused) } fn create_time_driver( @@ -131,7 +131,7 @@ cfg_not_time! { pub(crate) type Clock = (); pub(crate) type TimeHandle = (); - fn create_clock(_enable_pausing: bool) -> Clock { + fn create_clock(_enable_pausing: bool, _start_paused: bool) -> Clock { () } @@ -162,13 +162,15 @@ pub(crate) struct Cfg { pub(crate) enable_io: bool, pub(crate) enable_time: bool, pub(crate) enable_pause_time: bool, + pub(crate) start_paused: bool, } impl Driver { pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> { let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?; - let clock = create_clock(cfg.enable_pause_time); + let clock = create_clock(cfg.enable_pause_time, cfg.start_paused); + let (time_driver, time_handle) = create_time_driver(cfg.enable_time, io_stack, clock.clone()); diff --git a/src/runtime/handle.rs b/src/runtime/handle.rs index 6ff3c39..76b28f2 100644 --- a/src/runtime/handle.rs +++ b/src/runtime/handle.rs @@ -1,6 +1,7 @@ use crate::runtime::blocking::task::BlockingTask; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{blocking, context, driver, Spawner}; +use crate::util::error::CONTEXT_MISSING_ERROR; use std::future::Future; use std::{error, fmt}; @@ -97,7 +98,7 @@ impl Handle { /// # } /// ``` pub fn current() -> Self { - context::current().expect("not currently running on the Tokio runtime.") + context::current().expect(CONTEXT_MISSING_ERROR) } /// Returns a Handle view over the currently running Runtime @@ -213,7 +214,7 @@ impl fmt::Debug for TryCurrentError { impl fmt::Display for TryCurrentError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("no tokio Runtime has been initialized") + f.write_str(CONTEXT_MISSING_ERROR) } } diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 2c90acb..b138a66 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -10,7 +10,7 @@ //! //! Tokio's [`Runtime`] bundles all of these services as a single type, allowing //! them to be started, shut down, and configured together. However, often it is -//! not required to configure a [`Runtime`] manually, and user may just use the +//! not required to configure a [`Runtime`] manually, and a user may just use the //! [`tokio::main`] attribute macro, which creates a [`Runtime`] under the hood. //! //! # Usage @@ -114,7 +114,7 @@ //! //! The multi-thread scheduler executes futures on a _thread pool_, using a //! work-stealing strategy. By default, it will start a worker thread for each -//! CPU core available on the system. This tends to be the ideal configurations +//! CPU core available on the system. This tends to be the ideal configuration //! for most applications. The multi-thread scheduler requires the `rt-multi-thread` //! feature flag, and is selected by default: //! ``` diff --git a/src/runtime/queue.rs b/src/runtime/queue.rs index cdf4009..1c7bb23 100644 --- a/src/runtime/queue.rs +++ b/src/runtime/queue.rs @@ -8,7 +8,7 @@ use crate::runtime::task; use std::marker::PhantomData; use std::mem::MaybeUninit; use std::ptr::{self, NonNull}; -use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; /// Producer handle. May only be used from a single thread. pub(super) struct Local<T: 'static> { @@ -194,13 +194,17 @@ impl<T> Local<T> { // work. This is because all tasks are pushed into the queue from the // current thread (or memory has been acquired if the local queue handle // moved). - let actual = self.inner.head.compare_and_swap( - prev, - pack(head.wrapping_add(n), head.wrapping_add(n)), - Release, - ); - - if actual != prev { + if self + .inner + .head + .compare_exchange( + prev, + pack(head.wrapping_add(n), head.wrapping_add(n)), + Release, + Relaxed, + ) + .is_err() + { // We failed to claim the tasks, losing the race. Return out of // this function and try the full `push` routine again. The queue // may not be full anymore. @@ -231,7 +235,7 @@ impl<T> Local<T> { // tasks and we are the only producer. self.inner.buffer[i_idx].with_mut(|ptr| unsafe { let ptr = (*ptr).as_ptr(); - (*ptr).header().queue_next.with_mut(|ptr| *ptr = Some(next)); + (*ptr).header().set_next(Some(next)) }); } @@ -606,7 +610,7 @@ fn get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>> { fn set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>) { unsafe { - header.as_ref().queue_next.with_mut(|ptr| *ptr = val); + header.as_ref().set_next(val); } } diff --git a/src/runtime/task/core.rs b/src/runtime/task/core.rs index dfa8764..9f7ff55 100644 --- a/src/runtime/task/core.rs +++ b/src/runtime/task/core.rs @@ -12,7 +12,6 @@ use crate::loom::cell::UnsafeCell; use crate::runtime::task::raw::{self, Vtable}; use crate::runtime::task::state::State; -use crate::runtime::task::waker::waker_ref; use crate::runtime::task::{Notified, Schedule, Task}; use crate::util::linked_list; @@ -37,15 +36,23 @@ pub(super) struct Cell<T: Future, S> { pub(super) trailer: Trailer, } +pub(super) struct Scheduler<S> { + scheduler: UnsafeCell<Option<S>>, +} + +pub(super) struct CoreStage<T: Future> { + stage: UnsafeCell<Stage<T>>, +} + /// The core of the task. /// /// Holds the future or output, depending on the stage of execution. pub(super) struct Core<T: Future, S> { /// Scheduler used to drive this future - pub(super) scheduler: UnsafeCell<Option<S>>, + pub(super) scheduler: Scheduler<S>, /// Either the future or the output - pub(super) stage: UnsafeCell<Stage<T>>, + pub(super) stage: CoreStage<T>, } /// Crate public as this is also needed by the pool. @@ -95,8 +102,12 @@ impl<T: Future, S: Schedule> Cell<T, S> { vtable: raw::vtable::<T, S>(), }, core: Core { - scheduler: UnsafeCell::new(None), - stage: UnsafeCell::new(Stage::Running(future)), + scheduler: Scheduler { + scheduler: UnsafeCell::new(None), + }, + stage: CoreStage { + stage: UnsafeCell::new(Stage::Running(future)), + }, }, trailer: Trailer { waker: UnsafeCell::new(None), @@ -105,7 +116,11 @@ impl<T: Future, S: Schedule> Cell<T, S> { } } -impl<T: Future, S: Schedule> Core<T, S> { +impl<S: Schedule> Scheduler<S> { + pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Option<S>) -> R) -> R { + self.scheduler.with_mut(f) + } + /// Bind a scheduler to the task. /// /// This only happens on the first poll and must be preceeded by a call to @@ -140,6 +155,58 @@ impl<T: Future, S: Schedule> Core<T, S> { self.scheduler.with(|ptr| unsafe { (*ptr).is_some() }) } + /// Schedule the future for execution + pub(super) fn schedule(&self, task: Notified<S>) { + self.scheduler.with(|ptr| { + // Safety: Can only be called after initial `poll`, which is the + // only time the field is mutated. + match unsafe { &*ptr } { + Some(scheduler) => scheduler.schedule(task), + None => panic!("no scheduler set"), + } + }); + } + + /// Schedule the future for execution in the near future, yielding the + /// thread to other tasks. + pub(super) fn yield_now(&self, task: Notified<S>) { + self.scheduler.with(|ptr| { + // Safety: Can only be called after initial `poll`, which is the + // only time the field is mutated. + match unsafe { &*ptr } { + Some(scheduler) => scheduler.yield_now(task), + None => panic!("no scheduler set"), + } + }); + } + + /// Release the task + /// + /// If the `Scheduler` implementation is able to, it returns the `Task` + /// handle immediately. The caller of this function will batch a ref-dec + /// with a state change. + pub(super) fn release(&self, task: Task<S>) -> Option<Task<S>> { + use std::mem::ManuallyDrop; + + let task = ManuallyDrop::new(task); + + self.scheduler.with(|ptr| { + // Safety: Can only be called after initial `poll`, which is the + // only time the field is mutated. + match unsafe { &*ptr } { + Some(scheduler) => scheduler.release(&*task), + // Task was never polled + None => None, + } + }) + } +} + +impl<T: Future> CoreStage<T> { + pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Stage<T>) -> R) -> R { + self.stage.with_mut(f) + } + /// Poll the future /// /// # Safety @@ -153,7 +220,7 @@ impl<T: Future, S: Schedule> Core<T, S> { /// /// `self` must also be pinned. This is handled by storing the task on the /// heap. - pub(super) fn poll(&self, header: &Header) -> Poll<T::Output> { + pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> { let res = { self.stage.with_mut(|ptr| { // Safety: The caller ensures mutual exclusion to the field. @@ -165,11 +232,6 @@ impl<T: Future, S: Schedule> Core<T, S> { // Safety: The caller ensures the future is pinned. let future = unsafe { Pin::new_unchecked(future) }; - // The waker passed into the `poll` function does not require a ref - // count increment. - let waker_ref = waker_ref::<T, S>(header); - let mut cx = Context::from_waker(&*waker_ref); - future.poll(&mut cx) }) }; @@ -187,10 +249,10 @@ impl<T: Future, S: Schedule> Core<T, S> { /// /// The caller must ensure it is safe to mutate the `stage` field. pub(super) fn drop_future_or_output(&self) { - self.stage.with_mut(|ptr| { - // Safety: The caller ensures mutal exclusion to the field. - unsafe { *ptr = Stage::Consumed }; - }); + // Safety: the caller ensures mutual exclusion to the field. + unsafe { + self.set_stage(Stage::Consumed); + } } /// Store the task output @@ -199,10 +261,10 @@ impl<T: Future, S: Schedule> Core<T, S> { /// /// The caller must ensure it is safe to mutate the `stage` field. pub(super) fn store_output(&self, output: super::Result<T::Output>) { - self.stage.with_mut(|ptr| { - // Safety: the caller ensures mutual exclusion to the field. - unsafe { *ptr = Stage::Finished(output) }; - }); + // Safety: the caller ensures mutual exclusion to the field. + unsafe { + self.set_stage(Stage::Finished(output)); + } } /// Take the task output @@ -222,50 +284,8 @@ impl<T: Future, S: Schedule> Core<T, S> { }) } - /// Schedule the future for execution - pub(super) fn schedule(&self, task: Notified<S>) { - self.scheduler.with(|ptr| { - // Safety: Can only be called after initial `poll`, which is the - // only time the field is mutated. - match unsafe { &*ptr } { - Some(scheduler) => scheduler.schedule(task), - None => panic!("no scheduler set"), - } - }); - } - - /// Schedule the future for execution in the near future, yielding the - /// thread to other tasks. - pub(super) fn yield_now(&self, task: Notified<S>) { - self.scheduler.with(|ptr| { - // Safety: Can only be called after initial `poll`, which is the - // only time the field is mutated. - match unsafe { &*ptr } { - Some(scheduler) => scheduler.yield_now(task), - None => panic!("no scheduler set"), - } - }); - } - - /// Release the task - /// - /// If the `Scheduler` implementation is able to, it returns the `Task` - /// handle immediately. The caller of this function will batch a ref-dec - /// with a state change. - pub(super) fn release(&self, task: Task<S>) -> Option<Task<S>> { - use std::mem::ManuallyDrop; - - let task = ManuallyDrop::new(task); - - self.scheduler.with(|ptr| { - // Safety: Can only be called after initial `poll`, which is the - // only time the field is mutated. - match unsafe { &*ptr } { - Some(scheduler) => scheduler.release(&*task), - // Task was never polled - None => None, - } - }) + unsafe fn set_stage(&self, stage: Stage<T>) { + self.stage.with_mut(|ptr| *ptr = stage) } } @@ -277,6 +297,30 @@ cfg_rt_multi_thread! { let task = unsafe { RawTask::from_raw(self.into()) }; task.shutdown(); } + + pub(crate) unsafe fn set_next(&self, next: Option<NonNull<Header>>) { + self.queue_next.with_mut(|ptr| *ptr = next); + } + } +} + +impl Trailer { + pub(crate) unsafe fn set_waker(&self, waker: Option<Waker>) { + self.waker.with_mut(|ptr| { + *ptr = waker; + }); + } + + pub(crate) unsafe fn will_wake(&self, waker: &Waker) -> bool { + self.waker + .with(|ptr| (*ptr).as_ref().unwrap().will_wake(waker)) + } + + pub(crate) fn wake_join(&self) { + self.waker.with(|ptr| match unsafe { &*ptr } { + Some(waker) => waker.wake_by_ref(), + None => panic!("waker missing"), + }); } } diff --git a/src/runtime/task/harness.rs b/src/runtime/task/harness.rs index 208d48c..7d596e3 100644 --- a/src/runtime/task/harness.rs +++ b/src/runtime/task/harness.rs @@ -1,12 +1,13 @@ -use crate::runtime::task::core::{Cell, Core, Header, Trailer}; +use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Scheduler, Trailer}; use crate::runtime::task::state::Snapshot; +use crate::runtime::task::waker::waker_ref; use crate::runtime::task::{JoinError, Notified, Schedule, Task}; use std::future::Future; use std::mem; use std::panic; use std::ptr::NonNull; -use std::task::{Poll, Waker}; +use std::task::{Context, Poll, Waker}; /// Typed raw task handle pub(super) struct Harness<T: Future, S: 'static> { @@ -35,6 +36,13 @@ where fn core(&self) -> &Core<T, S> { unsafe { &self.cell.as_ref().core } } + + fn scheduler_view(&self) -> SchedulerView<'_, S> { + SchedulerView { + header: self.header(), + scheduler: &self.core().scheduler, + } + } } impl<T, S> Harness<T, S> @@ -48,102 +56,46 @@ where /// /// Panics raised while polling the future are handled. pub(super) fn poll(self) { - // If this is the first time the task is polled, the task will be bound - // to the scheduler, in which case the task ref count must be - // incremented. - let is_not_bound = !self.core().is_bound(); - - // Transition the task to the running state. - // - // A failure to transition here indicates the task has been cancelled - // while in the run queue pending execution. - let snapshot = match self.header().state.transition_to_running(is_not_bound) { - Ok(snapshot) => snapshot, - Err(_) => { - // The task was shutdown while in the run queue. At this point, - // we just hold a ref counted reference. Drop it here. + match self.poll_inner() { + PollFuture::Notified => { + // Signal yield + self.core().scheduler.yield_now(Notified(self.to_task())); + // The ref-count was incremented as part of + // `transition_to_idle`. self.drop_reference(); - return; } - }; - - if is_not_bound { - // Ensure the task is bound to a scheduler instance. Since this is - // the first time polling the task, a scheduler instance is pulled - // from the local context and assigned to the task. - // - // The scheduler maintains ownership of the task and responds to - // `wake` calls. - // - // The task reference count has been incremented. - // - // Safety: Since we have unique access to the task so that we can - // safely call `bind_scheduler`. - self.core().bind_scheduler(self.to_task()); + PollFuture::DropReference => { + self.drop_reference(); + } + PollFuture::Complete(out, is_join_interested) => { + self.complete(out, is_join_interested); + } + PollFuture::None => (), } + } + + fn poll_inner(&self) -> PollFuture<T::Output> { + let snapshot = match self.scheduler_view().transition_to_running() { + TransitionToRunning::Ok(snapshot) => snapshot, + TransitionToRunning::DropReference => return PollFuture::DropReference, + }; // The transition to `Running` done above ensures that a lock on the // future has been obtained. This also ensures the `*mut T` pointer // contains the future (as opposed to the output) and is initialized. - let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { - struct Guard<'a, T: Future, S: Schedule> { - core: &'a Core<T, S>, - } - - impl<T: Future, S: Schedule> Drop for Guard<'_, T, S> { - fn drop(&mut self) { - self.core.drop_future_or_output(); - } - } - - let guard = Guard { core: self.core() }; - - // If the task is cancelled, avoid polling it, instead signalling it - // is complete. - if snapshot.is_cancelled() { - Poll::Ready(Err(JoinError::cancelled())) - } else { - let res = guard.core.poll(self.header()); - - // prevent the guard from dropping the future - mem::forget(guard); - - res.map(Ok) - } - })); - - match res { - Ok(Poll::Ready(out)) => { - self.complete(out, snapshot.is_join_interested()); - } - Ok(Poll::Pending) => { - match self.header().state.transition_to_idle() { - Ok(snapshot) => { - if snapshot.is_notified() { - // Signal yield - self.core().yield_now(Notified(self.to_task())); - // The ref-count was incremented as part of - // `transition_to_idle`. - self.drop_reference(); - } - } - Err(_) => self.cancel_task(), - } - } - Err(err) => { - self.complete(Err(JoinError::panic(err)), snapshot.is_join_interested()); - } - } + let waker_ref = waker_ref::<T, S>(self.header()); + let cx = Context::from_waker(&*waker_ref); + poll_future(self.header(), &self.core().stage, snapshot, cx) } pub(super) fn dealloc(self) { // Release the join waker, if there is one. - self.trailer().waker.with_mut(|_| ()); + self.trailer().waker.with_mut(drop); // Check causality - self.core().stage.with_mut(|_| {}); - self.core().scheduler.with_mut(|_| {}); + self.core().stage.with_mut(drop); + self.core().scheduler.with_mut(drop); unsafe { drop(Box::from_raw(self.cell.as_ptr())); @@ -154,83 +106,9 @@ where /// Read the task output into `dst`. pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) { - // Load a snapshot of the current task state - let snapshot = self.header().state.load(); - - debug_assert!(snapshot.is_join_interested()); - - if !snapshot.is_complete() { - // The waker must be stored in the task struct. - let res = if snapshot.has_join_waker() { - // There already is a waker stored in the struct. If it matches - // the provided waker, then there is no further work to do. - // Otherwise, the waker must be swapped. - let will_wake = unsafe { - // Safety: when `JOIN_INTEREST` is set, only `JOIN_HANDLE` - // may mutate the `waker` field. - self.trailer() - .waker - .with(|ptr| (*ptr).as_ref().unwrap().will_wake(waker)) - }; - - if will_wake { - // The task is not complete **and** the waker is up to date, - // there is nothing further that needs to be done. - return; - } - - // Unset the `JOIN_WAKER` to gain mutable access to the `waker` - // field then update the field with the new join worker. - // - // This requires two atomic operations, unsetting the bit and - // then resetting it. If the task transitions to complete - // concurrently to either one of those operations, then setting - // the join waker fails and we proceed to reading the task - // output. - self.header() - .state - .unset_waker() - .and_then(|snapshot| self.set_join_waker(waker.clone(), snapshot)) - } else { - self.set_join_waker(waker.clone(), snapshot) - }; - - match res { - Ok(_) => return, - Err(snapshot) => { - assert!(snapshot.is_complete()); - } - } + if can_read_output(self.header(), self.trailer(), waker) { + *dst = Poll::Ready(self.core().stage.take_output()); } - - *dst = Poll::Ready(self.core().take_output()); - } - - fn set_join_waker(&self, waker: Waker, snapshot: Snapshot) -> Result<Snapshot, Snapshot> { - assert!(snapshot.is_join_interested()); - assert!(!snapshot.has_join_waker()); - - // Safety: Only the `JoinHandle` may set the `waker` field. When - // `JOIN_INTEREST` is **not** set, nothing else will touch the field. - unsafe { - self.trailer().waker.with_mut(|ptr| { - *ptr = Some(waker); - }); - } - - // Update the `JoinWaker` state accordingly - let res = self.header().state.set_join_waker(); - - // If the state could not be updated, then clear the join waker - if res.is_err() { - unsafe { - self.trailer().waker.with_mut(|ptr| { - *ptr = None; - }); - } - } - - res } pub(super) fn drop_join_handle_slow(self) { @@ -242,7 +120,7 @@ where // the scheduler or `JoinHandle`. i.e. if the output remains in the // task structure until the task is deallocated, it may be dropped // by a Waker on any arbitrary thread. - self.core().drop_future_or_output(); + self.core().stage.drop_future_or_output(); } // Drop the `JoinHandle` reference, possibly deallocating the task @@ -258,7 +136,7 @@ where pub(super) fn wake_by_ref(&self) { if self.header().state.transition_to_notified() { - self.core().schedule(Notified(self.to_task())); + self.core().scheduler.schedule(Notified(self.to_task())); } } @@ -282,44 +160,65 @@ where // By transitioning the lifcycle to `Running`, we have permission to // drop the future. - self.cancel_task(); + let err = cancel_task(&self.core().stage); + self.complete(Err(err), true) } // ====== internal ====== - fn cancel_task(self) { - // Drop the future from a panic guard. - let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { - self.core().drop_future_or_output(); - })); - - if let Err(err) = res { - // Dropping the future panicked, complete the join - // handle with the panic to avoid dropping the panic - // on the ground. - self.complete(Err(JoinError::panic(err)), true); - } else { - self.complete(Err(JoinError::cancelled()), true); - } - } - - fn complete(mut self, output: super::Result<T::Output>, is_join_interested: bool) { + fn complete(self, output: super::Result<T::Output>, is_join_interested: bool) { if is_join_interested { // Store the output. The future has already been dropped // // Safety: Mutual exclusion is obtained by having transitioned the task // state -> Running - self.core().store_output(output); + let stage = &self.core().stage; + stage.store_output(output); // Transition to `Complete`, notifying the `JoinHandle` if necessary. - self.transition_to_complete(); + transition_to_complete(self.header(), stage, &self.trailer()); } // The task has completed execution and will no longer be scheduled. // // Attempts to batch a ref-dec with the state transition below. - let ref_dec = if self.core().is_bound() { - if let Some(task) = self.core().release(self.to_task()) { + + if self + .scheduler_view() + .transition_to_terminal(is_join_interested) + { + self.dealloc() + } + } + + fn to_task(&self) -> Task<S> { + self.scheduler_view().to_task() + } +} + +enum TransitionToRunning { + Ok(Snapshot), + DropReference, +} + +struct SchedulerView<'a, S> { + header: &'a Header, + scheduler: &'a Scheduler<S>, +} + +impl<'a, S> SchedulerView<'a, S> +where + S: Schedule, +{ + fn to_task(&self) -> Task<S> { + // SAFETY The header is from the same struct containing the scheduler `S` so the cast is safe + unsafe { Task::from_raw(self.header.into()) } + } + + /// Returns true if the task should be deallocated. + fn transition_to_terminal(&self, is_join_interested: bool) -> bool { + let ref_dec = if self.scheduler.is_bound() { + if let Some(task) = self.scheduler.release(self.to_task()) { mem::forget(task); true } else { @@ -331,41 +230,217 @@ where // This might deallocate let snapshot = self - .header() + .header .state .transition_to_terminal(!is_join_interested, ref_dec); - if snapshot.ref_count() == 0 { - self.dealloc() + snapshot.ref_count() == 0 + } + + fn transition_to_running(&self) -> TransitionToRunning { + // If this is the first time the task is polled, the task will be bound + // to the scheduler, in which case the task ref count must be + // incremented. + let is_not_bound = !self.scheduler.is_bound(); + + // Transition the task to the running state. + // + // A failure to transition here indicates the task has been cancelled + // while in the run queue pending execution. + let snapshot = match self.header.state.transition_to_running(is_not_bound) { + Ok(snapshot) => snapshot, + Err(_) => { + // The task was shutdown while in the run queue. At this point, + // we just hold a ref counted reference. Since we do not have access to it here + // return `DropReference` so the caller drops it. + return TransitionToRunning::DropReference; + } + }; + + if is_not_bound { + // Ensure the task is bound to a scheduler instance. Since this is + // the first time polling the task, a scheduler instance is pulled + // from the local context and assigned to the task. + // + // The scheduler maintains ownership of the task and responds to + // `wake` calls. + // + // The task reference count has been incremented. + // + // Safety: Since we have unique access to the task so that we can + // safely call `bind_scheduler`. + self.scheduler.bind_scheduler(self.to_task()); } + TransitionToRunning::Ok(snapshot) } +} - /// Transitions the task's lifecycle to `Complete`. Notifies the - /// `JoinHandle` if it still has interest in the completion. - fn transition_to_complete(&mut self) { - // Transition the task's lifecycle to `Complete` and get a snapshot of - // the task's sate. - let snapshot = self.header().state.transition_to_complete(); - - if !snapshot.is_join_interested() { - // The `JoinHandle` is not interested in the output of this task. It - // is our responsibility to drop the output. - self.core().drop_future_or_output(); - } else if snapshot.has_join_waker() { - // Notify the join handle. The previous transition obtains the - // lock on the waker cell. - self.wake_join(); +/// Transitions the task's lifecycle to `Complete`. Notifies the +/// `JoinHandle` if it still has interest in the completion. +fn transition_to_complete<T>(header: &Header, stage: &CoreStage<T>, trailer: &Trailer) +where + T: Future, +{ + // Transition the task's lifecycle to `Complete` and get a snapshot of + // the task's sate. + let snapshot = header.state.transition_to_complete(); + + if !snapshot.is_join_interested() { + // The `JoinHandle` is not interested in the output of this task. It + // is our responsibility to drop the output. + stage.drop_future_or_output(); + } else if snapshot.has_join_waker() { + // Notify the join handle. The previous transition obtains the + // lock on the waker cell. + trailer.wake_join(); + } +} + +fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool { + // Load a snapshot of the current task state + let snapshot = header.state.load(); + + debug_assert!(snapshot.is_join_interested()); + + if !snapshot.is_complete() { + // The waker must be stored in the task struct. + let res = if snapshot.has_join_waker() { + // There already is a waker stored in the struct. If it matches + // the provided waker, then there is no further work to do. + // Otherwise, the waker must be swapped. + let will_wake = unsafe { + // Safety: when `JOIN_INTEREST` is set, only `JOIN_HANDLE` + // may mutate the `waker` field. + trailer.will_wake(waker) + }; + + if will_wake { + // The task is not complete **and** the waker is up to date, + // there is nothing further that needs to be done. + return false; + } + + // Unset the `JOIN_WAKER` to gain mutable access to the `waker` + // field then update the field with the new join worker. + // + // This requires two atomic operations, unsetting the bit and + // then resetting it. If the task transitions to complete + // concurrently to either one of those operations, then setting + // the join waker fails and we proceed to reading the task + // output. + header + .state + .unset_waker() + .and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot)) + } else { + set_join_waker(header, trailer, waker.clone(), snapshot) + }; + + match res { + Ok(_) => return false, + Err(snapshot) => { + assert!(snapshot.is_complete()); + } } } + true +} - fn wake_join(&self) { - self.trailer().waker.with(|ptr| match unsafe { &*ptr } { - Some(waker) => waker.wake_by_ref(), - None => panic!("waker missing"), - }); +fn set_join_waker( + header: &Header, + trailer: &Trailer, + waker: Waker, + snapshot: Snapshot, +) -> Result<Snapshot, Snapshot> { + assert!(snapshot.is_join_interested()); + assert!(!snapshot.has_join_waker()); + + // Safety: Only the `JoinHandle` may set the `waker` field. When + // `JOIN_INTEREST` is **not** set, nothing else will touch the field. + unsafe { + trailer.set_waker(Some(waker)); } - fn to_task(&self) -> Task<S> { - unsafe { Task::from_raw(self.header().into()) } + // Update the `JoinWaker` state accordingly + let res = header.state.set_join_waker(); + + // If the state could not be updated, then clear the join waker + if res.is_err() { + unsafe { + trailer.set_waker(None); + } + } + + res +} + +enum PollFuture<T> { + Complete(Result<T, JoinError>, bool), + DropReference, + Notified, + None, +} + +fn cancel_task<T: Future>(stage: &CoreStage<T>) -> JoinError { + // Drop the future from a panic guard. + let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { + stage.drop_future_or_output(); + })); + + if let Err(err) = res { + // Dropping the future panicked, complete the join + // handle with the panic to avoid dropping the panic + // on the ground. + JoinError::panic(err) + } else { + JoinError::cancelled() + } +} + +fn poll_future<T: Future>( + header: &Header, + core: &CoreStage<T>, + snapshot: Snapshot, + cx: Context<'_>, +) -> PollFuture<T::Output> { + if snapshot.is_cancelled() { + PollFuture::Complete(Err(JoinError::cancelled()), snapshot.is_join_interested()) + } else { + let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { + struct Guard<'a, T: Future> { + core: &'a CoreStage<T>, + } + + impl<T: Future> Drop for Guard<'_, T> { + fn drop(&mut self) { + self.core.drop_future_or_output(); + } + } + + let guard = Guard { core }; + + let res = guard.core.poll(cx); + + // prevent the guard from dropping the future + mem::forget(guard); + + res + })); + match res { + Ok(Poll::Pending) => match header.state.transition_to_idle() { + Ok(snapshot) => { + if snapshot.is_notified() { + PollFuture::Notified + } else { + PollFuture::None + } + } + Err(_) => PollFuture::Complete(Err(cancel_task(core)), true), + }, + Ok(Poll::Ready(ok)) => PollFuture::Complete(Ok(ok), snapshot.is_join_interested()), + Err(err) => { + PollFuture::Complete(Err(JoinError::panic(err)), snapshot.is_join_interested()) + } + } } } diff --git a/src/runtime/thread_pool/idle.rs b/src/runtime/thread_pool/idle.rs index 6e692fd..b77cce5 100644 --- a/src/runtime/thread_pool/idle.rs +++ b/src/runtime/thread_pool/idle.rs @@ -118,7 +118,7 @@ impl Idle { if sleepers[index] == worker_id { sleepers.swap_remove(index); - // Update the state accordingly whle the lock is held. + // Update the state accordingly while the lock is held. State::unpark_one(&self.state); return; diff --git a/src/runtime/thread_pool/worker.rs b/src/runtime/thread_pool/worker.rs index 31712e4..dbd1aff 100644 --- a/src/runtime/thread_pool/worker.rs +++ b/src/runtime/thread_pool/worker.rs @@ -337,7 +337,7 @@ impl Context { } fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult { - // Make sure thew orker is not in the **searching** state. This enables + // Make sure the worker is not in the **searching** state. This enables // another idle worker to try to steal work. core.transition_from_searching(&self.worker); diff --git a/src/signal/unix.rs b/src/signal/unix.rs index fc0f16d..0de875a 100644 --- a/src/signal/unix.rs +++ b/src/signal/unix.rs @@ -397,7 +397,41 @@ impl Signal { poll_fn(|cx| self.poll_recv(cx)).await } - pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + /// Polls to receive the next signal notification event, outside of an + /// `async` context. + /// + /// This method returns: + /// + /// * `Poll::Pending` if no signals are available but the channel is not + /// closed. + /// * `Poll::Ready(Some(()))` if a signal is available. + /// * `Poll::Ready(None)` if the channel has been closed and all signals + /// sent before it was closed have been received. + /// + /// # Examples + /// + /// Polling from a manually implemented future + /// + /// ```rust,no_run + /// use std::pin::Pin; + /// use std::future::Future; + /// use std::task::{Context, Poll}; + /// use tokio::signal::unix::Signal; + /// + /// struct MyFuture { + /// signal: Signal, + /// } + /// + /// impl Future for MyFuture { + /// type Output = Option<()>; + /// + /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + /// println!("polling MyFuture"); + /// self.signal.poll_recv(cx) + /// } + /// } + /// ``` + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { self.rx.poll_recv(cx) } diff --git a/src/sync/broadcast.rs b/src/sync/broadcast.rs index 1b94600..3ef8f84 100644 --- a/src/sync/broadcast.rs +++ b/src/sync/broadcast.rs @@ -163,6 +163,11 @@ pub struct Sender<T> { /// Must not be used concurrently. Messages may be retrieved using /// [`recv`][Receiver::recv]. /// +/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`] +/// wrapper. +/// +/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html +/// /// # Examples /// /// ``` @@ -361,38 +366,16 @@ struct RecvGuard<'a, T> { } /// Receive a value future -struct Recv<R, T> -where - R: AsMut<Receiver<T>>, -{ +struct Recv<'a, T> { /// Receiver being waited on - receiver: R, + receiver: &'a mut Receiver<T>, /// Entry in the waiter `LinkedList` waiter: UnsafeCell<Waiter>, - - _p: std::marker::PhantomData<T>, -} - -/// `AsMut<T>` is not implemented for `T` (coherence). Explicitly implementing -/// `AsMut` for `Receiver` would be included in the public API of the receiver -/// type. Instead, `Borrow` is used internally to bridge the gap. -struct Borrow<T>(T); - -impl<T> AsMut<Receiver<T>> for Borrow<Receiver<T>> { - fn as_mut(&mut self) -> &mut Receiver<T> { - &mut self.0 - } -} - -impl<'a, T> AsMut<Receiver<T>> for Borrow<&'a mut Receiver<T>> { - fn as_mut(&mut self) -> &mut Receiver<T> { - &mut *self.0 - } } -unsafe impl<R: AsMut<Receiver<T>> + Send, T: Send> Send for Recv<R, T> {} -unsafe impl<R: AsMut<Receiver<T>> + Sync, T: Send> Sync for Recv<R, T> {} +unsafe impl<'a, T: Send> Send for Recv<'a, T> {} +unsafe impl<'a, T: Send> Sync for Recv<'a, T> {} /// Max number of receivers. Reserve space to lock. const MAX_RECEIVERS: usize = usize::MAX >> 2; @@ -892,7 +875,7 @@ impl<T: Clone> Receiver<T> { /// } /// ``` pub async fn recv(&mut self) -> Result<T, RecvError> { - let fut = Recv::<_, T>::new(Borrow(self)); + let fut = Recv::new(self); fut.await } @@ -951,7 +934,7 @@ impl<T> Drop for Receiver<T> { drop(tail); - while self.next != until { + while self.next < until { match self.recv_ref(None) { Ok(_) => {} // The channel is closed @@ -965,11 +948,8 @@ impl<T> Drop for Receiver<T> { } } -impl<R, T> Recv<R, T> -where - R: AsMut<Receiver<T>>, -{ - fn new(receiver: R) -> Recv<R, T> { +impl<'a, T> Recv<'a, T> { + fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> { Recv { receiver, waiter: UnsafeCell::new(Waiter { @@ -978,7 +958,6 @@ where pointers: linked_list::Pointers::new(), _p: PhantomPinned, }), - _p: std::marker::PhantomData, } } @@ -990,14 +969,13 @@ where is_unpin::<&mut Receiver<T>>(); let me = self.get_unchecked_mut(); - (me.receiver.as_mut(), &me.waiter) + (me.receiver, &me.waiter) } } } -impl<R, T> Future for Recv<R, T> +impl<'a, T> Future for Recv<'a, T> where - R: AsMut<Receiver<T>>, T: Clone, { type Output = Result<T, RecvError>; @@ -1016,14 +994,11 @@ where } } -impl<R, T> Drop for Recv<R, T> -where - R: AsMut<Receiver<T>>, -{ +impl<'a, T> Drop for Recv<'a, T> { fn drop(&mut self) { // Acquire the tail lock. This is required for safety before accessing // the waiter node. - let mut tail = self.receiver.as_mut().shared.tail.lock(); + let mut tail = self.receiver.shared.tail.lock(); // safety: tail lock is held let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued }); diff --git a/src/sync/mpsc/block.rs b/src/sync/mpsc/block.rs index e062f2b..6bef794 100644 --- a/src/sync/mpsc/block.rs +++ b/src/sync/mpsc/block.rs @@ -258,13 +258,15 @@ impl<T> Block<T> { pub(crate) unsafe fn try_push( &self, block: &mut NonNull<Block<T>>, - ordering: Ordering, + success: Ordering, + failure: Ordering, ) -> Result<(), NonNull<Block<T>>> { block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP); let next_ptr = self .next - .compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering); + .compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure) + .unwrap_or_else(|x| x); match NonNull::new(next_ptr) { Some(next_ptr) => Err(next_ptr), @@ -306,11 +308,11 @@ impl<T> Block<T> { // // `Release` ensures that the newly allocated block is available to // other threads acquiring the next pointer. - let next = NonNull::new(self.next.compare_and_swap( - ptr::null_mut(), - new_block.as_ptr(), - AcqRel, - )); + let next = NonNull::new( + self.next + .compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire) + .unwrap_or_else(|x| x), + ); let next = match next { Some(next) => next, @@ -333,7 +335,7 @@ impl<T> Block<T> { // TODO: Should this iteration be capped? loop { - let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) }; + let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel, Acquire) }; curr = match actual { Ok(_) => { diff --git a/src/sync/mpsc/bounded.rs b/src/sync/mpsc/bounded.rs index 2dae7e2..dfe4a74 100644 --- a/src/sync/mpsc/bounded.rs +++ b/src/sync/mpsc/bounded.rs @@ -22,10 +22,11 @@ pub struct Sender<T> { /// Permit to send one value into the channel. /// -/// `Permit` values are returned by [`Sender::reserve()`] and are used to -/// guarantee channel capacity before generating a message to send. +/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`] +/// and are used to guarantee channel capacity before generating a message to send. /// /// [`Sender::reserve()`]: Sender::reserve +/// [`Sender::try_reserve()`]: Sender::try_reserve pub struct Permit<'a, T> { chan: &'a chan::Tx<T, Semaphore>, } @@ -33,6 +34,10 @@ pub struct Permit<'a, T> { /// Receive values from the associated `Sender`. /// /// Instances are created by the [`channel`](channel) function. +/// +/// This receiver can be turned into a `Stream` using [`ReceiverStream`]. +/// +/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html pub struct Receiver<T> { /// The channel receiver chan: chan::Rx<T, Semaphore>, @@ -104,8 +109,21 @@ impl<T> Receiver<T> { /// Receives the next value for this receiver. /// - /// `None` is returned when all `Sender` halves have dropped, indicating - /// that no further values can be sent on the channel. + /// This method returns `None` if the channel has been closed and there are + /// no remaining messages in the channel's buffer. This indicates that no + /// further values can ever be received from this `Receiver`. The channel is + /// closed when all senders have been dropped, or when [`close`] is called. + /// + /// If there are no messages in the channel's buffer, but the channel has + /// not yet been closed, this method will sleep until a message is sent or + /// the channel is closed. + /// + /// Note that if [`close`] is called, but there are still outstanding + /// [`Permits`] from before it was closed, the channel is not considered + /// closed by `recv` until the permits are released. + /// + /// [`close`]: Self::close + /// [`Permits`]: struct@crate::sync::mpsc::Permit /// /// # Examples /// @@ -148,6 +166,27 @@ impl<T> Receiver<T> { /// Blocking receive to call outside of asynchronous contexts. /// + /// This method returns `None` if the channel has been closed and there are + /// no remaining messages in the channel's buffer. This indicates that no + /// further values can ever be received from this `Receiver`. The channel is + /// closed when all senders have been dropped, or when [`close`] is called. + /// + /// If there are no messages in the channel's buffer, but the channel has + /// not yet been closed, this method will block until a message is sent or + /// the channel is closed. + /// + /// This method is intended for use cases where you are sending from + /// asynchronous code to synchronous code, and will work even if the sender + /// is not using [`blocking_send`] to send the message. + /// + /// Note that if [`close`] is called, but there are still outstanding + /// [`Permits`] from before it was closed, the channel is not considered + /// closed by `blocking_recv` until the permits are released. + /// + /// [`close`]: Self::close + /// [`Permits`]: struct@crate::sync::mpsc::Permit + /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send + /// /// # Panics /// /// This function panics if called within an asynchronous execution @@ -197,14 +236,16 @@ impl<T> Receiver<T> { self.chan.try_recv() } - /// Closes the receiving half of a channel, without dropping it. + /// Closes the receiving half of a channel without dropping it. /// /// This prevents any further messages from being sent on the channel while /// still enabling the receiver to drain messages that are buffered. Any /// outstanding [`Permit`] values will still be able to send messages. /// - /// In order to guarantee no messages are dropped, after calling `close()`, - /// `recv()` must be called until `None` is returned. + /// To guarantee that no messages are dropped, after calling `close()`, + /// `recv()` must be called until `None` is returned. If there are + /// outstanding [`Permit`] values, the `recv` method will not return `None` + /// until those are released. /// /// [`Permit`]: Permit /// @@ -356,7 +397,7 @@ impl<T> Sender<T> { /// tx4.closed(), /// tx5.closed() /// ); - //// println!("Receiver dropped"); + /// println!("Receiver dropped"); /// } /// ``` pub async fn closed(&self) { @@ -501,6 +542,12 @@ impl<T> Sender<T> { /// Blocking send to call outside of asynchronous contexts. /// + /// This method is intended for use cases where you are sending from + /// synchronous code to asynchronous code, and will work even if the + /// receiver is not using [`blocking_recv`] to receive the message. + /// + /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv + /// /// # Panics /// /// This function panics if called within an asynchronous execution @@ -599,6 +646,58 @@ impl<T> Sender<T> { Ok(Permit { chan: &self.chan }) } + + /// Try to acquire a slot in the channel without waiting for the slot to become + /// available. + /// + /// If the channel is full this function will return [`TrySendError`], otherwise + /// if there is a slot available it will return a [`Permit`] that will then allow you + /// to [`send`] on the channel with a guaranteed slot. This function is similar to + /// [`reserve`] except it does not await for the slot to become available. + /// + /// Dropping [`Permit`] without sending a message releases the capacity back + /// to the channel. + /// + /// [`Permit`]: Permit + /// [`send`]: Permit::send + /// [`reserve`]: Sender::reserve + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); + /// + /// // Reserve capacity + /// let permit = tx.try_reserve().unwrap(); + /// + /// // Trying to send directly on the `tx` will fail due to no + /// // available capacity. + /// assert!(tx.try_send(123).is_err()); + /// + /// // Trying to reserve an additional slot on the `tx` will + /// // fail because there is no capacity. + /// assert!(tx.try_reserve().is_err()); + /// + /// // Sending on the permit succeeds + /// permit.send(456); + /// + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); + /// + /// } + /// ``` + pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> { + match self.chan.semaphore().0.try_acquire(1) { + Ok(_) => {} + Err(_) => return Err(TrySendError::Full(())), + } + + Ok(Permit { chan: &self.chan }) + } } impl<T> Clone for Sender<T> { diff --git a/src/sync/mpsc/list.rs b/src/sync/mpsc/list.rs index 2f4c532..5dad2ba 100644 --- a/src/sync/mpsc/list.rs +++ b/src/sync/mpsc/list.rs @@ -140,11 +140,11 @@ impl<T> Tx<T> { // // Acquire is not needed as any "actual" value is not accessed. // At this point, the linked list is walked to acquire blocks. - let actual = - self.block_tail - .compare_and_swap(block_ptr, next_block.as_ptr(), Release); - - if actual == block_ptr { + if self + .block_tail + .compare_exchange(block_ptr, next_block.as_ptr(), Release, Relaxed) + .is_ok() + { // Synchronize with any senders let tail_position = self.tail_position.fetch_add(0, Release); @@ -191,7 +191,7 @@ impl<T> Tx<T> { // TODO: Unify this logic with Block::grow for _ in 0..3 { - match curr.as_ref().try_push(&mut block, AcqRel) { + match curr.as_ref().try_push(&mut block, AcqRel, Acquire) { Ok(_) => { reused = true; break; diff --git a/src/sync/mpsc/unbounded.rs b/src/sync/mpsc/unbounded.rs index 38953b8..29a0a29 100644 --- a/src/sync/mpsc/unbounded.rs +++ b/src/sync/mpsc/unbounded.rs @@ -33,6 +33,10 @@ impl<T> fmt::Debug for UnboundedSender<T> { /// /// Instances are created by the /// [`unbounded_channel`](unbounded_channel) function. +/// +/// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`]. +/// +/// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html pub struct UnboundedReceiver<T> { /// The channel receiver chan: chan::Rx<T, Semaphore>, diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 21e44ca..0283336 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -142,7 +142,7 @@ pub struct MutexGuard<'a, T: ?Sized> { /// unlike `MutexGuard`, it will have the `'static` lifetime. /// /// As long as you have this guard, you have exclusive access to the underlying -/// `T`. The guard internally keeps a reference-couned pointer to the original +/// `T`. The guard internally keeps a reference-counted pointer to the original /// `Mutex`, so even if the lock goes away, the guard remains valid. /// /// The lock is automatically released whenever the guard is dropped, at which @@ -161,13 +161,22 @@ unsafe impl<T> Sync for Mutex<T> where T: ?Sized + Send {} unsafe impl<T> Sync for MutexGuard<'_, T> where T: ?Sized + Send + Sync {} unsafe impl<T> Sync for OwnedMutexGuard<T> where T: ?Sized + Send + Sync {} -/// Error returned from the [`Mutex::try_lock`] function. +/// Error returned from the [`Mutex::try_lock`], [`RwLock::try_read`] and +/// [`RwLock::try_write`] functions. /// -/// A `try_lock` operation can only fail if the mutex is already locked. +/// `Mutex::try_lock` operation will only fail if the mutex is already locked. +/// +/// `RwLock::try_read` operation will only fail if the lock is currently held +/// by an exclusive writer. +/// +/// `RwLock::try_write` operation will if lock is held by any reader or by an +/// exclusive writer. /// /// [`Mutex::try_lock`]: Mutex::try_lock +/// [`RwLock::try_read`]: fn@super::RwLock::try_read +/// [`RwLock::try_write`]: fn@super::RwLock::try_write #[derive(Debug)] -pub struct TryLockError(()); +pub struct TryLockError(pub(super) ()); impl fmt::Display for TryLockError { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { diff --git a/src/sync/oneshot.rs b/src/sync/oneshot.rs index ece9aba..20d39dc 100644 --- a/src/sync/oneshot.rs +++ b/src/sync/oneshot.rs @@ -84,10 +84,42 @@ struct Inner<T> { value: UnsafeCell<Option<T>>, /// The task to notify when the receiver drops without consuming the value. - tx_task: UnsafeCell<MaybeUninit<Waker>>, + tx_task: Task, /// The task to notify when the value is sent. - rx_task: UnsafeCell<MaybeUninit<Waker>>, + rx_task: Task, +} + +struct Task(UnsafeCell<MaybeUninit<Waker>>); + +impl Task { + unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool { + self.with_task(|w| w.will_wake(cx.waker())) + } + + unsafe fn with_task<F, R>(&self, f: F) -> R + where + F: FnOnce(&Waker) -> R, + { + self.0.with(|ptr| { + let waker: *const Waker = (&*ptr).as_ptr(); + f(&*waker) + }) + } + + unsafe fn drop_task(&self) { + self.0.with_mut(|ptr| { + let ptr: *mut Waker = (&mut *ptr).as_mut_ptr(); + ptr.drop_in_place(); + }); + } + + unsafe fn set_task(&self, cx: &mut Context<'_>) { + self.0.with_mut(|ptr| { + let ptr: *mut Waker = (&mut *ptr).as_mut_ptr(); + ptr.write(cx.waker().clone()); + }); + } } #[derive(Clone, Copy)] @@ -127,8 +159,8 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let inner = Arc::new(Inner { state: AtomicUsize::new(State::new().as_usize()), value: UnsafeCell::new(None), - tx_task: UnsafeCell::new(MaybeUninit::uninit()), - rx_task: UnsafeCell::new(MaybeUninit::uninit()), + tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())), + rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())), }); let tx = Sender { @@ -188,9 +220,9 @@ impl<T> Sender<T> { }); if !inner.complete() { - return Err(inner - .value - .with_mut(|ptr| unsafe { (*ptr).take() }.unwrap())); + unsafe { + return Err(inner.consume_value().unwrap()); + } } Ok(()) @@ -357,7 +389,7 @@ impl<T> Sender<T> { } if state.is_tx_task_set() { - let will_notify = unsafe { inner.with_tx_task(|w| w.will_wake(cx.waker())) }; + let will_notify = unsafe { inner.tx_task.will_wake(cx) }; if !will_notify { state = State::unset_tx_task(&inner.state); @@ -368,7 +400,7 @@ impl<T> Sender<T> { coop.made_progress(); return Ready(()); } else { - unsafe { inner.drop_tx_task() }; + unsafe { inner.tx_task.drop_task() }; } } } @@ -376,7 +408,7 @@ impl<T> Sender<T> { if !state.is_tx_task_set() { // Attempt to set the task unsafe { - inner.set_tx_task(cx); + inner.tx_task.set_task(cx); } // Update the state @@ -584,7 +616,7 @@ impl<T> Inner<T> { if prev.is_rx_task_set() { // TODO: Consume waker? unsafe { - self.with_rx_task(Waker::wake_by_ref); + self.rx_task.with_task(Waker::wake_by_ref); } } @@ -609,7 +641,7 @@ impl<T> Inner<T> { Ready(Err(RecvError(()))) } else { if state.is_rx_task_set() { - let will_notify = unsafe { self.with_rx_task(|w| w.will_wake(cx.waker())) }; + let will_notify = unsafe { self.rx_task.will_wake(cx) }; // Check if the task is still the same if !will_notify { @@ -625,7 +657,7 @@ impl<T> Inner<T> { None => Ready(Err(RecvError(()))), }; } else { - unsafe { self.drop_rx_task() }; + unsafe { self.rx_task.drop_task() }; } } } @@ -633,7 +665,7 @@ impl<T> Inner<T> { if !state.is_rx_task_set() { // Attempt to set the task unsafe { - self.set_rx_task(cx); + self.rx_task.set_task(cx); } // Update the state @@ -660,7 +692,7 @@ impl<T> Inner<T> { if prev.is_tx_task_set() && !prev.is_complete() { unsafe { - self.with_tx_task(Waker::wake_by_ref); + self.tx_task.with_task(Waker::wake_by_ref); } } } @@ -669,72 +701,28 @@ impl<T> Inner<T> { unsafe fn consume_value(&self) -> Option<T> { self.value.with_mut(|ptr| (*ptr).take()) } - - unsafe fn with_rx_task<F, R>(&self, f: F) -> R - where - F: FnOnce(&Waker) -> R, - { - self.rx_task.with(|ptr| { - let waker: *const Waker = (&*ptr).as_ptr(); - f(&*waker) - }) - } - - unsafe fn with_tx_task<F, R>(&self, f: F) -> R - where - F: FnOnce(&Waker) -> R, - { - self.tx_task.with(|ptr| { - let waker: *const Waker = (&*ptr).as_ptr(); - f(&*waker) - }) - } - - unsafe fn drop_rx_task(&self) { - self.rx_task.with_mut(|ptr| { - let ptr: *mut Waker = (&mut *ptr).as_mut_ptr(); - ptr.drop_in_place(); - }); - } - - unsafe fn drop_tx_task(&self) { - self.tx_task.with_mut(|ptr| { - let ptr: *mut Waker = (&mut *ptr).as_mut_ptr(); - ptr.drop_in_place(); - }); - } - - unsafe fn set_rx_task(&self, cx: &mut Context<'_>) { - self.rx_task.with_mut(|ptr| { - let ptr: *mut Waker = (&mut *ptr).as_mut_ptr(); - ptr.write(cx.waker().clone()); - }); - } - - unsafe fn set_tx_task(&self, cx: &mut Context<'_>) { - self.tx_task.with_mut(|ptr| { - let ptr: *mut Waker = (&mut *ptr).as_mut_ptr(); - ptr.write(cx.waker().clone()); - }); - } } unsafe impl<T: Send> Send for Inner<T> {} unsafe impl<T: Send> Sync for Inner<T> {} +fn mut_load(this: &mut AtomicUsize) -> usize { + this.with_mut(|v| *v) +} + impl<T> Drop for Inner<T> { fn drop(&mut self) { - let state = State(self.state.with_mut(|v| *v)); + let state = State(mut_load(&mut self.state)); if state.is_rx_task_set() { unsafe { - self.drop_rx_task(); + self.rx_task.drop_task(); } } if state.is_tx_task_set() { unsafe { - self.drop_tx_task(); + self.tx_task.drop_task(); } } } diff --git a/src/sync/rwlock.rs b/src/sync/rwlock.rs index 2e72cf7..b0777b2 100644 --- a/src/sync/rwlock.rs +++ b/src/sync/rwlock.rs @@ -1,4 +1,5 @@ -use crate::sync::batch_semaphore::Semaphore; +use crate::sync::batch_semaphore::{Semaphore, TryAcquireError}; +use crate::sync::mutex::TryLockError; use std::cell::UnsafeCell; use std::fmt; use std::marker; @@ -244,7 +245,7 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { /// locks, since [`RwLock`] is fair and it is possible that a writer is next /// in line. /// - /// Returns an RAII guard which will drop the read access of this rwlock + /// Returns an RAII guard which will drop this read access of the `RwLock` /// when dropped. /// /// # Examples @@ -397,12 +398,20 @@ impl<T: ?Sized> RwLock<T> { } } - /// Locks this rwlock with shared read access, causing the current task + /// Locks this `RwLock` with shared read access, causing the current task /// to yield until the lock has been acquired. /// - /// The calling task will yield until there are no more writers which - /// hold the lock. There may be other readers currently inside the lock when - /// this method returns. + /// The calling task will yield until there are no writers which hold the + /// lock. There may be other readers inside the lock when the task resumes. + /// + /// Note that under the priority policy of [`RwLock`], read locks are not + /// granted until prior write locks, to prevent starvation. Therefore + /// deadlock may occur if a read lock is held by the current task, a write + /// lock attempt is made, and then a subsequent read lock attempt is made + /// by the current task. + /// + /// Returns an RAII guard which will drop this read access of the `RwLock` + /// when dropped. /// /// # Examples /// @@ -422,12 +431,13 @@ impl<T: ?Sized> RwLock<T> { /// // While main has an active read lock, we acquire one too. /// let r = c_lock.read().await; /// assert_eq!(*r, 1); - /// }).await.expect("The spawned task has paniced"); + /// }).await.expect("The spawned task has panicked"); /// /// // Drop the guard after the spawned task finishes. /// drop(n); ///} /// ``` + /// pub async fn read(&self) -> RwLockReadGuard<'_, T> { self.s.acquire(1).await.unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a @@ -441,13 +451,59 @@ impl<T: ?Sized> RwLock<T> { } } - /// Locks this rwlock with exclusive write access, causing the current task - /// to yield until the lock has been acquired. + /// Attempts to acquire this `RwLock` with shared read access. + /// + /// If the access couldn't be acquired immediately, returns [`TryLockError`]. + /// Otherwise, an RAII guard is returned which will release read access + /// when dropped. + /// + /// [`TryLockError`]: TryLockError + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use tokio::sync::RwLock; + /// + /// #[tokio::main] + /// async fn main() { + /// let lock = Arc::new(RwLock::new(1)); + /// let c_lock = lock.clone(); + /// + /// let v = lock.try_read().unwrap(); + /// assert_eq!(*v, 1); + /// + /// tokio::spawn(async move { + /// // While main has an active read lock, we acquire one too. + /// let n = c_lock.read().await; + /// assert_eq!(*n, 1); + /// }).await.expect("The spawned task has panicked"); + /// + /// // Drop the guard when spawned task finishes. + /// drop(v); + /// } + /// ``` + pub fn try_read(&self) -> Result<RwLockReadGuard<'_, T>, TryLockError> { + match self.s.try_acquire(1) { + Ok(permit) => permit, + Err(TryAcquireError::NoPermits) => return Err(TryLockError(())), + Err(TryAcquireError::Closed) => unreachable!(), + } + + Ok(RwLockReadGuard { + s: &self.s, + data: self.c.get(), + marker: marker::PhantomData, + }) + } + + /// Locks this `RwLock` with exclusive write access, causing the current + /// task to yield until the lock has been acquired. /// - /// This function will not return while other writers or other readers + /// The calling task will yield while other writers or readers /// currently have access to the lock. /// - /// Returns an RAII guard which will drop the write access of this rwlock + /// Returns an RAII guard which will drop the write access of this `RwLock` /// when dropped. /// /// # Examples @@ -476,6 +532,43 @@ impl<T: ?Sized> RwLock<T> { } } + /// Attempts to acquire this `RwLock` with exclusive write access. + /// + /// If the access couldn't be acquired immediately, returns [`TryLockError`]. + /// Otherwise, an RAII guard is returned which will release write access + /// when dropped. + /// + /// [`TryLockError`]: TryLockError + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::RwLock; + /// + /// #[tokio::main] + /// async fn main() { + /// let rw = RwLock::new(1); + /// + /// let v = rw.read().await; + /// assert_eq!(*v, 1); + /// + /// assert!(rw.try_write().is_err()); + /// } + /// ``` + pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, T>, TryLockError> { + match self.s.try_acquire(MAX_READS as u32) { + Ok(permit) => permit, + Err(TryAcquireError::NoPermits) => return Err(TryLockError(())), + Err(TryAcquireError::Closed) => unreachable!(), + } + + Ok(RwLockWriteGuard { + s: &self.s, + data: self.c.get(), + marker: marker::PhantomData, + }) + } + /// Returns a mutable reference to the underlying data. /// /// Since this call borrows the `RwLock` mutably, no actual locking needs to diff --git a/src/sync/semaphore.rs b/src/sync/semaphore.rs index 5555bdf..bff5de9 100644 --- a/src/sync/semaphore.rs +++ b/src/sync/semaphore.rs @@ -13,6 +13,11 @@ use std::sync::Arc; /// function immediately returns a permit. However, if no remaining permits are /// available, `acquire` (asynchronously) waits until an outstanding permit is /// dropped. At this point, the freed permit is assigned to the caller. +/// +/// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`] +/// utility. +/// +/// [`PollSemaphore`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSemaphore.html #[derive(Debug)] pub struct Semaphore { /// The low level semaphore diff --git a/src/sync/task/atomic_waker.rs b/src/sync/task/atomic_waker.rs index ae4cac7..5917204 100644 --- a/src/sync/task/atomic_waker.rs +++ b/src/sync/task/atomic_waker.rs @@ -171,7 +171,11 @@ impl AtomicWaker { where W: WakerRef, { - match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) { + match self + .state + .compare_exchange(WAITING, REGISTERING, Acquire, Acquire) + .unwrap_or_else(|x| x) + { WAITING => { unsafe { // Locked acquired, update the waker cell @@ -219,6 +223,8 @@ impl AtomicWaker { waker.wake(); // This is equivalent to a spin lock, so use a spin hint. + // TODO: once we bump MSRV to 1.49+, use `hint::spin_loop` instead. + #[allow(deprecated)] atomic::spin_loop_hint(); } state => { diff --git a/src/sync/tests/loom_broadcast.rs b/src/sync/tests/loom_broadcast.rs index 4b1f034..039b01b 100644 --- a/src/sync/tests/loom_broadcast.rs +++ b/src/sync/tests/loom_broadcast.rs @@ -178,3 +178,30 @@ fn drop_rx() { assert_ok!(th2.join()); }); } + +#[test] +fn drop_multiple_rx_with_overflow() { + loom::model(move || { + // It is essential to have multiple senders and receivers in this test case. + let (tx, mut rx) = broadcast::channel(1); + let _rx2 = tx.subscribe(); + + let _ = tx.send(()); + let tx2 = tx.clone(); + let th1 = thread::spawn(move || { + block_on(async { + for _ in 0..100 { + let _ = tx2.send(()); + } + }); + }); + let _ = tx.send(()); + + let th2 = thread::spawn(move || { + block_on(async { while let Ok(_) = rx.recv().await {} }); + }); + + assert_ok!(th1.join()); + assert_ok!(th2.join()); + }); +} diff --git a/src/sync/watch.rs b/src/sync/watch.rs index 6732d38..5590a75 100644 --- a/src/sync/watch.rs +++ b/src/sync/watch.rs @@ -61,6 +61,11 @@ use std::ops; /// Receives values from the associated [`Sender`](struct@Sender). /// /// Instances are created by the [`channel`](fn@channel) function. +/// +/// To turn this receiver into a `Stream`, you can use the [`WatchStream`] +/// wrapper. +/// +/// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html #[derive(Debug)] pub struct Receiver<T> { /// Pointer to the shared state diff --git a/src/task/local.rs b/src/task/local.rs index 566b2f2..ee11511 100644 --- a/src/task/local.rs +++ b/src/task/local.rs @@ -43,10 +43,13 @@ cfg_rt! { /// }).await.unwrap(); /// } /// ``` - /// In order to spawn `!Send` futures, we can use a local task set to - /// schedule them on the thread calling [`Runtime::block_on`]. When running - /// inside of the local task set, we can use [`task::spawn_local`], which can - /// spawn `!Send` futures. For example: + /// + /// # Use with `run_until` + /// + /// To spawn `!Send` futures, we can use a local task set to schedule them + /// on the thread calling [`Runtime::block_on`]. When running inside of the + /// local task set, we can use [`task::spawn_local`], which can spawn + /// `!Send` futures. For example: /// /// ```rust /// use std::rc::Rc; @@ -71,6 +74,9 @@ cfg_rt! { /// }).await; /// } /// ``` + /// **Note:** The `run_until` method can only be used in `#[tokio::main]`, + /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It + /// cannot be used inside a task spawned with `tokio::spawn`. /// /// ## Awaiting a `LocalSet` /// @@ -104,11 +110,106 @@ cfg_rt! { /// local.await; /// } /// ``` + /// **Note:** Awaiting a `LocalSet` can only be done inside + /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to + /// [`Runtime::block_on`]. It cannot be used inside a task spawned with + /// `tokio::spawn`. + /// + /// ## Use inside `tokio::spawn` + /// + /// The two methods mentioned above cannot be used inside `tokio::spawn`, so + /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do + /// something else. The solution is to create the `LocalSet` somewhere else, + /// and communicate with it using an [`mpsc`] channel. + /// + /// The following example puts the `LocalSet` inside a new thread. + /// ``` + /// use tokio::runtime::Builder; + /// use tokio::sync::{mpsc, oneshot}; + /// use tokio::task::LocalSet; + /// + /// // This struct describes the task you want to spawn. Here we include + /// // some simple examples. The oneshot channel allows sending a response + /// // to the spawner. + /// #[derive(Debug)] + /// enum Task { + /// PrintNumber(u32), + /// AddOne(u32, oneshot::Sender<u32>), + /// } + /// + /// #[derive(Clone)] + /// struct LocalSpawner { + /// send: mpsc::UnboundedSender<Task>, + /// } + /// + /// impl LocalSpawner { + /// pub fn new() -> Self { + /// let (send, mut recv) = mpsc::unbounded_channel(); + /// + /// let rt = Builder::new_current_thread() + /// .enable_all() + /// .build() + /// .unwrap(); + /// + /// std::thread::spawn(move || { + /// let local = LocalSet::new(); + /// + /// local.spawn_local(async move { + /// while let Some(new_task) = recv.recv().await { + /// tokio::task::spawn_local(run_task(new_task)); + /// } + /// // If the while loop returns, then all the LocalSpawner + /// // objects have have been dropped. + /// }); + /// + /// // This will return once all senders are dropped and all + /// // spawned tasks have returned. + /// rt.block_on(local); + /// }); + /// + /// Self { + /// send, + /// } + /// } + /// + /// pub fn spawn(&self, task: Task) { + /// self.send.send(task).expect("Thread with LocalSet has shut down."); + /// } + /// } + /// + /// // This task may do !Send stuff. We use printing a number as an example, + /// // but it could be anything. + /// // + /// // The Task struct is an enum to support spawning many different kinds + /// // of operations. + /// async fn run_task(task: Task) { + /// match task { + /// Task::PrintNumber(n) => { + /// println!("{}", n); + /// }, + /// Task::AddOne(n, response) => { + /// // We ignore failures to send the response. + /// let _ = response.send(n + 1); + /// }, + /// } + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// let spawner = LocalSpawner::new(); + /// + /// let (send, response) = oneshot::channel(); + /// spawner.spawn(Task::AddOne(10, send)); + /// let eleven = response.await.unwrap(); + /// assert_eq!(eleven, 11); + /// } + /// ``` /// /// [`Send`]: trait@std::marker::Send /// [local task set]: struct@LocalSet /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on /// [`task::spawn_local`]: fn@spawn_local + /// [`mpsc`]: mod@crate::sync::mpsc pub struct LocalSet { /// Current scheduler tick tick: Cell<u8>, @@ -283,6 +384,7 @@ impl LocalSet { let future = crate::util::trace::task(future, "local"); let (task, handle) = unsafe { task::joinable_local(future) }; self.context.tasks.borrow_mut().queue.push_back(task); + self.context.shared.waker.wake(); handle } diff --git a/src/task/spawn.rs b/src/task/spawn.rs index a060852..d846fb4 100644 --- a/src/task/spawn.rs +++ b/src/task/spawn.rs @@ -1,5 +1,6 @@ use crate::runtime; use crate::task::JoinHandle; +use crate::util::error::CONTEXT_MISSING_ERROR; use std::future::Future; @@ -129,7 +130,7 @@ cfg_rt! { T::Output: Send + 'static, { let spawn_handle = runtime::context::spawn_handle() - .expect("must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`"); + .expect(CONTEXT_MISSING_ERROR); let task = crate::util::trace::task(task, "task"); spawn_handle.spawn(task) } diff --git a/src/time/clock.rs b/src/time/clock.rs index a62fbe3..8957800 100644 --- a/src/time/clock.rs +++ b/src/time/clock.rs @@ -17,7 +17,7 @@ cfg_not_test_util! { } impl Clock { - pub(crate) fn new(_enable_pausing: bool) -> Clock { + pub(crate) fn new(_enable_pausing: bool, _start_paused: bool) -> Clock { Clock {} } @@ -78,7 +78,8 @@ cfg_test_util! { /// that depend on time. /// /// Pausing time requires the `current_thread` Tokio runtime. This is the - /// default runtime used by `#[tokio::test]` + /// default runtime used by `#[tokio::test]`. The runtime can be initialized + /// with time in a paused state using the `Builder::start_paused` method. /// /// # Panics /// @@ -149,16 +150,22 @@ cfg_test_util! { impl Clock { /// Return a new `Clock` instance that uses the current execution context's /// source of time. - pub(crate) fn new(enable_pausing: bool) -> Clock { + pub(crate) fn new(enable_pausing: bool, start_paused: bool) -> Clock { let now = std::time::Instant::now(); - Clock { + let clock = Clock { inner: Arc::new(Mutex::new(Inner { enable_pausing, base: now, unfrozen: Some(now), })), + }; + + if start_paused { + clock.pause(); } + + clock } pub(crate) fn pause(&self) { diff --git a/src/time/driver/entry.rs b/src/time/driver/entry.rs index bcad988..11366d2 100644 --- a/src/time/driver/entry.rs +++ b/src/time/driver/entry.rs @@ -53,6 +53,7 @@ //! refuse to mark the timer as pending. use crate::loom::cell::UnsafeCell; +use crate::loom::sync::atomic::AtomicU64; use crate::loom::sync::atomic::Ordering; use crate::sync::AtomicWaker; @@ -71,79 +72,6 @@ const STATE_DEREGISTERED: u64 = u64::max_value(); const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1; const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE; -/// Not all platforms support 64-bit compare-and-swap. This hack replaces the -/// AtomicU64 with a mutex around a u64 on platforms that don't. This is slow, -/// unfortunately, but 32-bit platforms are a bit niche so it'll do for now. -/// -/// Note: We use "x86 or 64-bit pointers" as the condition here because -/// target_has_atomic is not stable. -#[cfg(all( - not(tokio_force_time_entry_locked), - any(target_arch = "x86", target_pointer_width = "64") -))] -type AtomicU64 = crate::loom::sync::atomic::AtomicU64; - -#[cfg(not(all( - not(tokio_force_time_entry_locked), - any(target_arch = "x86", target_pointer_width = "64") -)))] -#[derive(Debug)] -struct AtomicU64 { - inner: crate::loom::sync::Mutex<u64>, -} - -#[cfg(not(all( - not(tokio_force_time_entry_locked), - any(target_arch = "x86", target_pointer_width = "64") -)))] -impl AtomicU64 { - fn new(v: u64) -> Self { - Self { - inner: crate::loom::sync::Mutex::new(v), - } - } - - fn load(&self, _order: Ordering) -> u64 { - debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock - *self.inner.lock() - } - - fn store(&self, v: u64, _order: Ordering) { - debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock - *self.inner.lock() = v; - } - - fn compare_exchange( - &self, - current: u64, - new: u64, - _success: Ordering, - _failure: Ordering, - ) -> Result<u64, u64> { - debug_assert_ne!(_success, Ordering::SeqCst); // we only provide AcqRel with the lock - debug_assert_ne!(_failure, Ordering::SeqCst); - - let mut lock = self.inner.lock(); - - if *lock == current { - *lock = new; - Ok(current) - } else { - Err(*lock) - } - } - - fn compare_exchange_weak( - &self, - current: u64, - new: u64, - success: Ordering, - failure: Ordering, - ) -> Result<u64, u64> { - self.compare_exchange(current, new, success, failure) - } -} - /// This structure holds the current shared state of the timer - its scheduled /// time (if registered), or otherwise the result of the timer completing, as /// well as the registered waker. @@ -300,7 +228,7 @@ impl StateCell { /// expiration time. /// /// While this function is memory-safe, it should only be called from a - /// context holding both `&mut TimerEntry` and the driver lock. + /// context holding both `&mut TimerEntry` and the driver lock. fn set_expiration(&self, timestamp: u64) { debug_assert!(timestamp < STATE_MIN_VALUE); diff --git a/src/time/driver/handle.rs b/src/time/driver/handle.rs index bfc49fb..e934b56 100644 --- a/src/time/driver/handle.rs +++ b/src/time/driver/handle.rs @@ -47,7 +47,7 @@ cfg_rt! { /// panicking. pub(crate) fn current() -> Self { crate::runtime::context::time_handle() - .expect("there is no timer running, must be called from the context of Tokio runtime") + .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.") } } } @@ -71,8 +71,7 @@ cfg_not_rt! { /// lazy, and so outside executed inside the runtime successfuly without /// panicking. pub(crate) fn current() -> Self { - panic!("there is no timer running, must be called from the context of Tokio runtime or \ - `rt` is not enabled") + panic!(crate::util::error::CONTEXT_MISSING_ERROR) } } } diff --git a/src/time/driver/mod.rs b/src/time/driver/mod.rs index 9fbc0b3..615307e 100644 --- a/src/time/driver/mod.rs +++ b/src/time/driver/mod.rs @@ -102,8 +102,8 @@ pub(self) struct ClockTime { impl ClockTime { pub(self) fn new(clock: Clock) -> Self { Self { + start_time: clock.now(), clock, - start_time: super::clock::now(), } } diff --git a/src/time/driver/sleep.rs b/src/time/driver/sleep.rs index 69a6e6d..2438f14 100644 --- a/src/time/driver/sleep.rs +++ b/src/time/driver/sleep.rs @@ -58,8 +58,93 @@ pub fn sleep(duration: Duration) -> Sleep { } pin_project! { - /// Future returned by [`sleep`](sleep) and - /// [`sleep_until`](sleep_until). + /// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until). + /// + /// This type does not implement the `Unpin` trait, which means that if you + /// use it with [`select!`] or by calling `poll`, you have to pin it first. + /// If you use it with `.await`, this does not apply. + /// + /// # Examples + /// + /// Wait 100ms and print "100 ms have elapsed". + /// + /// ``` + /// use tokio::time::{sleep, Duration}; + /// + /// #[tokio::main] + /// async fn main() { + /// sleep(Duration::from_millis(100)).await; + /// println!("100 ms have elapsed"); + /// } + /// ``` + /// + /// Use with [`select!`]. Pinning the `Sleep` with [`tokio::pin!`] is + /// necessary when the same `Sleep` is selected on multiple times. + /// ```no_run + /// use tokio::time::{self, Duration, Instant}; + /// + /// #[tokio::main] + /// async fn main() { + /// let sleep = time::sleep(Duration::from_millis(10)); + /// tokio::pin!(sleep); + /// + /// loop { + /// tokio::select! { + /// () = &mut sleep => { + /// println!("timer elapsed"); + /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(50)); + /// }, + /// } + /// } + /// } + /// ``` + /// Use in a struct with boxing. By pinning the `Sleep` with a `Box`, the + /// `HasSleep` struct implements `Unpin`, even though `Sleep` does not. + /// ``` + /// use std::future::Future; + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// use tokio::time::Sleep; + /// + /// struct HasSleep { + /// sleep: Pin<Box<Sleep>>, + /// } + /// + /// impl Future for HasSleep { + /// type Output = (); + /// + /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + /// self.sleep.as_mut().poll(cx) + /// } + /// } + /// ``` + /// Use in a struct with pin projection. This method avoids the `Box`, but + /// the `HasSleep` struct will not be `Unpin` as a consequence. + /// ``` + /// use std::future::Future; + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// use tokio::time::Sleep; + /// use pin_project_lite::pin_project; + /// + /// pin_project! { + /// struct HasSleep { + /// #[pin] + /// sleep: Sleep, + /// } + /// } + /// + /// impl Future for HasSleep { + /// type Output = (); + /// + /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + /// self.project().sleep.poll(cx) + /// } + /// } + /// ``` + /// + /// [`select!`]: ../macro.select.html + /// [`tokio::pin!`]: ../macro.pin.html #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Sleep { @@ -98,6 +183,26 @@ impl Sleep { /// /// This function can be called both before and after the future has /// completed. + /// + /// To call this method, you will usually combine the call with + /// [`Pin::as_mut`], which lets you call the method with consuming the + /// `Sleep` itself. + /// + /// # Example + /// + /// ``` + /// use tokio::time::{Duration, Instant}; + /// + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() { + /// let sleep = tokio::time::sleep(Duration::from_millis(10)); + /// tokio::pin!(sleep); + /// + /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(20)); + /// # } + /// ``` + /// + /// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut pub fn reset(self: Pin<&mut Self>, deadline: Instant) { let me = self.project(); me.entry.reset(deadline); diff --git a/src/time/driver/tests/mod.rs b/src/time/driver/tests/mod.rs index cfefed3..8ae4a84 100644 --- a/src/time/driver/tests/mod.rs +++ b/src/time/driver/tests/mod.rs @@ -41,7 +41,7 @@ fn model(f: impl Fn() + Send + Sync + 'static) { #[test] fn single_timer() { model(|| { - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); @@ -72,7 +72,7 @@ fn single_timer() { #[test] fn drop_timer() { model(|| { - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); @@ -103,7 +103,7 @@ fn drop_timer() { #[test] fn change_waker() { model(|| { - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); @@ -138,7 +138,7 @@ fn reset_future() { model(|| { let finished_early = Arc::new(AtomicBool::new(false)); - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); @@ -185,7 +185,7 @@ fn reset_future() { #[test] #[cfg(not(loom))] fn poll_process_levels() { - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); clock.pause(); let time_source = super::ClockTime::new(clock.clone()); @@ -226,7 +226,7 @@ fn poll_process_levels() { fn poll_process_levels_targeted() { let mut context = Context::from_waker(noop_waker_ref()); - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); clock.pause(); let time_source = super::ClockTime::new(clock.clone()); diff --git a/src/time/driver/wheel/level.rs b/src/time/driver/wheel/level.rs index 58280b1..81d6b58 100644 --- a/src/time/driver/wheel/level.rs +++ b/src/time/driver/wheel/level.rs @@ -255,14 +255,13 @@ fn slot_for(duration: u64, level: usize) -> usize { ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize } -/* #[cfg(all(test, not(loom)))] mod test { use super::*; #[test] fn test_slot_for() { - for pos in 1..64 { + for pos in 0..64 { assert_eq!(pos as usize, slot_for(pos, 0)); } @@ -274,4 +273,3 @@ mod test { } } } -*/ diff --git a/src/time/driver/wheel/mod.rs b/src/time/driver/wheel/mod.rs index 164cac4..24bf517 100644 --- a/src/time/driver/wheel/mod.rs +++ b/src/time/driver/wheel/mod.rs @@ -122,6 +122,13 @@ impl Wheel { if when == u64::max_value() { self.pending.remove(item); } else { + debug_assert!( + self.elapsed <= when, + "elapsed={}; when={}", + self.elapsed, + when + ); + let level = self.level_for(when); self.levels[level].remove_entry(item); @@ -281,15 +288,17 @@ impl Wheel { } fn level_for(elapsed: u64, when: u64) -> usize { - let mut masked = elapsed ^ when; + const SLOT_MASK: u64 = (1 << 6) - 1; + + // Mask in the trailing bits ignored by the level calculation in order to cap + // the possible leading zeros + let mut masked = elapsed ^ when | SLOT_MASK; if masked >= MAX_DURATION { // Fudge the timer into the top level masked = MAX_DURATION - 1; } - assert!(masked != 0, "elapsed={}; when={}", elapsed, when); - let leading_zeros = masked.leading_zeros() as usize; let significant = 63 - leading_zeros; @@ -302,7 +311,7 @@ mod test { #[test] fn test_level_for() { - for pos in 1..64 { + for pos in 0..64 { assert_eq!( 0, level_for(0, pos), diff --git a/src/time/interval.rs b/src/time/interval.rs index be93ba1..20cfcec 100644 --- a/src/time/interval.rs +++ b/src/time/interval.rs @@ -106,7 +106,16 @@ pub fn interval_at(start: Instant, period: Duration) -> Interval { } } -/// Stream returned by [`interval`](interval) and [`interval_at`](interval_at). +/// Interval returned by [`interval`](interval) and [`interval_at`](interval_at). +/// +/// This type allows you to wait on a sequence of instants with a certain +/// duration between each instant. Unlike calling [`sleep`](crate::time::sleep) +/// in a loop, this lets you count the time spent between the calls to `sleep` +/// as well. +/// +/// An `Interval` can be turned into a `Stream` with [`IntervalStream`]. +/// +/// [`IntervalStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.IntervalStream.html #[derive(Debug)] pub struct Interval { /// Future that completes the next time the `Interval` yields a value. diff --git a/src/util/error.rs b/src/util/error.rs new file mode 100644 index 0000000..518cb2c --- /dev/null +++ b/src/util/error.rs @@ -0,0 +1,3 @@ +/// Error string explaining that the Tokio context hasn't been instantiated. +pub(crate) const CONTEXT_MISSING_ERROR: &str = + "there is no reactor running, must be called from the context of a Tokio 1.x runtime"; diff --git a/src/util/mod.rs b/src/util/mod.rs index 382bbb9..b267125 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -24,7 +24,7 @@ cfg_rt! { } cfg_rt_multi_thread! { - pub(crate) use rand::FastRand; + pub(crate) use self::rand::FastRand; mod try_lock; pub(crate) use try_lock::TryLock; @@ -34,4 +34,13 @@ pub(crate) mod trace; #[cfg(any(feature = "macros"))] #[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] -pub use rand::thread_rng_n; +pub use self::rand::thread_rng_n; + +#[cfg(any( + feature = "rt", + feature = "time", + feature = "net", + feature = "process", + all(unix, feature = "signal") +))] +pub(crate) mod error; diff --git a/tests/io_driver.rs b/tests/io_driver.rs index 9a40247..6fb566d 100644 --- a/tests/io_driver.rs +++ b/tests/io_driver.rs @@ -84,3 +84,16 @@ fn test_drop_on_notify() { // Force the reactor to turn rt.block_on(async {}); } + +#[test] +#[should_panic( + expected = "A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO." +)] +fn panics_when_io_disabled() { + let rt = runtime::Builder::new_current_thread().build().unwrap(); + + rt.block_on(async { + let _ = + tokio::net::TcpListener::from_std(std::net::TcpListener::bind("127.0.0.1:0").unwrap()); + }); +} diff --git a/tests/io_read_to_end.rs b/tests/io_read_to_end.rs index ee636ba..171e6d6 100644 --- a/tests/io_read_to_end.rs +++ b/tests/io_read_to_end.rs @@ -1,7 +1,9 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::io::AsyncReadExt; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; use tokio_test::assert_ok; #[tokio::test] @@ -13,3 +15,64 @@ async fn read_to_end() { assert_eq!(n, 11); assert_eq!(buf[..], b"hello world"[..]); } + +#[derive(Copy, Clone, Debug)] +enum State { + Initializing, + JustFilling, + Done, +} + +struct UninitTest { + num_init: usize, + state: State, +} + +impl AsyncRead for UninitTest { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<std::io::Result<()>> { + let me = Pin::into_inner(self); + let real_num_init = buf.initialized().len() - buf.filled().len(); + assert_eq!(real_num_init, me.num_init, "{:?}", me.state); + + match me.state { + State::Initializing => { + buf.initialize_unfilled_to(me.num_init + 2); + buf.advance(1); + me.num_init += 1; + + if me.num_init == 24 { + me.state = State::JustFilling; + } + } + State::JustFilling => { + buf.advance(1); + me.num_init -= 1; + + if me.num_init == 15 { + // The buffer is resized on next call. + me.num_init = 0; + me.state = State::Done; + } + } + State::Done => { /* .. do nothing .. */ } + } + + Poll::Ready(Ok(())) + } +} + +#[tokio::test] +async fn read_to_end_uninit() { + let mut buf = Vec::with_capacity(64); + let mut test = UninitTest { + num_init: 0, + state: State::Initializing, + }; + + test.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf.len(), 33); +} diff --git a/tests/macros_test.rs b/tests/macros_test.rs index 8e68b8a..8396398 100644 --- a/tests/macros_test.rs +++ b/tests/macros_test.rs @@ -17,3 +17,11 @@ async fn test_macro_is_resilient_to_shadowing() { .await .unwrap(); } + +// https://github.com/tokio-rs/tokio/issues/3403 +#[rustfmt::skip] // this `rustfmt::skip` is necessary because unused_braces does not warn if the block contains newline. +#[tokio::main] +async fn unused_braces_main() { println!("hello") } +#[rustfmt::skip] // this `rustfmt::skip` is necessary because unused_braces does not warn if the block contains newline. +#[tokio::test] +async fn unused_braces_test() { assert_eq!(1 + 1, 2) } diff --git a/tests/no_rt.rs b/tests/no_rt.rs index 962eed7..8437b80 100644 --- a/tests/no_rt.rs +++ b/tests/no_rt.rs @@ -7,13 +7,17 @@ use futures::executor::block_on; use std::net::TcpListener; #[test] -#[should_panic(expected = "no timer running")] -fn panics_when_no_timer() { +#[should_panic( + expected = "there is no reactor running, must be called from the context of a Tokio 1.x runtime" +)] +fn timeout_panics_when_no_tokio_context() { block_on(timeout_value()); } #[test] -#[should_panic(expected = "no reactor running")] +#[should_panic( + expected = "there is no reactor running, must be called from the context of a Tokio 1.x runtime" +)] fn panics_when_no_reactor() { let srv = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = srv.local_addr().unwrap(); @@ -25,3 +29,11 @@ async fn timeout_value() { let dur = Duration::from_millis(20); let _ = timeout(dur, rx).await; } + +#[test] +#[should_panic( + expected = "there is no reactor running, must be called from the context of a Tokio 1.x runtime" +)] +fn io_panics_when_no_tokio_context() { + let _ = tokio::net::TcpListener::from_std(std::net::TcpListener::bind("127.0.0.1:0").unwrap()); +} diff --git a/tests/rt_basic.rs b/tests/rt_basic.rs index 977a838..4b1bdad 100644 --- a/tests/rt_basic.rs +++ b/tests/rt_basic.rs @@ -6,7 +6,7 @@ use tokio::sync::oneshot; use tokio_test::{assert_err, assert_ok}; use std::thread; -use std::time::Duration; +use tokio::time::{timeout, Duration}; mod support { pub(crate) mod mpsc_stream; @@ -135,6 +135,21 @@ fn acquire_mutex_in_drop() { drop(rt); } +#[test] +#[should_panic( + expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers." +)] +fn timeout_panics_when_no_time_handle() { + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + rt.block_on(async { + let (_tx, rx) = oneshot::channel::<()>(); + let dur = Duration::from_millis(20); + let _ = timeout(dur, rx).await; + }); +} + fn rt() -> Runtime { tokio::runtime::Builder::new_current_thread() .enable_all() diff --git a/tests/rt_common.rs b/tests/rt_common.rs index 66e6f2c..9aef4b9 100644 --- a/tests/rt_common.rs +++ b/tests/rt_common.rs @@ -1021,39 +1021,45 @@ rt_test! { // other tasks. #[test] fn ping_pong_saturation() { + use std::sync::atomic::{Ordering, AtomicBool}; use tokio::sync::mpsc; const NUM: usize = 100; let rt = rt(); + let running = Arc::new(AtomicBool::new(true)); + rt.block_on(async { let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel(); + let mut tasks = vec![]; // Spawn a bunch of tasks that ping ping between each other to // saturate the runtime. for _ in 0..NUM { let (tx1, mut rx1) = mpsc::unbounded_channel(); let (tx2, mut rx2) = mpsc::unbounded_channel(); let spawned_tx = spawned_tx.clone(); - - task::spawn(async move { + let running = running.clone(); + tasks.push(task::spawn(async move { spawned_tx.send(()).unwrap(); - tx1.send(()).unwrap(); - loop { - rx2.recv().await.unwrap(); + while running.load(Ordering::Relaxed) { tx1.send(()).unwrap(); + rx2.recv().await.unwrap(); } - }); - task::spawn(async move { - loop { - rx1.recv().await.unwrap(); + // Close the channel and wait for the other task to exit. + drop(tx1); + assert!(rx2.recv().await.is_none()); + })); + + tasks.push(task::spawn(async move { + while rx1.recv().await.is_some() { tx2.send(()).unwrap(); } - }); + })); } for _ in 0..NUM { @@ -1068,6 +1074,10 @@ rt_test! { } }); handle.await.unwrap(); + running.store(false, Ordering::Relaxed); + for t in tasks { + t.await.unwrap(); + } }); } } diff --git a/tests/sync_mpsc.rs b/tests/sync_mpsc.rs index b378e6b..cd43ad4 100644 --- a/tests/sync_mpsc.rs +++ b/tests/sync_mpsc.rs @@ -328,6 +328,29 @@ async fn try_send_fail() { } #[tokio::test] +async fn try_reserve_fails() { + let (tx, mut rx) = mpsc::channel(1); + + let permit = tx.try_reserve().unwrap(); + + // This should fail + match assert_err!(tx.try_reserve()) { + TrySendError::Full(()) => {} + _ => panic!(), + } + + permit.send("foo"); + + assert_eq!(rx.recv().await, Some("foo")); + + // Dropping permit releases the slot. + let permit = tx.try_reserve().unwrap(); + drop(permit); + + let _permit = tx.try_reserve().unwrap(); +} + +#[tokio::test] async fn drop_permit_releases_permit() { // poll_ready reserves capacity, ensure that the capacity is released if tx // is dropped w/o sending a value. diff --git a/tests/sync_rwlock.rs b/tests/sync_rwlock.rs index 7676035..872b845 100644 --- a/tests/sync_rwlock.rs +++ b/tests/sync_rwlock.rs @@ -235,3 +235,36 @@ async fn multithreaded() { let g = rwlock.read().await; assert_eq!(*g, 17_000); } + +#[tokio::test] +async fn try_write() { + let lock = RwLock::new(0); + let read_guard = lock.read().await; + assert!(lock.try_write().is_err()); + drop(read_guard); + assert!(lock.try_write().is_ok()); +} + +#[test] +fn try_read_try_write() { + let lock: RwLock<usize> = RwLock::new(15); + + { + let rg1 = lock.try_read().unwrap(); + assert_eq!(*rg1, 15); + + assert!(lock.try_write().is_err()); + + let rg2 = lock.try_read().unwrap(); + assert_eq!(*rg2, 15) + } + + { + let mut wg = lock.try_write().unwrap(); + *wg = 1515; + + assert!(lock.try_read().is_err()) + } + + assert_eq!(*lock.try_read().unwrap(), 1515); +} diff --git a/tests/task_local_set.rs b/tests/task_local_set.rs index dda4280..8513609 100644 --- a/tests/task_local_set.rs +++ b/tests/task_local_set.rs @@ -1,6 +1,11 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use futures::{ + future::{pending, ready}, + FutureExt, +}; + use tokio::runtime::{self, Runtime}; use tokio::sync::{mpsc, oneshot}; use tokio::task::{self, LocalSet}; @@ -486,6 +491,15 @@ async fn acquire_mutex_in_drop() { drop(local); } +#[tokio::test] +async fn spawn_wakes_localset() { + let local = LocalSet::new(); + futures::select! { + _ = local.run_until(pending::<()>()).fuse() => unreachable!(), + ret = async { local.spawn_local(ready(())).await.unwrap()}.fuse() => ret + } +} + fn rt() -> Runtime { tokio::runtime::Builder::new_current_thread() .enable_all() diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index 58b06ee..e34c2bb 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -234,3 +234,81 @@ fn write_until_pending(stream: &mut TcpStream) { } } } + +#[tokio::test] +async fn try_read_buf() { + const DATA: &[u8] = b"this is some data to write to the socket"; + + // Create listener + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + + // Create socket pair + let client = TcpStream::connect(listener.local_addr().unwrap()) + .await + .unwrap(); + let (server, _) = listener.accept().await.unwrap(); + let mut written = DATA.to_vec(); + + // Track the server receiving data + let mut readable = task::spawn(server.readable()); + assert_pending!(readable.poll()); + + // Write data. + client.writable().await.unwrap(); + assert_eq!(DATA.len(), client.try_write(DATA).unwrap()); + + // The task should be notified + while !readable.is_woken() { + tokio::task::yield_now().await; + } + + // Fill the write buffer + loop { + // Still ready + let mut writable = task::spawn(client.writable()); + assert_ready_ok!(writable.poll()); + + match client.try_write(DATA) { + Ok(n) => written.extend(&DATA[..n]), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + break; + } + Err(e) => panic!("error = {:?}", e), + } + } + + { + // Write buffer full + let mut writable = task::spawn(client.writable()); + assert_pending!(writable.poll()); + + // Drain the socket from the server end + let mut read = Vec::with_capacity(written.len()); + let mut i = 0; + + while i < read.capacity() { + server.readable().await.unwrap(); + + match server.try_read_buf(&mut read) { + Ok(n) => i += n, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("error = {:?}", e), + } + } + + assert_eq!(read, written); + } + + // Now, we listen for shutdown + drop(client); + + loop { + let ready = server.ready(Interest::READABLE).await.unwrap(); + + if ready.is_read_closed() { + return; + } else { + tokio::task::yield_now().await; + } + } +} diff --git a/tests/time_pause.rs b/tests/time_pause.rs index 49a7677..bc84ac5 100644 --- a/tests/time_pause.rs +++ b/tests/time_pause.rs @@ -1,6 +1,9 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use rand::SeedableRng; +use rand::{rngs::StdRng, Rng}; +use tokio::time::{self, Duration, Instant}; use tokio_test::assert_err; #[tokio::test] @@ -31,3 +34,26 @@ async fn pause_time_in_spawn_threads() { assert_err!(t.await); } + +#[test] +fn paused_time_is_deterministic() { + let run_1 = paused_time_stress_run(); + let run_2 = paused_time_stress_run(); + + assert_eq!(run_1, run_2); +} + +#[tokio::main(flavor = "current_thread", start_paused = true)] +async fn paused_time_stress_run() -> Vec<Duration> { + let mut rng = StdRng::seed_from_u64(1); + + let mut times = vec![]; + let start = Instant::now(); + for _ in 0..10_000 { + let sleep = rng.gen_range(Duration::from_secs(0)..Duration::from_secs(1)); + time::sleep(sleep).await; + times.push(start.elapsed()); + } + + times +} diff --git a/tests/udp.rs b/tests/udp.rs index 7cbba1b..715d8eb 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -353,3 +353,90 @@ async fn try_send_to_recv_from() { } } } + +#[tokio::test] +async fn try_recv_buf() { + // Create listener + let server = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + + // Create socket pair + let client = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + + // Connect the two + client.connect(server.local_addr().unwrap()).await.unwrap(); + server.connect(client.local_addr().unwrap()).await.unwrap(); + + for _ in 0..5 { + loop { + client.writable().await.unwrap(); + + match client.try_send(b"hello world") { + Ok(n) => { + assert_eq!(n, 11); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + + loop { + server.readable().await.unwrap(); + + let mut buf = Vec::with_capacity(512); + + match server.try_recv_buf(&mut buf) { + Ok(n) => { + assert_eq!(n, 11); + assert_eq!(&buf[0..11], &b"hello world"[..]); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + } +} + +#[tokio::test] +async fn try_recv_buf_from() { + // Create listener + let server = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let saddr = server.local_addr().unwrap(); + + // Create socket pair + let client = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let caddr = client.local_addr().unwrap(); + + for _ in 0..5 { + loop { + client.writable().await.unwrap(); + + match client.try_send_to(b"hello world", saddr) { + Ok(n) => { + assert_eq!(n, 11); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + + loop { + server.readable().await.unwrap(); + + let mut buf = Vec::with_capacity(512); + + match server.try_recv_buf_from(&mut buf) { + Ok((n, addr)) => { + assert_eq!(n, 11); + assert_eq!(addr, caddr); + assert_eq!(&buf[0..11], &b"hello world"[..]); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + } +} diff --git a/tests/uds_datagram.rs b/tests/uds_datagram.rs index cdabd7b..10314be 100644 --- a/tests/uds_datagram.rs +++ b/tests/uds_datagram.rs @@ -230,3 +230,95 @@ async fn try_send_to_recv_from() -> std::io::Result<()> { Ok(()) } + +#[tokio::test] +async fn try_recv_buf_from() -> std::io::Result<()> { + let dir = tempfile::tempdir().unwrap(); + let server_path = dir.path().join("server.sock"); + let client_path = dir.path().join("client.sock"); + + // Create listener + let server = UnixDatagram::bind(&server_path)?; + + // Create socket pair + let client = UnixDatagram::bind(&client_path)?; + + for _ in 0..5 { + loop { + client.writable().await?; + + match client.try_send_to(b"hello world", &server_path) { + Ok(n) => { + assert_eq!(n, 11); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + + loop { + server.readable().await?; + + let mut buf = Vec::with_capacity(512); + + match server.try_recv_buf_from(&mut buf) { + Ok((n, addr)) => { + assert_eq!(n, 11); + assert_eq!(addr.as_pathname(), Some(client_path.as_ref())); + assert_eq!(&buf[0..11], &b"hello world"[..]); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + } + + Ok(()) +} + +// Even though we use sync non-blocking io we still need a reactor. +#[tokio::test] +async fn try_recv_buf_never_block() -> io::Result<()> { + let payload = b"PAYLOAD"; + let mut count = 0; + + let (dgram1, dgram2) = UnixDatagram::pair()?; + + // Send until we hit the OS `net.unix.max_dgram_qlen`. + loop { + dgram1.writable().await.unwrap(); + + match dgram1.try_send(payload) { + Err(err) => match err.kind() { + io::ErrorKind::WouldBlock | io::ErrorKind::Other => break, + _ => unreachable!("unexpected error {:?}", err), + }, + Ok(len) => { + assert_eq!(len, payload.len()); + } + } + count += 1; + } + + // Read every dgram we sent. + while count > 0 { + let mut recv_buf = Vec::with_capacity(16); + + dgram2.readable().await.unwrap(); + let len = dgram2.try_recv_buf(&mut recv_buf)?; + assert_eq!(len, payload.len()); + assert_eq!(payload, &recv_buf[..len]); + count -= 1; + } + + let mut recv_buf = vec![0; 16]; + let err = dgram2.try_recv_from(&mut recv_buf).unwrap_err(); + match err.kind() { + io::ErrorKind::WouldBlock => (), + _ => unreachable!("unexpected error {:?}", err), + } + + Ok(()) +} diff --git a/tests/uds_stream.rs b/tests/uds_stream.rs index 5160f17..c528620 100644 --- a/tests/uds_stream.rs +++ b/tests/uds_stream.rs @@ -252,3 +252,85 @@ fn write_until_pending(stream: &mut UnixStream) { } } } + +#[tokio::test] +async fn try_read_buf() -> std::io::Result<()> { + let msg = b"hello world"; + + let dir = tempfile::tempdir()?; + let bind_path = dir.path().join("bind.sock"); + + // Create listener + let listener = UnixListener::bind(&bind_path)?; + + // Create socket pair + let client = UnixStream::connect(&bind_path).await?; + + let (server, _) = listener.accept().await?; + let mut written = msg.to_vec(); + + // Track the server receiving data + let mut readable = task::spawn(server.readable()); + assert_pending!(readable.poll()); + + // Write data. + client.writable().await?; + assert_eq!(msg.len(), client.try_write(msg)?); + + // The task should be notified + while !readable.is_woken() { + tokio::task::yield_now().await; + } + + // Fill the write buffer + loop { + // Still ready + let mut writable = task::spawn(client.writable()); + assert_ready_ok!(writable.poll()); + + match client.try_write(msg) { + Ok(n) => written.extend(&msg[..n]), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + break; + } + Err(e) => panic!("error = {:?}", e), + } + } + + { + // Write buffer full + let mut writable = task::spawn(client.writable()); + assert_pending!(writable.poll()); + + // Drain the socket from the server end + let mut read = Vec::with_capacity(written.len()); + let mut i = 0; + + while i < read.capacity() { + server.readable().await?; + + match server.try_read_buf(&mut read) { + Ok(n) => i += n, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("error = {:?}", e), + } + } + + assert_eq!(read, written); + } + + // Now, we listen for shutdown + drop(client); + + loop { + let ready = server.ready(Interest::READABLE).await?; + + if ready.is_read_closed() { + break; + } else { + tokio::task::yield_now().await; + } + } + + Ok(()) +} |