From b81c80b31414735c1c2056834bf3d5fb678668e9 Mon Sep 17 00:00:00 2001 From: Luke Huang Date: Wed, 26 May 2021 23:24:32 +0800 Subject: Upgrade rust/crates/tokio to 1.6.0 and use cargo2android.json to generate bp file 1. Only generate libtokio by cargo2android.json 2. Put all the test targets to patch, which might let future upgrade easier. 3. Add some tests removed by previous upgrade back. 4. Disable some tests that doesn't work for Android. Test: atest Bug: 189140417 Change-Id: I141d548e667cbf33966e868a6eedbe4b50ab56ed --- .cargo_vcs_info.json | 2 +- Android.bp | 628 ++++++++++++++++-- CHANGELOG.md | 47 ++ Cargo.toml | 10 +- Cargo.toml.orig | 10 +- METADATA | 8 +- TEST_MAPPING | 99 +++ cargo2android.json | 11 + patches/Android.bp.patch | 1324 ++++++++++++++++++++++++++++--------- patches/io_mem_stream.patch | 12 + patches/rt_common.patch | 12 + patches/task_blocking.rs | 12 + src/fs/file.rs | 191 +++++- src/io/driver/mod.rs | 2 +- src/io/driver/registration.rs | 7 +- src/io/poll_evented.rs | 26 +- src/io/util/async_write_ext.rs | 58 +- src/io/util/buf_reader.rs | 124 +++- src/io/util/buf_stream.rs | 4 + src/io/util/buf_writer.rs | 62 +- src/io/util/copy_bidirectional.rs | 1 + src/io/util/lines.rs | 2 +- src/io/util/mem.rs | 26 +- src/io/util/mod.rs | 1 + src/io/util/read_line.rs | 4 +- src/io/util/read_to_string.rs | 2 +- src/io/util/split.rs | 2 +- src/io/util/write_all_buf.rs | 56 ++ src/macros/select.rs | 2 +- src/net/tcp/stream.rs | 140 ++++ src/net/unix/mod.rs | 4 + src/net/unix/stream.rs | 140 ++++ src/process/mod.rs | 13 +- src/process/unix/driver.rs | 94 +-- src/process/unix/mod.rs | 9 +- src/process/unix/orphan.rs | 195 ++++-- src/process/unix/reap.rs | 6 - src/runtime/blocking/pool.rs | 4 +- src/runtime/driver.rs | 8 +- src/runtime/task/core.rs | 2 +- src/signal/unix.rs | 25 +- src/sync/barrier.rs | 18 +- src/sync/mod.rs | 2 +- src/sync/mpsc/bounded.rs | 301 ++++++++- src/sync/mpsc/mod.rs | 2 +- src/sync/mutex.rs | 199 +++++- src/sync/notify.rs | 12 +- src/sync/tests/loom_notify.rs | 33 +- src/sync/watch.rs | 2 +- src/task/blocking.rs | 53 +- src/task/mod.rs | 5 + src/task/unconstrained.rs | 2 + src/time/clock.rs | 15 +- src/time/driver/entry.rs | 2 +- src/time/driver/handle.rs | 2 +- src/time/interval.rs | 5 + src/util/linked_list.rs | 4 +- src/util/rand.rs | 6 +- src/util/slab.rs | 2 +- tests/fs_file.rs | 21 + tests/io_buf_reader.rs | 362 ++++++++++ tests/io_buf_writer.rs | 251 +++++++ tests/io_mem_stream.rs | 30 +- tests/io_write_all_buf.rs | 96 +++ tests/macros_select.rs | 3 - tests/macros_test.rs | 15 +- tests/rt_common.rs | 1 + tests/task_blocking.rs | 1 + tests/tcp_stream.rs | 49 +- tests/time_pause.rs | 169 ++++- tests/time_sleep.rs | 38 +- tests/uds_stream.rs | 49 +- 72 files changed, 4452 insertions(+), 683 deletions(-) create mode 100644 cargo2android.json create mode 100644 patches/io_mem_stream.patch create mode 100644 patches/rt_common.patch create mode 100644 patches/task_blocking.rs create mode 100644 src/io/util/write_all_buf.rs create mode 100644 tests/io_buf_reader.rs create mode 100644 tests/io_buf_writer.rs create mode 100644 tests/io_write_all_buf.rs diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index b75e29b..13071e3 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "a5ee2f0d3d78daa01e2c6c12d22b82474dc5c32a" + "sha1": "580dc9594c8e42b7d2dec60447f35238b8dfcf35" } } diff --git a/Android.bp b/Android.bp index 222916b..d289f14 100644 --- a/Android.bp +++ b/Android.bp @@ -1,4 +1,4 @@ -// This file is generated by cargo2android.py --device --run --features io-util,macros,rt-multi-thread,sync,net,fs,time --tests --patch=patches/Android.bp.patch. +// This file is generated by cargo2android.py --config cargo2android.json. // Do not modify this file as changes will be overridden on upgrade. package { @@ -58,7 +58,7 @@ rust_library { } rust_defaults { - name: "tokio_defaults", + name: "tokio_defaults_tokio", crate_name: "tokio", test_suites: ["general-tests"], auto_gen_config: true, @@ -99,9 +99,24 @@ rust_defaults { proc_macros: ["libtokio_macros"], } +rust_test_host { + name: "tokio_host_test_tests__require_full", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/_require_full.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests__require_full", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/_require_full.rs"], +} + rust_test_host { name: "tokio_host_test_tests_buffered", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/buffered.rs"], test_options: { unit_test: true, @@ -110,13 +125,28 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_buffered", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/buffered.rs"], } +rust_test_host { + name: "tokio_host_test_tests_io_async_fd", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_async_fd.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_async_fd", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_async_fd.rs"], +} + rust_test_host { name: "tokio_host_test_tests_io_async_read", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_async_read.rs"], test_options: { unit_test: true, @@ -125,13 +155,43 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_async_read", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_async_read.rs"], } +rust_test_host { + name: "tokio_host_test_tests_io_chain", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_chain.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_chain", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_chain.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_io_copy", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_copy.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_copy", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_copy.rs"], +} + rust_test_host { name: "tokio_host_test_tests_io_copy_bidirectional", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_copy_bidirectional.rs"], test_options: { unit_test: true, @@ -140,13 +200,43 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_copy_bidirectional", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_copy_bidirectional.rs"], } +rust_test_host { + name: "tokio_host_test_tests_io_driver", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_driver.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_driver", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_driver.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_io_driver_drop", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_driver_drop.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_driver_drop", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_driver_drop.rs"], +} + rust_test_host { name: "tokio_host_test_tests_io_lines", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_lines.rs"], test_options: { unit_test: true, @@ -155,13 +245,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_lines", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_lines.rs"], } rust_test_host { name: "tokio_host_test_tests_io_mem_stream", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_mem_stream.rs"], test_options: { unit_test: true, @@ -170,13 +260,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_mem_stream", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_mem_stream.rs"], } rust_test_host { name: "tokio_host_test_tests_io_read", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_read.rs"], test_options: { unit_test: true, @@ -185,13 +275,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_read", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_read.rs"], } rust_test_host { name: "tokio_host_test_tests_io_read_buf", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_read_buf.rs"], test_options: { unit_test: true, @@ -200,13 +290,43 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_read_buf", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_read_buf.rs"], } +rust_test_host { + name: "tokio_host_test_tests_io_read_exact", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_read_exact.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_read_exact", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_read_exact.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_io_read_line", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_read_line.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_read_line", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_read_line.rs"], +} + rust_test_host { name: "tokio_host_test_tests_io_read_to_end", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_read_to_end.rs"], test_options: { unit_test: true, @@ -215,13 +335,58 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_read_to_end", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_read_to_end.rs"], } +rust_test_host { + name: "tokio_host_test_tests_io_read_to_string", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_read_to_string.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_read_to_string", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_read_to_string.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_io_read_until", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_read_until.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_read_until", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_read_until.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_io_split", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_split.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_split", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_split.rs"], +} + rust_test_host { name: "tokio_host_test_tests_io_take", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_take.rs"], test_options: { unit_test: true, @@ -230,13 +395,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_take", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_take.rs"], } rust_test_host { name: "tokio_host_test_tests_io_write", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_write.rs"], test_options: { unit_test: true, @@ -245,13 +410,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_write", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_write.rs"], } rust_test_host { name: "tokio_host_test_tests_io_write_all", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_write_all.rs"], test_options: { unit_test: true, @@ -260,13 +425,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_write_all", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_write_all.rs"], } rust_test_host { name: "tokio_host_test_tests_io_write_buf", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_write_buf.rs"], test_options: { unit_test: true, @@ -275,13 +440,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_write_buf", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_write_buf.rs"], } rust_test_host { name: "tokio_host_test_tests_io_write_int", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_write_int.rs"], test_options: { unit_test: true, @@ -290,13 +455,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_write_int", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_write_int.rs"], } rust_test_host { name: "tokio_host_test_tests_macros_join", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/macros_join.rs"], test_options: { unit_test: true, @@ -305,13 +470,103 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_macros_join", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/macros_join.rs"], } +rust_test_host { + name: "tokio_host_test_tests_macros_pin", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/macros_pin.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_macros_pin", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/macros_pin.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_macros_select", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/macros_select.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_macros_select", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/macros_select.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_macros_test", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/macros_test.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_macros_test", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/macros_test.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_macros_try_join", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/macros_try_join.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_macros_try_join", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/macros_try_join.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_net_bind_resource", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/net_bind_resource.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_net_bind_resource", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/net_bind_resource.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_net_lookup_host", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/net_lookup_host.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_net_lookup_host", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/net_lookup_host.rs"], +} + rust_test_host { name: "tokio_host_test_tests_no_rt", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/no_rt.rs"], test_options: { unit_test: true, @@ -320,13 +575,28 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_no_rt", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/no_rt.rs"], } +rust_test_host { + name: "tokio_host_test_tests_process_kill_on_drop", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/process_kill_on_drop.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_process_kill_on_drop", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/process_kill_on_drop.rs"], +} + rust_test_host { name: "tokio_host_test_tests_rt_basic", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/rt_basic.rs"], test_options: { unit_test: true, @@ -335,13 +605,29 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_rt_basic", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/rt_basic.rs"], } +rust_test_host { + name: "tokio_host_test_tests_rt_common", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/rt_common.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_rt_common", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/rt_common.rs"], +} + + rust_test_host { name: "tokio_host_test_tests_rt_threaded", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/rt_threaded.rs"], test_options: { unit_test: true, @@ -350,13 +636,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_rt_threaded", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/rt_threaded.rs"], } rust_test_host { name: "tokio_host_test_tests_sync_barrier", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_barrier.rs"], test_options: { unit_test: true, @@ -365,13 +651,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_sync_barrier", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_barrier.rs"], } rust_test_host { name: "tokio_host_test_tests_sync_broadcast", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_broadcast.rs"], test_options: { unit_test: true, @@ -380,13 +666,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_sync_broadcast", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_broadcast.rs"], } rust_test_host { name: "tokio_host_test_tests_sync_errors", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_errors.rs"], test_options: { unit_test: true, @@ -395,13 +681,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_sync_errors", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_errors.rs"], } rust_test_host { name: "tokio_host_test_tests_sync_mpsc", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_mpsc.rs"], test_options: { unit_test: true, @@ -410,13 +696,28 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_sync_mpsc", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_mpsc.rs"], } +rust_test_host { + name: "tokio_host_test_tests_sync_mutex", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_mutex.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_sync_mutex", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_mutex.rs"], +} + rust_test_host { name: "tokio_host_test_tests_sync_mutex_owned", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_mutex_owned.rs"], test_options: { unit_test: true, @@ -425,13 +726,43 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_sync_mutex_owned", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_mutex_owned.rs"], } +rust_test_host { + name: "tokio_host_test_tests_sync_notify", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_notify.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_sync_notify", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_notify.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_sync_oneshot", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_oneshot.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_sync_oneshot", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_oneshot.rs"], +} + rust_test_host { name: "tokio_host_test_tests_sync_rwlock", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_rwlock.rs"], test_options: { unit_test: true, @@ -440,13 +771,43 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_sync_rwlock", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_rwlock.rs"], } +rust_test_host { + name: "tokio_host_test_tests_sync_semaphore", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_semaphore.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_sync_semaphore", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_semaphore.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_sync_semaphore_owned", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_semaphore_owned.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_sync_semaphore_owned", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_semaphore_owned.rs"], +} + rust_test_host { name: "tokio_host_test_tests_sync_watch", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_watch.rs"], test_options: { unit_test: true, @@ -455,13 +816,43 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_sync_watch", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_watch.rs"], } +rust_test_host { + name: "tokio_host_test_tests_task_abort", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/task_abort.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_task_abort", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/task_abort.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_task_blocking", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/task_blocking.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_task_blocking", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/task_blocking.rs"], +} + rust_test_host { name: "tokio_host_test_tests_task_local", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/task_local.rs"], test_options: { unit_test: true, @@ -470,13 +861,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_task_local", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/task_local.rs"], } rust_test_host { name: "tokio_host_test_tests_task_local_set", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/task_local_set.rs"], test_options: { unit_test: true, @@ -485,13 +876,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_task_local_set", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/task_local_set.rs"], } rust_test_host { name: "tokio_host_test_tests_tcp_accept", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/tcp_accept.rs"], test_options: { unit_test: true, @@ -500,13 +891,28 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_tcp_accept", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/tcp_accept.rs"], } +rust_test_host { + name: "tokio_host_test_tests_tcp_connect", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_connect.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_tcp_connect", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_connect.rs"], +} + rust_test_host { name: "tokio_host_test_tests_tcp_echo", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/tcp_echo.rs"], test_options: { unit_test: true, @@ -515,13 +921,28 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_tcp_echo", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/tcp_echo.rs"], } +rust_test_host { + name: "tokio_host_test_tests_tcp_into_split", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_into_split.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_tcp_into_split", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_into_split.rs"], +} + rust_test_host { name: "tokio_host_test_tests_tcp_into_std", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/tcp_into_std.rs"], test_options: { unit_test: true, @@ -530,13 +951,28 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_tcp_into_std", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/tcp_into_std.rs"], } +rust_test_host { + name: "tokio_host_test_tests_tcp_peek", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_peek.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_tcp_peek", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_peek.rs"], +} + rust_test_host { name: "tokio_host_test_tests_tcp_shutdown", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/tcp_shutdown.rs"], test_options: { unit_test: true, @@ -545,13 +981,43 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_tcp_shutdown", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/tcp_shutdown.rs"], } +rust_test_host { + name: "tokio_host_test_tests_tcp_socket", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_socket.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_tcp_socket", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_socket.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_tcp_split", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_split.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_tcp_split", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_split.rs"], +} + rust_test_host { name: "tokio_host_test_tests_time_rt", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/time_rt.rs"], test_options: { unit_test: true, @@ -560,13 +1026,43 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_time_rt", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/time_rt.rs"], } +rust_test_host { + name: "tokio_host_test_tests_udp", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/udp.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_udp", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/udp.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_uds_cred", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/uds_cred.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_uds_cred", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/uds_cred.rs"], +} + rust_test_host { name: "tokio_host_test_tests_uds_split", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/uds_split.rs"], test_options: { unit_test: true, @@ -575,6 +1071,6 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_uds_split", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/uds_split.rs"], } diff --git a/CHANGELOG.md b/CHANGELOG.md index 0808920..2349b0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,49 @@ +# 1.6.0 (May 14, 2021) + +### Added + +- fs: try doing a non-blocking read before punting to the threadpool ([#3518]) +- io: add `write_all_buf` to `AsyncWriteExt` ([#3737]) +- io: implement `AsyncSeek` for `BufReader`, `BufWriter`, and `BufStream` ([#3491]) +- net: support non-blocking vectored I/O ([#3761]) +- sync: add `mpsc::Sender::{reserve_owned, try_reserve_owned}` ([#3704]) +- sync: add a `MutexGuard::map` method that returns a `MappedMutexGuard` ([#2472]) +- time: add getter for Interval's period ([#3705]) + +### Fixed + +- io: wake pending writers on `DuplexStream` close ([#3756]) +- process: avoid redundant effort to reap orphan processes ([#3743]) +- signal: use `std::os::raw::c_int` instead of `libc::c_int` on public API ([#3774]) +- sync: preserve permit state in `notify_waiters` ([#3660]) +- task: update `JoinHandle` panic message ([#3727]) +- time: prevent `time::advance` from going too far ([#3712]) + +### Documented + +- net: hide `net::unix::datagram` module from docs ([#3775]) +- process: updated example ([#3748]) +- sync: `Barrier` doc should use task, not thread ([#3780]) +- task: update documentation on `block_in_place` ([#3753]) + +[#2472]: https://github.com/tokio-rs/tokio/pull/2472 +[#3491]: https://github.com/tokio-rs/tokio/pull/3491 +[#3518]: https://github.com/tokio-rs/tokio/pull/3518 +[#3660]: https://github.com/tokio-rs/tokio/pull/3660 +[#3704]: https://github.com/tokio-rs/tokio/pull/3704 +[#3705]: https://github.com/tokio-rs/tokio/pull/3705 +[#3712]: https://github.com/tokio-rs/tokio/pull/3712 +[#3727]: https://github.com/tokio-rs/tokio/pull/3727 +[#3737]: https://github.com/tokio-rs/tokio/pull/3737 +[#3743]: https://github.com/tokio-rs/tokio/pull/3743 +[#3748]: https://github.com/tokio-rs/tokio/pull/3748 +[#3753]: https://github.com/tokio-rs/tokio/pull/3753 +[#3756]: https://github.com/tokio-rs/tokio/pull/3756 +[#3761]: https://github.com/tokio-rs/tokio/pull/3761 +[#3774]: https://github.com/tokio-rs/tokio/pull/3774 +[#3775]: https://github.com/tokio-rs/tokio/pull/3775 +[#3780]: https://github.com/tokio-rs/tokio/pull/3780 + # 1.5.0 (April 12, 2021) ### Added @@ -19,6 +65,7 @@ - rt: fix panic in `JoinHandle::abort()` when called from other threads ([#3672]) - sync: don't panic in `oneshot::try_recv` ([#3674]) - sync: fix notifications getting dropped on receiver drop ([#3652]) +- sync: fix `Semaphore` permit overflow calculation ([#3644]) ### Documented diff --git a/Cargo.toml b/Cargo.toml index 05ab658..85a245a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,11 +13,11 @@ [package] edition = "2018" name = "tokio" -version = "1.5.0" +version = "1.6.0" authors = ["Tokio Contributors "] description = "An event-driven, non-blocking I/O platform for writing asynchronous I/O\nbacked applications.\n" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio/1.5.0/tokio/" +documentation = "https://docs.rs/tokio/1.6.0/tokio/" readme = "README.md" keywords = ["io", "async", "non-blocking", "futures"] categories = ["asynchronous", "network-programming"] @@ -85,7 +85,7 @@ version = "1" [features] default = [] -fs = [] +fs = ["libc"] full = ["fs", "io-util", "io-std", "macros", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"] io-std = [] io-util = ["memchr", "bytes"] @@ -107,14 +107,14 @@ features = ["std"] optional = true default-features = false [target."cfg(unix)".dependencies.libc] -version = "0.2.42" +version = "0.2.87" optional = true [target."cfg(unix)".dependencies.signal-hook-registry] version = "1.1.1" optional = true [target."cfg(unix)".dev-dependencies.libc] -version = "0.2.42" +version = "0.2.87" [target."cfg(unix)".dev-dependencies.nix] version = "0.19.0" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 5e53c3f..005767e 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -7,12 +7,12 @@ name = "tokio" # - README.md # - Update CHANGELOG.md. # - Create "v1.0.x" git tag. -version = "1.5.0" +version = "1.6.0" edition = "2018" authors = ["Tokio Contributors "] license = "MIT" readme = "README.md" -documentation = "https://docs.rs/tokio/1.5.0/tokio/" +documentation = "https://docs.rs/tokio/1.6.0/tokio/" repository = "https://github.com/tokio-rs/tokio" homepage = "https://tokio.rs" description = """ @@ -42,7 +42,7 @@ full = [ "time", ] -fs = [] +fs = ["libc"] io-util = ["memchr", "bytes"] # stdin, stdout, stderr io-std = [] @@ -103,11 +103,11 @@ parking_lot = { version = "0.11.0", optional = true } tracing = { version = "0.1.21", default-features = false, features = ["std"], optional = true } # Not in full [target.'cfg(unix)'.dependencies] -libc = { version = "0.2.42", optional = true } +libc = { version = "0.2.87", optional = true } signal-hook-registry = { version = "1.1.1", optional = true } [target.'cfg(unix)'.dev-dependencies] -libc = { version = "0.2.42" } +libc = { version = "0.2.87" } nix = { version = "0.19.0" } [target.'cfg(windows)'.dependencies.winapi] diff --git a/METADATA b/METADATA index 0a2c7af..0dcfe0b 100644 --- a/METADATA +++ b/METADATA @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/tokio/tokio-1.5.0.crate" + value: "https://static.crates.io/crates/tokio/tokio-1.6.0.crate" } - version: "1.5.0" + version: "1.6.0" license_type: NOTICE last_upgrade_date { year: 2021 - month: 4 - day: 21 + month: 5 + day: 26 } } diff --git a/TEST_MAPPING b/TEST_MAPPING index 686fbda..6c266ef 100644 --- a/TEST_MAPPING +++ b/TEST_MAPPING @@ -19,15 +19,33 @@ { "name": "tokio-test_device_test_tests_macros" }, + { + "name": "tokio_device_test_tests__require_full" + }, { "name": "tokio_device_test_tests_buffered" }, + { + "name": "tokio_device_test_tests_io_async_fd" + }, { "name": "tokio_device_test_tests_io_async_read" }, + { + "name": "tokio_device_test_tests_io_chain" + }, + { + "name": "tokio_device_test_tests_io_copy" + }, { "name": "tokio_device_test_tests_io_copy_bidirectional" }, + { + "name": "tokio_device_test_tests_io_driver" + }, + { + "name": "tokio_device_test_tests_io_driver_drop" + }, { "name": "tokio_device_test_tests_io_lines" }, @@ -40,9 +58,24 @@ { "name": "tokio_device_test_tests_io_read_buf" }, + { + "name": "tokio_device_test_tests_io_read_exact" + }, + { + "name": "tokio_device_test_tests_io_read_line" + }, { "name": "tokio_device_test_tests_io_read_to_end" }, + { + "name": "tokio_device_test_tests_io_read_to_string" + }, + { + "name": "tokio_device_test_tests_io_read_until" + }, + { + "name": "tokio_device_test_tests_io_split" + }, { "name": "tokio_device_test_tests_io_take" }, @@ -61,12 +94,36 @@ { "name": "tokio_device_test_tests_macros_join" }, + { + "name": "tokio_device_test_tests_macros_pin" + }, + { + "name": "tokio_device_test_tests_macros_select" + }, + { + "name": "tokio_device_test_tests_macros_test" + }, + { + "name": "tokio_device_test_tests_macros_try_join" + }, + { + "name": "tokio_device_test_tests_net_bind_resource" + }, + { + "name": "tokio_device_test_tests_net_lookup_host" + }, { "name": "tokio_device_test_tests_no_rt" }, + { + "name": "tokio_device_test_tests_process_kill_on_drop" + }, { "name": "tokio_device_test_tests_rt_basic" }, + { + "name": "tokio_device_test_tests_rt_common" + }, { "name": "tokio_device_test_tests_rt_threaded" }, @@ -82,15 +139,36 @@ { "name": "tokio_device_test_tests_sync_mpsc" }, + { + "name": "tokio_device_test_tests_sync_mutex" + }, { "name": "tokio_device_test_tests_sync_mutex_owned" }, + { + "name": "tokio_device_test_tests_sync_notify" + }, + { + "name": "tokio_device_test_tests_sync_oneshot" + }, { "name": "tokio_device_test_tests_sync_rwlock" }, + { + "name": "tokio_device_test_tests_sync_semaphore" + }, + { + "name": "tokio_device_test_tests_sync_semaphore_owned" + }, { "name": "tokio_device_test_tests_sync_watch" }, + { + "name": "tokio_device_test_tests_task_abort" + }, + { + "name": "tokio_device_test_tests_task_blocking" + }, { "name": "tokio_device_test_tests_task_local" }, @@ -100,18 +178,39 @@ { "name": "tokio_device_test_tests_tcp_accept" }, + { + "name": "tokio_device_test_tests_tcp_connect" + }, { "name": "tokio_device_test_tests_tcp_echo" }, + { + "name": "tokio_device_test_tests_tcp_into_split" + }, { "name": "tokio_device_test_tests_tcp_into_std" }, + { + "name": "tokio_device_test_tests_tcp_peek" + }, { "name": "tokio_device_test_tests_tcp_shutdown" }, + { + "name": "tokio_device_test_tests_tcp_socket" + }, + { + "name": "tokio_device_test_tests_tcp_split" + }, { "name": "tokio_device_test_tests_time_rt" }, + { + "name": "tokio_device_test_tests_udp" + }, + { + "name": "tokio_device_test_tests_uds_cred" + }, { "name": "tokio_device_test_tests_uds_split" } diff --git a/cargo2android.json b/cargo2android.json new file mode 100644 index 0000000..a4c19a2 --- /dev/null +++ b/cargo2android.json @@ -0,0 +1,11 @@ +{ + "apex-available": [ + "//apex_available:platform", + "com.android.resolv" + ], + "min_sdk_version": "29", + "features": "io-util,macros,rt-multi-thread,sync,net,fs,time", + "device": true, + "run": true, + "patch": "patches/Android.bp.patch" +} \ No newline at end of file diff --git a/patches/Android.bp.patch b/patches/Android.bp.patch index 759d95f..645b3c3 100644 --- a/patches/Android.bp.patch +++ b/patches/Android.bp.patch @@ -1,310 +1,1026 @@ diff --git a/Android.bp b/Android.bp -index 6b8ca5b..222916b 100644 +index e2a61ba..d289f14 100644 --- a/Android.bp +++ b/Android.bp -@@ -50,6 +50,11 @@ rust_library { - "libpin_project_lite", +@@ -56,3 +56,1021 @@ rust_library { ], - proc_macros: ["libtokio_macros"], -+ apex_available: [ -+ "//apex_available:platform", -+ "com.android.resolv", -+ ], -+ min_sdk_version: "29", + min_sdk_version: "29", } - - rust_defaults { -@@ -61,6 +66,7 @@ rust_defaults { - features: [ - "bytes", - "fs", ++ ++rust_defaults { ++ name: "tokio_defaults_tokio", ++ crate_name: "tokio", ++ test_suites: ["general-tests"], ++ auto_gen_config: true, ++ edition: "2018", ++ features: [ ++ "bytes", ++ "fs", + "full", - "io-util", - "libc", - "macros", -@@ -108,36 +114,6 @@ rust_test { - srcs: ["tests/buffered.rs"], - } - --rust_test_host { -- name: "tokio_host_test_tests_fs_file", -- defaults: ["tokio_defaults"], -- srcs: ["tests/fs_file.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_fs_file", -- defaults: ["tokio_defaults"], -- srcs: ["tests/fs_file.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_fs_link", -- defaults: ["tokio_defaults"], -- srcs: ["tests/fs_link.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_fs_link", -- defaults: ["tokio_defaults"], -- srcs: ["tests/fs_link.rs"], --} -- - rust_test_host { - name: "tokio_host_test_tests_io_async_read", - defaults: ["tokio_defaults"], -@@ -348,51 +324,6 @@ rust_test { - srcs: ["tests/no_rt.rs"], - } - --rust_test_host { -- name: "tokio_host_test_tests_process_issue_2174", -- defaults: ["tokio_defaults"], -- srcs: ["tests/process_issue_2174.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_process_issue_2174", -- defaults: ["tokio_defaults"], -- srcs: ["tests/process_issue_2174.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_process_issue_42", -- defaults: ["tokio_defaults"], -- srcs: ["tests/process_issue_42.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_process_issue_42", -- defaults: ["tokio_defaults"], -- srcs: ["tests/process_issue_42.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_process_smoke", -- defaults: ["tokio_defaults"], -- srcs: ["tests/process_smoke.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_process_smoke", -- defaults: ["tokio_defaults"], -- srcs: ["tests/process_smoke.rs"], --} -- - rust_test_host { - name: "tokio_host_test_tests_rt_basic", - defaults: ["tokio_defaults"], -@@ -423,111 +354,6 @@ rust_test { - srcs: ["tests/rt_threaded.rs"], - } - --rust_test_host { -- name: "tokio_host_test_tests_signal_ctrl_c", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_ctrl_c.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_signal_ctrl_c", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_ctrl_c.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_signal_drop_rt", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_drop_rt.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_signal_drop_rt", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_drop_rt.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_signal_drop_signal", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_drop_signal.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_signal_drop_signal", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_drop_signal.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_signal_multi_rt", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_multi_rt.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_signal_multi_rt", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_multi_rt.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_signal_no_rt", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_no_rt.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_signal_no_rt", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_no_rt.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_signal_notify_both", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_notify_both.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_signal_notify_both", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_notify_both.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_signal_twice", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_twice.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_signal_twice", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_twice.rs"], --} -- - rust_test_host { - name: "tokio_host_test_tests_sync_barrier", - defaults: ["tokio_defaults"], -@@ -603,21 +429,6 @@ rust_test { - srcs: ["tests/sync_mutex_owned.rs"], - } - --rust_test_host { -- name: "tokio_host_test_tests_sync_once_cell", -- defaults: ["tokio_defaults"], -- srcs: ["tests/sync_once_cell.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_sync_once_cell", -- defaults: ["tokio_defaults"], -- srcs: ["tests/sync_once_cell.rs"], --} -- - rust_test_host { - name: "tokio_host_test_tests_sync_rwlock", - defaults: ["tokio_defaults"], -@@ -738,21 +549,6 @@ rust_test { - srcs: ["tests/tcp_shutdown.rs"], - } - --rust_test_host { -- name: "tokio_host_test_tests_time_interval", -- defaults: ["tokio_defaults"], -- srcs: ["tests/time_interval.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_time_interval", -- defaults: ["tokio_defaults"], -- srcs: ["tests/time_interval.rs"], --} -- - rust_test_host { - name: "tokio_host_test_tests_time_rt", - defaults: ["tokio_defaults"], -@@ -768,21 +564,6 @@ rust_test { - srcs: ["tests/time_rt.rs"], - } - --rust_test_host { -- name: "tokio_host_test_tests_time_timeout", -- defaults: ["tokio_defaults"], -- srcs: ["tests/time_timeout.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_time_timeout", -- defaults: ["tokio_defaults"], -- srcs: ["tests/time_timeout.rs"], --} -- - rust_test_host { - name: "tokio_host_test_tests_uds_split", - defaults: ["tokio_defaults"], -@@ -797,18 +578,3 @@ rust_test { - defaults: ["tokio_defaults"], - srcs: ["tests/uds_split.rs"], - } -- --rust_test_host { -- name: "tokio_host_test_tests_uds_stream", -- defaults: ["tokio_defaults"], -- srcs: ["tests/uds_stream.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_uds_stream", -- defaults: ["tokio_defaults"], -- srcs: ["tests/uds_stream.rs"], --} ++ "io-util", ++ "libc", ++ "macros", ++ "memchr", ++ "mio", ++ "net", ++ "num_cpus", ++ "rt", ++ "rt-multi-thread", ++ "sync", ++ "time", ++ "tokio-macros", ++ ], ++ cfgs: ["tokio_track_caller"], ++ rustlibs: [ ++ "libasync_stream", ++ "libbytes", ++ "libfutures", ++ "liblibc", ++ "libmemchr", ++ "libmio", ++ "libnix", ++ "libnum_cpus", ++ "libpin_project_lite", ++ "librand", ++ "libtokio", ++ "libtokio_stream", ++ "libtokio_test", ++ ], ++ proc_macros: ["libtokio_macros"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests__require_full", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/_require_full.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests__require_full", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/_require_full.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_buffered", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/buffered.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_buffered", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/buffered.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_async_fd", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_async_fd.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_async_fd", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_async_fd.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_async_read", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_async_read.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_async_read", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_async_read.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_chain", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_chain.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_chain", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_chain.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_copy", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_copy.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_copy", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_copy.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_copy_bidirectional", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_copy_bidirectional.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_copy_bidirectional", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_copy_bidirectional.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_driver", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_driver.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_driver", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_driver.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_driver_drop", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_driver_drop.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_driver_drop", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_driver_drop.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_lines", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_lines.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_lines", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_lines.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_mem_stream", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_mem_stream.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_mem_stream", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_mem_stream.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_read", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_read", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_read_buf", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_buf.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_read_buf", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_buf.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_read_exact", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_exact.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_read_exact", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_exact.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_read_line", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_line.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_read_line", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_line.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_read_to_end", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_to_end.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_read_to_end", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_to_end.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_read_to_string", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_to_string.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_read_to_string", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_to_string.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_read_until", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_until.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_read_until", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_until.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_split", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_split.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_split", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_split.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_take", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_take.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_take", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_take.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_write", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_write.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_write", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_write.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_write_all", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_write_all.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_write_all", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_write_all.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_write_buf", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_write_buf.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_write_buf", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_write_buf.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_write_int", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_write_int.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_write_int", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_write_int.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_macros_join", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_join.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_macros_join", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_join.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_macros_pin", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_pin.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_macros_pin", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_pin.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_macros_select", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_select.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_macros_select", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_select.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_macros_test", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_test.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_macros_test", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_test.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_macros_try_join", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_try_join.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_macros_try_join", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_try_join.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_net_bind_resource", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/net_bind_resource.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_net_bind_resource", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/net_bind_resource.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_net_lookup_host", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/net_lookup_host.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_net_lookup_host", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/net_lookup_host.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_no_rt", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/no_rt.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_no_rt", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/no_rt.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_process_kill_on_drop", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/process_kill_on_drop.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_process_kill_on_drop", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/process_kill_on_drop.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_rt_basic", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/rt_basic.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_rt_basic", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/rt_basic.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_rt_common", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/rt_common.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_rt_common", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/rt_common.rs"], ++} ++ ++ ++rust_test_host { ++ name: "tokio_host_test_tests_rt_threaded", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/rt_threaded.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_rt_threaded", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/rt_threaded.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_barrier", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_barrier.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_barrier", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_barrier.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_broadcast", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_broadcast.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_broadcast", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_broadcast.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_errors", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_errors.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_errors", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_errors.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_mpsc", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_mpsc.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_mpsc", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_mpsc.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_mutex", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_mutex.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_mutex", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_mutex.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_mutex_owned", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_mutex_owned.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_mutex_owned", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_mutex_owned.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_notify", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_notify.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_notify", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_notify.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_oneshot", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_oneshot.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_oneshot", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_oneshot.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_rwlock", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_rwlock.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_rwlock", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_rwlock.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_semaphore", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_semaphore.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_semaphore", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_semaphore.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_semaphore_owned", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_semaphore_owned.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_semaphore_owned", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_semaphore_owned.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_watch", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_watch.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_watch", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_watch.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_task_abort", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/task_abort.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_task_abort", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/task_abort.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_task_blocking", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/task_blocking.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_task_blocking", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/task_blocking.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_task_local", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/task_local.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_task_local", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/task_local.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_task_local_set", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/task_local_set.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_task_local_set", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/task_local_set.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_accept", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_accept.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_accept", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_accept.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_connect", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_connect.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_connect", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_connect.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_echo", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_echo.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_echo", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_echo.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_into_split", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_into_split.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_into_split", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_into_split.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_into_std", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_into_std.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_into_std", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_into_std.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_peek", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_peek.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_peek", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_peek.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_shutdown", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_shutdown.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_shutdown", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_shutdown.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_socket", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_socket.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_socket", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_socket.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_split", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_split.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_split", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_split.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_time_rt", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/time_rt.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_time_rt", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/time_rt.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_udp", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/udp.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_udp", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/udp.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_uds_cred", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/uds_cred.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_uds_cred", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/uds_cred.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_uds_split", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/uds_split.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_uds_split", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/uds_split.rs"], ++} diff --git a/patches/io_mem_stream.patch b/patches/io_mem_stream.patch new file mode 100644 index 0000000..c21ce18 --- /dev/null +++ b/patches/io_mem_stream.patch @@ -0,0 +1,12 @@ +diff --git a/tests/io_mem_stream.rs b/tests/io_mem_stream.rs +index 01baa53..520391a 100644 +--- a/tests/io_mem_stream.rs ++++ b/tests/io_mem_stream.rs +@@ -63,6 +63,7 @@ async fn disconnect() { + } + + #[tokio::test] ++#[cfg(not(target_os = "android"))] + async fn disconnect_reader() { + let (a, mut b) = duplex(2); + diff --git a/patches/rt_common.patch b/patches/rt_common.patch new file mode 100644 index 0000000..1444cfe --- /dev/null +++ b/patches/rt_common.patch @@ -0,0 +1,12 @@ +diff --git a/tests/rt_common.rs b/tests/rt_common.rs +index cb1d0f6..e5fc7a9 100644 +--- a/tests/rt_common.rs ++++ b/tests/rt_common.rs +@@ -647,6 +647,7 @@ rt_test! { + } + + #[test] ++ #[cfg(not(target_os = "android"))] + fn panic_in_task() { + let rt = rt(); + let (tx, rx) = oneshot::channel(); diff --git a/patches/task_blocking.rs b/patches/task_blocking.rs new file mode 100644 index 0000000..7f4f7d4 --- /dev/null +++ b/patches/task_blocking.rs @@ -0,0 +1,12 @@ +diff --git a/tests/task_blocking.rs b/tests/task_blocking.rs +index 82bef8a..d9514d2 100644 +--- a/tests/task_blocking.rs ++++ b/tests/task_blocking.rs +@@ -114,6 +114,7 @@ fn can_enter_basic_rt_from_within_block_in_place() { + } + + #[test] ++#[cfg(not(target_os = "android"))] + fn useful_panic_message_when_dropping_rt_in_rt() { + use std::panic::{catch_unwind, AssertUnwindSafe}; + diff --git a/src/fs/file.rs b/src/fs/file.rs index 5c06e73..abd6e8c 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -491,14 +491,18 @@ impl AsyncRead for File { loop { match inner.state { Idle(ref mut buf_cell) => { - let mut buf = buf_cell.take().unwrap(); + let buf = buf_cell.as_mut().unwrap(); if !buf.is_empty() { buf.copy_to(dst); - *buf_cell = Some(buf); return Ready(Ok(())); } + if let Some(x) = try_nonblocking_read(me.std.as_ref(), dst) { + return Ready(x); + } + + let mut buf = buf_cell.take().unwrap(); buf.ensure_capacity_for(dst); let std = me.std.clone(); @@ -756,3 +760,186 @@ impl Inner { } } } + +#[cfg(all(target_os = "linux", not(test)))] +pub(crate) fn try_nonblocking_read( + file: &crate::fs::sys::File, + dst: &mut ReadBuf<'_>, +) -> Option> { + use std::sync::atomic::{AtomicBool, Ordering}; + + static NONBLOCKING_READ_SUPPORTED: AtomicBool = AtomicBool::new(true); + if !NONBLOCKING_READ_SUPPORTED.load(Ordering::Relaxed) { + return None; + } + let out = preadv2::preadv2_safe(file, dst, -1, preadv2::RWF_NOWAIT); + if let Err(err) = &out { + match err.raw_os_error() { + Some(libc::ENOSYS) => { + NONBLOCKING_READ_SUPPORTED.store(false, Ordering::Relaxed); + return None; + } + Some(libc::ENOTSUP) | Some(libc::EAGAIN) => return None, + _ => {} + } + } + Some(out) +} + +#[cfg(any(not(target_os = "linux"), test))] +pub(crate) fn try_nonblocking_read( + _file: &crate::fs::sys::File, + _dst: &mut ReadBuf<'_>, +) -> Option> { + None +} + +#[cfg(target_os = "linux")] +mod preadv2 { + use libc::{c_int, c_long, c_void, iovec, off_t, ssize_t}; + use std::os::unix::prelude::AsRawFd; + + use crate::io::ReadBuf; + + pub(crate) fn preadv2_safe( + file: &std::fs::File, + dst: &mut ReadBuf<'_>, + offset: off_t, + flags: c_int, + ) -> std::io::Result<()> { + unsafe { + /* We have to defend against buffer overflows manually here. The slice API makes + * this fairly straightforward. */ + let unfilled = dst.unfilled_mut(); + let mut iov = iovec { + iov_base: unfilled.as_mut_ptr() as *mut c_void, + iov_len: unfilled.len(), + }; + /* We take a File object rather than an fd as reading from a sensitive fd may confuse + * other unsafe code that assumes that only they have access to that fd. */ + let bytes_read = preadv2( + file.as_raw_fd(), + &mut iov as *mut iovec as *const iovec, + 1, + offset, + flags, + ); + if bytes_read < 0 { + Err(std::io::Error::last_os_error()) + } else { + /* preadv2 returns the number of bytes read, e.g. the number of bytes that have + * written into `unfilled`. So it's safe to assume that the data is now + * initialised */ + dst.assume_init(bytes_read as usize); + dst.advance(bytes_read as usize); + Ok(()) + } + } + } + + #[cfg(test)] + mod test { + use super::*; + + #[test] + fn test_preadv2_safe() { + use std::io::{Seek, Write}; + use std::mem::MaybeUninit; + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let filename = tmp.path().join("file"); + const MESSAGE: &[u8] = b"Hello this is a test"; + { + let mut f = std::fs::File::create(&filename).unwrap(); + f.write_all(MESSAGE).unwrap(); + } + let f = std::fs::File::open(&filename).unwrap(); + + let mut buf = [MaybeUninit::::new(0); 50]; + let mut br = ReadBuf::uninit(&mut buf); + + // Basic use: + preadv2_safe(&f, &mut br, 0, 0).unwrap(); + assert_eq!(br.initialized().len(), MESSAGE.len()); + assert_eq!(br.filled(), MESSAGE); + + // Here we check that offset works, but also that appending to a non-empty buffer + // behaves correctly WRT initialisation. + preadv2_safe(&f, &mut br, 5, 0).unwrap(); + assert_eq!(br.initialized().len(), MESSAGE.len() * 2 - 5); + assert_eq!(br.filled(), b"Hello this is a test this is a test".as_ref()); + + // offset of -1 means use the current cursor. This has not been advanced by the + // previous reads because we specified an offset there. + preadv2_safe(&f, &mut br, -1, 0).unwrap(); + assert_eq!(br.remaining(), 0); + assert_eq!( + br.filled(), + b"Hello this is a test this is a testHello this is a".as_ref() + ); + + // but the offset should have been advanced by that read + br.clear(); + preadv2_safe(&f, &mut br, -1, 0).unwrap(); + assert_eq!(br.filled(), b" test"); + + // This should be in cache, so RWF_NOWAIT should work, but it not being in cache + // (EAGAIN) or not supported by the underlying filesystem (ENOTSUP) is fine too. + br.clear(); + match preadv2_safe(&f, &mut br, 0, RWF_NOWAIT) { + Ok(()) => assert_eq!(br.filled(), MESSAGE), + Err(e) => assert!(matches!( + e.raw_os_error(), + Some(libc::ENOTSUP) | Some(libc::EAGAIN) + )), + } + + // Test handling large offsets + { + // I hope the underlying filesystem supports sparse files + let mut w = std::fs::OpenOptions::new() + .write(true) + .open(&filename) + .unwrap(); + w.set_len(0x1_0000_0000).unwrap(); + w.seek(std::io::SeekFrom::Start(0x1_0000_0000)).unwrap(); + w.write_all(b"This is a Large File").unwrap(); + } + + br.clear(); + preadv2_safe(&f, &mut br, 0x1_0000_0008, 0).unwrap(); + assert_eq!(br.filled(), b"a Large File"); + } + } + + fn pos_to_lohi(offset: off_t) -> (c_long, c_long) { + // 64-bit offset is split over high and low 32-bits on 32-bit architectures. + // 64-bit architectures still have high and low arguments, but only the low + // one is inspected. See pos_from_hilo in linux/fs/read_write.c. + const HALF_LONG_BITS: usize = core::mem::size_of::() * 8 / 2; + ( + offset as c_long, + // We want to shift this off_t value by size_of::(). We can't do + // it in one shift because if they're both 64-bits we'd be doing u64 >> 64 + // which is implementation defined. Instead do it in two halves: + ((offset >> HALF_LONG_BITS) >> HALF_LONG_BITS) as c_long, + ) + } + + pub(crate) const RWF_NOWAIT: c_int = 0x00000008; + unsafe fn preadv2( + fd: c_int, + iov: *const iovec, + iovcnt: c_int, + offset: off_t, + flags: c_int, + ) -> ssize_t { + // Call via libc::syscall rather than libc::preadv2. preadv2 is only supported by glibc + // and only since v2.26. By using syscall we don't need to worry about compatiblity with + // old glibc versions and it will work on Android and musl too. The downside is that you + // can't use `LD_PRELOAD` tricks any more to intercept these calls. + let (lo, hi) = pos_to_lohi(offset); + libc::syscall(libc::SYS_preadv2, fd, iov, iovcnt, lo, hi, flags) as ssize_t + } +} diff --git a/src/io/driver/mod.rs b/src/io/driver/mod.rs index fa2d420..52451c6 100644 --- a/src/io/driver/mod.rs +++ b/src/io/driver/mod.rs @@ -273,7 +273,7 @@ cfg_not_rt! { /// This function panics if there is no current reactor set, or if the `rt` /// feature flag is not enabled. pub(super) fn current() -> Self { - panic!(crate::util::error::CONTEXT_MISSING_ERROR) + panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) } } } diff --git a/src/io/driver/registration.rs b/src/io/driver/registration.rs index 8251fe6..7350be6 100644 --- a/src/io/driver/registration.rs +++ b/src/io/driver/registration.rs @@ -14,8 +14,9 @@ cfg_io_driver! { /// that it will receive task notifications on readiness. This is the lowest /// level API for integrating with a reactor. /// - /// The association between an I/O resource is made by calling [`new`]. Once - /// the association is established, it remains established until the + /// The association between an I/O resource is made by calling + /// [`new_with_interest_and_handle`]. + /// Once the association is established, it remains established until the /// registration instance is dropped. /// /// A registration instance represents two separate readiness streams. One @@ -36,7 +37,7 @@ cfg_io_driver! { /// stream. The write readiness event stream is only for `Ready::writable()` /// events. /// - /// [`new`]: method@Self::new + /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle /// [`poll_read_ready`]: method@Self::poll_read_ready` /// [`poll_write_ready`]: method@Self::poll_write_ready` #[derive(Debug)] diff --git a/src/io/poll_evented.rs b/src/io/poll_evented.rs index 47ae558..a31e6db 100644 --- a/src/io/poll_evented.rs +++ b/src/io/poll_evented.rs @@ -10,10 +10,10 @@ cfg_io_driver! { /// [`std::io::Write`] traits with the reactor that drives it. /// /// `PollEvented` uses [`Registration`] internally to take a type that - /// implements [`mio::Evented`] as well as [`std::io::Read`] and or + /// implements [`mio::event::Source`] as well as [`std::io::Read`] and or /// [`std::io::Write`] and associate it with a reactor that will drive it. /// - /// Once the [`mio::Evented`] type is wrapped by `PollEvented`, it can be + /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be /// used from within the future's execution model. As such, the /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`] /// implementations using the underlying I/O resource as well as readiness @@ -40,13 +40,13 @@ cfg_io_driver! { /// [`poll_read_ready`] again will also indicate read readiness. /// /// When the operation is attempted and is unable to succeed due to the I/O - /// resource not being ready, the caller must call [`clear_read_ready`] or - /// [`clear_write_ready`]. This clears the readiness state until a new + /// resource not being ready, the caller must call `clear_read_ready` or + /// `clear_write_ready`. This clears the readiness state until a new /// readiness event is received. /// /// This allows the caller to implement additional functions. For example, /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and - /// [`clear_read_ready`]. + /// `clear_read_ready`. /// /// ## Platform-specific events /// @@ -54,17 +54,11 @@ cfg_io_driver! { /// These events are included as part of the read readiness event stream. The /// write readiness event stream is only for `Ready::writable()` events. /// - /// [`std::io::Read`]: trait@std::io::Read - /// [`std::io::Write`]: trait@std::io::Write - /// [`AsyncRead`]: trait@AsyncRead - /// [`AsyncWrite`]: trait@AsyncWrite - /// [`mio::Evented`]: trait@mio::Evented - /// [`Registration`]: struct@Registration - /// [`TcpListener`]: struct@crate::net::TcpListener - /// [`clear_read_ready`]: method@Self::clear_read_ready - /// [`clear_write_ready`]: method@Self::clear_write_ready - /// [`poll_read_ready`]: method@Self::poll_read_ready - /// [`poll_write_ready`]: method@Self::poll_write_ready + /// [`AsyncRead`]: crate::io::AsyncRead + /// [`AsyncWrite`]: crate::io::AsyncWrite + /// [`TcpListener`]: crate::net::TcpListener + /// [`poll_read_ready`]: Registration::poll_read_ready + /// [`poll_write_ready`]: Registration::poll_write_ready pub(crate) struct PollEvented { io: Option, registration: Registration, diff --git a/src/io/util/async_write_ext.rs b/src/io/util/async_write_ext.rs index d011d82..2510ccd 100644 --- a/src/io/util/async_write_ext.rs +++ b/src/io/util/async_write_ext.rs @@ -2,6 +2,7 @@ use crate::io::util::flush::{flush, Flush}; use crate::io::util::shutdown::{shutdown, Shutdown}; use crate::io::util::write::{write, Write}; use crate::io::util::write_all::{write_all, WriteAll}; +use crate::io::util::write_all_buf::{write_all_buf, WriteAllBuf}; use crate::io::util::write_buf::{write_buf, WriteBuf}; use crate::io::util::write_int::{ WriteI128, WriteI128Le, WriteI16, WriteI16Le, WriteI32, WriteI32Le, WriteI64, WriteI64Le, @@ -159,7 +160,6 @@ cfg_io_util! { write_vectored(self, bufs) } - /// Writes a buffer into this writer, advancing the buffer's internal /// cursor. /// @@ -197,10 +197,11 @@ cfg_io_util! { /// /// # Examples /// - /// [`File`] implements `Read` and [`Cursor<&[u8]>`] implements [`Buf`]: + /// [`File`] implements [`AsyncWrite`] and [`Cursor`]`<&[u8]>` implements [`Buf`]: /// /// [`File`]: crate::fs::File /// [`Buf`]: bytes::Buf + /// [`Cursor`]: std::io::Cursor /// /// ```no_run /// use tokio::io::{self, AsyncWriteExt}; @@ -233,6 +234,59 @@ cfg_io_util! { write_buf(self, src) } + /// Attempts to write an entire buffer into this writer + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn write_all_buf(&mut self, buf: impl Buf) -> Result<(), io::Error> { + /// while buf.has_remaining() { + /// self.write_buf(&mut buf).await?; + /// } + /// } + /// ``` + /// + /// This method will continuously call [`write`] until + /// [`buf.has_remaining()`](bytes::Buf::has_remaining) returns false. This method will not + /// return until the entire buffer has been successfully written or an error occurs. The + /// first error generated will be returned. + /// + /// The buffer is advanced after each chunk is successfully written. After failure, + /// `src.chunk()` will return the chunk that failed to write. + /// + /// # Examples + /// + /// [`File`] implements [`AsyncWrite`] and [`Cursor`]`<&[u8]>` implements [`Buf`]: + /// + /// [`File`]: crate::fs::File + /// [`Buf`]: bytes::Buf + /// [`Cursor`]: std::io::Cursor + /// + /// ```no_run + /// use tokio::io::{self, AsyncWriteExt}; + /// use tokio::fs::File; + /// + /// use std::io::Cursor; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// let mut buffer = Cursor::new(b"data to write"); + /// + /// file.write_all_buf(&mut buffer).await?; + /// Ok(()) + /// } + /// ``` + /// + /// [`write`]: AsyncWriteExt::write + fn write_all_buf<'a, B>(&'a mut self, src: &'a mut B) -> WriteAllBuf<'a, Self, B> + where + Self: Sized + Unpin, + B: Buf, + { + write_all_buf(self, src) + } + /// Attempts to write an entire buffer into this writer. /// /// Equivalent to: diff --git a/src/io/util/buf_reader.rs b/src/io/util/buf_reader.rs index 271f61b..cc65ef2 100644 --- a/src/io/util/buf_reader.rs +++ b/src/io/util/buf_reader.rs @@ -1,11 +1,11 @@ use crate::io::util::DEFAULT_BUF_SIZE; -use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; use pin_project_lite::pin_project; -use std::io; +use std::io::{self, SeekFrom}; use std::pin::Pin; use std::task::{Context, Poll}; -use std::{cmp, fmt}; +use std::{cmp, fmt, mem}; pin_project! { /// The `BufReader` struct adds buffering to any reader. @@ -30,6 +30,7 @@ pin_project! { pub(super) buf: Box<[u8]>, pub(super) pos: usize, pub(super) cap: usize, + pub(super) seek_state: SeekState, } } @@ -48,6 +49,7 @@ impl BufReader { buf: buffer.into_boxed_slice(), pos: 0, cap: 0, + seek_state: SeekState::Init, } } @@ -141,6 +143,122 @@ impl AsyncBufRead for BufReader { } } +#[derive(Debug, Clone, Copy)] +pub(super) enum SeekState { + /// start_seek has not been called. + Init, + /// start_seek has been called, but poll_complete has not yet been called. + Start(SeekFrom), + /// Waiting for completion of the first poll_complete in the `n.checked_sub(remainder).is_none()` branch. + PendingOverflowed(i64), + /// Waiting for completion of poll_complete. + Pending, +} + +/// Seek to an offset, in bytes, in the underlying reader. +/// +/// The position used for seeking with `SeekFrom::Current(_)` is the +/// position the underlying reader would be at if the `BufReader` had no +/// internal buffer. +/// +/// Seeking always discards the internal buffer, even if the seek position +/// would otherwise fall within it. This guarantees that calling +/// `.into_inner()` immediately after a seek yields the underlying reader +/// at the same position. +/// +/// See [`AsyncSeek`] for more details. +/// +/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` +/// where `n` minus the internal buffer length overflows an `i64`, two +/// seeks will be performed instead of one. If the second seek returns +/// `Err`, the underlying reader will be left at the same position it would +/// have if you called `seek` with `SeekFrom::Current(0)`. +impl AsyncSeek for BufReader { + fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + // We needs to call seek operation multiple times. + // And we should always call both start_seek and poll_complete, + // as start_seek alone cannot guarantee that the operation will be completed. + // poll_complete receives a Context and returns a Poll, so it cannot be called + // inside start_seek. + *self.project().seek_state = SeekState::Start(pos); + Ok(()) + } + + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let res = match mem::replace(self.as_mut().project().seek_state, SeekState::Init) { + SeekState::Init => { + // 1.x AsyncSeek recommends calling poll_complete before start_seek. + // We don't have to guarantee that the value returned by + // poll_complete called without start_seek is correct, + // so we'll return 0. + return Poll::Ready(Ok(0)); + } + SeekState::Start(SeekFrom::Current(n)) => { + let remainder = (self.cap - self.pos) as i64; + // it should be safe to assume that remainder fits within an i64 as the alternative + // means we managed to allocate 8 exbibytes and that's absurd. + // But it's not out of the realm of possibility for some weird underlying reader to + // support seeking by i64::min_value() so we need to handle underflow when subtracting + // remainder. + if let Some(offset) = n.checked_sub(remainder) { + self.as_mut() + .get_pin_mut() + .start_seek(SeekFrom::Current(offset))?; + self.as_mut().get_pin_mut().poll_complete(cx)? + } else { + // seek backwards by our remainder, and then by the offset + self.as_mut() + .get_pin_mut() + .start_seek(SeekFrom::Current(-remainder))?; + if self.as_mut().get_pin_mut().poll_complete(cx)?.is_pending() { + *self.as_mut().project().seek_state = SeekState::PendingOverflowed(n); + return Poll::Pending; + } + + // https://github.com/rust-lang/rust/pull/61157#issuecomment-495932676 + self.as_mut().discard_buffer(); + + self.as_mut() + .get_pin_mut() + .start_seek(SeekFrom::Current(n))?; + self.as_mut().get_pin_mut().poll_complete(cx)? + } + } + SeekState::PendingOverflowed(n) => { + if self.as_mut().get_pin_mut().poll_complete(cx)?.is_pending() { + *self.as_mut().project().seek_state = SeekState::PendingOverflowed(n); + return Poll::Pending; + } + + // https://github.com/rust-lang/rust/pull/61157#issuecomment-495932676 + self.as_mut().discard_buffer(); + + self.as_mut() + .get_pin_mut() + .start_seek(SeekFrom::Current(n))?; + self.as_mut().get_pin_mut().poll_complete(cx)? + } + SeekState::Start(pos) => { + // Seeking with Start/End doesn't care about our buffer length. + self.as_mut().get_pin_mut().start_seek(pos)?; + self.as_mut().get_pin_mut().poll_complete(cx)? + } + SeekState::Pending => self.as_mut().get_pin_mut().poll_complete(cx)?, + }; + + match res { + Poll::Ready(res) => { + self.discard_buffer(); + Poll::Ready(Ok(res)) + } + Poll::Pending => { + *self.as_mut().project().seek_state = SeekState::Pending; + Poll::Pending + } + } + } +} + impl AsyncWrite for BufReader { fn poll_write( self: Pin<&mut Self>, diff --git a/src/io/util/buf_stream.rs b/src/io/util/buf_stream.rs index cc857e2..9238665 100644 --- a/src/io/util/buf_stream.rs +++ b/src/io/util/buf_stream.rs @@ -94,9 +94,11 @@ impl From>> for BufStream { buf: rbuf, pos, cap, + seek_state: rseek_state, }, buf: wbuf, written, + seek_state: wseek_state, } = b; BufStream { @@ -105,10 +107,12 @@ impl From>> for BufStream { inner, buf: wbuf, written, + seek_state: wseek_state, }, buf: rbuf, pos, cap, + seek_state: rseek_state, }, } } diff --git a/src/io/util/buf_writer.rs b/src/io/util/buf_writer.rs index 5e3d4b7..4e8e493 100644 --- a/src/io/util/buf_writer.rs +++ b/src/io/util/buf_writer.rs @@ -1,9 +1,9 @@ use crate::io::util::DEFAULT_BUF_SIZE; -use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; use pin_project_lite::pin_project; use std::fmt; -use std::io::{self, Write}; +use std::io::{self, SeekFrom, Write}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -34,6 +34,7 @@ pin_project! { pub(super) inner: W, pub(super) buf: Vec, pub(super) written: usize, + pub(super) seek_state: SeekState, } } @@ -50,6 +51,7 @@ impl BufWriter { inner, buf: Vec::with_capacity(cap), written: 0, + seek_state: SeekState::Init, } } @@ -142,6 +144,62 @@ impl AsyncWrite for BufWriter { } } +#[derive(Debug, Clone, Copy)] +pub(super) enum SeekState { + /// start_seek has not been called. + Init, + /// start_seek has been called, but poll_complete has not yet been called. + Start(SeekFrom), + /// Waiting for completion of poll_complete. + Pending, +} + +/// Seek to the offset, in bytes, in the underlying writer. +/// +/// Seeking always writes out the internal buffer before seeking. +impl AsyncSeek for BufWriter { + fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + // We need to flush the internal buffer before seeking. + // It receives a `Context` and returns a `Poll`, so it cannot be called + // inside `start_seek`. + *self.project().seek_state = SeekState::Start(pos); + Ok(()) + } + + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let pos = match self.seek_state { + SeekState::Init => { + return self.project().inner.poll_complete(cx); + } + SeekState::Start(pos) => Some(pos), + SeekState::Pending => None, + }; + + // Flush the internal buffer before seeking. + ready!(self.as_mut().flush_buf(cx))?; + + let mut me = self.project(); + if let Some(pos) = pos { + // Ensure previous seeks have finished before starting a new one + ready!(me.inner.as_mut().poll_complete(cx))?; + if let Err(e) = me.inner.as_mut().start_seek(pos) { + *me.seek_state = SeekState::Init; + return Poll::Ready(Err(e)); + } + } + match me.inner.poll_complete(cx) { + Poll::Ready(res) => { + *me.seek_state = SeekState::Init; + Poll::Ready(res) + } + Poll::Pending => { + *me.seek_state = SeekState::Pending; + Poll::Pending + } + } + } +} + impl AsyncRead for BufWriter { fn poll_read( self: Pin<&mut Self>, diff --git a/src/io/util/copy_bidirectional.rs b/src/io/util/copy_bidirectional.rs index cc43f0f..c93060b 100644 --- a/src/io/util/copy_bidirectional.rs +++ b/src/io/util/copy_bidirectional.rs @@ -104,6 +104,7 @@ where /// # Return value /// /// Returns a tuple of bytes copied `a` to `b` and bytes copied `b` to `a`. +#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] pub async fn copy_bidirectional(a: &mut A, b: &mut B) -> Result<(u64, u64), std::io::Error> where A: AsyncRead + AsyncWrite + Unpin + ?Sized, diff --git a/src/io/util/lines.rs b/src/io/util/lines.rs index ed6a944..d02a453 100644 --- a/src/io/util/lines.rs +++ b/src/io/util/lines.rs @@ -128,7 +128,7 @@ where } } - Poll::Ready(Ok(Some(mem::replace(me.buf, String::new())))) + Poll::Ready(Ok(Some(mem::take(me.buf)))) } } diff --git a/src/io/util/mem.rs b/src/io/util/mem.rs index e91a932..4eefe7b 100644 --- a/src/io/util/mem.rs +++ b/src/io/util/mem.rs @@ -16,6 +16,14 @@ use std::{ /// that can be used as in-memory IO types. Writing to one of the pairs will /// allow that data to be read from the other, and vice versa. /// +/// # Closing a `DuplexStream` +/// +/// If one end of the `DuplexStream` channel is dropped, any pending reads on +/// the other side will continue to read data until the buffer is drained, then +/// they will signal EOF by returning 0 bytes. Any writes to the other side, +/// including pending ones (that are waiting for free space in the buffer) will +/// return `Err(BrokenPipe)` immediately. +/// /// # Example /// /// ``` @@ -37,6 +45,7 @@ use std::{ /// # } /// ``` #[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] pub struct DuplexStream { read: Arc>, write: Arc>, @@ -72,6 +81,7 @@ struct Pipe { /// /// The `max_buf_size` argument is the maximum amount of bytes that can be /// written to a side before the write returns `Poll::Pending`. +#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream) { let one = Arc::new(Mutex::new(Pipe::new(max_buf_size))); let two = Arc::new(Mutex::new(Pipe::new(max_buf_size))); @@ -134,7 +144,8 @@ impl AsyncWrite for DuplexStream { impl Drop for DuplexStream { fn drop(&mut self) { // notify the other side of the closure - self.write.lock().close(); + self.write.lock().close_write(); + self.read.lock().close_read(); } } @@ -151,12 +162,21 @@ impl Pipe { } } - fn close(&mut self) { + fn close_write(&mut self) { self.is_closed = true; + // needs to notify any readers that no more data will come if let Some(waker) = self.read_waker.take() { waker.wake(); } } + + fn close_read(&mut self) { + self.is_closed = true; + // needs to notify any writers that they have to abort + if let Some(waker) = self.write_waker.take() { + waker.wake(); + } + } } impl AsyncRead for Pipe { @@ -217,7 +237,7 @@ impl AsyncWrite for Pipe { mut self: Pin<&mut Self>, _: &mut task::Context<'_>, ) -> Poll> { - self.close(); + self.close_write(); Poll::Ready(Ok(())) } } diff --git a/src/io/util/mod.rs b/src/io/util/mod.rs index ab38664..fd3dd0d 100644 --- a/src/io/util/mod.rs +++ b/src/io/util/mod.rs @@ -77,6 +77,7 @@ cfg_io_util! { mod write_vectored; mod write_all; mod write_buf; + mod write_all_buf; mod write_int; diff --git a/src/io/util/read_line.rs b/src/io/util/read_line.rs index d38ffaf..e641f51 100644 --- a/src/io/util/read_line.rs +++ b/src/io/util/read_line.rs @@ -36,7 +36,7 @@ where { ReadLine { reader, - buf: mem::replace(string, String::new()).into_bytes(), + buf: mem::take(string).into_bytes(), output: string, read: 0, _pin: PhantomPinned, @@ -99,7 +99,7 @@ pub(super) fn read_line_internal( read: &mut usize, ) -> Poll> { let io_res = ready!(read_until_internal(reader, cx, b'\n', buf, read)); - let utf8_res = String::from_utf8(mem::replace(buf, Vec::new())); + let utf8_res = String::from_utf8(mem::take(buf)); // At this point both buf and output are empty. The allocation is in utf8_res. diff --git a/src/io/util/read_to_string.rs b/src/io/util/read_to_string.rs index 2c17383..b3d82a2 100644 --- a/src/io/util/read_to_string.rs +++ b/src/io/util/read_to_string.rs @@ -37,7 +37,7 @@ pub(crate) fn read_to_string<'a, R>( where R: AsyncRead + ?Sized + Unpin, { - let buf = mem::replace(string, String::new()).into_bytes(); + let buf = mem::take(string).into_bytes(); ReadToString { reader, buf: VecWithInitialized::new(buf), diff --git a/src/io/util/split.rs b/src/io/util/split.rs index 4f3ce4e..9c2bb05 100644 --- a/src/io/util/split.rs +++ b/src/io/util/split.rs @@ -106,7 +106,7 @@ where me.buf.pop(); } - Poll::Ready(Ok(Some(mem::replace(me.buf, Vec::new())))) + Poll::Ready(Ok(Some(mem::take(me.buf)))) } } diff --git a/src/io/util/write_all_buf.rs b/src/io/util/write_all_buf.rs new file mode 100644 index 0000000..05af7fe --- /dev/null +++ b/src/io/util/write_all_buf.rs @@ -0,0 +1,56 @@ +use crate::io::AsyncWrite; + +use bytes::Buf; +use pin_project_lite::pin_project; +use std::future::Future; +use std::io; +use std::marker::PhantomPinned; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// A future to write some of the buffer to an `AsyncWrite`. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct WriteAllBuf<'a, W, B> { + writer: &'a mut W, + buf: &'a mut B, + #[pin] + _pin: PhantomPinned, + } +} + +/// Tries to write some bytes from the given `buf` to the writer in an +/// asynchronous manner, returning a future. +pub(crate) fn write_all_buf<'a, W, B>(writer: &'a mut W, buf: &'a mut B) -> WriteAllBuf<'a, W, B> +where + W: AsyncWrite + Unpin, + B: Buf, +{ + WriteAllBuf { + writer, + buf, + _pin: PhantomPinned, + } +} + +impl Future for WriteAllBuf<'_, W, B> +where + W: AsyncWrite + Unpin, + B: Buf, +{ + type Output = io::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let me = self.project(); + while me.buf.has_remaining() { + let n = ready!(Pin::new(&mut *me.writer).poll_write(cx, me.buf.chunk())?); + me.buf.advance(n); + if n == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } + } + + Poll::Ready(Ok(())) + } +} diff --git a/src/macros/select.rs b/src/macros/select.rs index 3ba16b6..f98ebff 100644 --- a/src/macros/select.rs +++ b/src/macros/select.rs @@ -154,7 +154,7 @@ /// `select!` panics if all branches are disabled **and** there is no provided /// `else` branch. A branch is disabled when the provided `if` precondition /// returns `false` **or** when the pattern does not match the result of `. +/// expression>`. /// /// # Examples /// diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index e231e5d..2a367ef 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -563,6 +563,84 @@ impl TcpStream { .try_io(Interest::READABLE, || (&*self.io).read(buf)) } + /// Try to read data from the stream into the provided buffers, returning + /// how many bytes were read. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method behaves + /// equivalently to a single call to [`try_read()`] with concatenated + /// buffers. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_vectored()` is non-blocking, the buffer does not have to be + /// stored by the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`try_read()`]: TcpStream::try_read() + /// [`readable()`]: TcpStream::readable() + /// [`ready()`]: TcpStream::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io::{self, IoSliceMut}; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// loop { + /// // Wait for the socket to be readable + /// stream.readable().await?; + /// + /// // Creating the buffer **after** the `await` prevents it from + /// // being stored in the async task. + /// let mut buf_a = [0; 512]; + /// let mut buf_b = [0; 1024]; + /// let mut bufs = [ + /// IoSliceMut::new(&mut buf_a), + /// IoSliceMut::new(&mut buf_b), + /// ]; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_read_vectored(&mut bufs) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result { + use std::io::Read; + + self.io + .registration() + .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) + } + cfg_io_util! { /// Try to read data from the stream into the provided buffer, advancing the /// buffer's internal cursor, returning how many bytes were read. @@ -775,6 +853,68 @@ impl TcpStream { .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) } + /// Try to write several buffers to the stream, returning how many bytes + /// were written. + /// + /// Data is written from each buffer in order, with the final buffer read + /// from possible being only partially consumed. This method behaves + /// equivalently to a single call to [`try_write()`] with concatenated + /// buffers. + /// + /// This function is usually paired with `writable()`. + /// + /// [`try_write()`]: TcpStream::try_write() + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")]; + /// + /// loop { + /// // Wait for the socket to be writable + /// stream.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_write_vectored(&bufs) { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result { + use std::io::Write; + + self.io + .registration() + .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs)) + } + /// Receives data on the socket from the remote address to which it is /// connected, without removing that data from the queue. On success, /// returns the number of bytes peeked. diff --git a/src/net/unix/mod.rs b/src/net/unix/mod.rs index 19ee34a..c3046f1 100644 --- a/src/net/unix/mod.rs +++ b/src/net/unix/mod.rs @@ -1,5 +1,9 @@ //! Unix domain socket utility types +// This module does not currently provide any public API, but it was +// unintentionally defined as a public module. Hide it from the documentation +// instead of changing it to a private module to avoid breakage. +#[doc(hidden)] pub mod datagram; pub(crate) mod listener; diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs index d797aae..917844b 100644 --- a/src/net/unix/stream.rs +++ b/src/net/unix/stream.rs @@ -271,6 +271,84 @@ impl UnixStream { .try_io(Interest::READABLE, || (&*self.io).read(buf)) } + /// Try to read data from the stream into the provided buffers, returning + /// how many bytes were read. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method behaves + /// equivalently to a single call to [`try_read()`] with concatenated + /// buffers. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_vectored()` is non-blocking, the buffer does not have to be + /// stored by the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`try_read()`]: UnixStream::try_read() + /// [`readable()`]: UnixStream::readable() + /// [`ready()`]: UnixStream::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixStream; + /// use std::error::Error; + /// use std::io::{self, IoSliceMut}; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// // Connect to a peer + /// let dir = tempfile::tempdir().unwrap(); + /// let bind_path = dir.path().join("bind_path"); + /// let stream = UnixStream::connect(bind_path).await?; + /// + /// loop { + /// // Wait for the socket to be readable + /// stream.readable().await?; + /// + /// // Creating the buffer **after** the `await` prevents it from + /// // being stored in the async task. + /// let mut buf_a = [0; 512]; + /// let mut buf_b = [0; 1024]; + /// let mut bufs = [ + /// IoSliceMut::new(&mut buf_a), + /// IoSliceMut::new(&mut buf_b), + /// ]; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_read_vectored(&mut bufs) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result { + self.io + .registration() + .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) + } + cfg_io_util! { /// Try to read data from the stream into the provided buffer, advancing the /// buffer's internal cursor, returning how many bytes were read. @@ -487,6 +565,68 @@ impl UnixStream { .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) } + /// Try to write several buffers to the stream, returning how many bytes + /// were written. + /// + /// Data is written from each buffer in order, with the final buffer read + /// from possible being only partially consumed. This method behaves + /// equivalently to a single call to [`try_write()`] with concatenated + /// buffers. + /// + /// This function is usually paired with `writable()`. + /// + /// [`try_write()`]: UnixStream::try_write() + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// // Connect to a peer + /// let dir = tempfile::tempdir().unwrap(); + /// let bind_path = dir.path().join("bind_path"); + /// let stream = UnixStream::connect(bind_path).await?; + /// + /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")]; + /// + /// loop { + /// // Wait for the socket to be writable + /// stream.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_write_vectored(&bufs) { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result { + self.io + .registration() + .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) + } + /// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`. /// /// This function is intended to be used to wrap a UnixStream from the diff --git a/src/process/mod.rs b/src/process/mod.rs index 00e39b0..96ceb6d 100644 --- a/src/process/mod.rs +++ b/src/process/mod.rs @@ -994,13 +994,22 @@ impl Child { /// If the caller wishes to explicitly control when the child's stdin /// handle is closed, they may `.take()` it before calling `.wait()`: /// - /// ```no_run + /// ``` + /// # #[cfg(not(unix))]fn main(){} + /// # #[cfg(unix)] /// use tokio::io::AsyncWriteExt; + /// # #[cfg(unix)] /// use tokio::process::Command; + /// # #[cfg(unix)] + /// use std::process::Stdio; /// + /// # #[cfg(unix)] /// #[tokio::main] /// async fn main() { - /// let mut child = Command::new("cat").spawn().unwrap(); + /// let mut child = Command::new("cat") + /// .stdin(Stdio::piped()) + /// .spawn() + /// .unwrap(); /// /// let mut stdin = child.stdin.take().unwrap(); /// tokio::spawn(async move { diff --git a/src/process/unix/driver.rs b/src/process/unix/driver.rs index 110b484..43b2efa 100644 --- a/src/process/unix/driver.rs +++ b/src/process/unix/driver.rs @@ -3,11 +3,8 @@ //! Process driver use crate::park::Park; -use crate::process::unix::orphan::ReapOrphanQueue; use crate::process::unix::GlobalOrphanQueue; -use crate::signal::unix::driver::Driver as SignalDriver; -use crate::signal::unix::{signal_with_handle, SignalKind}; -use crate::sync::watch; +use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle}; use std::io; use std::time::Duration; @@ -16,51 +13,20 @@ use std::time::Duration; #[derive(Debug)] pub(crate) struct Driver { park: SignalDriver, - inner: CoreDriver, GlobalOrphanQueue>, -} - -#[derive(Debug)] -struct CoreDriver { - sigchild: S, - orphan_queue: Q, -} - -trait HasChanged { - fn has_changed(&mut self) -> bool; -} - -impl HasChanged for watch::Receiver { - fn has_changed(&mut self) -> bool { - self.try_has_changed().and_then(Result::ok).is_some() - } -} - -// ===== impl CoreDriver ===== - -impl CoreDriver -where - S: HasChanged, - Q: ReapOrphanQueue, -{ - fn process(&mut self) { - if self.sigchild.has_changed() { - self.orphan_queue.reap_orphans(); - } - } + signal_handle: SignalHandle, } // ===== impl Driver ===== impl Driver { /// Creates a new signal `Driver` instance that delegates wakeups to `park`. - pub(crate) fn new(park: SignalDriver) -> io::Result { - let sigchild = signal_with_handle(SignalKind::child(), park.handle())?; - let inner = CoreDriver { - sigchild, - orphan_queue: GlobalOrphanQueue, - }; + pub(crate) fn new(park: SignalDriver) -> Self { + let signal_handle = park.handle(); - Ok(Self { park, inner }) + Self { + park, + signal_handle, + } } } @@ -76,13 +42,13 @@ impl Park for Driver { fn park(&mut self) -> Result<(), Self::Error> { self.park.park()?; - self.inner.process(); + GlobalOrphanQueue::reap_orphans(&self.signal_handle); Ok(()) } fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { self.park.park_timeout(duration)?; - self.inner.process(); + GlobalOrphanQueue::reap_orphans(&self.signal_handle); Ok(()) } @@ -90,43 +56,3 @@ impl Park for Driver { self.park.shutdown() } } - -#[cfg(test)] -mod test { - use super::*; - use crate::process::unix::orphan::test::MockQueue; - - struct MockStream { - total_try_recv: usize, - values: Vec>, - } - - impl MockStream { - fn new(values: Vec>) -> Self { - Self { - total_try_recv: 0, - values, - } - } - } - - impl HasChanged for MockStream { - fn has_changed(&mut self) -> bool { - self.total_try_recv += 1; - self.values.remove(0).is_some() - } - } - - #[test] - fn no_reap_if_no_signal() { - let mut driver = CoreDriver { - sigchild: MockStream::new(vec![None]), - orphan_queue: MockQueue::<()>::new(), - }; - - driver.process(); - - assert_eq!(1, driver.sigchild.total_try_recv); - assert_eq!(0, driver.orphan_queue.total_reaps.get()); - } -} diff --git a/src/process/unix/mod.rs b/src/process/unix/mod.rs index 852a191..fab63dd 100644 --- a/src/process/unix/mod.rs +++ b/src/process/unix/mod.rs @@ -24,7 +24,7 @@ pub(crate) mod driver; pub(crate) mod orphan; -use orphan::{OrphanQueue, OrphanQueueImpl, ReapOrphanQueue, Wait}; +use orphan::{OrphanQueue, OrphanQueueImpl, Wait}; mod reap; use reap::Reaper; @@ -32,6 +32,7 @@ use reap::Reaper; use crate::io::PollEvented; use crate::process::kill::Kill; use crate::process::SpawnedChild; +use crate::signal::unix::driver::Handle as SignalHandle; use crate::signal::unix::{signal, Signal, SignalKind}; use mio::event::Source; @@ -73,9 +74,9 @@ impl fmt::Debug for GlobalOrphanQueue { } } -impl ReapOrphanQueue for GlobalOrphanQueue { - fn reap_orphans(&self) { - ORPHAN_QUEUE.reap_orphans() +impl GlobalOrphanQueue { + fn reap_orphans(handle: &SignalHandle) { + ORPHAN_QUEUE.reap_orphans(handle) } } diff --git a/src/process/unix/orphan.rs b/src/process/unix/orphan.rs index 8a1e127..07f0dcf 100644 --- a/src/process/unix/orphan.rs +++ b/src/process/unix/orphan.rs @@ -1,6 +1,9 @@ +use crate::loom::sync::{Mutex, MutexGuard}; +use crate::signal::unix::driver::Handle as SignalHandle; +use crate::signal::unix::{signal_with_handle, SignalKind}; +use crate::sync::watch; use std::io; use std::process::ExitStatus; -use std::sync::Mutex; /// An interface for waiting on a process to exit. pub(crate) trait Wait { @@ -20,21 +23,8 @@ impl Wait for &mut T { } } -/// An interface for reaping a set of orphaned processes. -pub(crate) trait ReapOrphanQueue { - /// Attempts to reap every process in the queue, ignoring any errors and - /// enqueueing any orphans which have not yet exited. - fn reap_orphans(&self); -} - -impl ReapOrphanQueue for &T { - fn reap_orphans(&self) { - (**self).reap_orphans() - } -} - /// An interface for queueing up an orphaned process so that it can be reaped. -pub(crate) trait OrphanQueue: ReapOrphanQueue { +pub(crate) trait OrphanQueue { /// Adds an orphan to the queue. fn push_orphan(&self, orphan: T); } @@ -48,50 +38,91 @@ impl> OrphanQueue for &O { /// An implementation of `OrphanQueue`. #[derive(Debug)] pub(crate) struct OrphanQueueImpl { + sigchild: Mutex>>, queue: Mutex>, } impl OrphanQueueImpl { pub(crate) fn new() -> Self { Self { + sigchild: Mutex::new(None), queue: Mutex::new(Vec::new()), } } #[cfg(test)] fn len(&self) -> usize { - self.queue.lock().unwrap().len() + self.queue.lock().len() } -} -impl OrphanQueue for OrphanQueueImpl { - fn push_orphan(&self, orphan: T) { - self.queue.lock().unwrap().push(orphan) + pub(crate) fn push_orphan(&self, orphan: T) + where + T: Wait, + { + self.queue.lock().push(orphan) } -} -impl ReapOrphanQueue for OrphanQueueImpl { - fn reap_orphans(&self) { - let mut queue = self.queue.lock().unwrap(); - let queue = &mut *queue; - - for i in (0..queue.len()).rev() { - match queue[i].try_wait() { - Ok(None) => {} - Ok(Some(_)) | Err(_) => { - // The stdlib handles interruption errors (EINTR) when polling a child process. - // All other errors represent invalid inputs or pids that have already been - // reaped, so we can drop the orphan in case an error is raised. - queue.swap_remove(i); + /// Attempts to reap every process in the queue, ignoring any errors and + /// enqueueing any orphans which have not yet exited. + pub(crate) fn reap_orphans(&self, handle: &SignalHandle) + where + T: Wait, + { + // If someone else is holding the lock, they will be responsible for draining + // the queue as necessary, so we can safely bail if that happens + if let Some(mut sigchild_guard) = self.sigchild.try_lock() { + match &mut *sigchild_guard { + Some(sigchild) => { + if sigchild.try_has_changed().and_then(Result::ok).is_some() { + drain_orphan_queue(self.queue.lock()); + } + } + None => { + let queue = self.queue.lock(); + + // Be lazy and only initialize the SIGCHLD listener if there + // are any orphaned processes in the queue. + if !queue.is_empty() { + // An errors shouldn't really happen here, but if it does it + // means that the signal driver isn't running, in + // which case there isn't anything we can + // register/initialize here, so we can try again later + if let Ok(sigchild) = signal_with_handle(SignalKind::child(), &handle) { + *sigchild_guard = Some(sigchild); + drain_orphan_queue(queue); + } + } } } } } } +fn drain_orphan_queue(mut queue: MutexGuard<'_, Vec>) +where + T: Wait, +{ + for i in (0..queue.len()).rev() { + match queue[i].try_wait() { + Ok(None) => {} + Ok(Some(_)) | Err(_) => { + // The stdlib handles interruption errors (EINTR) when polling a child process. + // All other errors represent invalid inputs or pids that have already been + // reaped, so we can drop the orphan in case an error is raised. + queue.swap_remove(i); + } + } + } + + drop(queue); +} + #[cfg(all(test, not(loom)))] pub(crate) mod test { use super::*; + use crate::io::driver::Driver as IoDriver; + use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle}; + use crate::sync::watch; use std::cell::{Cell, RefCell}; use std::io; use std::os::unix::process::ExitStatusExt; @@ -100,14 +131,12 @@ pub(crate) mod test { pub(crate) struct MockQueue { pub(crate) all_enqueued: RefCell>, - pub(crate) total_reaps: Cell, } impl MockQueue { pub(crate) fn new() -> Self { Self { all_enqueued: RefCell::new(Vec::new()), - total_reaps: Cell::new(0), } } } @@ -118,12 +147,6 @@ pub(crate) mod test { } } - impl ReapOrphanQueue for MockQueue { - fn reap_orphans(&self) { - self.total_reaps.set(self.total_reaps.get() + 1); - } - } - struct MockWait { total_waits: Rc>, num_wait_until_status: usize, @@ -191,27 +214,107 @@ pub(crate) mod test { assert_eq!(orphanage.len(), 4); - orphanage.reap_orphans(); + drain_orphan_queue(orphanage.queue.lock()); assert_eq!(orphanage.len(), 2); assert_eq!(first_waits.get(), 1); assert_eq!(second_waits.get(), 1); assert_eq!(third_waits.get(), 1); assert_eq!(fourth_waits.get(), 1); - orphanage.reap_orphans(); + drain_orphan_queue(orphanage.queue.lock()); assert_eq!(orphanage.len(), 1); assert_eq!(first_waits.get(), 1); assert_eq!(second_waits.get(), 2); assert_eq!(third_waits.get(), 2); assert_eq!(fourth_waits.get(), 1); - orphanage.reap_orphans(); + drain_orphan_queue(orphanage.queue.lock()); assert_eq!(orphanage.len(), 0); assert_eq!(first_waits.get(), 1); assert_eq!(second_waits.get(), 2); assert_eq!(third_waits.get(), 3); assert_eq!(fourth_waits.get(), 1); - orphanage.reap_orphans(); // Safe to reap when empty + // Safe to reap when empty + drain_orphan_queue(orphanage.queue.lock()); + } + + #[test] + fn no_reap_if_no_signal_received() { + let (tx, rx) = watch::channel(()); + + let handle = SignalHandle::default(); + + let orphanage = OrphanQueueImpl::new(); + *orphanage.sigchild.lock() = Some(rx); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 0); + + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 0); + + tx.send(()).unwrap(); + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 1); + } + + #[test] + fn no_reap_if_signal_lock_held() { + let handle = SignalHandle::default(); + + let orphanage = OrphanQueueImpl::new(); + let signal_guard = orphanage.sigchild.lock(); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 0); + + drop(signal_guard); + } + + #[test] + fn does_not_register_signal_if_queue_empty() { + let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap(); + let handle = signal_driver.handle(); + + let orphanage = OrphanQueueImpl::new(); + assert!(orphanage.sigchild.lock().is_none()); // Sanity + + // No register when queue empty + orphanage.reap_orphans(&handle); + assert!(orphanage.sigchild.lock().is_none()); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + orphanage.reap_orphans(&handle); + assert!(orphanage.sigchild.lock().is_some()); + assert_eq!(waits.get(), 1); // Eager reap when registering listener + } + + #[test] + fn does_nothing_if_signal_could_not_be_registered() { + let handle = SignalHandle::default(); + + let orphanage = OrphanQueueImpl::new(); + assert!(orphanage.sigchild.lock().is_none()); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + // Signal handler has "gone away", nothing to register or reap + orphanage.reap_orphans(&handle); + assert!(orphanage.sigchild.lock().is_none()); + assert_eq!(waits.get(), 0); } } diff --git a/src/process/unix/reap.rs b/src/process/unix/reap.rs index 5dc95e5..f7f4d3c 100644 --- a/src/process/unix/reap.rs +++ b/src/process/unix/reap.rs @@ -224,7 +224,6 @@ mod test { assert!(grim.poll_unpin(&mut context).is_pending()); assert_eq!(1, grim.signal.total_polls); assert_eq!(1, grim.total_waits); - assert_eq!(0, grim.orphan_queue.total_reaps.get()); assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); // Not yet exited, couldn't register interest the first time @@ -232,7 +231,6 @@ mod test { assert!(grim.poll_unpin(&mut context).is_pending()); assert_eq!(3, grim.signal.total_polls); assert_eq!(3, grim.total_waits); - assert_eq!(0, grim.orphan_queue.total_reaps.get()); assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); // Exited @@ -245,7 +243,6 @@ mod test { } assert_eq!(4, grim.signal.total_polls); assert_eq!(4, grim.total_waits); - assert_eq!(0, grim.orphan_queue.total_reaps.get()); assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); } @@ -260,7 +257,6 @@ mod test { grim.kill().unwrap(); assert_eq!(1, grim.total_kills); - assert_eq!(0, grim.orphan_queue.total_reaps.get()); assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); } @@ -276,7 +272,6 @@ mod test { drop(grim); - assert_eq!(0, queue.total_reaps.get()); assert!(queue.all_enqueued.borrow().is_empty()); } @@ -294,7 +289,6 @@ mod test { let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![])); drop(grim); - assert_eq!(0, queue.total_reaps.get()); assert_eq!(1, queue.all_enqueued.borrow().len()); } diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs index 791e405..5c9b8ed 100644 --- a/src/runtime/blocking/pool.rs +++ b/src/runtime/blocking/pool.rs @@ -61,7 +61,7 @@ struct Shared { /// Prior to shutdown, we clean up JoinHandles by having each timed-out /// thread join on the previous timed-out thread. This is not strictly /// necessary but helps avoid Valgrind false positives, see - /// https://github.com/tokio-rs/tokio/commit/646fbae76535e397ef79dbcaacb945d4c829f666 + /// /// for more information. last_exiting_thread: Option>, /// This holds the JoinHandles for all running threads; on shutdown, the thread @@ -151,7 +151,7 @@ impl BlockingPool { self.spawner.inner.condvar.notify_all(); let last_exited_thread = std::mem::take(&mut shared.last_exiting_thread); - let workers = std::mem::replace(&mut shared.worker_threads, HashMap::new()); + let workers = std::mem::take(&mut shared.worker_threads); drop(shared); diff --git a/src/runtime/driver.rs b/src/runtime/driver.rs index a0e8e23..7e45977 100644 --- a/src/runtime/driver.rs +++ b/src/runtime/driver.rs @@ -23,7 +23,7 @@ cfg_io_driver! { let io_handle = io_driver.handle(); let (signal_driver, signal_handle) = create_signal_driver(io_driver)?; - let process_driver = create_process_driver(signal_driver)?; + let process_driver = create_process_driver(signal_driver); (Either::A(process_driver), Some(io_handle), signal_handle) } else { @@ -80,7 +80,7 @@ cfg_not_signal_internal! { cfg_process_driver! { type ProcessDriver = crate::process::unix::driver::Driver; - fn create_process_driver(signal_driver: SignalDriver) -> io::Result { + fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver { crate::process::unix::driver::Driver::new(signal_driver) } } @@ -89,8 +89,8 @@ cfg_not_process_driver! { cfg_io_driver! { type ProcessDriver = SignalDriver; - fn create_process_driver(signal_driver: SignalDriver) -> io::Result { - Ok(signal_driver) + fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver { + signal_driver } } } diff --git a/src/runtime/task/core.rs b/src/runtime/task/core.rs index 9f7ff55..fb6dafd 100644 --- a/src/runtime/task/core.rs +++ b/src/runtime/task/core.rs @@ -279,7 +279,7 @@ impl CoreStage { // Safety:: the caller ensures mutal exclusion to the field. match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) { Stage::Finished(output) => output, - _ => panic!("unexpected task state"), + _ => panic!("JoinHandle polled after completion"), } }) } diff --git a/src/signal/unix.rs b/src/signal/unix.rs index cb1d1cc..f96b2f4 100644 --- a/src/signal/unix.rs +++ b/src/signal/unix.rs @@ -9,7 +9,6 @@ use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storag use crate::signal::RxFuture; use crate::sync::watch; -use libc::c_int; use mio::net::UnixStream; use std::io::{self, Error, ErrorKind, Write}; use std::pin::Pin; @@ -61,7 +60,7 @@ impl Init for OsExtraData { /// Represents the specific kind of signal to listen for. #[derive(Debug, Clone, Copy)] -pub struct SignalKind(c_int); +pub struct SignalKind(libc::c_int); impl SignalKind { /// Allows for listening to any valid OS signal. @@ -74,8 +73,14 @@ impl SignalKind { /// // let signum = libc::OS_SPECIFIC_SIGNAL; /// let kind = SignalKind::from_raw(signum); /// ``` - pub fn from_raw(signum: c_int) -> Self { - Self(signum) + // Use `std::os::raw::c_int` on public API to prevent leaking a non-stable + // type alias from libc. + // `libc::c_int` and `std::os::raw::c_int` are currently the same type, and are + // unlikely to change to other types, but technically libc can change this + // in the future minor version. + // See https://github.com/tokio-rs/tokio/issues/3767 for more. + pub fn from_raw(signum: std::os::raw::c_int) -> Self { + Self(signum as libc::c_int) } /// Represents the SIGALRM signal. @@ -208,7 +213,7 @@ impl Default for SignalInfo { /// 2. Wake up the driver by writing a byte to a pipe /// /// Those two operations should both be async-signal safe. -fn action(globals: Pin<&'static Globals>, signal: c_int) { +fn action(globals: Pin<&'static Globals>, signal: libc::c_int) { globals.record_event(signal as EventId); // Send a wakeup, ignore any errors (anything reasonably possible is @@ -222,7 +227,7 @@ fn action(globals: Pin<&'static Globals>, signal: c_int) { /// /// This will register the signal handler if it hasn't already been registered, /// returning any error along the way if that fails. -fn signal_enable(signal: SignalKind, handle: Handle) -> io::Result<()> { +fn signal_enable(signal: SignalKind, handle: &Handle) -> io::Result<()> { let signal = signal.0; if signal < 0 || signal_hook_registry::FORBIDDEN.contains(&signal) { return Err(Error::new( @@ -352,7 +357,7 @@ pub struct Signal { /// * If the signal is one of /// [`signal_hook::FORBIDDEN`](fn@signal_hook_registry::register#panics) pub fn signal(kind: SignalKind) -> io::Result { - let rx = signal_with_handle(kind, Handle::current())?; + let rx = signal_with_handle(kind, &Handle::current())?; Ok(Signal { inner: RxFuture::new(rx), @@ -361,7 +366,7 @@ pub fn signal(kind: SignalKind) -> io::Result { pub(crate) fn signal_with_handle( kind: SignalKind, - handle: Handle, + handle: &Handle, ) -> io::Result> { // Turn the signal delivery on once we are ready for it signal_enable(kind, handle)?; @@ -457,14 +462,14 @@ mod tests { #[test] fn signal_enable_error_on_invalid_input() { - signal_enable(SignalKind::from_raw(-1), Handle::default()).unwrap_err(); + signal_enable(SignalKind::from_raw(-1), &Handle::default()).unwrap_err(); } #[test] fn signal_enable_error_on_forbidden_input() { signal_enable( SignalKind::from_raw(signal_hook_registry::FORBIDDEN[0]), - Handle::default(), + &Handle::default(), ) .unwrap_err(); } diff --git a/src/sync/barrier.rs b/src/sync/barrier.rs index a8b291f..e3c95f6 100644 --- a/src/sync/barrier.rs +++ b/src/sync/barrier.rs @@ -2,7 +2,7 @@ use crate::sync::watch; use std::sync::Mutex; -/// A barrier enables multiple threads to synchronize the beginning of some computation. +/// A barrier enables multiple tasks to synchronize the beginning of some computation. /// /// ``` /// # #[tokio::main] @@ -52,10 +52,10 @@ struct BarrierState { } impl Barrier { - /// Creates a new barrier that can block a given number of threads. + /// Creates a new barrier that can block a given number of tasks. /// - /// A barrier will block `n`-1 threads which call [`Barrier::wait`] and then wake up all - /// threads at once when the `n`th thread calls `wait`. + /// A barrier will block `n`-1 tasks which call [`Barrier::wait`] and then wake up all + /// tasks at once when the `n`th task calls `wait`. pub fn new(mut n: usize) -> Barrier { let (waker, wait) = crate::sync::watch::channel(0); @@ -79,11 +79,11 @@ impl Barrier { /// Does not resolve until all tasks have rendezvoused here. /// - /// Barriers are re-usable after all threads have rendezvoused once, and can + /// Barriers are re-usable after all tasks have rendezvoused once, and can /// be used continuously. /// /// A single (arbitrary) future will receive a [`BarrierWaitResult`] that returns `true` from - /// [`BarrierWaitResult::is_leader`] when returning from this function, and all other threads + /// [`BarrierWaitResult::is_leader`] when returning from this function, and all other tasks /// will receive a result that will return `false` from `is_leader`. pub async fn wait(&self) -> BarrierWaitResult { // NOTE: we are taking a _synchronous_ lock here. @@ -129,14 +129,14 @@ impl Barrier { } } -/// A `BarrierWaitResult` is returned by `wait` when all threads in the `Barrier` have rendezvoused. +/// A `BarrierWaitResult` is returned by `wait` when all tasks in the `Barrier` have rendezvoused. #[derive(Debug, Clone)] pub struct BarrierWaitResult(bool); impl BarrierWaitResult { - /// Returns `true` if this thread from wait is the "leader thread". + /// Returns `true` if this task from wait is the "leader task". /// - /// Only one thread will have `true` returned from their result, all other threads will have + /// Only one task will have `true` returned from their result, all other tasks will have /// `false` returned. pub fn is_leader(&self) -> bool { self.0 diff --git a/src/sync/mod.rs b/src/sync/mod.rs index d89a9dd..5f97c1a 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -436,7 +436,7 @@ cfg_sync! { pub mod mpsc; mod mutex; - pub use mutex::{Mutex, MutexGuard, TryLockError, OwnedMutexGuard}; + pub use mutex::{Mutex, MutexGuard, TryLockError, OwnedMutexGuard, MappedMutexGuard}; pub(crate) mod notify; pub use notify::Notify; diff --git a/src/sync/mpsc/bounded.rs b/src/sync/mpsc/bounded.rs index 1f670bf..ce857d7 100644 --- a/src/sync/mpsc/bounded.rs +++ b/src/sync/mpsc/bounded.rs @@ -33,6 +33,22 @@ pub struct Permit<'a, T> { chan: &'a chan::Tx, } +/// Owned permit to send one value into the channel. +/// +/// This is identical to the [`Permit`] type, except that it moves the sender +/// rather than borrowing it. +/// +/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and +/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity +/// before generating a message to send. +/// +/// [`Permit`]: Permit +/// [`Sender::reserve_owned()`]: Sender::reserve_owned +/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned +pub struct OwnedPermit { + chan: Option>, +} + /// Receive values from the associated `Sender`. /// /// Instances are created by the [`channel`](channel) function. @@ -229,10 +245,11 @@ impl Receiver { /// /// To guarantee that no messages are dropped, after calling `close()`, /// `recv()` must be called until `None` is returned. If there are - /// outstanding [`Permit`] values, the `recv` method will not return `None` - /// until those are released. + /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will + /// not return `None` until those are released. /// /// [`Permit`]: Permit + /// [`OwnedPermit`]: OwnedPermit /// /// # Examples /// @@ -624,12 +641,96 @@ impl Sender { /// } /// ``` pub async fn reserve(&self) -> Result, SendError<()>> { + self.reserve_inner().await?; + Ok(Permit { chan: &self.chan }) + } + + /// Wait for channel capacity, moving the `Sender` and returning an owned + /// permit. Once capacity to send one message is available, it is reserved + /// for the caller. + /// + /// This moves the sender _by value_, and returns an owned permit that can + /// be used to send a message into the channel. Unlike [`Sender::reserve`], + /// this method may be used in cases where the permit must be valid for the + /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is + /// essentially a reference count increment, comparable to [`Arc::clone`]), + /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be + /// moved, it can be cloned prior to calling `reserve_owned`. + /// + /// If the channel is full, the function waits for the number of unreceived + /// messages to become less than the channel capacity. Capacity to send one + /// message is reserved for the caller. An [`OwnedPermit`] is returned to + /// track the reserved capacity. The [`send`] function on [`OwnedPermit`] + /// consumes the reserved capacity. + /// + /// Dropping the [`OwnedPermit`] without sending a message releases the + /// capacity back to the channel. + /// + /// # Examples + /// Sending a message using an [`OwnedPermit`]: + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); + /// + /// // Reserve capacity, moving the sender. + /// let permit = tx.reserve_owned().await.unwrap(); + /// + /// // Send a message, consuming the permit and returning + /// // the moved sender. + /// let tx = permit.send(123); + /// + /// // The value sent on the permit is received. + /// assert_eq!(rx.recv().await.unwrap(), 123); + /// + /// // The sender can now be used again. + /// tx.send(456).await.unwrap(); + /// } + /// ``` + /// + /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved + /// by value, it can be inexpensively cloned before calling `reserve_owned`: + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); + /// + /// // Clone the sender and reserve capacity. + /// let permit = tx.clone().reserve_owned().await.unwrap(); + /// + /// // Trying to send directly on the `tx` will fail due to no + /// // available capacity. + /// assert!(tx.try_send(123).is_err()); + /// + /// // Sending on the permit succeeds. + /// permit.send(456); + /// + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); + /// } + /// ``` + /// + /// [`Sender::reserve`]: Sender::reserve + /// [`OwnedPermit`]: OwnedPermit + /// [`send`]: OwnedPermit::send + /// [`Arc::clone`]: std::sync::Arc::clone + pub async fn reserve_owned(self) -> Result, SendError<()>> { + self.reserve_inner().await?; + Ok(OwnedPermit { + chan: Some(self.chan), + }) + } + + async fn reserve_inner(&self) -> Result<(), SendError<()>> { match self.chan.semaphore().0.acquire(1).await { - Ok(_) => {} - Err(_) => return Err(SendError(())), + Ok(_) => Ok(()), + Err(_) => Err(SendError(())), } - - Ok(Permit { chan: &self.chan }) } /// Try to acquire a slot in the channel without waiting for the slot to become @@ -684,6 +785,72 @@ impl Sender { Ok(Permit { chan: &self.chan }) } + /// Try to acquire a slot in the channel without waiting for the slot to become + /// available, returning an owned permit. + /// + /// This moves the sender _by value_, and returns an owned permit that can + /// be used to send a message into the channel. Unlike [`Sender::try_reserve`], + /// this method may be used in cases where the permit must be valid for the + /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is + /// essentially a reference count increment, comparable to [`Arc::clone`]), + /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be + /// moved, it can be cloned prior to calling `try_reserve_owned`. + /// + /// If the channel is full this function will return a [`TrySendError`]. + /// Since the sender is taken by value, the `TrySendError` returned in this + /// case contains the sender, so that it may be used again. Otherwise, if + /// there is a slot available, this method will return an [`OwnedPermit`] + /// that can then be used to [`send`] on the channel with a guaranteed slot. + /// This function is similar to [`reserve_owned`] except it does not await + /// for the slot to become available. + /// + /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back + /// to the channel. + /// + /// [`OwnedPermit`]: OwnedPermit + /// [`send`]: OwnedPermit::send + /// [`reserve_owned`]: Sender::reserve_owned + /// [`Arc::clone`]: std::sync::Arc::clone + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); + /// + /// // Reserve capacity + /// let permit = tx.clone().try_reserve_owned().unwrap(); + /// + /// // Trying to send directly on the `tx` will fail due to no + /// // available capacity. + /// assert!(tx.try_send(123).is_err()); + /// + /// // Trying to reserve an additional slot on the `tx` will + /// // fail because there is no capacity. + /// assert!(tx.try_reserve().is_err()); + /// + /// // Sending on the permit succeeds + /// permit.send(456); + /// + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); + /// + /// } + /// ``` + pub fn try_reserve_owned(self) -> Result, TrySendError> { + match self.chan.semaphore().0.try_acquire(1) { + Ok(_) => {} + Err(_) => return Err(TrySendError::Full(self)), + } + + Ok(OwnedPermit { + chan: Some(self.chan), + }) + } + /// Returns `true` if senders belong to the same channel. /// /// # Examples @@ -804,6 +971,8 @@ impl Drop for Permit<'_, T> { // Add the permit back to the semaphore semaphore.add_permit(); + // If this is the last sender for this channel, wake the receiver so + // that it can be notified that the channel is closed. if semaphore.is_closed() && semaphore.is_idle() { self.chan.wake_rx(); } @@ -817,3 +986,123 @@ impl fmt::Debug for Permit<'_, T> { .finish() } } + +// ===== impl Permit ===== + +impl OwnedPermit { + /// Sends a value using the reserved capacity. + /// + /// Capacity for the message has already been reserved. The message is sent + /// to the receiver and the permit is consumed. The operation will succeed + /// even if the receiver half has been closed. See [`Receiver::close`] for + /// more details on performing a clean shutdown. + /// + /// Unlike [`Permit::send`], this method returns the [`Sender`] from which + /// the `OwnedPermit` was reserved. + /// + /// [`Receiver::close`]: Receiver::close + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); + /// + /// // Reserve capacity + /// let permit = tx.reserve_owned().await.unwrap(); + /// + /// // Send a message on the permit, returning the sender. + /// let tx = permit.send(456); + /// + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); + /// + /// // We may now reuse `tx` to send another message. + /// tx.send(789).await.unwrap(); + /// } + /// ``` + pub fn send(mut self, value: T) -> Sender { + let chan = self.chan.take().unwrap_or_else(|| { + unreachable!("OwnedPermit channel is only taken when the permit is moved") + }); + chan.send(value); + + Sender { chan } + } + + /// Release the reserved capacity *without* sending a message, returning the + /// [`Sender`]. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = mpsc::channel(1); + /// + /// // Clone the sender and reserve capacity + /// let permit = tx.clone().reserve_owned().await.unwrap(); + /// + /// // Trying to send on the original `tx` will fail, since the `permit` + /// // has reserved all the available capacity. + /// assert!(tx.try_send(123).is_err()); + /// + /// // Release the permit without sending a message, returning the clone + /// // of the sender. + /// let tx2 = permit.release(); + /// + /// // We may now reuse `tx` to send another message. + /// tx.send(789).await.unwrap(); + /// # drop(rx); drop(tx2); + /// } + /// ``` + /// + /// [`Sender`]: Sender + pub fn release(mut self) -> Sender { + use chan::Semaphore; + + let chan = self.chan.take().unwrap_or_else(|| { + unreachable!("OwnedPermit channel is only taken when the permit is moved") + }); + + // Add the permit back to the semaphore + chan.semaphore().add_permit(); + Sender { chan } + } +} + +impl Drop for OwnedPermit { + fn drop(&mut self) { + use chan::Semaphore; + + // Are we still holding onto the sender? + if let Some(chan) = self.chan.take() { + let semaphore = chan.semaphore(); + + // Add the permit back to the semaphore + semaphore.add_permit(); + + // If this `OwnedPermit` is holding the last sender for this + // channel, wake the receiver so that it can be notified that the + // channel is closed. + if semaphore.is_closed() && semaphore.is_idle() { + chan.wake_rx(); + } + } + + // Otherwise, do nothing. + } +} + +impl fmt::Debug for OwnedPermit { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("OwnedPermit") + .field("chan", &self.chan) + .finish() + } +} diff --git a/src/sync/mpsc/mod.rs b/src/sync/mpsc/mod.rs index e7033f6..879e3dc 100644 --- a/src/sync/mpsc/mod.rs +++ b/src/sync/mpsc/mod.rs @@ -73,7 +73,7 @@ pub(super) mod block; mod bounded; -pub use self::bounded::{channel, Permit, Receiver, Sender}; +pub use self::bounded::{channel, OwnedPermit, Permit, Receiver, Sender}; mod chan; diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 0a118e7..9fd7c91 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -4,9 +4,9 @@ use crate::sync::batch_semaphore as semaphore; use std::cell::UnsafeCell; use std::error::Error; -use std::fmt; use std::ops::{Deref, DerefMut}; use std::sync::Arc; +use std::{fmt, marker, mem}; /// An asynchronous `Mutex`-like type. /// @@ -160,6 +160,19 @@ pub struct OwnedMutexGuard { lock: Arc>, } +/// A handle to a held `Mutex` that has had a function applied to it via [`MutexGuard::map`]. +/// +/// This can be used to hold a subfield of the protected data. +/// +/// [`MutexGuard::map`]: method@MutexGuard::map +#[must_use = "if unused the Mutex will immediately unlock"] +pub struct MappedMutexGuard<'a, T: ?Sized> { + s: &'a semaphore::Semaphore, + data: *mut T, + // Needed to tell the borrow checker that we are holding a `&mut T` + marker: marker::PhantomData<&'a mut T>, +} + // As long as T: Send, it's fine to send and share Mutex between threads. // If T was not Send, sending and sharing a Mutex would be bad, since you can // access T through Mutex. @@ -167,6 +180,8 @@ unsafe impl Send for Mutex where T: ?Sized + Send {} unsafe impl Sync for Mutex where T: ?Sized + Send {} unsafe impl Sync for MutexGuard<'_, T> where T: ?Sized + Send + Sync {} unsafe impl Sync for OwnedMutexGuard where T: ?Sized + Send + Sync {} +unsafe impl<'a, T> Sync for MappedMutexGuard<'a, T> where T: ?Sized + Sync + 'a {} +unsafe impl<'a, T> Send for MappedMutexGuard<'a, T> where T: ?Sized + Send + 'a {} /// Error returned from the [`Mutex::try_lock`], [`RwLock::try_read`] and /// [`RwLock::try_write`] functions. @@ -451,6 +466,103 @@ where // === impl MutexGuard === +impl<'a, T: ?Sized> MutexGuard<'a, T> { + /// Makes a new [`MappedMutexGuard`] for a component of the locked data. + /// + /// This operation cannot fail as the [`MutexGuard`] passed in already locked the mutex. + /// + /// This is an associated function that needs to be used as `MutexGuard::map(...)`. A method + /// would interfere with methods of the same name on the contents of the locked data. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::{Mutex, MutexGuard}; + /// + /// #[derive(Debug, Clone, Copy, PartialEq, Eq)] + /// struct Foo(u32); + /// + /// # #[tokio::main] + /// # async fn main() { + /// let foo = Mutex::new(Foo(1)); + /// + /// { + /// let mut mapped = MutexGuard::map(foo.lock().await, |f| &mut f.0); + /// *mapped = 2; + /// } + /// + /// assert_eq!(Foo(2), *foo.lock().await); + /// # } + /// ``` + /// + /// [`MutexGuard`]: struct@MutexGuard + /// [`MappedMutexGuard`]: struct@MappedMutexGuard + #[inline] + pub fn map(mut this: Self, f: F) -> MappedMutexGuard<'a, U> + where + F: FnOnce(&mut T) -> &mut U, + { + let data = f(&mut *this) as *mut U; + let s = &this.lock.s; + mem::forget(this); + MappedMutexGuard { + s, + data, + marker: marker::PhantomData, + } + } + + /// Attempts to make a new [`MappedMutexGuard`] for a component of the locked data. The + /// original guard is returned if the closure returns `None`. + /// + /// This operation cannot fail as the [`MutexGuard`] passed in already locked the mutex. + /// + /// This is an associated function that needs to be used as `MutexGuard::try_map(...)`. A + /// method would interfere with methods of the same name on the contents of the locked data. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::{Mutex, MutexGuard}; + /// + /// #[derive(Debug, Clone, Copy, PartialEq, Eq)] + /// struct Foo(u32); + /// + /// # #[tokio::main] + /// # async fn main() { + /// let foo = Mutex::new(Foo(1)); + /// + /// { + /// let mut mapped = MutexGuard::try_map(foo.lock().await, |f| Some(&mut f.0)) + /// .expect("should not fail"); + /// *mapped = 2; + /// } + /// + /// assert_eq!(Foo(2), *foo.lock().await); + /// # } + /// ``` + /// + /// [`MutexGuard`]: struct@MutexGuard + /// [`MappedMutexGuard`]: struct@MappedMutexGuard + #[inline] + pub fn try_map(mut this: Self, f: F) -> Result, Self> + where + F: FnOnce(&mut T) -> Option<&mut U>, + { + let data = match f(&mut *this) { + Some(data) => data as *mut U, + None => return Err(this), + }; + let s = &this.lock.s; + mem::forget(this); + Ok(MappedMutexGuard { + s, + data, + marker: marker::PhantomData, + }) + } +} + impl Drop for MutexGuard<'_, T> { fn drop(&mut self) { self.lock.s.release(1) @@ -514,3 +626,88 @@ impl fmt::Display for OwnedMutexGuard { fmt::Display::fmt(&**self, f) } } + +// === impl MappedMutexGuard === + +impl<'a, T: ?Sized> MappedMutexGuard<'a, T> { + /// Makes a new [`MappedMutexGuard`] for a component of the locked data. + /// + /// This operation cannot fail as the [`MappedMutexGuard`] passed in already locked the mutex. + /// + /// This is an associated function that needs to be used as `MappedMutexGuard::map(...)`. A + /// method would interfere with methods of the same name on the contents of the locked data. + /// + /// [`MappedMutexGuard`]: struct@MappedMutexGuard + #[inline] + pub fn map(mut this: Self, f: F) -> MappedMutexGuard<'a, U> + where + F: FnOnce(&mut T) -> &mut U, + { + let data = f(&mut *this) as *mut U; + let s = this.s; + mem::forget(this); + MappedMutexGuard { + s, + data, + marker: marker::PhantomData, + } + } + + /// Attempts to make a new [`MappedMutexGuard`] for a component of the locked data. The + /// original guard is returned if the closure returns `None`. + /// + /// This operation cannot fail as the [`MappedMutexGuard`] passed in already locked the mutex. + /// + /// This is an associated function that needs to be used as `MappedMutexGuard::try_map(...)`. A + /// method would interfere with methods of the same name on the contents of the locked data. + /// + /// [`MappedMutexGuard`]: struct@MappedMutexGuard + #[inline] + pub fn try_map(mut this: Self, f: F) -> Result, Self> + where + F: FnOnce(&mut T) -> Option<&mut U>, + { + let data = match f(&mut *this) { + Some(data) => data as *mut U, + None => return Err(this), + }; + let s = this.s; + mem::forget(this); + Ok(MappedMutexGuard { + s, + data, + marker: marker::PhantomData, + }) + } +} + +impl<'a, T: ?Sized> Drop for MappedMutexGuard<'a, T> { + fn drop(&mut self) { + self.s.release(1) + } +} + +impl<'a, T: ?Sized> Deref for MappedMutexGuard<'a, T> { + type Target = T; + fn deref(&self) -> &Self::Target { + unsafe { &*self.data } + } +} + +impl<'a, T: ?Sized> DerefMut for MappedMutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { &mut *self.data } + } +} + +impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for MappedMutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized + fmt::Display> fmt::Display for MappedMutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} diff --git a/src/sync/notify.rs b/src/sync/notify.rs index 2d30da9..5d2132f 100644 --- a/src/sync/notify.rs +++ b/src/sync/notify.rs @@ -192,6 +192,10 @@ fn inc_num_notify_waiters_calls(data: usize) -> usize { data + (1 << NOTIFY_WAITERS_SHIFT) } +fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) { + data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst); +} + impl Notify { /// Create a new `Notify`, initialized without a permit. /// @@ -394,11 +398,9 @@ impl Notify { let curr = self.state.load(SeqCst); if let EMPTY | NOTIFIED = get_state(curr) { - // There are no waiting tasks. In this case, no synchronization is - // established between `notify` and `notified().await`. - // All we need to do is increment the number of times this - // method was called. - self.state.store(inc_num_notify_waiters_calls(curr), SeqCst); + // There are no waiting tasks. All we need to do is increment the + // number of times this method was called. + atomic_inc_num_notify_waiters_calls(&self.state); return; } diff --git a/src/sync/tests/loom_notify.rs b/src/sync/tests/loom_notify.rs index 4be949a..d484a75 100644 --- a/src/sync/tests/loom_notify.rs +++ b/src/sync/tests/loom_notify.rs @@ -33,12 +33,41 @@ fn notify_waiters() { tx.notify_waiters(); }); - th.join().unwrap(); - block_on(async { notified1.await; notified2.await; }); + + th.join().unwrap(); + }); +} + +#[test] +fn notify_waiters_and_one() { + loom::model(|| { + let notify = Arc::new(Notify::new()); + let tx1 = notify.clone(); + let tx2 = notify.clone(); + + let th1 = thread::spawn(move || { + tx1.notify_waiters(); + }); + + let th2 = thread::spawn(move || { + tx2.notify_one(); + }); + + let th3 = thread::spawn(move || { + let notified = notify.notified(); + + block_on(async { + notified.await; + }); + }); + + th1.join().unwrap(); + th2.join().unwrap(); + th3.join().unwrap(); }); } diff --git a/src/sync/watch.rs b/src/sync/watch.rs index bf6f0ac..db65e5a 100644 --- a/src/sync/watch.rs +++ b/src/sync/watch.rs @@ -205,7 +205,7 @@ impl Receiver { // not memory access. shared.ref_count_rx.fetch_add(1, Relaxed); - Self { version, shared } + Self { shared, version } } /// Returns a reference to the most recently sent value diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 28bbcdb..e4fe254 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -5,19 +5,24 @@ cfg_rt_multi_thread! { /// blocking the executor. /// /// In general, issuing a blocking call or performing a lot of compute in a - /// future without yielding is not okay, as it may prevent the executor from - /// driving other futures forward. This function runs the closure on the - /// current thread by having the thread temporarily cease from being a core - /// thread, and turns it into a blocking thread. See the [CPU-bound tasks - /// and blocking code][blocking] section for more information. - /// - /// Although this function avoids starving other independently spawned - /// tasks, any other code running concurrently in the same task will be - /// suspended during the call to `block_in_place`. This can happen e.g. when - /// using the [`join!`] macro. To avoid this issue, use [`spawn_blocking`] - /// instead. - /// - /// Note that this function can only be used when using the `multi_thread` runtime. + /// future without yielding is problematic, as it may prevent the executor + /// from driving other tasks forward. Calling this function informs the + /// executor that the currently executing task is about to block the thread, + /// so the executor is able to hand off any other tasks it has to a new + /// worker thread before that happens. See the [CPU-bound tasks and blocking + /// code][blocking] section for more information. + /// + /// Be aware that although this function avoids starving other independently + /// spawned tasks, any other code running concurrently in the same task will + /// be suspended during the call to `block_in_place`. This can happen e.g. + /// when using the [`join!`] macro. To avoid this issue, use + /// [`spawn_blocking`] instead of `block_in_place`. + /// + /// Note that this function cannot be used within a [`current_thread`] runtime + /// because in this case there are no other worker threads to hand off tasks + /// to. On the other hand, calling the function outside a runtime is + /// allowed. In this case, `block_in_place` just calls the provided closure + /// normally. /// /// Code running behind `block_in_place` cannot be cancelled. When you shut /// down the executor, it will wait indefinitely for all blocking operations @@ -43,6 +48,28 @@ cfg_rt_multi_thread! { /// }); /// # } /// ``` + /// + /// Code running inside `block_in_place` may use `block_on` to reenter the + /// async context. + /// + /// ``` + /// use tokio::task; + /// use tokio::runtime::Handle; + /// + /// # async fn docs() { + /// task::block_in_place(move || { + /// Handle::current().block_on(async move { + /// // do something async + /// }); + /// }); + /// # } + /// ``` + /// + /// # Panics + /// + /// This function panics if called from a [`current_thread`] runtime. + /// + /// [`current_thread`]: fn@crate::runtime::Builder::new_current_thread pub fn block_in_place(f: F) -> R where F: FnOnce() -> R, diff --git a/src/task/mod.rs b/src/task/mod.rs index abae818..7255535 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -122,6 +122,11 @@ //! Instead, Tokio provides two APIs for running blocking operations in an //! asynchronous context: [`task::spawn_blocking`] and [`task::block_in_place`]. //! +//! Be aware that if you call a non-async method from async code, that non-async +//! method is still inside the asynchronous context, so you should also avoid +//! blocking operations there. This includes destructors of objects destroyed in +//! async code. +//! //! #### spawn_blocking //! //! The `task::spawn_blocking` function is similar to the `task::spawn` function diff --git a/src/task/unconstrained.rs b/src/task/unconstrained.rs index 4a62f81..31c732b 100644 --- a/src/task/unconstrained.rs +++ b/src/task/unconstrained.rs @@ -5,6 +5,7 @@ use std::task::{Context, Poll}; pin_project! { /// Future for the [`unconstrained`](unconstrained) method. + #[cfg_attr(docsrs, doc(cfg(feature = "rt")))] #[must_use = "Unconstrained does nothing unless polled"] pub struct Unconstrained { #[pin] @@ -38,6 +39,7 @@ where /// otherwise. /// /// See also the usage example in the [task module](index.html#unconstrained). +#[cfg_attr(docsrs, doc(cfg(feature = "rt")))] pub fn unconstrained(inner: F) -> Unconstrained { Unconstrained { inner } } diff --git a/src/time/clock.rs b/src/time/clock.rs index 8957800..c5ef86b 100644 --- a/src/time/clock.rs +++ b/src/time/clock.rs @@ -120,22 +120,11 @@ cfg_test_util! { /// Panics if time is not frozen or if called from outside of the Tokio /// runtime. pub async fn advance(duration: Duration) { - use crate::future::poll_fn; - use std::task::Poll; - let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); + let until = clock.now() + duration; clock.advance(duration); - let mut yielded = false; - poll_fn(|cx| { - if yielded { - Poll::Ready(()) - } else { - yielded = true; - cx.waker().wake_by_ref(); - Poll::Pending - } - }).await; + crate::time::sleep_until(until).await; } /// Return the current instant, factoring in frozen time. diff --git a/src/time/driver/entry.rs b/src/time/driver/entry.rs index e630fa8..08edab3 100644 --- a/src/time/driver/entry.rs +++ b/src/time/driver/entry.rs @@ -85,7 +85,7 @@ const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE; /// requires only the driver lock. pub(super) struct StateCell { /// Holds either the scheduled expiration time for this timer, or (if the - /// timer has been fired and is unregistered), [`u64::max_value()`]. + /// timer has been fired and is unregistered), `u64::max_value()`. state: AtomicU64, /// If the timer is fired (an Acquire order read on state shows /// `u64::max_value()`), holds the result that should be returned from diff --git a/src/time/driver/handle.rs b/src/time/driver/handle.rs index 9a05a54..77b4358 100644 --- a/src/time/driver/handle.rs +++ b/src/time/driver/handle.rs @@ -76,7 +76,7 @@ cfg_not_rt! { /// lazy, and so outside executed inside the runtime successfully without /// panicking. pub(crate) fn current() -> Self { - panic!(crate::util::error::CONTEXT_MISSING_ERROR) + panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) } } } diff --git a/src/time/interval.rs b/src/time/interval.rs index 20cfcec..4b1c6f6 100644 --- a/src/time/interval.rs +++ b/src/time/interval.rs @@ -176,4 +176,9 @@ impl Interval { // Return the current instant Poll::Ready(now) } + + /// Returns the period of the interval. + pub fn period(&self) -> Duration { + self.period + } } diff --git a/src/util/linked_list.rs b/src/util/linked_list.rs index dd00e14..480ea09 100644 --- a/src/util/linked_list.rs +++ b/src/util/linked_list.rs @@ -77,7 +77,7 @@ pub(crate) struct Pointers { /// #[repr(C)]. /// /// See this link for more information: -/// https://github.com/rust-lang/rust/pull/82834 +/// #[repr(C)] struct PointersInner { /// The previous node in the list. null if there is no previous node. @@ -93,7 +93,7 @@ struct PointersInner { next: Option>, /// This type is !Unpin due to the heuristic from: - /// https://github.com/rust-lang/rust/pull/82834 + /// _pin: PhantomPinned, } diff --git a/src/util/rand.rs b/src/util/rand.rs index 5660103..17b3ec1 100644 --- a/src/util/rand.rs +++ b/src/util/rand.rs @@ -3,10 +3,10 @@ use std::cell::Cell; /// Fast random number generate /// /// Implement xorshift64+: 2 32-bit xorshift sequences added together. -/// Shift triplet [17,7,16] was calculated as indicated in Marsaglia's -/// Xorshift paper: https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf +/// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's +/// Xorshift paper: /// This generator passes the SmallCrush suite, part of TestU01 framework: -/// http://simul.iro.umontreal.ca/testu01/tu01.html +/// #[derive(Debug)] pub(crate) struct FastRand { one: Cell, diff --git a/src/util/slab.rs b/src/util/slab.rs index efc72e1..2ddaa6c 100644 --- a/src/util/slab.rs +++ b/src/util/slab.rs @@ -296,7 +296,7 @@ impl Slab { // Remove the slots vector from the page. This is done so that the // freeing process is done outside of the lock's critical section. - let vec = mem::replace(&mut slots.slots, vec![]); + let vec = mem::take(&mut slots.slots); slots.head = 0; // Drop the lock so we can drop the vector outside the lock below. diff --git a/tests/fs_file.rs b/tests/fs_file.rs index bf2f1d7..1f4760e 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -22,6 +22,27 @@ async fn basic_read() { assert_eq!(n, HELLO.len()); assert_eq!(&buf[..n], HELLO); + + // Drop the data from the cache to stimulate uncached codepath on Linux (see preadv2 in + // file.rs) + #[cfg(target_os = "linux")] + { + use std::os::unix::io::AsRawFd; + nix::unistd::fsync(tempfile.as_raw_fd()).unwrap(); + nix::fcntl::posix_fadvise( + tempfile.as_raw_fd(), + 0, + 0, + nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED, + ) + .unwrap(); + } + + let mut file = File::open(tempfile.path()).await.unwrap(); + let n = file.read(&mut buf).await.unwrap(); + + assert_eq!(n, HELLO.len()); + assert_eq!(&buf[..n], HELLO); } #[tokio::test] diff --git a/tests/io_buf_reader.rs b/tests/io_buf_reader.rs new file mode 100644 index 0000000..ac5f11c --- /dev/null +++ b/tests/io_buf_reader.rs @@ -0,0 +1,362 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +// https://github.com/rust-lang/futures-rs/blob/1803948ff091b4eabf7f3bf39e16bbbdefca5cc8/futures/tests/io_buf_reader.rs + +use futures::task::{noop_waker_ref, Context, Poll}; +use std::cmp; +use std::io::{self, Cursor}; +use std::pin::Pin; +use tokio::io::{ + AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, BufReader, + ReadBuf, SeekFrom, +}; + +macro_rules! run_fill_buf { + ($reader:expr) => {{ + let mut cx = Context::from_waker(noop_waker_ref()); + loop { + if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) { + break x; + } + } + }}; +} + +struct MaybePending<'a> { + inner: &'a [u8], + ready_read: bool, + ready_fill_buf: bool, +} + +impl<'a> MaybePending<'a> { + fn new(inner: &'a [u8]) -> Self { + Self { + inner, + ready_read: false, + ready_fill_buf: false, + } + } +} + +impl AsyncRead for MaybePending<'_> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + if self.ready_read { + self.ready_read = false; + Pin::new(&mut self.inner).poll_read(cx, buf) + } else { + self.ready_read = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} + +impl AsyncBufRead for MaybePending<'_> { + fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + if self.ready_fill_buf { + self.ready_fill_buf = false; + if self.inner.is_empty() { + return Poll::Ready(Ok(&[])); + } + let len = cmp::min(2, self.inner.len()); + Poll::Ready(Ok(&self.inner[0..len])) + } else { + self.ready_fill_buf = true; + Poll::Pending + } + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + self.inner = &self.inner[amt..]; + } +} + +#[tokio::test] +async fn test_buffered_reader() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, inner); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 3); + assert_eq!(buf, [5, 6, 7]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 2); + assert_eq!(buf, [0, 1]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [2]); + assert_eq!(reader.buffer(), [3]); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [3, 0, 0]); + assert_eq!(reader.buffer(), []); + + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [4, 0, 0]); + assert_eq!(reader.buffer(), []); + + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); +} + +#[tokio::test] +async fn test_buffered_reader_seek() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, Cursor::new(inner)); + + assert_eq!(reader.seek(SeekFrom::Start(3)).await.unwrap(), 3); + assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]); + assert!(reader + .seek(SeekFrom::Current(i64::min_value())) + .await + .is_err()); + assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]); + assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4); + assert_eq!(run_fill_buf!(reader).unwrap(), &[1, 2][..]); + Pin::new(&mut reader).consume(1); + assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3); +} + +#[tokio::test] +async fn test_buffered_reader_seek_underflow() { + // gimmick reader that yields its position modulo 256 for each byte + struct PositionReader { + pos: u64, + } + impl AsyncRead for PositionReader { + fn poll_read( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let b = buf.initialize_unfilled(); + let len = b.len(); + for x in b { + *x = self.pos as u8; + self.pos = self.pos.wrapping_add(1); + } + buf.advance(len); + Poll::Ready(Ok(())) + } + } + impl AsyncSeek for PositionReader { + fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + match pos { + SeekFrom::Start(n) => { + self.pos = n; + } + SeekFrom::Current(n) => { + self.pos = self.pos.wrapping_add(n as u64); + } + SeekFrom::End(n) => { + self.pos = u64::max_value().wrapping_add(n as u64); + } + } + Ok(()) + } + fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(self.pos)) + } + } + + let mut reader = BufReader::with_capacity(5, PositionReader { pos: 0 }); + assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1, 2, 3, 4][..]); + assert_eq!( + reader.seek(SeekFrom::End(-5)).await.unwrap(), + u64::max_value() - 5 + ); + assert_eq!(run_fill_buf!(reader).unwrap().len(), 5); + // the following seek will require two underlying seeks + let expected = 9_223_372_036_854_775_802; + assert_eq!( + reader + .seek(SeekFrom::Current(i64::min_value())) + .await + .unwrap(), + expected + ); + assert_eq!(run_fill_buf!(reader).unwrap().len(), 5); + // seeking to 0 should empty the buffer. + assert_eq!(reader.seek(SeekFrom::Current(0)).await.unwrap(), expected); + assert_eq!(reader.get_ref().pos, expected); +} + +#[tokio::test] +async fn test_short_reads() { + /// A dummy reader intended at testing short-reads propagation. + struct ShortReader { + lengths: Vec, + } + + impl AsyncRead for ShortReader { + fn poll_read( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + if !self.lengths.is_empty() { + buf.advance(self.lengths.remove(0)); + } + Poll::Ready(Ok(())) + } + } + + let inner = ShortReader { + lengths: vec![0, 1, 2, 0, 1, 0], + }; + let mut reader = BufReader::new(inner); + let mut buf = [0, 0]; + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 1); + assert_eq!(reader.read(&mut buf).await.unwrap(), 2); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 1); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); +} + +#[tokio::test] +async fn maybe_pending() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, MaybePending::new(inner)); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 3); + assert_eq!(buf, [5, 6, 7]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 2); + assert_eq!(buf, [0, 1]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [2]); + assert_eq!(reader.buffer(), [3]); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [3, 0, 0]); + assert_eq!(reader.buffer(), []); + + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [4, 0, 0]); + assert_eq!(reader.buffer(), []); + + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); +} + +#[tokio::test] +async fn maybe_pending_buf_read() { + let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]); + let mut reader = BufReader::with_capacity(2, inner); + let mut v = Vec::new(); + reader.read_until(3, &mut v).await.unwrap(); + assert_eq!(v, [0, 1, 2, 3]); + v.clear(); + reader.read_until(1, &mut v).await.unwrap(); + assert_eq!(v, [1]); + v.clear(); + reader.read_until(8, &mut v).await.unwrap(); + assert_eq!(v, [0]); + v.clear(); + reader.read_until(9, &mut v).await.unwrap(); + assert_eq!(v, []); +} + +// https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309 +#[tokio::test] +async fn maybe_pending_seek() { + struct MaybePendingSeek<'a> { + inner: Cursor<&'a [u8]>, + ready: bool, + seek_res: Option>, + } + + impl<'a> MaybePendingSeek<'a> { + fn new(inner: &'a [u8]) -> Self { + Self { + inner: Cursor::new(inner), + ready: true, + seek_res: None, + } + } + } + + impl AsyncRead for MaybePendingSeek<'_> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.inner).poll_read(cx, buf) + } + } + + impl AsyncBufRead for MaybePendingSeek<'_> { + fn poll_fill_buf( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this: *mut Self = &mut *self as *mut _; + Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + Pin::new(&mut self.inner).consume(amt) + } + } + + impl AsyncSeek for MaybePendingSeek<'_> { + fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + self.seek_res = Some(Pin::new(&mut self.inner).start_seek(pos)); + Ok(()) + } + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.ready { + self.ready = false; + self.seek_res.take().unwrap_or(Ok(()))?; + Pin::new(&mut self.inner).poll_complete(cx) + } else { + self.ready = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } + + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner)); + + assert_eq!(reader.seek(SeekFrom::Current(3)).await.unwrap(), 3); + assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]); + assert!(reader + .seek(SeekFrom::Current(i64::min_value())) + .await + .is_err()); + assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]); + assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4); + assert_eq!(run_fill_buf!(reader).unwrap(), &[1, 2][..]); + Pin::new(&mut reader).consume(1); + assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3); +} diff --git a/tests/io_buf_writer.rs b/tests/io_buf_writer.rs new file mode 100644 index 0000000..6f4f10a --- /dev/null +++ b/tests/io_buf_writer.rs @@ -0,0 +1,251 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +// https://github.com/rust-lang/futures-rs/blob/1803948ff091b4eabf7f3bf39e16bbbdefca5cc8/futures/tests/io_buf_writer.rs + +use futures::task::{Context, Poll}; +use std::io::{self, Cursor}; +use std::pin::Pin; +use tokio::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, SeekFrom}; + +struct MaybePending { + inner: Vec, + ready: bool, +} + +impl MaybePending { + fn new(inner: Vec) -> Self { + Self { + inner, + ready: false, + } + } +} + +impl AsyncWrite for MaybePending { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + if self.ready { + self.ready = false; + Pin::new(&mut self.inner).poll_write(cx, buf) + } else { + self.ready = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } +} + +#[tokio::test] +async fn buf_writer() { + let mut writer = BufWriter::with_capacity(2, Vec::new()); + + writer.write(&[0, 1]).await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1]); + + writer.write(&[2]).await.unwrap(); + assert_eq!(writer.buffer(), [2]); + assert_eq!(*writer.get_ref(), [0, 1]); + + writer.write(&[3]).await.unwrap(); + assert_eq!(writer.buffer(), [2, 3]); + assert_eq!(*writer.get_ref(), [0, 1]); + + writer.flush().await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); + + writer.write(&[4]).await.unwrap(); + writer.write(&[5]).await.unwrap(); + assert_eq!(writer.buffer(), [4, 5]); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); + + writer.write(&[6]).await.unwrap(); + assert_eq!(writer.buffer(), [6]); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5]); + + writer.write(&[7, 8]).await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8]); + + writer.write(&[9, 10, 11]).await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + + writer.flush().await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); +} + +#[tokio::test] +async fn buf_writer_inner_flushes() { + let mut w = BufWriter::with_capacity(3, Vec::new()); + w.write(&[0, 1]).await.unwrap(); + assert_eq!(*w.get_ref(), []); + w.flush().await.unwrap(); + let w = w.into_inner(); + assert_eq!(w, [0, 1]); +} + +#[tokio::test] +async fn buf_writer_seek() { + let mut w = BufWriter::with_capacity(3, Cursor::new(Vec::new())); + w.write_all(&[0, 1, 2, 3, 4, 5]).await.unwrap(); + w.write_all(&[6, 7]).await.unwrap(); + assert_eq!(w.seek(SeekFrom::Current(0)).await.unwrap(), 8); + assert_eq!(&w.get_ref().get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]); + assert_eq!(w.seek(SeekFrom::Start(2)).await.unwrap(), 2); + w.write_all(&[8, 9]).await.unwrap(); + w.flush().await.unwrap(); + assert_eq!(&w.into_inner().into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]); +} + +#[tokio::test] +async fn maybe_pending_buf_writer() { + let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new())); + + writer.write(&[0, 1]).await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(&writer.get_ref().inner, &[0, 1]); + + writer.write(&[2]).await.unwrap(); + assert_eq!(writer.buffer(), [2]); + assert_eq!(&writer.get_ref().inner, &[0, 1]); + + writer.write(&[3]).await.unwrap(); + assert_eq!(writer.buffer(), [2, 3]); + assert_eq!(&writer.get_ref().inner, &[0, 1]); + + writer.flush().await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]); + + writer.write(&[4]).await.unwrap(); + writer.write(&[5]).await.unwrap(); + assert_eq!(writer.buffer(), [4, 5]); + assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]); + + writer.write(&[6]).await.unwrap(); + assert_eq!(writer.buffer(), [6]); + assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5]); + + writer.write(&[7, 8]).await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8]); + + writer.write(&[9, 10, 11]).await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!( + writer.get_ref().inner, + &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] + ); + + writer.flush().await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!( + &writer.get_ref().inner, + &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] + ); +} + +#[tokio::test] +async fn maybe_pending_buf_writer_inner_flushes() { + let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new())); + w.write(&[0, 1]).await.unwrap(); + assert_eq!(&w.get_ref().inner, &[]); + w.flush().await.unwrap(); + let w = w.into_inner().inner; + assert_eq!(w, [0, 1]); +} + +#[tokio::test] +async fn maybe_pending_buf_writer_seek() { + struct MaybePendingSeek { + inner: Cursor>, + ready_write: bool, + ready_seek: bool, + seek_res: Option>, + } + + impl MaybePendingSeek { + fn new(inner: Vec) -> Self { + Self { + inner: Cursor::new(inner), + ready_write: false, + ready_seek: false, + seek_res: None, + } + } + } + + impl AsyncWrite for MaybePendingSeek { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + if self.ready_write { + self.ready_write = false; + Pin::new(&mut self.inner).poll_write(cx, buf) + } else { + self.ready_write = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } + } + + impl AsyncSeek for MaybePendingSeek { + fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + self.seek_res = Some(Pin::new(&mut self.inner).start_seek(pos)); + Ok(()) + } + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.ready_seek { + self.ready_seek = false; + self.seek_res.take().unwrap_or(Ok(()))?; + Pin::new(&mut self.inner).poll_complete(cx) + } else { + self.ready_seek = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } + + let mut w = BufWriter::with_capacity(3, MaybePendingSeek::new(Vec::new())); + w.write_all(&[0, 1, 2, 3, 4, 5]).await.unwrap(); + w.write_all(&[6, 7]).await.unwrap(); + assert_eq!(w.seek(SeekFrom::Current(0)).await.unwrap(), 8); + assert_eq!( + &w.get_ref().inner.get_ref()[..], + &[0, 1, 2, 3, 4, 5, 6, 7][..] + ); + assert_eq!(w.seek(SeekFrom::Start(2)).await.unwrap(), 2); + w.write_all(&[8, 9]).await.unwrap(); + w.flush().await.unwrap(); + assert_eq!( + &w.into_inner().inner.into_inner()[..], + &[0, 1, 8, 9, 4, 5, 6, 7] + ); +} diff --git a/tests/io_mem_stream.rs b/tests/io_mem_stream.rs index 3335214..520391a 100644 --- a/tests/io_mem_stream.rs +++ b/tests/io_mem_stream.rs @@ -62,6 +62,26 @@ async fn disconnect() { t2.await.unwrap(); } +#[tokio::test] +#[cfg(not(target_os = "android"))] +async fn disconnect_reader() { + let (a, mut b) = duplex(2); + + let t1 = tokio::spawn(async move { + // this will block, as not all data fits into duplex + b.write_all(b"ping").await.unwrap_err(); + }); + + let t2 = tokio::spawn(async move { + // here we drop the reader side, and we expect the writer in the other + // task to exit with an error + drop(a); + }); + + t2.await.unwrap(); + t1.await.unwrap(); +} + #[tokio::test] async fn max_write_size() { let (mut a, mut b) = duplex(32); @@ -73,11 +93,11 @@ async fn max_write_size() { assert_eq!(n, 4); }); - let t2 = tokio::spawn(async move { - let mut buf = [0u8; 4]; - b.read_exact(&mut buf).await.unwrap(); - }); + let mut buf = [0u8; 4]; + b.read_exact(&mut buf).await.unwrap(); t1.await.unwrap(); - t2.await.unwrap(); + + // drop b only after task t1 finishes writing + drop(b); } diff --git a/tests/io_write_all_buf.rs b/tests/io_write_all_buf.rs new file mode 100644 index 0000000..b49a58e --- /dev/null +++ b/tests/io_write_all_buf.rs @@ -0,0 +1,96 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio_test::{assert_err, assert_ok}; + +use bytes::{Buf, Bytes, BytesMut}; +use std::cmp; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[tokio::test] +async fn write_all_buf() { + struct Wr { + buf: BytesMut, + cnt: usize, + } + + impl AsyncWrite for Wr { + fn poll_write( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let n = cmp::min(4, buf.len()); + dbg!(buf); + let buf = &buf[0..n]; + + self.cnt += 1; + self.buf.extend(buf); + Ok(buf.len()).into() + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Ok(()).into() + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Ok(()).into() + } + } + + let mut wr = Wr { + buf: BytesMut::with_capacity(64), + cnt: 0, + }; + + let mut buf = Bytes::from_static(b"hello").chain(Bytes::from_static(b"world")); + + assert_ok!(wr.write_all_buf(&mut buf).await); + assert_eq!(wr.buf, b"helloworld"[..]); + // expect 4 writes, [hell],[o],[worl],[d] + assert_eq!(wr.cnt, 4); + assert_eq!(buf.has_remaining(), false); +} + +#[tokio::test] +async fn write_buf_err() { + /// Error out after writing the first 4 bytes + struct Wr { + cnt: usize, + } + + impl AsyncWrite for Wr { + fn poll_write( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &[u8], + ) -> Poll> { + self.cnt += 1; + if self.cnt == 2 { + return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "whoops"))); + } + Poll::Ready(Ok(4)) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Ok(()).into() + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Ok(()).into() + } + } + + let mut wr = Wr { cnt: 0 }; + + let mut buf = Bytes::from_static(b"hello").chain(Bytes::from_static(b"world")); + + assert_err!(wr.write_all_buf(&mut buf).await); + assert_eq!( + buf.copy_to_bytes(buf.remaining()), + Bytes::from_static(b"oworld") + ); +} diff --git a/tests/macros_select.rs b/tests/macros_select.rs index ea06d51..07c2b81 100644 --- a/tests/macros_select.rs +++ b/tests/macros_select.rs @@ -359,9 +359,6 @@ async fn join_with_select() { async fn use_future_in_if_condition() { use tokio::time::{self, Duration}; - let sleep = time::sleep(Duration::from_millis(50)); - tokio::pin!(sleep); - tokio::select! { _ = time::sleep(Duration::from_millis(50)), if false => { panic!("if condition ignored") diff --git a/tests/macros_test.rs b/tests/macros_test.rs index 8396398..f5bc5a0 100644 --- a/tests/macros_test.rs +++ b/tests/macros_test.rs @@ -21,7 +21,20 @@ async fn test_macro_is_resilient_to_shadowing() { // https://github.com/tokio-rs/tokio/issues/3403 #[rustfmt::skip] // this `rustfmt::skip` is necessary because unused_braces does not warn if the block contains newline. #[tokio::main] -async fn unused_braces_main() { println!("hello") } +pub async fn unused_braces_main() { println!("hello") } #[rustfmt::skip] // this `rustfmt::skip` is necessary because unused_braces does not warn if the block contains newline. #[tokio::test] async fn unused_braces_test() { assert_eq!(1 + 1, 2) } + +// https://github.com/tokio-rs/tokio/pull/3766#issuecomment-835508651 +#[std::prelude::v1::test] +fn trait_method() { + trait A { + fn f(self); + } + impl A for () { + #[tokio::main] + async fn f(self) {} + } + ().f() +} diff --git a/tests/rt_common.rs b/tests/rt_common.rs index cb1d0f6..e5fc7a9 100644 --- a/tests/rt_common.rs +++ b/tests/rt_common.rs @@ -647,6 +647,7 @@ rt_test! { } #[test] + #[cfg(not(target_os = "android"))] fn panic_in_task() { let rt = rt(); let (tx, rx) = oneshot::channel(); diff --git a/tests/task_blocking.rs b/tests/task_blocking.rs index 82bef8a..d9514d2 100644 --- a/tests/task_blocking.rs +++ b/tests/task_blocking.rs @@ -114,6 +114,7 @@ fn can_enter_basic_rt_from_within_block_in_place() { } #[test] +#[cfg(not(target_os = "android"))] fn useful_panic_message_when_dropping_rt_in_rt() { use std::panic::{catch_unwind, AssertUnwindSafe}; diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index e34c2bb..0b5d12a 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -55,7 +55,7 @@ async fn try_read_write() { tokio::task::yield_now().await; } - // Fill the write buffer + // Fill the write buffer using non-vectored I/O loop { // Still ready let mut writable = task::spawn(client.writable()); @@ -75,7 +75,7 @@ async fn try_read_write() { let mut writable = task::spawn(client.writable()); assert_pending!(writable.poll()); - // Drain the socket from the server end + // Drain the socket from the server end using non-vectored I/O let mut read = vec![0; written.len()]; let mut i = 0; @@ -92,6 +92,51 @@ async fn try_read_write() { assert_eq!(read, written); } + written.clear(); + client.writable().await.unwrap(); + + // Fill the write buffer using vectored I/O + let data_bufs: Vec<_> = DATA.chunks(10).map(io::IoSlice::new).collect(); + loop { + // Still ready + let mut writable = task::spawn(client.writable()); + assert_ready_ok!(writable.poll()); + + match client.try_write_vectored(&data_bufs) { + Ok(n) => written.extend(&DATA[..n]), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + break; + } + Err(e) => panic!("error = {:?}", e), + } + } + + { + // Write buffer full + let mut writable = task::spawn(client.writable()); + assert_pending!(writable.poll()); + + // Drain the socket from the server end using vectored I/O + let mut read = vec![0; written.len()]; + let mut i = 0; + + while i < read.len() { + server.readable().await.unwrap(); + + let mut bufs: Vec<_> = read[i..] + .chunks_mut(0x10000) + .map(io::IoSliceMut::new) + .collect(); + match server.try_read_vectored(&mut bufs) { + Ok(n) => i += n, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("error = {:?}", e), + } + } + + assert_eq!(read, written); + } + // Now, we listen for shutdown drop(client); diff --git a/tests/time_pause.rs b/tests/time_pause.rs index bc84ac5..d1834af 100644 --- a/tests/time_pause.rs +++ b/tests/time_pause.rs @@ -3,8 +3,14 @@ use rand::SeedableRng; use rand::{rngs::StdRng, Rng}; -use tokio::time::{self, Duration, Instant}; -use tokio_test::assert_err; +use tokio::time::{self, Duration, Instant, Sleep}; +use tokio_test::{assert_elapsed, assert_err, assert_pending, assert_ready_eq, task}; + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; #[tokio::test] async fn pause_time_in_main() { @@ -57,3 +63,162 @@ async fn paused_time_stress_run() -> Vec { times } + +#[tokio::test(start_paused = true)] +async fn advance_after_poll() { + time::sleep(ms(1)).await; + + let start = Instant::now(); + + let mut sleep = task::spawn(time::sleep_until(start + ms(300))); + + assert_pending!(sleep.poll()); + + let before = Instant::now(); + time::advance(ms(100)).await; + assert_elapsed!(before, ms(100)); + + assert_pending!(sleep.poll()); +} + +#[tokio::test(start_paused = true)] +async fn sleep_no_poll() { + let start = Instant::now(); + + // TODO: Skip this + time::advance(ms(1)).await; + + let mut sleep = task::spawn(time::sleep_until(start + ms(300))); + + let before = Instant::now(); + time::advance(ms(100)).await; + assert_elapsed!(before, ms(100)); + + assert_pending!(sleep.poll()); +} + +enum State { + Begin, + AwaitingAdvance(Pin>>), + AfterAdvance, +} + +struct Tester { + sleep: Pin>, + state: State, + before: Option, + poll: bool, +} + +impl Future for Tester { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match &mut self.state { + State::Begin => { + if self.poll { + assert_pending!(self.sleep.as_mut().poll(cx)); + } + self.before = Some(Instant::now()); + let advance_fut = Box::pin(time::advance(ms(100))); + self.state = State::AwaitingAdvance(advance_fut); + self.poll(cx) + } + State::AwaitingAdvance(ref mut advance_fut) => match advance_fut.as_mut().poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(()) => { + self.state = State::AfterAdvance; + self.poll(cx) + } + }, + State::AfterAdvance => { + assert_elapsed!(self.before.unwrap(), ms(100)); + + assert_pending!(self.sleep.as_mut().poll(cx)); + + Poll::Ready(()) + } + } + } +} + +#[tokio::test(start_paused = true)] +async fn sleep_same_task() { + let start = Instant::now(); + + // TODO: Skip this + time::advance(ms(1)).await; + + let sleep = Box::pin(time::sleep_until(start + ms(300))); + + Tester { + sleep, + state: State::Begin, + before: None, + poll: true, + } + .await; +} + +#[tokio::test(start_paused = true)] +async fn sleep_same_task_no_poll() { + let start = Instant::now(); + + // TODO: Skip this + time::advance(ms(1)).await; + + let sleep = Box::pin(time::sleep_until(start + ms(300))); + + Tester { + sleep, + state: State::Begin, + before: None, + poll: false, + } + .await; +} + +#[tokio::test(start_paused = true)] +async fn interval() { + let start = Instant::now(); + + // TODO: Skip this + time::advance(ms(1)).await; + + let mut i = task::spawn(time::interval_at(start, ms(300))); + + assert_ready_eq!(poll_next(&mut i), start); + assert_pending!(poll_next(&mut i)); + + let before = Instant::now(); + time::advance(ms(100)).await; + assert_elapsed!(before, ms(100)); + assert_pending!(poll_next(&mut i)); + + let before = Instant::now(); + time::advance(ms(200)).await; + assert_elapsed!(before, ms(200)); + assert_ready_eq!(poll_next(&mut i), start + ms(300)); + assert_pending!(poll_next(&mut i)); + + let before = Instant::now(); + time::advance(ms(400)).await; + assert_elapsed!(before, ms(400)); + assert_ready_eq!(poll_next(&mut i), start + ms(600)); + assert_pending!(poll_next(&mut i)); + + let before = Instant::now(); + time::advance(ms(500)).await; + assert_elapsed!(before, ms(500)); + assert_ready_eq!(poll_next(&mut i), start + ms(900)); + assert_ready_eq!(poll_next(&mut i), start + ms(1200)); + assert_pending!(poll_next(&mut i)); +} + +fn poll_next(interval: &mut task::Spawn) -> Poll { + interval.enter(|cx, mut interval| interval.poll_tick(cx)) +} + +fn ms(n: u64) -> Duration { + Duration::from_millis(n) +} diff --git a/tests/time_sleep.rs b/tests/time_sleep.rs index 2736258..9c04d22 100644 --- a/tests/time_sleep.rs +++ b/tests/time_sleep.rs @@ -7,22 +7,7 @@ use std::task::Context; use futures::task::noop_waker_ref; use tokio::time::{self, Duration, Instant}; -use tokio_test::{assert_pending, assert_ready, task}; - -macro_rules! assert_elapsed { - ($now:expr, $ms:expr) => {{ - let elapsed = $now.elapsed(); - let lower = ms($ms); - - // Handles ms rounding - assert!( - elapsed >= lower && elapsed <= lower + ms(1), - "actual = {:?}, expected = {:?}", - elapsed, - lower - ); - }}; -} +use tokio_test::{assert_elapsed, assert_pending, assert_ready, task}; #[tokio::test] async fn immediate_sleep() { @@ -32,7 +17,7 @@ async fn immediate_sleep() { // Ready! time::sleep_until(now).await; - assert_elapsed!(now, 0); + assert_elapsed!(now, ms(1)); } #[tokio::test] @@ -60,10 +45,11 @@ async fn delayed_sleep_level_0() { for &i in &[1, 10, 60] { let now = Instant::now(); + let dur = ms(i); - time::sleep_until(now + ms(i)).await; + time::sleep_until(now + dur).await; - assert_elapsed!(now, i); + assert_elapsed!(now, dur); } } @@ -77,7 +63,7 @@ async fn sub_ms_delayed_sleep() { time::sleep_until(deadline).await; - assert_elapsed!(now, 1); + assert_elapsed!(now, ms(1)); } } @@ -90,7 +76,7 @@ async fn delayed_sleep_wrapping_level_0() { let now = Instant::now(); time::sleep_until(now + ms(60)).await; - assert_elapsed!(now, 60); + assert_elapsed!(now, ms(60)); } #[tokio::test] @@ -107,7 +93,7 @@ async fn reset_future_sleep_before_fire() { sleep.as_mut().reset(Instant::now() + ms(200)); sleep.await; - assert_elapsed!(now, 200); + assert_elapsed!(now, ms(200)); } #[tokio::test] @@ -124,7 +110,7 @@ async fn reset_past_sleep_before_turn() { sleep.as_mut().reset(now + ms(80)); sleep.await; - assert_elapsed!(now, 80); + assert_elapsed!(now, ms(80)); } #[tokio::test] @@ -143,7 +129,7 @@ async fn reset_past_sleep_before_fire() { sleep.as_mut().reset(now + ms(80)); sleep.await; - assert_elapsed!(now, 80); + assert_elapsed!(now, ms(80)); } #[tokio::test] @@ -154,11 +140,11 @@ async fn reset_future_sleep_after_fire() { let mut sleep = Box::pin(time::sleep_until(now + ms(100))); sleep.as_mut().await; - assert_elapsed!(now, 100); + assert_elapsed!(now, ms(100)); sleep.as_mut().reset(now + ms(110)); sleep.await; - assert_elapsed!(now, 110); + assert_elapsed!(now, ms(110)); } #[tokio::test] diff --git a/tests/uds_stream.rs b/tests/uds_stream.rs index c528620..2754e84 100644 --- a/tests/uds_stream.rs +++ b/tests/uds_stream.rs @@ -90,7 +90,7 @@ async fn try_read_write() -> std::io::Result<()> { tokio::task::yield_now().await; } - // Fill the write buffer + // Fill the write buffer using non-vectored I/O loop { // Still ready let mut writable = task::spawn(client.writable()); @@ -110,7 +110,7 @@ async fn try_read_write() -> std::io::Result<()> { let mut writable = task::spawn(client.writable()); assert_pending!(writable.poll()); - // Drain the socket from the server end + // Drain the socket from the server end using non-vectored I/O let mut read = vec![0; written.len()]; let mut i = 0; @@ -127,6 +127,51 @@ async fn try_read_write() -> std::io::Result<()> { assert_eq!(read, written); } + written.clear(); + client.writable().await.unwrap(); + + // Fill the write buffer using vectored I/O + let msg_bufs: Vec<_> = msg.chunks(3).map(io::IoSlice::new).collect(); + loop { + // Still ready + let mut writable = task::spawn(client.writable()); + assert_ready_ok!(writable.poll()); + + match client.try_write_vectored(&msg_bufs) { + Ok(n) => written.extend(&msg[..n]), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + break; + } + Err(e) => panic!("error = {:?}", e), + } + } + + { + // Write buffer full + let mut writable = task::spawn(client.writable()); + assert_pending!(writable.poll()); + + // Drain the socket from the server end using vectored I/O + let mut read = vec![0; written.len()]; + let mut i = 0; + + while i < read.len() { + server.readable().await?; + + let mut bufs: Vec<_> = read[i..] + .chunks_mut(0x10000) + .map(io::IoSliceMut::new) + .collect(); + match server.try_read_vectored(&mut bufs) { + Ok(n) => i += n, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("error = {:?}", e), + } + } + + assert_eq!(read, written); + } + // Now, we listen for shutdown drop(client); -- cgit v1.2.3