diff options
author | Luke Huang <huangluke@google.com> | 2021-05-30 08:40:50 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2021-05-30 08:40:50 +0000 |
commit | 0d66e18fb85a0d694d52ca0e873d162c070094a0 (patch) | |
tree | 9960fb5406e537c303b85517c25ac6c65d47d634 | |
parent | 55513e0bfafb02059e7c43f7accedaf0e651bbac (diff) | |
parent | d4230f61dfcffdae2dda70d2103abf5699616382 (diff) | |
download | tokio-0d66e18fb85a0d694d52ca0e873d162c070094a0.tar.gz |
Merge "Upgrade rust/crates/tokio to 1.6.0 and use cargo2android.json to generate bp file" am: d4230f61df
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio/+/1718994
Change-Id: I6f3bef3d4f45e33471fc70d0e7648912a994705a
72 files changed, 4452 insertions, 683 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index b75e29b..13071e3 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "a5ee2f0d3d78daa01e2c6c12d22b82474dc5c32a" + "sha1": "580dc9594c8e42b7d2dec60447f35238b8dfcf35" } } @@ -1,4 +1,4 @@ -// This file is generated by cargo2android.py --device --run --features io-util,macros,rt-multi-thread,sync,net,fs,time --tests --patch=patches/Android.bp.patch. +// This file is generated by cargo2android.py --config cargo2android.json. // Do not modify this file as changes will be overridden on upgrade. package { @@ -58,7 +58,7 @@ rust_library { } rust_defaults { - name: "tokio_defaults", + name: "tokio_defaults_tokio", crate_name: "tokio", test_suites: ["general-tests"], auto_gen_config: true, @@ -100,8 +100,23 @@ rust_defaults { } rust_test_host { + name: "tokio_host_test_tests__require_full", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/_require_full.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests__require_full", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/_require_full.rs"], +} + +rust_test_host { name: "tokio_host_test_tests_buffered", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/buffered.rs"], test_options: { unit_test: true, @@ -110,13 +125,28 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_buffered", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/buffered.rs"], } rust_test_host { + name: "tokio_host_test_tests_io_async_fd", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_async_fd.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_async_fd", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_async_fd.rs"], +} + +rust_test_host { name: "tokio_host_test_tests_io_async_read", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_async_read.rs"], test_options: { unit_test: true, @@ -125,13 +155,43 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_async_read", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_async_read.rs"], } rust_test_host { + name: "tokio_host_test_tests_io_chain", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_chain.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_chain", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_chain.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_io_copy", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_copy.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_copy", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_copy.rs"], +} + +rust_test_host { name: "tokio_host_test_tests_io_copy_bidirectional", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_copy_bidirectional.rs"], test_options: { unit_test: true, @@ -140,13 +200,43 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_copy_bidirectional", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_copy_bidirectional.rs"], } rust_test_host { + name: "tokio_host_test_tests_io_driver", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_driver.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_driver", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_driver.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_io_driver_drop", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_driver_drop.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_driver_drop", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_driver_drop.rs"], +} + +rust_test_host { name: "tokio_host_test_tests_io_lines", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_lines.rs"], test_options: { unit_test: true, @@ -155,13 +245,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_lines", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_lines.rs"], } rust_test_host { name: "tokio_host_test_tests_io_mem_stream", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_mem_stream.rs"], test_options: { unit_test: true, @@ -170,13 +260,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_mem_stream", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_mem_stream.rs"], } rust_test_host { name: "tokio_host_test_tests_io_read", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_read.rs"], test_options: { unit_test: true, @@ -185,13 +275,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_read", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_read.rs"], } rust_test_host { name: "tokio_host_test_tests_io_read_buf", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_read_buf.rs"], test_options: { unit_test: true, @@ -200,13 +290,43 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_read_buf", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_read_buf.rs"], } rust_test_host { + name: "tokio_host_test_tests_io_read_exact", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_read_exact.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_read_exact", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_read_exact.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_io_read_line", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_read_line.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_read_line", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_read_line.rs"], +} + +rust_test_host { name: "tokio_host_test_tests_io_read_to_end", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_read_to_end.rs"], test_options: { unit_test: true, @@ -215,13 +335,58 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_read_to_end", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_read_to_end.rs"], } rust_test_host { + name: "tokio_host_test_tests_io_read_to_string", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_read_to_string.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_read_to_string", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_read_to_string.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_io_read_until", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_read_until.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_read_until", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_read_until.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_io_split", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_split.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_io_split", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/io_split.rs"], +} + +rust_test_host { name: "tokio_host_test_tests_io_take", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_take.rs"], test_options: { unit_test: true, @@ -230,13 +395,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_take", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_take.rs"], } rust_test_host { name: "tokio_host_test_tests_io_write", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_write.rs"], test_options: { unit_test: true, @@ -245,13 +410,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_write", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_write.rs"], } rust_test_host { name: "tokio_host_test_tests_io_write_all", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_write_all.rs"], test_options: { unit_test: true, @@ -260,13 +425,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_write_all", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_write_all.rs"], } rust_test_host { name: "tokio_host_test_tests_io_write_buf", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_write_buf.rs"], test_options: { unit_test: true, @@ -275,13 +440,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_write_buf", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_write_buf.rs"], } rust_test_host { name: "tokio_host_test_tests_io_write_int", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_write_int.rs"], test_options: { unit_test: true, @@ -290,13 +455,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_io_write_int", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/io_write_int.rs"], } rust_test_host { name: "tokio_host_test_tests_macros_join", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/macros_join.rs"], test_options: { unit_test: true, @@ -305,13 +470,103 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_macros_join", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/macros_join.rs"], } rust_test_host { + name: "tokio_host_test_tests_macros_pin", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/macros_pin.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_macros_pin", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/macros_pin.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_macros_select", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/macros_select.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_macros_select", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/macros_select.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_macros_test", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/macros_test.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_macros_test", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/macros_test.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_macros_try_join", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/macros_try_join.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_macros_try_join", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/macros_try_join.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_net_bind_resource", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/net_bind_resource.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_net_bind_resource", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/net_bind_resource.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_net_lookup_host", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/net_lookup_host.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_net_lookup_host", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/net_lookup_host.rs"], +} + +rust_test_host { name: "tokio_host_test_tests_no_rt", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/no_rt.rs"], test_options: { unit_test: true, @@ -320,13 +575,28 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_no_rt", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/no_rt.rs"], } rust_test_host { + name: "tokio_host_test_tests_process_kill_on_drop", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/process_kill_on_drop.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_process_kill_on_drop", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/process_kill_on_drop.rs"], +} + +rust_test_host { name: "tokio_host_test_tests_rt_basic", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/rt_basic.rs"], test_options: { unit_test: true, @@ -335,13 +605,29 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_rt_basic", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/rt_basic.rs"], } rust_test_host { + name: "tokio_host_test_tests_rt_common", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/rt_common.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_rt_common", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/rt_common.rs"], +} + + +rust_test_host { name: "tokio_host_test_tests_rt_threaded", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/rt_threaded.rs"], test_options: { unit_test: true, @@ -350,13 +636,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_rt_threaded", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/rt_threaded.rs"], } rust_test_host { name: "tokio_host_test_tests_sync_barrier", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_barrier.rs"], test_options: { unit_test: true, @@ -365,13 +651,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_sync_barrier", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_barrier.rs"], } rust_test_host { name: "tokio_host_test_tests_sync_broadcast", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_broadcast.rs"], test_options: { unit_test: true, @@ -380,13 +666,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_sync_broadcast", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_broadcast.rs"], } rust_test_host { name: "tokio_host_test_tests_sync_errors", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_errors.rs"], test_options: { unit_test: true, @@ -395,13 +681,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_sync_errors", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_errors.rs"], } rust_test_host { name: "tokio_host_test_tests_sync_mpsc", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_mpsc.rs"], test_options: { unit_test: true, @@ -410,13 +696,28 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_sync_mpsc", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_mpsc.rs"], } rust_test_host { + name: "tokio_host_test_tests_sync_mutex", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_mutex.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_sync_mutex", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_mutex.rs"], +} + +rust_test_host { name: "tokio_host_test_tests_sync_mutex_owned", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_mutex_owned.rs"], test_options: { unit_test: true, @@ -425,13 +726,43 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_sync_mutex_owned", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_mutex_owned.rs"], } rust_test_host { + name: "tokio_host_test_tests_sync_notify", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_notify.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_sync_notify", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_notify.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_sync_oneshot", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_oneshot.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_sync_oneshot", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_oneshot.rs"], +} + +rust_test_host { name: "tokio_host_test_tests_sync_rwlock", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_rwlock.rs"], test_options: { unit_test: true, @@ -440,13 +771,43 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_sync_rwlock", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_rwlock.rs"], } rust_test_host { + name: "tokio_host_test_tests_sync_semaphore", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_semaphore.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_sync_semaphore", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_semaphore.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_sync_semaphore_owned", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_semaphore_owned.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_sync_semaphore_owned", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/sync_semaphore_owned.rs"], +} + +rust_test_host { name: "tokio_host_test_tests_sync_watch", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_watch.rs"], test_options: { unit_test: true, @@ -455,13 +816,43 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_sync_watch", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/sync_watch.rs"], } rust_test_host { + name: "tokio_host_test_tests_task_abort", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/task_abort.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_task_abort", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/task_abort.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_task_blocking", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/task_blocking.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_task_blocking", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/task_blocking.rs"], +} + +rust_test_host { name: "tokio_host_test_tests_task_local", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/task_local.rs"], test_options: { unit_test: true, @@ -470,13 +861,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_task_local", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/task_local.rs"], } rust_test_host { name: "tokio_host_test_tests_task_local_set", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/task_local_set.rs"], test_options: { unit_test: true, @@ -485,13 +876,13 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_task_local_set", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/task_local_set.rs"], } rust_test_host { name: "tokio_host_test_tests_tcp_accept", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/tcp_accept.rs"], test_options: { unit_test: true, @@ -500,13 +891,28 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_tcp_accept", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/tcp_accept.rs"], } rust_test_host { + name: "tokio_host_test_tests_tcp_connect", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_connect.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_tcp_connect", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_connect.rs"], +} + +rust_test_host { name: "tokio_host_test_tests_tcp_echo", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/tcp_echo.rs"], test_options: { unit_test: true, @@ -515,13 +921,28 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_tcp_echo", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/tcp_echo.rs"], } rust_test_host { + name: "tokio_host_test_tests_tcp_into_split", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_into_split.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_tcp_into_split", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_into_split.rs"], +} + +rust_test_host { name: "tokio_host_test_tests_tcp_into_std", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/tcp_into_std.rs"], test_options: { unit_test: true, @@ -530,13 +951,28 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_tcp_into_std", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/tcp_into_std.rs"], } rust_test_host { + name: "tokio_host_test_tests_tcp_peek", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_peek.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_tcp_peek", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_peek.rs"], +} + +rust_test_host { name: "tokio_host_test_tests_tcp_shutdown", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/tcp_shutdown.rs"], test_options: { unit_test: true, @@ -545,13 +981,43 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_tcp_shutdown", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/tcp_shutdown.rs"], } rust_test_host { + name: "tokio_host_test_tests_tcp_socket", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_socket.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_tcp_socket", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_socket.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_tcp_split", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_split.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_tcp_split", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/tcp_split.rs"], +} + +rust_test_host { name: "tokio_host_test_tests_time_rt", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/time_rt.rs"], test_options: { unit_test: true, @@ -560,13 +1026,43 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_time_rt", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/time_rt.rs"], } rust_test_host { + name: "tokio_host_test_tests_udp", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/udp.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_udp", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/udp.rs"], +} + +rust_test_host { + name: "tokio_host_test_tests_uds_cred", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/uds_cred.rs"], + test_options: { + unit_test: true, + }, +} + +rust_test { + name: "tokio_device_test_tests_uds_cred", + defaults: ["tokio_defaults_tokio"], + srcs: ["tests/uds_cred.rs"], +} + +rust_test_host { name: "tokio_host_test_tests_uds_split", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/uds_split.rs"], test_options: { unit_test: true, @@ -575,6 +1071,6 @@ rust_test_host { rust_test { name: "tokio_device_test_tests_uds_split", - defaults: ["tokio_defaults"], + defaults: ["tokio_defaults_tokio"], srcs: ["tests/uds_split.rs"], } diff --git a/CHANGELOG.md b/CHANGELOG.md index 0808920..2349b0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,49 @@ +# 1.6.0 (May 14, 2021) + +### Added + +- fs: try doing a non-blocking read before punting to the threadpool ([#3518]) +- io: add `write_all_buf` to `AsyncWriteExt` ([#3737]) +- io: implement `AsyncSeek` for `BufReader`, `BufWriter`, and `BufStream` ([#3491]) +- net: support non-blocking vectored I/O ([#3761]) +- sync: add `mpsc::Sender::{reserve_owned, try_reserve_owned}` ([#3704]) +- sync: add a `MutexGuard::map` method that returns a `MappedMutexGuard` ([#2472]) +- time: add getter for Interval's period ([#3705]) + +### Fixed + +- io: wake pending writers on `DuplexStream` close ([#3756]) +- process: avoid redundant effort to reap orphan processes ([#3743]) +- signal: use `std::os::raw::c_int` instead of `libc::c_int` on public API ([#3774]) +- sync: preserve permit state in `notify_waiters` ([#3660]) +- task: update `JoinHandle` panic message ([#3727]) +- time: prevent `time::advance` from going too far ([#3712]) + +### Documented + +- net: hide `net::unix::datagram` module from docs ([#3775]) +- process: updated example ([#3748]) +- sync: `Barrier` doc should use task, not thread ([#3780]) +- task: update documentation on `block_in_place` ([#3753]) + +[#2472]: https://github.com/tokio-rs/tokio/pull/2472 +[#3491]: https://github.com/tokio-rs/tokio/pull/3491 +[#3518]: https://github.com/tokio-rs/tokio/pull/3518 +[#3660]: https://github.com/tokio-rs/tokio/pull/3660 +[#3704]: https://github.com/tokio-rs/tokio/pull/3704 +[#3705]: https://github.com/tokio-rs/tokio/pull/3705 +[#3712]: https://github.com/tokio-rs/tokio/pull/3712 +[#3727]: https://github.com/tokio-rs/tokio/pull/3727 +[#3737]: https://github.com/tokio-rs/tokio/pull/3737 +[#3743]: https://github.com/tokio-rs/tokio/pull/3743 +[#3748]: https://github.com/tokio-rs/tokio/pull/3748 +[#3753]: https://github.com/tokio-rs/tokio/pull/3753 +[#3756]: https://github.com/tokio-rs/tokio/pull/3756 +[#3761]: https://github.com/tokio-rs/tokio/pull/3761 +[#3774]: https://github.com/tokio-rs/tokio/pull/3774 +[#3775]: https://github.com/tokio-rs/tokio/pull/3775 +[#3780]: https://github.com/tokio-rs/tokio/pull/3780 + # 1.5.0 (April 12, 2021) ### Added @@ -19,6 +65,7 @@ - rt: fix panic in `JoinHandle::abort()` when called from other threads ([#3672]) - sync: don't panic in `oneshot::try_recv` ([#3674]) - sync: fix notifications getting dropped on receiver drop ([#3652]) +- sync: fix `Semaphore` permit overflow calculation ([#3644]) ### Documented @@ -13,11 +13,11 @@ [package] edition = "2018" name = "tokio" -version = "1.5.0" +version = "1.6.0" authors = ["Tokio Contributors <team@tokio.rs>"] description = "An event-driven, non-blocking I/O platform for writing asynchronous I/O\nbacked applications.\n" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio/1.5.0/tokio/" +documentation = "https://docs.rs/tokio/1.6.0/tokio/" readme = "README.md" keywords = ["io", "async", "non-blocking", "futures"] categories = ["asynchronous", "network-programming"] @@ -85,7 +85,7 @@ version = "1" [features] default = [] -fs = [] +fs = ["libc"] full = ["fs", "io-util", "io-std", "macros", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"] io-std = [] io-util = ["memchr", "bytes"] @@ -107,14 +107,14 @@ features = ["std"] optional = true default-features = false [target."cfg(unix)".dependencies.libc] -version = "0.2.42" +version = "0.2.87" optional = true [target."cfg(unix)".dependencies.signal-hook-registry] version = "1.1.1" optional = true [target."cfg(unix)".dev-dependencies.libc] -version = "0.2.42" +version = "0.2.87" [target."cfg(unix)".dev-dependencies.nix] version = "0.19.0" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 5e53c3f..005767e 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -7,12 +7,12 @@ name = "tokio" # - README.md # - Update CHANGELOG.md. # - Create "v1.0.x" git tag. -version = "1.5.0" +version = "1.6.0" edition = "2018" authors = ["Tokio Contributors <team@tokio.rs>"] license = "MIT" readme = "README.md" -documentation = "https://docs.rs/tokio/1.5.0/tokio/" +documentation = "https://docs.rs/tokio/1.6.0/tokio/" repository = "https://github.com/tokio-rs/tokio" homepage = "https://tokio.rs" description = """ @@ -42,7 +42,7 @@ full = [ "time", ] -fs = [] +fs = ["libc"] io-util = ["memchr", "bytes"] # stdin, stdout, stderr io-std = [] @@ -103,11 +103,11 @@ parking_lot = { version = "0.11.0", optional = true } tracing = { version = "0.1.21", default-features = false, features = ["std"], optional = true } # Not in full [target.'cfg(unix)'.dependencies] -libc = { version = "0.2.42", optional = true } +libc = { version = "0.2.87", optional = true } signal-hook-registry = { version = "1.1.1", optional = true } [target.'cfg(unix)'.dev-dependencies] -libc = { version = "0.2.42" } +libc = { version = "0.2.87" } nix = { version = "0.19.0" } [target.'cfg(windows)'.dependencies.winapi] @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/tokio/tokio-1.5.0.crate" + value: "https://static.crates.io/crates/tokio/tokio-1.6.0.crate" } - version: "1.5.0" + version: "1.6.0" license_type: NOTICE last_upgrade_date { year: 2021 - month: 4 - day: 21 + month: 5 + day: 26 } } diff --git a/TEST_MAPPING b/TEST_MAPPING index 686fbda..6c266ef 100644 --- a/TEST_MAPPING +++ b/TEST_MAPPING @@ -20,15 +20,33 @@ "name": "tokio-test_device_test_tests_macros" }, { + "name": "tokio_device_test_tests__require_full" + }, + { "name": "tokio_device_test_tests_buffered" }, { + "name": "tokio_device_test_tests_io_async_fd" + }, + { "name": "tokio_device_test_tests_io_async_read" }, { + "name": "tokio_device_test_tests_io_chain" + }, + { + "name": "tokio_device_test_tests_io_copy" + }, + { "name": "tokio_device_test_tests_io_copy_bidirectional" }, { + "name": "tokio_device_test_tests_io_driver" + }, + { + "name": "tokio_device_test_tests_io_driver_drop" + }, + { "name": "tokio_device_test_tests_io_lines" }, { @@ -41,9 +59,24 @@ "name": "tokio_device_test_tests_io_read_buf" }, { + "name": "tokio_device_test_tests_io_read_exact" + }, + { + "name": "tokio_device_test_tests_io_read_line" + }, + { "name": "tokio_device_test_tests_io_read_to_end" }, { + "name": "tokio_device_test_tests_io_read_to_string" + }, + { + "name": "tokio_device_test_tests_io_read_until" + }, + { + "name": "tokio_device_test_tests_io_split" + }, + { "name": "tokio_device_test_tests_io_take" }, { @@ -62,12 +95,36 @@ "name": "tokio_device_test_tests_macros_join" }, { + "name": "tokio_device_test_tests_macros_pin" + }, + { + "name": "tokio_device_test_tests_macros_select" + }, + { + "name": "tokio_device_test_tests_macros_test" + }, + { + "name": "tokio_device_test_tests_macros_try_join" + }, + { + "name": "tokio_device_test_tests_net_bind_resource" + }, + { + "name": "tokio_device_test_tests_net_lookup_host" + }, + { "name": "tokio_device_test_tests_no_rt" }, { + "name": "tokio_device_test_tests_process_kill_on_drop" + }, + { "name": "tokio_device_test_tests_rt_basic" }, { + "name": "tokio_device_test_tests_rt_common" + }, + { "name": "tokio_device_test_tests_rt_threaded" }, { @@ -83,15 +140,36 @@ "name": "tokio_device_test_tests_sync_mpsc" }, { + "name": "tokio_device_test_tests_sync_mutex" + }, + { "name": "tokio_device_test_tests_sync_mutex_owned" }, { + "name": "tokio_device_test_tests_sync_notify" + }, + { + "name": "tokio_device_test_tests_sync_oneshot" + }, + { "name": "tokio_device_test_tests_sync_rwlock" }, { + "name": "tokio_device_test_tests_sync_semaphore" + }, + { + "name": "tokio_device_test_tests_sync_semaphore_owned" + }, + { "name": "tokio_device_test_tests_sync_watch" }, { + "name": "tokio_device_test_tests_task_abort" + }, + { + "name": "tokio_device_test_tests_task_blocking" + }, + { "name": "tokio_device_test_tests_task_local" }, { @@ -101,18 +179,39 @@ "name": "tokio_device_test_tests_tcp_accept" }, { + "name": "tokio_device_test_tests_tcp_connect" + }, + { "name": "tokio_device_test_tests_tcp_echo" }, { + "name": "tokio_device_test_tests_tcp_into_split" + }, + { "name": "tokio_device_test_tests_tcp_into_std" }, { + "name": "tokio_device_test_tests_tcp_peek" + }, + { "name": "tokio_device_test_tests_tcp_shutdown" }, { + "name": "tokio_device_test_tests_tcp_socket" + }, + { + "name": "tokio_device_test_tests_tcp_split" + }, + { "name": "tokio_device_test_tests_time_rt" }, { + "name": "tokio_device_test_tests_udp" + }, + { + "name": "tokio_device_test_tests_uds_cred" + }, + { "name": "tokio_device_test_tests_uds_split" } ] diff --git a/cargo2android.json b/cargo2android.json new file mode 100644 index 0000000..a4c19a2 --- /dev/null +++ b/cargo2android.json @@ -0,0 +1,11 @@ +{ + "apex-available": [ + "//apex_available:platform", + "com.android.resolv" + ], + "min_sdk_version": "29", + "features": "io-util,macros,rt-multi-thread,sync,net,fs,time", + "device": true, + "run": true, + "patch": "patches/Android.bp.patch" +}
\ No newline at end of file diff --git a/patches/Android.bp.patch b/patches/Android.bp.patch index 759d95f..645b3c3 100644 --- a/patches/Android.bp.patch +++ b/patches/Android.bp.patch @@ -1,310 +1,1026 @@ diff --git a/Android.bp b/Android.bp -index 6b8ca5b..222916b 100644 +index e2a61ba..d289f14 100644 --- a/Android.bp +++ b/Android.bp -@@ -50,6 +50,11 @@ rust_library { - "libpin_project_lite", +@@ -56,3 +56,1021 @@ rust_library { ], - proc_macros: ["libtokio_macros"], -+ apex_available: [ -+ "//apex_available:platform", -+ "com.android.resolv", -+ ], -+ min_sdk_version: "29", + min_sdk_version: "29", } - - rust_defaults { -@@ -61,6 +66,7 @@ rust_defaults { - features: [ - "bytes", - "fs", ++ ++rust_defaults { ++ name: "tokio_defaults_tokio", ++ crate_name: "tokio", ++ test_suites: ["general-tests"], ++ auto_gen_config: true, ++ edition: "2018", ++ features: [ ++ "bytes", ++ "fs", + "full", - "io-util", - "libc", - "macros", -@@ -108,36 +114,6 @@ rust_test { - srcs: ["tests/buffered.rs"], - } - --rust_test_host { -- name: "tokio_host_test_tests_fs_file", -- defaults: ["tokio_defaults"], -- srcs: ["tests/fs_file.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_fs_file", -- defaults: ["tokio_defaults"], -- srcs: ["tests/fs_file.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_fs_link", -- defaults: ["tokio_defaults"], -- srcs: ["tests/fs_link.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_fs_link", -- defaults: ["tokio_defaults"], -- srcs: ["tests/fs_link.rs"], --} -- - rust_test_host { - name: "tokio_host_test_tests_io_async_read", - defaults: ["tokio_defaults"], -@@ -348,51 +324,6 @@ rust_test { - srcs: ["tests/no_rt.rs"], - } - --rust_test_host { -- name: "tokio_host_test_tests_process_issue_2174", -- defaults: ["tokio_defaults"], -- srcs: ["tests/process_issue_2174.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_process_issue_2174", -- defaults: ["tokio_defaults"], -- srcs: ["tests/process_issue_2174.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_process_issue_42", -- defaults: ["tokio_defaults"], -- srcs: ["tests/process_issue_42.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_process_issue_42", -- defaults: ["tokio_defaults"], -- srcs: ["tests/process_issue_42.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_process_smoke", -- defaults: ["tokio_defaults"], -- srcs: ["tests/process_smoke.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_process_smoke", -- defaults: ["tokio_defaults"], -- srcs: ["tests/process_smoke.rs"], --} -- - rust_test_host { - name: "tokio_host_test_tests_rt_basic", - defaults: ["tokio_defaults"], -@@ -423,111 +354,6 @@ rust_test { - srcs: ["tests/rt_threaded.rs"], - } - --rust_test_host { -- name: "tokio_host_test_tests_signal_ctrl_c", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_ctrl_c.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_signal_ctrl_c", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_ctrl_c.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_signal_drop_rt", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_drop_rt.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_signal_drop_rt", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_drop_rt.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_signal_drop_signal", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_drop_signal.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_signal_drop_signal", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_drop_signal.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_signal_multi_rt", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_multi_rt.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_signal_multi_rt", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_multi_rt.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_signal_no_rt", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_no_rt.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_signal_no_rt", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_no_rt.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_signal_notify_both", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_notify_both.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_signal_notify_both", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_notify_both.rs"], --} -- --rust_test_host { -- name: "tokio_host_test_tests_signal_twice", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_twice.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_signal_twice", -- defaults: ["tokio_defaults"], -- srcs: ["tests/signal_twice.rs"], --} -- - rust_test_host { - name: "tokio_host_test_tests_sync_barrier", - defaults: ["tokio_defaults"], -@@ -603,21 +429,6 @@ rust_test { - srcs: ["tests/sync_mutex_owned.rs"], - } - --rust_test_host { -- name: "tokio_host_test_tests_sync_once_cell", -- defaults: ["tokio_defaults"], -- srcs: ["tests/sync_once_cell.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_sync_once_cell", -- defaults: ["tokio_defaults"], -- srcs: ["tests/sync_once_cell.rs"], --} -- - rust_test_host { - name: "tokio_host_test_tests_sync_rwlock", - defaults: ["tokio_defaults"], -@@ -738,21 +549,6 @@ rust_test { - srcs: ["tests/tcp_shutdown.rs"], - } - --rust_test_host { -- name: "tokio_host_test_tests_time_interval", -- defaults: ["tokio_defaults"], -- srcs: ["tests/time_interval.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_time_interval", -- defaults: ["tokio_defaults"], -- srcs: ["tests/time_interval.rs"], --} -- - rust_test_host { - name: "tokio_host_test_tests_time_rt", - defaults: ["tokio_defaults"], -@@ -768,21 +564,6 @@ rust_test { - srcs: ["tests/time_rt.rs"], - } - --rust_test_host { -- name: "tokio_host_test_tests_time_timeout", -- defaults: ["tokio_defaults"], -- srcs: ["tests/time_timeout.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_time_timeout", -- defaults: ["tokio_defaults"], -- srcs: ["tests/time_timeout.rs"], --} -- - rust_test_host { - name: "tokio_host_test_tests_uds_split", - defaults: ["tokio_defaults"], -@@ -797,18 +578,3 @@ rust_test { - defaults: ["tokio_defaults"], - srcs: ["tests/uds_split.rs"], - } -- --rust_test_host { -- name: "tokio_host_test_tests_uds_stream", -- defaults: ["tokio_defaults"], -- srcs: ["tests/uds_stream.rs"], -- test_options: { -- unit_test: true, -- }, --} -- --rust_test { -- name: "tokio_device_test_tests_uds_stream", -- defaults: ["tokio_defaults"], -- srcs: ["tests/uds_stream.rs"], --} ++ "io-util", ++ "libc", ++ "macros", ++ "memchr", ++ "mio", ++ "net", ++ "num_cpus", ++ "rt", ++ "rt-multi-thread", ++ "sync", ++ "time", ++ "tokio-macros", ++ ], ++ cfgs: ["tokio_track_caller"], ++ rustlibs: [ ++ "libasync_stream", ++ "libbytes", ++ "libfutures", ++ "liblibc", ++ "libmemchr", ++ "libmio", ++ "libnix", ++ "libnum_cpus", ++ "libpin_project_lite", ++ "librand", ++ "libtokio", ++ "libtokio_stream", ++ "libtokio_test", ++ ], ++ proc_macros: ["libtokio_macros"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests__require_full", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/_require_full.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests__require_full", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/_require_full.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_buffered", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/buffered.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_buffered", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/buffered.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_async_fd", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_async_fd.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_async_fd", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_async_fd.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_async_read", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_async_read.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_async_read", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_async_read.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_chain", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_chain.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_chain", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_chain.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_copy", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_copy.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_copy", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_copy.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_copy_bidirectional", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_copy_bidirectional.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_copy_bidirectional", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_copy_bidirectional.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_driver", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_driver.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_driver", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_driver.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_driver_drop", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_driver_drop.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_driver_drop", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_driver_drop.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_lines", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_lines.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_lines", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_lines.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_mem_stream", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_mem_stream.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_mem_stream", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_mem_stream.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_read", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_read", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_read_buf", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_buf.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_read_buf", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_buf.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_read_exact", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_exact.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_read_exact", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_exact.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_read_line", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_line.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_read_line", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_line.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_read_to_end", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_to_end.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_read_to_end", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_to_end.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_read_to_string", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_to_string.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_read_to_string", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_to_string.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_read_until", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_until.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_read_until", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_read_until.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_split", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_split.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_split", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_split.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_take", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_take.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_take", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_take.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_write", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_write.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_write", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_write.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_write_all", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_write_all.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_write_all", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_write_all.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_write_buf", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_write_buf.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_write_buf", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_write_buf.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_io_write_int", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_write_int.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_io_write_int", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/io_write_int.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_macros_join", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_join.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_macros_join", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_join.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_macros_pin", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_pin.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_macros_pin", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_pin.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_macros_select", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_select.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_macros_select", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_select.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_macros_test", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_test.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_macros_test", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_test.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_macros_try_join", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_try_join.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_macros_try_join", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/macros_try_join.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_net_bind_resource", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/net_bind_resource.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_net_bind_resource", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/net_bind_resource.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_net_lookup_host", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/net_lookup_host.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_net_lookup_host", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/net_lookup_host.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_no_rt", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/no_rt.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_no_rt", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/no_rt.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_process_kill_on_drop", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/process_kill_on_drop.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_process_kill_on_drop", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/process_kill_on_drop.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_rt_basic", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/rt_basic.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_rt_basic", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/rt_basic.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_rt_common", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/rt_common.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_rt_common", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/rt_common.rs"], ++} ++ ++ ++rust_test_host { ++ name: "tokio_host_test_tests_rt_threaded", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/rt_threaded.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_rt_threaded", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/rt_threaded.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_barrier", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_barrier.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_barrier", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_barrier.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_broadcast", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_broadcast.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_broadcast", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_broadcast.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_errors", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_errors.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_errors", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_errors.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_mpsc", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_mpsc.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_mpsc", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_mpsc.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_mutex", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_mutex.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_mutex", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_mutex.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_mutex_owned", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_mutex_owned.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_mutex_owned", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_mutex_owned.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_notify", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_notify.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_notify", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_notify.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_oneshot", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_oneshot.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_oneshot", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_oneshot.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_rwlock", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_rwlock.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_rwlock", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_rwlock.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_semaphore", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_semaphore.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_semaphore", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_semaphore.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_semaphore_owned", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_semaphore_owned.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_semaphore_owned", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_semaphore_owned.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_sync_watch", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_watch.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_sync_watch", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/sync_watch.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_task_abort", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/task_abort.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_task_abort", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/task_abort.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_task_blocking", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/task_blocking.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_task_blocking", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/task_blocking.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_task_local", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/task_local.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_task_local", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/task_local.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_task_local_set", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/task_local_set.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_task_local_set", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/task_local_set.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_accept", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_accept.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_accept", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_accept.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_connect", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_connect.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_connect", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_connect.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_echo", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_echo.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_echo", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_echo.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_into_split", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_into_split.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_into_split", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_into_split.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_into_std", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_into_std.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_into_std", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_into_std.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_peek", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_peek.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_peek", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_peek.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_shutdown", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_shutdown.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_shutdown", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_shutdown.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_socket", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_socket.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_socket", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_socket.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_tcp_split", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_split.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_tcp_split", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/tcp_split.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_time_rt", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/time_rt.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_time_rt", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/time_rt.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_udp", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/udp.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_udp", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/udp.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_uds_cred", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/uds_cred.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_uds_cred", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/uds_cred.rs"], ++} ++ ++rust_test_host { ++ name: "tokio_host_test_tests_uds_split", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/uds_split.rs"], ++ test_options: { ++ unit_test: true, ++ }, ++} ++ ++rust_test { ++ name: "tokio_device_test_tests_uds_split", ++ defaults: ["tokio_defaults_tokio"], ++ srcs: ["tests/uds_split.rs"], ++} diff --git a/patches/io_mem_stream.patch b/patches/io_mem_stream.patch new file mode 100644 index 0000000..c21ce18 --- /dev/null +++ b/patches/io_mem_stream.patch @@ -0,0 +1,12 @@ +diff --git a/tests/io_mem_stream.rs b/tests/io_mem_stream.rs +index 01baa53..520391a 100644 +--- a/tests/io_mem_stream.rs ++++ b/tests/io_mem_stream.rs +@@ -63,6 +63,7 @@ async fn disconnect() { + } + + #[tokio::test] ++#[cfg(not(target_os = "android"))] + async fn disconnect_reader() { + let (a, mut b) = duplex(2); + diff --git a/patches/rt_common.patch b/patches/rt_common.patch new file mode 100644 index 0000000..1444cfe --- /dev/null +++ b/patches/rt_common.patch @@ -0,0 +1,12 @@ +diff --git a/tests/rt_common.rs b/tests/rt_common.rs +index cb1d0f6..e5fc7a9 100644 +--- a/tests/rt_common.rs ++++ b/tests/rt_common.rs +@@ -647,6 +647,7 @@ rt_test! { + } + + #[test] ++ #[cfg(not(target_os = "android"))] + fn panic_in_task() { + let rt = rt(); + let (tx, rx) = oneshot::channel(); diff --git a/patches/task_blocking.rs b/patches/task_blocking.rs new file mode 100644 index 0000000..7f4f7d4 --- /dev/null +++ b/patches/task_blocking.rs @@ -0,0 +1,12 @@ +diff --git a/tests/task_blocking.rs b/tests/task_blocking.rs +index 82bef8a..d9514d2 100644 +--- a/tests/task_blocking.rs ++++ b/tests/task_blocking.rs +@@ -114,6 +114,7 @@ fn can_enter_basic_rt_from_within_block_in_place() { + } + + #[test] ++#[cfg(not(target_os = "android"))] + fn useful_panic_message_when_dropping_rt_in_rt() { + use std::panic::{catch_unwind, AssertUnwindSafe}; + diff --git a/src/fs/file.rs b/src/fs/file.rs index 5c06e73..abd6e8c 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -491,14 +491,18 @@ impl AsyncRead for File { loop { match inner.state { Idle(ref mut buf_cell) => { - let mut buf = buf_cell.take().unwrap(); + let buf = buf_cell.as_mut().unwrap(); if !buf.is_empty() { buf.copy_to(dst); - *buf_cell = Some(buf); return Ready(Ok(())); } + if let Some(x) = try_nonblocking_read(me.std.as_ref(), dst) { + return Ready(x); + } + + let mut buf = buf_cell.take().unwrap(); buf.ensure_capacity_for(dst); let std = me.std.clone(); @@ -756,3 +760,186 @@ impl Inner { } } } + +#[cfg(all(target_os = "linux", not(test)))] +pub(crate) fn try_nonblocking_read( + file: &crate::fs::sys::File, + dst: &mut ReadBuf<'_>, +) -> Option<std::io::Result<()>> { + use std::sync::atomic::{AtomicBool, Ordering}; + + static NONBLOCKING_READ_SUPPORTED: AtomicBool = AtomicBool::new(true); + if !NONBLOCKING_READ_SUPPORTED.load(Ordering::Relaxed) { + return None; + } + let out = preadv2::preadv2_safe(file, dst, -1, preadv2::RWF_NOWAIT); + if let Err(err) = &out { + match err.raw_os_error() { + Some(libc::ENOSYS) => { + NONBLOCKING_READ_SUPPORTED.store(false, Ordering::Relaxed); + return None; + } + Some(libc::ENOTSUP) | Some(libc::EAGAIN) => return None, + _ => {} + } + } + Some(out) +} + +#[cfg(any(not(target_os = "linux"), test))] +pub(crate) fn try_nonblocking_read( + _file: &crate::fs::sys::File, + _dst: &mut ReadBuf<'_>, +) -> Option<std::io::Result<()>> { + None +} + +#[cfg(target_os = "linux")] +mod preadv2 { + use libc::{c_int, c_long, c_void, iovec, off_t, ssize_t}; + use std::os::unix::prelude::AsRawFd; + + use crate::io::ReadBuf; + + pub(crate) fn preadv2_safe( + file: &std::fs::File, + dst: &mut ReadBuf<'_>, + offset: off_t, + flags: c_int, + ) -> std::io::Result<()> { + unsafe { + /* We have to defend against buffer overflows manually here. The slice API makes + * this fairly straightforward. */ + let unfilled = dst.unfilled_mut(); + let mut iov = iovec { + iov_base: unfilled.as_mut_ptr() as *mut c_void, + iov_len: unfilled.len(), + }; + /* We take a File object rather than an fd as reading from a sensitive fd may confuse + * other unsafe code that assumes that only they have access to that fd. */ + let bytes_read = preadv2( + file.as_raw_fd(), + &mut iov as *mut iovec as *const iovec, + 1, + offset, + flags, + ); + if bytes_read < 0 { + Err(std::io::Error::last_os_error()) + } else { + /* preadv2 returns the number of bytes read, e.g. the number of bytes that have + * written into `unfilled`. So it's safe to assume that the data is now + * initialised */ + dst.assume_init(bytes_read as usize); + dst.advance(bytes_read as usize); + Ok(()) + } + } + } + + #[cfg(test)] + mod test { + use super::*; + + #[test] + fn test_preadv2_safe() { + use std::io::{Seek, Write}; + use std::mem::MaybeUninit; + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let filename = tmp.path().join("file"); + const MESSAGE: &[u8] = b"Hello this is a test"; + { + let mut f = std::fs::File::create(&filename).unwrap(); + f.write_all(MESSAGE).unwrap(); + } + let f = std::fs::File::open(&filename).unwrap(); + + let mut buf = [MaybeUninit::<u8>::new(0); 50]; + let mut br = ReadBuf::uninit(&mut buf); + + // Basic use: + preadv2_safe(&f, &mut br, 0, 0).unwrap(); + assert_eq!(br.initialized().len(), MESSAGE.len()); + assert_eq!(br.filled(), MESSAGE); + + // Here we check that offset works, but also that appending to a non-empty buffer + // behaves correctly WRT initialisation. + preadv2_safe(&f, &mut br, 5, 0).unwrap(); + assert_eq!(br.initialized().len(), MESSAGE.len() * 2 - 5); + assert_eq!(br.filled(), b"Hello this is a test this is a test".as_ref()); + + // offset of -1 means use the current cursor. This has not been advanced by the + // previous reads because we specified an offset there. + preadv2_safe(&f, &mut br, -1, 0).unwrap(); + assert_eq!(br.remaining(), 0); + assert_eq!( + br.filled(), + b"Hello this is a test this is a testHello this is a".as_ref() + ); + + // but the offset should have been advanced by that read + br.clear(); + preadv2_safe(&f, &mut br, -1, 0).unwrap(); + assert_eq!(br.filled(), b" test"); + + // This should be in cache, so RWF_NOWAIT should work, but it not being in cache + // (EAGAIN) or not supported by the underlying filesystem (ENOTSUP) is fine too. + br.clear(); + match preadv2_safe(&f, &mut br, 0, RWF_NOWAIT) { + Ok(()) => assert_eq!(br.filled(), MESSAGE), + Err(e) => assert!(matches!( + e.raw_os_error(), + Some(libc::ENOTSUP) | Some(libc::EAGAIN) + )), + } + + // Test handling large offsets + { + // I hope the underlying filesystem supports sparse files + let mut w = std::fs::OpenOptions::new() + .write(true) + .open(&filename) + .unwrap(); + w.set_len(0x1_0000_0000).unwrap(); + w.seek(std::io::SeekFrom::Start(0x1_0000_0000)).unwrap(); + w.write_all(b"This is a Large File").unwrap(); + } + + br.clear(); + preadv2_safe(&f, &mut br, 0x1_0000_0008, 0).unwrap(); + assert_eq!(br.filled(), b"a Large File"); + } + } + + fn pos_to_lohi(offset: off_t) -> (c_long, c_long) { + // 64-bit offset is split over high and low 32-bits on 32-bit architectures. + // 64-bit architectures still have high and low arguments, but only the low + // one is inspected. See pos_from_hilo in linux/fs/read_write.c. + const HALF_LONG_BITS: usize = core::mem::size_of::<c_long>() * 8 / 2; + ( + offset as c_long, + // We want to shift this off_t value by size_of::<c_long>(). We can't do + // it in one shift because if they're both 64-bits we'd be doing u64 >> 64 + // which is implementation defined. Instead do it in two halves: + ((offset >> HALF_LONG_BITS) >> HALF_LONG_BITS) as c_long, + ) + } + + pub(crate) const RWF_NOWAIT: c_int = 0x00000008; + unsafe fn preadv2( + fd: c_int, + iov: *const iovec, + iovcnt: c_int, + offset: off_t, + flags: c_int, + ) -> ssize_t { + // Call via libc::syscall rather than libc::preadv2. preadv2 is only supported by glibc + // and only since v2.26. By using syscall we don't need to worry about compatiblity with + // old glibc versions and it will work on Android and musl too. The downside is that you + // can't use `LD_PRELOAD` tricks any more to intercept these calls. + let (lo, hi) = pos_to_lohi(offset); + libc::syscall(libc::SYS_preadv2, fd, iov, iovcnt, lo, hi, flags) as ssize_t + } +} diff --git a/src/io/driver/mod.rs b/src/io/driver/mod.rs index fa2d420..52451c6 100644 --- a/src/io/driver/mod.rs +++ b/src/io/driver/mod.rs @@ -273,7 +273,7 @@ cfg_not_rt! { /// This function panics if there is no current reactor set, or if the `rt` /// feature flag is not enabled. pub(super) fn current() -> Self { - panic!(crate::util::error::CONTEXT_MISSING_ERROR) + panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) } } } diff --git a/src/io/driver/registration.rs b/src/io/driver/registration.rs index 8251fe6..7350be6 100644 --- a/src/io/driver/registration.rs +++ b/src/io/driver/registration.rs @@ -14,8 +14,9 @@ cfg_io_driver! { /// that it will receive task notifications on readiness. This is the lowest /// level API for integrating with a reactor. /// - /// The association between an I/O resource is made by calling [`new`]. Once - /// the association is established, it remains established until the + /// The association between an I/O resource is made by calling + /// [`new_with_interest_and_handle`]. + /// Once the association is established, it remains established until the /// registration instance is dropped. /// /// A registration instance represents two separate readiness streams. One @@ -36,7 +37,7 @@ cfg_io_driver! { /// stream. The write readiness event stream is only for `Ready::writable()` /// events. /// - /// [`new`]: method@Self::new + /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle /// [`poll_read_ready`]: method@Self::poll_read_ready` /// [`poll_write_ready`]: method@Self::poll_write_ready` #[derive(Debug)] diff --git a/src/io/poll_evented.rs b/src/io/poll_evented.rs index 47ae558..a31e6db 100644 --- a/src/io/poll_evented.rs +++ b/src/io/poll_evented.rs @@ -10,10 +10,10 @@ cfg_io_driver! { /// [`std::io::Write`] traits with the reactor that drives it. /// /// `PollEvented` uses [`Registration`] internally to take a type that - /// implements [`mio::Evented`] as well as [`std::io::Read`] and or + /// implements [`mio::event::Source`] as well as [`std::io::Read`] and or /// [`std::io::Write`] and associate it with a reactor that will drive it. /// - /// Once the [`mio::Evented`] type is wrapped by `PollEvented`, it can be + /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be /// used from within the future's execution model. As such, the /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`] /// implementations using the underlying I/O resource as well as readiness @@ -40,13 +40,13 @@ cfg_io_driver! { /// [`poll_read_ready`] again will also indicate read readiness. /// /// When the operation is attempted and is unable to succeed due to the I/O - /// resource not being ready, the caller must call [`clear_read_ready`] or - /// [`clear_write_ready`]. This clears the readiness state until a new + /// resource not being ready, the caller must call `clear_read_ready` or + /// `clear_write_ready`. This clears the readiness state until a new /// readiness event is received. /// /// This allows the caller to implement additional functions. For example, /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and - /// [`clear_read_ready`]. + /// `clear_read_ready`. /// /// ## Platform-specific events /// @@ -54,17 +54,11 @@ cfg_io_driver! { /// These events are included as part of the read readiness event stream. The /// write readiness event stream is only for `Ready::writable()` events. /// - /// [`std::io::Read`]: trait@std::io::Read - /// [`std::io::Write`]: trait@std::io::Write - /// [`AsyncRead`]: trait@AsyncRead - /// [`AsyncWrite`]: trait@AsyncWrite - /// [`mio::Evented`]: trait@mio::Evented - /// [`Registration`]: struct@Registration - /// [`TcpListener`]: struct@crate::net::TcpListener - /// [`clear_read_ready`]: method@Self::clear_read_ready - /// [`clear_write_ready`]: method@Self::clear_write_ready - /// [`poll_read_ready`]: method@Self::poll_read_ready - /// [`poll_write_ready`]: method@Self::poll_write_ready + /// [`AsyncRead`]: crate::io::AsyncRead + /// [`AsyncWrite`]: crate::io::AsyncWrite + /// [`TcpListener`]: crate::net::TcpListener + /// [`poll_read_ready`]: Registration::poll_read_ready + /// [`poll_write_ready`]: Registration::poll_write_ready pub(crate) struct PollEvented<E: Source> { io: Option<E>, registration: Registration, diff --git a/src/io/util/async_write_ext.rs b/src/io/util/async_write_ext.rs index d011d82..2510ccd 100644 --- a/src/io/util/async_write_ext.rs +++ b/src/io/util/async_write_ext.rs @@ -2,6 +2,7 @@ use crate::io::util::flush::{flush, Flush}; use crate::io::util::shutdown::{shutdown, Shutdown}; use crate::io::util::write::{write, Write}; use crate::io::util::write_all::{write_all, WriteAll}; +use crate::io::util::write_all_buf::{write_all_buf, WriteAllBuf}; use crate::io::util::write_buf::{write_buf, WriteBuf}; use crate::io::util::write_int::{ WriteI128, WriteI128Le, WriteI16, WriteI16Le, WriteI32, WriteI32Le, WriteI64, WriteI64Le, @@ -159,7 +160,6 @@ cfg_io_util! { write_vectored(self, bufs) } - /// Writes a buffer into this writer, advancing the buffer's internal /// cursor. /// @@ -197,10 +197,11 @@ cfg_io_util! { /// /// # Examples /// - /// [`File`] implements `Read` and [`Cursor<&[u8]>`] implements [`Buf`]: + /// [`File`] implements [`AsyncWrite`] and [`Cursor`]`<&[u8]>` implements [`Buf`]: /// /// [`File`]: crate::fs::File /// [`Buf`]: bytes::Buf + /// [`Cursor`]: std::io::Cursor /// /// ```no_run /// use tokio::io::{self, AsyncWriteExt}; @@ -233,6 +234,59 @@ cfg_io_util! { write_buf(self, src) } + /// Attempts to write an entire buffer into this writer + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn write_all_buf(&mut self, buf: impl Buf) -> Result<(), io::Error> { + /// while buf.has_remaining() { + /// self.write_buf(&mut buf).await?; + /// } + /// } + /// ``` + /// + /// This method will continuously call [`write`] until + /// [`buf.has_remaining()`](bytes::Buf::has_remaining) returns false. This method will not + /// return until the entire buffer has been successfully written or an error occurs. The + /// first error generated will be returned. + /// + /// The buffer is advanced after each chunk is successfully written. After failure, + /// `src.chunk()` will return the chunk that failed to write. + /// + /// # Examples + /// + /// [`File`] implements [`AsyncWrite`] and [`Cursor`]`<&[u8]>` implements [`Buf`]: + /// + /// [`File`]: crate::fs::File + /// [`Buf`]: bytes::Buf + /// [`Cursor`]: std::io::Cursor + /// + /// ```no_run + /// use tokio::io::{self, AsyncWriteExt}; + /// use tokio::fs::File; + /// + /// use std::io::Cursor; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// let mut buffer = Cursor::new(b"data to write"); + /// + /// file.write_all_buf(&mut buffer).await?; + /// Ok(()) + /// } + /// ``` + /// + /// [`write`]: AsyncWriteExt::write + fn write_all_buf<'a, B>(&'a mut self, src: &'a mut B) -> WriteAllBuf<'a, Self, B> + where + Self: Sized + Unpin, + B: Buf, + { + write_all_buf(self, src) + } + /// Attempts to write an entire buffer into this writer. /// /// Equivalent to: diff --git a/src/io/util/buf_reader.rs b/src/io/util/buf_reader.rs index 271f61b..cc65ef2 100644 --- a/src/io/util/buf_reader.rs +++ b/src/io/util/buf_reader.rs @@ -1,11 +1,11 @@ use crate::io::util::DEFAULT_BUF_SIZE; -use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; use pin_project_lite::pin_project; -use std::io; +use std::io::{self, SeekFrom}; use std::pin::Pin; use std::task::{Context, Poll}; -use std::{cmp, fmt}; +use std::{cmp, fmt, mem}; pin_project! { /// The `BufReader` struct adds buffering to any reader. @@ -30,6 +30,7 @@ pin_project! { pub(super) buf: Box<[u8]>, pub(super) pos: usize, pub(super) cap: usize, + pub(super) seek_state: SeekState, } } @@ -48,6 +49,7 @@ impl<R: AsyncRead> BufReader<R> { buf: buffer.into_boxed_slice(), pos: 0, cap: 0, + seek_state: SeekState::Init, } } @@ -141,6 +143,122 @@ impl<R: AsyncRead> AsyncBufRead for BufReader<R> { } } +#[derive(Debug, Clone, Copy)] +pub(super) enum SeekState { + /// start_seek has not been called. + Init, + /// start_seek has been called, but poll_complete has not yet been called. + Start(SeekFrom), + /// Waiting for completion of the first poll_complete in the `n.checked_sub(remainder).is_none()` branch. + PendingOverflowed(i64), + /// Waiting for completion of poll_complete. + Pending, +} + +/// Seek to an offset, in bytes, in the underlying reader. +/// +/// The position used for seeking with `SeekFrom::Current(_)` is the +/// position the underlying reader would be at if the `BufReader` had no +/// internal buffer. +/// +/// Seeking always discards the internal buffer, even if the seek position +/// would otherwise fall within it. This guarantees that calling +/// `.into_inner()` immediately after a seek yields the underlying reader +/// at the same position. +/// +/// See [`AsyncSeek`] for more details. +/// +/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` +/// where `n` minus the internal buffer length overflows an `i64`, two +/// seeks will be performed instead of one. If the second seek returns +/// `Err`, the underlying reader will be left at the same position it would +/// have if you called `seek` with `SeekFrom::Current(0)`. +impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> { + fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + // We needs to call seek operation multiple times. + // And we should always call both start_seek and poll_complete, + // as start_seek alone cannot guarantee that the operation will be completed. + // poll_complete receives a Context and returns a Poll, so it cannot be called + // inside start_seek. + *self.project().seek_state = SeekState::Start(pos); + Ok(()) + } + + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { + let res = match mem::replace(self.as_mut().project().seek_state, SeekState::Init) { + SeekState::Init => { + // 1.x AsyncSeek recommends calling poll_complete before start_seek. + // We don't have to guarantee that the value returned by + // poll_complete called without start_seek is correct, + // so we'll return 0. + return Poll::Ready(Ok(0)); + } + SeekState::Start(SeekFrom::Current(n)) => { + let remainder = (self.cap - self.pos) as i64; + // it should be safe to assume that remainder fits within an i64 as the alternative + // means we managed to allocate 8 exbibytes and that's absurd. + // But it's not out of the realm of possibility for some weird underlying reader to + // support seeking by i64::min_value() so we need to handle underflow when subtracting + // remainder. + if let Some(offset) = n.checked_sub(remainder) { + self.as_mut() + .get_pin_mut() + .start_seek(SeekFrom::Current(offset))?; + self.as_mut().get_pin_mut().poll_complete(cx)? + } else { + // seek backwards by our remainder, and then by the offset + self.as_mut() + .get_pin_mut() + .start_seek(SeekFrom::Current(-remainder))?; + if self.as_mut().get_pin_mut().poll_complete(cx)?.is_pending() { + *self.as_mut().project().seek_state = SeekState::PendingOverflowed(n); + return Poll::Pending; + } + + // https://github.com/rust-lang/rust/pull/61157#issuecomment-495932676 + self.as_mut().discard_buffer(); + + self.as_mut() + .get_pin_mut() + .start_seek(SeekFrom::Current(n))?; + self.as_mut().get_pin_mut().poll_complete(cx)? + } + } + SeekState::PendingOverflowed(n) => { + if self.as_mut().get_pin_mut().poll_complete(cx)?.is_pending() { + *self.as_mut().project().seek_state = SeekState::PendingOverflowed(n); + return Poll::Pending; + } + + // https://github.com/rust-lang/rust/pull/61157#issuecomment-495932676 + self.as_mut().discard_buffer(); + + self.as_mut() + .get_pin_mut() + .start_seek(SeekFrom::Current(n))?; + self.as_mut().get_pin_mut().poll_complete(cx)? + } + SeekState::Start(pos) => { + // Seeking with Start/End doesn't care about our buffer length. + self.as_mut().get_pin_mut().start_seek(pos)?; + self.as_mut().get_pin_mut().poll_complete(cx)? + } + SeekState::Pending => self.as_mut().get_pin_mut().poll_complete(cx)?, + }; + + match res { + Poll::Ready(res) => { + self.discard_buffer(); + Poll::Ready(Ok(res)) + } + Poll::Pending => { + *self.as_mut().project().seek_state = SeekState::Pending; + Poll::Pending + } + } + } +} + impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> { fn poll_write( self: Pin<&mut Self>, diff --git a/src/io/util/buf_stream.rs b/src/io/util/buf_stream.rs index cc857e2..9238665 100644 --- a/src/io/util/buf_stream.rs +++ b/src/io/util/buf_stream.rs @@ -94,9 +94,11 @@ impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> { buf: rbuf, pos, cap, + seek_state: rseek_state, }, buf: wbuf, written, + seek_state: wseek_state, } = b; BufStream { @@ -105,10 +107,12 @@ impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> { inner, buf: wbuf, written, + seek_state: wseek_state, }, buf: rbuf, pos, cap, + seek_state: rseek_state, }, } } diff --git a/src/io/util/buf_writer.rs b/src/io/util/buf_writer.rs index 5e3d4b7..4e8e493 100644 --- a/src/io/util/buf_writer.rs +++ b/src/io/util/buf_writer.rs @@ -1,9 +1,9 @@ use crate::io::util::DEFAULT_BUF_SIZE; -use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; use pin_project_lite::pin_project; use std::fmt; -use std::io::{self, Write}; +use std::io::{self, SeekFrom, Write}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -34,6 +34,7 @@ pin_project! { pub(super) inner: W, pub(super) buf: Vec<u8>, pub(super) written: usize, + pub(super) seek_state: SeekState, } } @@ -50,6 +51,7 @@ impl<W: AsyncWrite> BufWriter<W> { inner, buf: Vec::with_capacity(cap), written: 0, + seek_state: SeekState::Init, } } @@ -142,6 +144,62 @@ impl<W: AsyncWrite> AsyncWrite for BufWriter<W> { } } +#[derive(Debug, Clone, Copy)] +pub(super) enum SeekState { + /// start_seek has not been called. + Init, + /// start_seek has been called, but poll_complete has not yet been called. + Start(SeekFrom), + /// Waiting for completion of poll_complete. + Pending, +} + +/// Seek to the offset, in bytes, in the underlying writer. +/// +/// Seeking always writes out the internal buffer before seeking. +impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> { + fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + // We need to flush the internal buffer before seeking. + // It receives a `Context` and returns a `Poll`, so it cannot be called + // inside `start_seek`. + *self.project().seek_state = SeekState::Start(pos); + Ok(()) + } + + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { + let pos = match self.seek_state { + SeekState::Init => { + return self.project().inner.poll_complete(cx); + } + SeekState::Start(pos) => Some(pos), + SeekState::Pending => None, + }; + + // Flush the internal buffer before seeking. + ready!(self.as_mut().flush_buf(cx))?; + + let mut me = self.project(); + if let Some(pos) = pos { + // Ensure previous seeks have finished before starting a new one + ready!(me.inner.as_mut().poll_complete(cx))?; + if let Err(e) = me.inner.as_mut().start_seek(pos) { + *me.seek_state = SeekState::Init; + return Poll::Ready(Err(e)); + } + } + match me.inner.poll_complete(cx) { + Poll::Ready(res) => { + *me.seek_state = SeekState::Init; + Poll::Ready(res) + } + Poll::Pending => { + *me.seek_state = SeekState::Pending; + Poll::Pending + } + } + } +} + impl<W: AsyncWrite + AsyncRead> AsyncRead for BufWriter<W> { fn poll_read( self: Pin<&mut Self>, diff --git a/src/io/util/copy_bidirectional.rs b/src/io/util/copy_bidirectional.rs index cc43f0f..c93060b 100644 --- a/src/io/util/copy_bidirectional.rs +++ b/src/io/util/copy_bidirectional.rs @@ -104,6 +104,7 @@ where /// # Return value /// /// Returns a tuple of bytes copied `a` to `b` and bytes copied `b` to `a`. +#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] pub async fn copy_bidirectional<A, B>(a: &mut A, b: &mut B) -> Result<(u64, u64), std::io::Error> where A: AsyncRead + AsyncWrite + Unpin + ?Sized, diff --git a/src/io/util/lines.rs b/src/io/util/lines.rs index ed6a944..d02a453 100644 --- a/src/io/util/lines.rs +++ b/src/io/util/lines.rs @@ -128,7 +128,7 @@ where } } - Poll::Ready(Ok(Some(mem::replace(me.buf, String::new())))) + Poll::Ready(Ok(Some(mem::take(me.buf)))) } } diff --git a/src/io/util/mem.rs b/src/io/util/mem.rs index e91a932..4eefe7b 100644 --- a/src/io/util/mem.rs +++ b/src/io/util/mem.rs @@ -16,6 +16,14 @@ use std::{ /// that can be used as in-memory IO types. Writing to one of the pairs will /// allow that data to be read from the other, and vice versa. /// +/// # Closing a `DuplexStream` +/// +/// If one end of the `DuplexStream` channel is dropped, any pending reads on +/// the other side will continue to read data until the buffer is drained, then +/// they will signal EOF by returning 0 bytes. Any writes to the other side, +/// including pending ones (that are waiting for free space in the buffer) will +/// return `Err(BrokenPipe)` immediately. +/// /// # Example /// /// ``` @@ -37,6 +45,7 @@ use std::{ /// # } /// ``` #[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] pub struct DuplexStream { read: Arc<Mutex<Pipe>>, write: Arc<Mutex<Pipe>>, @@ -72,6 +81,7 @@ struct Pipe { /// /// The `max_buf_size` argument is the maximum amount of bytes that can be /// written to a side before the write returns `Poll::Pending`. +#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream) { let one = Arc::new(Mutex::new(Pipe::new(max_buf_size))); let two = Arc::new(Mutex::new(Pipe::new(max_buf_size))); @@ -134,7 +144,8 @@ impl AsyncWrite for DuplexStream { impl Drop for DuplexStream { fn drop(&mut self) { // notify the other side of the closure - self.write.lock().close(); + self.write.lock().close_write(); + self.read.lock().close_read(); } } @@ -151,12 +162,21 @@ impl Pipe { } } - fn close(&mut self) { + fn close_write(&mut self) { self.is_closed = true; + // needs to notify any readers that no more data will come if let Some(waker) = self.read_waker.take() { waker.wake(); } } + + fn close_read(&mut self) { + self.is_closed = true; + // needs to notify any writers that they have to abort + if let Some(waker) = self.write_waker.take() { + waker.wake(); + } + } } impl AsyncRead for Pipe { @@ -217,7 +237,7 @@ impl AsyncWrite for Pipe { mut self: Pin<&mut Self>, _: &mut task::Context<'_>, ) -> Poll<std::io::Result<()>> { - self.close(); + self.close_write(); Poll::Ready(Ok(())) } } diff --git a/src/io/util/mod.rs b/src/io/util/mod.rs index ab38664..fd3dd0d 100644 --- a/src/io/util/mod.rs +++ b/src/io/util/mod.rs @@ -77,6 +77,7 @@ cfg_io_util! { mod write_vectored; mod write_all; mod write_buf; + mod write_all_buf; mod write_int; diff --git a/src/io/util/read_line.rs b/src/io/util/read_line.rs index d38ffaf..e641f51 100644 --- a/src/io/util/read_line.rs +++ b/src/io/util/read_line.rs @@ -36,7 +36,7 @@ where { ReadLine { reader, - buf: mem::replace(string, String::new()).into_bytes(), + buf: mem::take(string).into_bytes(), output: string, read: 0, _pin: PhantomPinned, @@ -99,7 +99,7 @@ pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>( read: &mut usize, ) -> Poll<io::Result<usize>> { let io_res = ready!(read_until_internal(reader, cx, b'\n', buf, read)); - let utf8_res = String::from_utf8(mem::replace(buf, Vec::new())); + let utf8_res = String::from_utf8(mem::take(buf)); // At this point both buf and output are empty. The allocation is in utf8_res. diff --git a/src/io/util/read_to_string.rs b/src/io/util/read_to_string.rs index 2c17383..b3d82a2 100644 --- a/src/io/util/read_to_string.rs +++ b/src/io/util/read_to_string.rs @@ -37,7 +37,7 @@ pub(crate) fn read_to_string<'a, R>( where R: AsyncRead + ?Sized + Unpin, { - let buf = mem::replace(string, String::new()).into_bytes(); + let buf = mem::take(string).into_bytes(); ReadToString { reader, buf: VecWithInitialized::new(buf), diff --git a/src/io/util/split.rs b/src/io/util/split.rs index 4f3ce4e..9c2bb05 100644 --- a/src/io/util/split.rs +++ b/src/io/util/split.rs @@ -106,7 +106,7 @@ where me.buf.pop(); } - Poll::Ready(Ok(Some(mem::replace(me.buf, Vec::new())))) + Poll::Ready(Ok(Some(mem::take(me.buf)))) } } diff --git a/src/io/util/write_all_buf.rs b/src/io/util/write_all_buf.rs new file mode 100644 index 0000000..05af7fe --- /dev/null +++ b/src/io/util/write_all_buf.rs @@ -0,0 +1,56 @@ +use crate::io::AsyncWrite; + +use bytes::Buf; +use pin_project_lite::pin_project; +use std::future::Future; +use std::io; +use std::marker::PhantomPinned; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// A future to write some of the buffer to an `AsyncWrite`. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct WriteAllBuf<'a, W, B> { + writer: &'a mut W, + buf: &'a mut B, + #[pin] + _pin: PhantomPinned, + } +} + +/// Tries to write some bytes from the given `buf` to the writer in an +/// asynchronous manner, returning a future. +pub(crate) fn write_all_buf<'a, W, B>(writer: &'a mut W, buf: &'a mut B) -> WriteAllBuf<'a, W, B> +where + W: AsyncWrite + Unpin, + B: Buf, +{ + WriteAllBuf { + writer, + buf, + _pin: PhantomPinned, + } +} + +impl<W, B> Future for WriteAllBuf<'_, W, B> +where + W: AsyncWrite + Unpin, + B: Buf, +{ + type Output = io::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + let me = self.project(); + while me.buf.has_remaining() { + let n = ready!(Pin::new(&mut *me.writer).poll_write(cx, me.buf.chunk())?); + me.buf.advance(n); + if n == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } + } + + Poll::Ready(Ok(())) + } +} diff --git a/src/macros/select.rs b/src/macros/select.rs index 3ba16b6..f98ebff 100644 --- a/src/macros/select.rs +++ b/src/macros/select.rs @@ -154,7 +154,7 @@ /// `select!` panics if all branches are disabled **and** there is no provided /// `else` branch. A branch is disabled when the provided `if` precondition /// returns `false` **or** when the pattern does not match the result of `<async -/// expression>. +/// expression>`. /// /// # Examples /// diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index e231e5d..2a367ef 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -563,6 +563,84 @@ impl TcpStream { .try_io(Interest::READABLE, || (&*self.io).read(buf)) } + /// Try to read data from the stream into the provided buffers, returning + /// how many bytes were read. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method behaves + /// equivalently to a single call to [`try_read()`] with concatenated + /// buffers. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_vectored()` is non-blocking, the buffer does not have to be + /// stored by the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`try_read()`]: TcpStream::try_read() + /// [`readable()`]: TcpStream::readable() + /// [`ready()`]: TcpStream::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io::{self, IoSliceMut}; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// loop { + /// // Wait for the socket to be readable + /// stream.readable().await?; + /// + /// // Creating the buffer **after** the `await` prevents it from + /// // being stored in the async task. + /// let mut buf_a = [0; 512]; + /// let mut buf_b = [0; 1024]; + /// let mut bufs = [ + /// IoSliceMut::new(&mut buf_a), + /// IoSliceMut::new(&mut buf_b), + /// ]; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_read_vectored(&mut bufs) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { + use std::io::Read; + + self.io + .registration() + .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) + } + cfg_io_util! { /// Try to read data from the stream into the provided buffer, advancing the /// buffer's internal cursor, returning how many bytes were read. @@ -775,6 +853,68 @@ impl TcpStream { .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) } + /// Try to write several buffers to the stream, returning how many bytes + /// were written. + /// + /// Data is written from each buffer in order, with the final buffer read + /// from possible being only partially consumed. This method behaves + /// equivalently to a single call to [`try_write()`] with concatenated + /// buffers. + /// + /// This function is usually paired with `writable()`. + /// + /// [`try_write()`]: TcpStream::try_write() + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")]; + /// + /// loop { + /// // Wait for the socket to be writable + /// stream.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_write_vectored(&bufs) { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> { + use std::io::Write; + + self.io + .registration() + .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs)) + } + /// Receives data on the socket from the remote address to which it is /// connected, without removing that data from the queue. On success, /// returns the number of bytes peeked. diff --git a/src/net/unix/mod.rs b/src/net/unix/mod.rs index 19ee34a..c3046f1 100644 --- a/src/net/unix/mod.rs +++ b/src/net/unix/mod.rs @@ -1,5 +1,9 @@ //! Unix domain socket utility types +// This module does not currently provide any public API, but it was +// unintentionally defined as a public module. Hide it from the documentation +// instead of changing it to a private module to avoid breakage. +#[doc(hidden)] pub mod datagram; pub(crate) mod listener; diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs index d797aae..917844b 100644 --- a/src/net/unix/stream.rs +++ b/src/net/unix/stream.rs @@ -271,6 +271,84 @@ impl UnixStream { .try_io(Interest::READABLE, || (&*self.io).read(buf)) } + /// Try to read data from the stream into the provided buffers, returning + /// how many bytes were read. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method behaves + /// equivalently to a single call to [`try_read()`] with concatenated + /// buffers. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_vectored()` is non-blocking, the buffer does not have to be + /// stored by the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`try_read()`]: UnixStream::try_read() + /// [`readable()`]: UnixStream::readable() + /// [`ready()`]: UnixStream::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixStream; + /// use std::error::Error; + /// use std::io::{self, IoSliceMut}; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let dir = tempfile::tempdir().unwrap(); + /// let bind_path = dir.path().join("bind_path"); + /// let stream = UnixStream::connect(bind_path).await?; + /// + /// loop { + /// // Wait for the socket to be readable + /// stream.readable().await?; + /// + /// // Creating the buffer **after** the `await` prevents it from + /// // being stored in the async task. + /// let mut buf_a = [0; 512]; + /// let mut buf_b = [0; 1024]; + /// let mut bufs = [ + /// IoSliceMut::new(&mut buf_a), + /// IoSliceMut::new(&mut buf_b), + /// ]; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_read_vectored(&mut bufs) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { + self.io + .registration() + .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) + } + cfg_io_util! { /// Try to read data from the stream into the provided buffer, advancing the /// buffer's internal cursor, returning how many bytes were read. @@ -487,6 +565,68 @@ impl UnixStream { .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) } + /// Try to write several buffers to the stream, returning how many bytes + /// were written. + /// + /// Data is written from each buffer in order, with the final buffer read + /// from possible being only partially consumed. This method behaves + /// equivalently to a single call to [`try_write()`] with concatenated + /// buffers. + /// + /// This function is usually paired with `writable()`. + /// + /// [`try_write()`]: UnixStream::try_write() + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let dir = tempfile::tempdir().unwrap(); + /// let bind_path = dir.path().join("bind_path"); + /// let stream = UnixStream::connect(bind_path).await?; + /// + /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")]; + /// + /// loop { + /// // Wait for the socket to be writable + /// stream.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_write_vectored(&bufs) { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> { + self.io + .registration() + .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) + } + /// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`. /// /// This function is intended to be used to wrap a UnixStream from the diff --git a/src/process/mod.rs b/src/process/mod.rs index 00e39b0..96ceb6d 100644 --- a/src/process/mod.rs +++ b/src/process/mod.rs @@ -994,13 +994,22 @@ impl Child { /// If the caller wishes to explicitly control when the child's stdin /// handle is closed, they may `.take()` it before calling `.wait()`: /// - /// ```no_run + /// ``` + /// # #[cfg(not(unix))]fn main(){} + /// # #[cfg(unix)] /// use tokio::io::AsyncWriteExt; + /// # #[cfg(unix)] /// use tokio::process::Command; + /// # #[cfg(unix)] + /// use std::process::Stdio; /// + /// # #[cfg(unix)] /// #[tokio::main] /// async fn main() { - /// let mut child = Command::new("cat").spawn().unwrap(); + /// let mut child = Command::new("cat") + /// .stdin(Stdio::piped()) + /// .spawn() + /// .unwrap(); /// /// let mut stdin = child.stdin.take().unwrap(); /// tokio::spawn(async move { diff --git a/src/process/unix/driver.rs b/src/process/unix/driver.rs index 110b484..43b2efa 100644 --- a/src/process/unix/driver.rs +++ b/src/process/unix/driver.rs @@ -3,11 +3,8 @@ //! Process driver use crate::park::Park; -use crate::process::unix::orphan::ReapOrphanQueue; use crate::process::unix::GlobalOrphanQueue; -use crate::signal::unix::driver::Driver as SignalDriver; -use crate::signal::unix::{signal_with_handle, SignalKind}; -use crate::sync::watch; +use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle}; use std::io; use std::time::Duration; @@ -16,51 +13,20 @@ use std::time::Duration; #[derive(Debug)] pub(crate) struct Driver { park: SignalDriver, - inner: CoreDriver<watch::Receiver<()>, GlobalOrphanQueue>, -} - -#[derive(Debug)] -struct CoreDriver<S, Q> { - sigchild: S, - orphan_queue: Q, -} - -trait HasChanged { - fn has_changed(&mut self) -> bool; -} - -impl<T> HasChanged for watch::Receiver<T> { - fn has_changed(&mut self) -> bool { - self.try_has_changed().and_then(Result::ok).is_some() - } -} - -// ===== impl CoreDriver ===== - -impl<S, Q> CoreDriver<S, Q> -where - S: HasChanged, - Q: ReapOrphanQueue, -{ - fn process(&mut self) { - if self.sigchild.has_changed() { - self.orphan_queue.reap_orphans(); - } - } + signal_handle: SignalHandle, } // ===== impl Driver ===== impl Driver { /// Creates a new signal `Driver` instance that delegates wakeups to `park`. - pub(crate) fn new(park: SignalDriver) -> io::Result<Self> { - let sigchild = signal_with_handle(SignalKind::child(), park.handle())?; - let inner = CoreDriver { - sigchild, - orphan_queue: GlobalOrphanQueue, - }; + pub(crate) fn new(park: SignalDriver) -> Self { + let signal_handle = park.handle(); - Ok(Self { park, inner }) + Self { + park, + signal_handle, + } } } @@ -76,13 +42,13 @@ impl Park for Driver { fn park(&mut self) -> Result<(), Self::Error> { self.park.park()?; - self.inner.process(); + GlobalOrphanQueue::reap_orphans(&self.signal_handle); Ok(()) } fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { self.park.park_timeout(duration)?; - self.inner.process(); + GlobalOrphanQueue::reap_orphans(&self.signal_handle); Ok(()) } @@ -90,43 +56,3 @@ impl Park for Driver { self.park.shutdown() } } - -#[cfg(test)] -mod test { - use super::*; - use crate::process::unix::orphan::test::MockQueue; - - struct MockStream { - total_try_recv: usize, - values: Vec<Option<()>>, - } - - impl MockStream { - fn new(values: Vec<Option<()>>) -> Self { - Self { - total_try_recv: 0, - values, - } - } - } - - impl HasChanged for MockStream { - fn has_changed(&mut self) -> bool { - self.total_try_recv += 1; - self.values.remove(0).is_some() - } - } - - #[test] - fn no_reap_if_no_signal() { - let mut driver = CoreDriver { - sigchild: MockStream::new(vec![None]), - orphan_queue: MockQueue::<()>::new(), - }; - - driver.process(); - - assert_eq!(1, driver.sigchild.total_try_recv); - assert_eq!(0, driver.orphan_queue.total_reaps.get()); - } -} diff --git a/src/process/unix/mod.rs b/src/process/unix/mod.rs index 852a191..fab63dd 100644 --- a/src/process/unix/mod.rs +++ b/src/process/unix/mod.rs @@ -24,7 +24,7 @@ pub(crate) mod driver; pub(crate) mod orphan; -use orphan::{OrphanQueue, OrphanQueueImpl, ReapOrphanQueue, Wait}; +use orphan::{OrphanQueue, OrphanQueueImpl, Wait}; mod reap; use reap::Reaper; @@ -32,6 +32,7 @@ use reap::Reaper; use crate::io::PollEvented; use crate::process::kill::Kill; use crate::process::SpawnedChild; +use crate::signal::unix::driver::Handle as SignalHandle; use crate::signal::unix::{signal, Signal, SignalKind}; use mio::event::Source; @@ -73,9 +74,9 @@ impl fmt::Debug for GlobalOrphanQueue { } } -impl ReapOrphanQueue for GlobalOrphanQueue { - fn reap_orphans(&self) { - ORPHAN_QUEUE.reap_orphans() +impl GlobalOrphanQueue { + fn reap_orphans(handle: &SignalHandle) { + ORPHAN_QUEUE.reap_orphans(handle) } } diff --git a/src/process/unix/orphan.rs b/src/process/unix/orphan.rs index 8a1e127..07f0dcf 100644 --- a/src/process/unix/orphan.rs +++ b/src/process/unix/orphan.rs @@ -1,6 +1,9 @@ +use crate::loom::sync::{Mutex, MutexGuard}; +use crate::signal::unix::driver::Handle as SignalHandle; +use crate::signal::unix::{signal_with_handle, SignalKind}; +use crate::sync::watch; use std::io; use std::process::ExitStatus; -use std::sync::Mutex; /// An interface for waiting on a process to exit. pub(crate) trait Wait { @@ -20,21 +23,8 @@ impl<T: Wait> Wait for &mut T { } } -/// An interface for reaping a set of orphaned processes. -pub(crate) trait ReapOrphanQueue { - /// Attempts to reap every process in the queue, ignoring any errors and - /// enqueueing any orphans which have not yet exited. - fn reap_orphans(&self); -} - -impl<T: ReapOrphanQueue> ReapOrphanQueue for &T { - fn reap_orphans(&self) { - (**self).reap_orphans() - } -} - /// An interface for queueing up an orphaned process so that it can be reaped. -pub(crate) trait OrphanQueue<T>: ReapOrphanQueue { +pub(crate) trait OrphanQueue<T> { /// Adds an orphan to the queue. fn push_orphan(&self, orphan: T); } @@ -48,50 +38,91 @@ impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O { /// An implementation of `OrphanQueue`. #[derive(Debug)] pub(crate) struct OrphanQueueImpl<T> { + sigchild: Mutex<Option<watch::Receiver<()>>>, queue: Mutex<Vec<T>>, } impl<T> OrphanQueueImpl<T> { pub(crate) fn new() -> Self { Self { + sigchild: Mutex::new(None), queue: Mutex::new(Vec::new()), } } #[cfg(test)] fn len(&self) -> usize { - self.queue.lock().unwrap().len() + self.queue.lock().len() } -} -impl<T: Wait> OrphanQueue<T> for OrphanQueueImpl<T> { - fn push_orphan(&self, orphan: T) { - self.queue.lock().unwrap().push(orphan) + pub(crate) fn push_orphan(&self, orphan: T) + where + T: Wait, + { + self.queue.lock().push(orphan) } -} -impl<T: Wait> ReapOrphanQueue for OrphanQueueImpl<T> { - fn reap_orphans(&self) { - let mut queue = self.queue.lock().unwrap(); - let queue = &mut *queue; - - for i in (0..queue.len()).rev() { - match queue[i].try_wait() { - Ok(None) => {} - Ok(Some(_)) | Err(_) => { - // The stdlib handles interruption errors (EINTR) when polling a child process. - // All other errors represent invalid inputs or pids that have already been - // reaped, so we can drop the orphan in case an error is raised. - queue.swap_remove(i); + /// Attempts to reap every process in the queue, ignoring any errors and + /// enqueueing any orphans which have not yet exited. + pub(crate) fn reap_orphans(&self, handle: &SignalHandle) + where + T: Wait, + { + // If someone else is holding the lock, they will be responsible for draining + // the queue as necessary, so we can safely bail if that happens + if let Some(mut sigchild_guard) = self.sigchild.try_lock() { + match &mut *sigchild_guard { + Some(sigchild) => { + if sigchild.try_has_changed().and_then(Result::ok).is_some() { + drain_orphan_queue(self.queue.lock()); + } + } + None => { + let queue = self.queue.lock(); + + // Be lazy and only initialize the SIGCHLD listener if there + // are any orphaned processes in the queue. + if !queue.is_empty() { + // An errors shouldn't really happen here, but if it does it + // means that the signal driver isn't running, in + // which case there isn't anything we can + // register/initialize here, so we can try again later + if let Ok(sigchild) = signal_with_handle(SignalKind::child(), &handle) { + *sigchild_guard = Some(sigchild); + drain_orphan_queue(queue); + } + } } } } } } +fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>) +where + T: Wait, +{ + for i in (0..queue.len()).rev() { + match queue[i].try_wait() { + Ok(None) => {} + Ok(Some(_)) | Err(_) => { + // The stdlib handles interruption errors (EINTR) when polling a child process. + // All other errors represent invalid inputs or pids that have already been + // reaped, so we can drop the orphan in case an error is raised. + queue.swap_remove(i); + } + } + } + + drop(queue); +} + #[cfg(all(test, not(loom)))] pub(crate) mod test { use super::*; + use crate::io::driver::Driver as IoDriver; + use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle}; + use crate::sync::watch; use std::cell::{Cell, RefCell}; use std::io; use std::os::unix::process::ExitStatusExt; @@ -100,14 +131,12 @@ pub(crate) mod test { pub(crate) struct MockQueue<W> { pub(crate) all_enqueued: RefCell<Vec<W>>, - pub(crate) total_reaps: Cell<usize>, } impl<W> MockQueue<W> { pub(crate) fn new() -> Self { Self { all_enqueued: RefCell::new(Vec::new()), - total_reaps: Cell::new(0), } } } @@ -118,12 +147,6 @@ pub(crate) mod test { } } - impl<W> ReapOrphanQueue for MockQueue<W> { - fn reap_orphans(&self) { - self.total_reaps.set(self.total_reaps.get() + 1); - } - } - struct MockWait { total_waits: Rc<Cell<usize>>, num_wait_until_status: usize, @@ -191,27 +214,107 @@ pub(crate) mod test { assert_eq!(orphanage.len(), 4); - orphanage.reap_orphans(); + drain_orphan_queue(orphanage.queue.lock()); assert_eq!(orphanage.len(), 2); assert_eq!(first_waits.get(), 1); assert_eq!(second_waits.get(), 1); assert_eq!(third_waits.get(), 1); assert_eq!(fourth_waits.get(), 1); - orphanage.reap_orphans(); + drain_orphan_queue(orphanage.queue.lock()); assert_eq!(orphanage.len(), 1); assert_eq!(first_waits.get(), 1); assert_eq!(second_waits.get(), 2); assert_eq!(third_waits.get(), 2); assert_eq!(fourth_waits.get(), 1); - orphanage.reap_orphans(); + drain_orphan_queue(orphanage.queue.lock()); assert_eq!(orphanage.len(), 0); assert_eq!(first_waits.get(), 1); assert_eq!(second_waits.get(), 2); assert_eq!(third_waits.get(), 3); assert_eq!(fourth_waits.get(), 1); - orphanage.reap_orphans(); // Safe to reap when empty + // Safe to reap when empty + drain_orphan_queue(orphanage.queue.lock()); + } + + #[test] + fn no_reap_if_no_signal_received() { + let (tx, rx) = watch::channel(()); + + let handle = SignalHandle::default(); + + let orphanage = OrphanQueueImpl::new(); + *orphanage.sigchild.lock() = Some(rx); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 0); + + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 0); + + tx.send(()).unwrap(); + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 1); + } + + #[test] + fn no_reap_if_signal_lock_held() { + let handle = SignalHandle::default(); + + let orphanage = OrphanQueueImpl::new(); + let signal_guard = orphanage.sigchild.lock(); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 0); + + drop(signal_guard); + } + + #[test] + fn does_not_register_signal_if_queue_empty() { + let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap(); + let handle = signal_driver.handle(); + + let orphanage = OrphanQueueImpl::new(); + assert!(orphanage.sigchild.lock().is_none()); // Sanity + + // No register when queue empty + orphanage.reap_orphans(&handle); + assert!(orphanage.sigchild.lock().is_none()); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + orphanage.reap_orphans(&handle); + assert!(orphanage.sigchild.lock().is_some()); + assert_eq!(waits.get(), 1); // Eager reap when registering listener + } + + #[test] + fn does_nothing_if_signal_could_not_be_registered() { + let handle = SignalHandle::default(); + + let orphanage = OrphanQueueImpl::new(); + assert!(orphanage.sigchild.lock().is_none()); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + // Signal handler has "gone away", nothing to register or reap + orphanage.reap_orphans(&handle); + assert!(orphanage.sigchild.lock().is_none()); + assert_eq!(waits.get(), 0); } } diff --git a/src/process/unix/reap.rs b/src/process/unix/reap.rs index 5dc95e5..f7f4d3c 100644 --- a/src/process/unix/reap.rs +++ b/src/process/unix/reap.rs @@ -224,7 +224,6 @@ mod test { assert!(grim.poll_unpin(&mut context).is_pending()); assert_eq!(1, grim.signal.total_polls); assert_eq!(1, grim.total_waits); - assert_eq!(0, grim.orphan_queue.total_reaps.get()); assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); // Not yet exited, couldn't register interest the first time @@ -232,7 +231,6 @@ mod test { assert!(grim.poll_unpin(&mut context).is_pending()); assert_eq!(3, grim.signal.total_polls); assert_eq!(3, grim.total_waits); - assert_eq!(0, grim.orphan_queue.total_reaps.get()); assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); // Exited @@ -245,7 +243,6 @@ mod test { } assert_eq!(4, grim.signal.total_polls); assert_eq!(4, grim.total_waits); - assert_eq!(0, grim.orphan_queue.total_reaps.get()); assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); } @@ -260,7 +257,6 @@ mod test { grim.kill().unwrap(); assert_eq!(1, grim.total_kills); - assert_eq!(0, grim.orphan_queue.total_reaps.get()); assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); } @@ -276,7 +272,6 @@ mod test { drop(grim); - assert_eq!(0, queue.total_reaps.get()); assert!(queue.all_enqueued.borrow().is_empty()); } @@ -294,7 +289,6 @@ mod test { let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![])); drop(grim); - assert_eq!(0, queue.total_reaps.get()); assert_eq!(1, queue.all_enqueued.borrow().len()); } diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs index 791e405..5c9b8ed 100644 --- a/src/runtime/blocking/pool.rs +++ b/src/runtime/blocking/pool.rs @@ -61,7 +61,7 @@ struct Shared { /// Prior to shutdown, we clean up JoinHandles by having each timed-out /// thread join on the previous timed-out thread. This is not strictly /// necessary but helps avoid Valgrind false positives, see - /// https://github.com/tokio-rs/tokio/commit/646fbae76535e397ef79dbcaacb945d4c829f666 + /// <https://github.com/tokio-rs/tokio/commit/646fbae76535e397ef79dbcaacb945d4c829f666> /// for more information. last_exiting_thread: Option<thread::JoinHandle<()>>, /// This holds the JoinHandles for all running threads; on shutdown, the thread @@ -151,7 +151,7 @@ impl BlockingPool { self.spawner.inner.condvar.notify_all(); let last_exited_thread = std::mem::take(&mut shared.last_exiting_thread); - let workers = std::mem::replace(&mut shared.worker_threads, HashMap::new()); + let workers = std::mem::take(&mut shared.worker_threads); drop(shared); diff --git a/src/runtime/driver.rs b/src/runtime/driver.rs index a0e8e23..7e45977 100644 --- a/src/runtime/driver.rs +++ b/src/runtime/driver.rs @@ -23,7 +23,7 @@ cfg_io_driver! { let io_handle = io_driver.handle(); let (signal_driver, signal_handle) = create_signal_driver(io_driver)?; - let process_driver = create_process_driver(signal_driver)?; + let process_driver = create_process_driver(signal_driver); (Either::A(process_driver), Some(io_handle), signal_handle) } else { @@ -80,7 +80,7 @@ cfg_not_signal_internal! { cfg_process_driver! { type ProcessDriver = crate::process::unix::driver::Driver; - fn create_process_driver(signal_driver: SignalDriver) -> io::Result<ProcessDriver> { + fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver { crate::process::unix::driver::Driver::new(signal_driver) } } @@ -89,8 +89,8 @@ cfg_not_process_driver! { cfg_io_driver! { type ProcessDriver = SignalDriver; - fn create_process_driver(signal_driver: SignalDriver) -> io::Result<ProcessDriver> { - Ok(signal_driver) + fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver { + signal_driver } } } diff --git a/src/runtime/task/core.rs b/src/runtime/task/core.rs index 9f7ff55..fb6dafd 100644 --- a/src/runtime/task/core.rs +++ b/src/runtime/task/core.rs @@ -279,7 +279,7 @@ impl<T: Future> CoreStage<T> { // Safety:: the caller ensures mutal exclusion to the field. match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) { Stage::Finished(output) => output, - _ => panic!("unexpected task state"), + _ => panic!("JoinHandle polled after completion"), } }) } diff --git a/src/signal/unix.rs b/src/signal/unix.rs index cb1d1cc..f96b2f4 100644 --- a/src/signal/unix.rs +++ b/src/signal/unix.rs @@ -9,7 +9,6 @@ use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storag use crate::signal::RxFuture; use crate::sync::watch; -use libc::c_int; use mio::net::UnixStream; use std::io::{self, Error, ErrorKind, Write}; use std::pin::Pin; @@ -61,7 +60,7 @@ impl Init for OsExtraData { /// Represents the specific kind of signal to listen for. #[derive(Debug, Clone, Copy)] -pub struct SignalKind(c_int); +pub struct SignalKind(libc::c_int); impl SignalKind { /// Allows for listening to any valid OS signal. @@ -74,8 +73,14 @@ impl SignalKind { /// // let signum = libc::OS_SPECIFIC_SIGNAL; /// let kind = SignalKind::from_raw(signum); /// ``` - pub fn from_raw(signum: c_int) -> Self { - Self(signum) + // Use `std::os::raw::c_int` on public API to prevent leaking a non-stable + // type alias from libc. + // `libc::c_int` and `std::os::raw::c_int` are currently the same type, and are + // unlikely to change to other types, but technically libc can change this + // in the future minor version. + // See https://github.com/tokio-rs/tokio/issues/3767 for more. + pub fn from_raw(signum: std::os::raw::c_int) -> Self { + Self(signum as libc::c_int) } /// Represents the SIGALRM signal. @@ -208,7 +213,7 @@ impl Default for SignalInfo { /// 2. Wake up the driver by writing a byte to a pipe /// /// Those two operations should both be async-signal safe. -fn action(globals: Pin<&'static Globals>, signal: c_int) { +fn action(globals: Pin<&'static Globals>, signal: libc::c_int) { globals.record_event(signal as EventId); // Send a wakeup, ignore any errors (anything reasonably possible is @@ -222,7 +227,7 @@ fn action(globals: Pin<&'static Globals>, signal: c_int) { /// /// This will register the signal handler if it hasn't already been registered, /// returning any error along the way if that fails. -fn signal_enable(signal: SignalKind, handle: Handle) -> io::Result<()> { +fn signal_enable(signal: SignalKind, handle: &Handle) -> io::Result<()> { let signal = signal.0; if signal < 0 || signal_hook_registry::FORBIDDEN.contains(&signal) { return Err(Error::new( @@ -352,7 +357,7 @@ pub struct Signal { /// * If the signal is one of /// [`signal_hook::FORBIDDEN`](fn@signal_hook_registry::register#panics) pub fn signal(kind: SignalKind) -> io::Result<Signal> { - let rx = signal_with_handle(kind, Handle::current())?; + let rx = signal_with_handle(kind, &Handle::current())?; Ok(Signal { inner: RxFuture::new(rx), @@ -361,7 +366,7 @@ pub fn signal(kind: SignalKind) -> io::Result<Signal> { pub(crate) fn signal_with_handle( kind: SignalKind, - handle: Handle, + handle: &Handle, ) -> io::Result<watch::Receiver<()>> { // Turn the signal delivery on once we are ready for it signal_enable(kind, handle)?; @@ -457,14 +462,14 @@ mod tests { #[test] fn signal_enable_error_on_invalid_input() { - signal_enable(SignalKind::from_raw(-1), Handle::default()).unwrap_err(); + signal_enable(SignalKind::from_raw(-1), &Handle::default()).unwrap_err(); } #[test] fn signal_enable_error_on_forbidden_input() { signal_enable( SignalKind::from_raw(signal_hook_registry::FORBIDDEN[0]), - Handle::default(), + &Handle::default(), ) .unwrap_err(); } diff --git a/src/sync/barrier.rs b/src/sync/barrier.rs index a8b291f..e3c95f6 100644 --- a/src/sync/barrier.rs +++ b/src/sync/barrier.rs @@ -2,7 +2,7 @@ use crate::sync::watch; use std::sync::Mutex; -/// A barrier enables multiple threads to synchronize the beginning of some computation. +/// A barrier enables multiple tasks to synchronize the beginning of some computation. /// /// ``` /// # #[tokio::main] @@ -52,10 +52,10 @@ struct BarrierState { } impl Barrier { - /// Creates a new barrier that can block a given number of threads. + /// Creates a new barrier that can block a given number of tasks. /// - /// A barrier will block `n`-1 threads which call [`Barrier::wait`] and then wake up all - /// threads at once when the `n`th thread calls `wait`. + /// A barrier will block `n`-1 tasks which call [`Barrier::wait`] and then wake up all + /// tasks at once when the `n`th task calls `wait`. pub fn new(mut n: usize) -> Barrier { let (waker, wait) = crate::sync::watch::channel(0); @@ -79,11 +79,11 @@ impl Barrier { /// Does not resolve until all tasks have rendezvoused here. /// - /// Barriers are re-usable after all threads have rendezvoused once, and can + /// Barriers are re-usable after all tasks have rendezvoused once, and can /// be used continuously. /// /// A single (arbitrary) future will receive a [`BarrierWaitResult`] that returns `true` from - /// [`BarrierWaitResult::is_leader`] when returning from this function, and all other threads + /// [`BarrierWaitResult::is_leader`] when returning from this function, and all other tasks /// will receive a result that will return `false` from `is_leader`. pub async fn wait(&self) -> BarrierWaitResult { // NOTE: we are taking a _synchronous_ lock here. @@ -129,14 +129,14 @@ impl Barrier { } } -/// A `BarrierWaitResult` is returned by `wait` when all threads in the `Barrier` have rendezvoused. +/// A `BarrierWaitResult` is returned by `wait` when all tasks in the `Barrier` have rendezvoused. #[derive(Debug, Clone)] pub struct BarrierWaitResult(bool); impl BarrierWaitResult { - /// Returns `true` if this thread from wait is the "leader thread". + /// Returns `true` if this task from wait is the "leader task". /// - /// Only one thread will have `true` returned from their result, all other threads will have + /// Only one task will have `true` returned from their result, all other tasks will have /// `false` returned. pub fn is_leader(&self) -> bool { self.0 diff --git a/src/sync/mod.rs b/src/sync/mod.rs index d89a9dd..5f97c1a 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -436,7 +436,7 @@ cfg_sync! { pub mod mpsc; mod mutex; - pub use mutex::{Mutex, MutexGuard, TryLockError, OwnedMutexGuard}; + pub use mutex::{Mutex, MutexGuard, TryLockError, OwnedMutexGuard, MappedMutexGuard}; pub(crate) mod notify; pub use notify::Notify; diff --git a/src/sync/mpsc/bounded.rs b/src/sync/mpsc/bounded.rs index 1f670bf..ce857d7 100644 --- a/src/sync/mpsc/bounded.rs +++ b/src/sync/mpsc/bounded.rs @@ -33,6 +33,22 @@ pub struct Permit<'a, T> { chan: &'a chan::Tx<T, Semaphore>, } +/// Owned permit to send one value into the channel. +/// +/// This is identical to the [`Permit`] type, except that it moves the sender +/// rather than borrowing it. +/// +/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and +/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity +/// before generating a message to send. +/// +/// [`Permit`]: Permit +/// [`Sender::reserve_owned()`]: Sender::reserve_owned +/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned +pub struct OwnedPermit<T> { + chan: Option<chan::Tx<T, Semaphore>>, +} + /// Receive values from the associated `Sender`. /// /// Instances are created by the [`channel`](channel) function. @@ -229,10 +245,11 @@ impl<T> Receiver<T> { /// /// To guarantee that no messages are dropped, after calling `close()`, /// `recv()` must be called until `None` is returned. If there are - /// outstanding [`Permit`] values, the `recv` method will not return `None` - /// until those are released. + /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will + /// not return `None` until those are released. /// /// [`Permit`]: Permit + /// [`OwnedPermit`]: OwnedPermit /// /// # Examples /// @@ -624,12 +641,96 @@ impl<T> Sender<T> { /// } /// ``` pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> { + self.reserve_inner().await?; + Ok(Permit { chan: &self.chan }) + } + + /// Wait for channel capacity, moving the `Sender` and returning an owned + /// permit. Once capacity to send one message is available, it is reserved + /// for the caller. + /// + /// This moves the sender _by value_, and returns an owned permit that can + /// be used to send a message into the channel. Unlike [`Sender::reserve`], + /// this method may be used in cases where the permit must be valid for the + /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is + /// essentially a reference count increment, comparable to [`Arc::clone`]), + /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be + /// moved, it can be cloned prior to calling `reserve_owned`. + /// + /// If the channel is full, the function waits for the number of unreceived + /// messages to become less than the channel capacity. Capacity to send one + /// message is reserved for the caller. An [`OwnedPermit`] is returned to + /// track the reserved capacity. The [`send`] function on [`OwnedPermit`] + /// consumes the reserved capacity. + /// + /// Dropping the [`OwnedPermit`] without sending a message releases the + /// capacity back to the channel. + /// + /// # Examples + /// Sending a message using an [`OwnedPermit`]: + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); + /// + /// // Reserve capacity, moving the sender. + /// let permit = tx.reserve_owned().await.unwrap(); + /// + /// // Send a message, consuming the permit and returning + /// // the moved sender. + /// let tx = permit.send(123); + /// + /// // The value sent on the permit is received. + /// assert_eq!(rx.recv().await.unwrap(), 123); + /// + /// // The sender can now be used again. + /// tx.send(456).await.unwrap(); + /// } + /// ``` + /// + /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved + /// by value, it can be inexpensively cloned before calling `reserve_owned`: + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); + /// + /// // Clone the sender and reserve capacity. + /// let permit = tx.clone().reserve_owned().await.unwrap(); + /// + /// // Trying to send directly on the `tx` will fail due to no + /// // available capacity. + /// assert!(tx.try_send(123).is_err()); + /// + /// // Sending on the permit succeeds. + /// permit.send(456); + /// + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); + /// } + /// ``` + /// + /// [`Sender::reserve`]: Sender::reserve + /// [`OwnedPermit`]: OwnedPermit + /// [`send`]: OwnedPermit::send + /// [`Arc::clone`]: std::sync::Arc::clone + pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> { + self.reserve_inner().await?; + Ok(OwnedPermit { + chan: Some(self.chan), + }) + } + + async fn reserve_inner(&self) -> Result<(), SendError<()>> { match self.chan.semaphore().0.acquire(1).await { - Ok(_) => {} - Err(_) => return Err(SendError(())), + Ok(_) => Ok(()), + Err(_) => Err(SendError(())), } - - Ok(Permit { chan: &self.chan }) } /// Try to acquire a slot in the channel without waiting for the slot to become @@ -684,6 +785,72 @@ impl<T> Sender<T> { Ok(Permit { chan: &self.chan }) } + /// Try to acquire a slot in the channel without waiting for the slot to become + /// available, returning an owned permit. + /// + /// This moves the sender _by value_, and returns an owned permit that can + /// be used to send a message into the channel. Unlike [`Sender::try_reserve`], + /// this method may be used in cases where the permit must be valid for the + /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is + /// essentially a reference count increment, comparable to [`Arc::clone`]), + /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be + /// moved, it can be cloned prior to calling `try_reserve_owned`. + /// + /// If the channel is full this function will return a [`TrySendError`]. + /// Since the sender is taken by value, the `TrySendError` returned in this + /// case contains the sender, so that it may be used again. Otherwise, if + /// there is a slot available, this method will return an [`OwnedPermit`] + /// that can then be used to [`send`] on the channel with a guaranteed slot. + /// This function is similar to [`reserve_owned`] except it does not await + /// for the slot to become available. + /// + /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back + /// to the channel. + /// + /// [`OwnedPermit`]: OwnedPermit + /// [`send`]: OwnedPermit::send + /// [`reserve_owned`]: Sender::reserve_owned + /// [`Arc::clone`]: std::sync::Arc::clone + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); + /// + /// // Reserve capacity + /// let permit = tx.clone().try_reserve_owned().unwrap(); + /// + /// // Trying to send directly on the `tx` will fail due to no + /// // available capacity. + /// assert!(tx.try_send(123).is_err()); + /// + /// // Trying to reserve an additional slot on the `tx` will + /// // fail because there is no capacity. + /// assert!(tx.try_reserve().is_err()); + /// + /// // Sending on the permit succeeds + /// permit.send(456); + /// + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); + /// + /// } + /// ``` + pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> { + match self.chan.semaphore().0.try_acquire(1) { + Ok(_) => {} + Err(_) => return Err(TrySendError::Full(self)), + } + + Ok(OwnedPermit { + chan: Some(self.chan), + }) + } + /// Returns `true` if senders belong to the same channel. /// /// # Examples @@ -804,6 +971,8 @@ impl<T> Drop for Permit<'_, T> { // Add the permit back to the semaphore semaphore.add_permit(); + // If this is the last sender for this channel, wake the receiver so + // that it can be notified that the channel is closed. if semaphore.is_closed() && semaphore.is_idle() { self.chan.wake_rx(); } @@ -817,3 +986,123 @@ impl<T> fmt::Debug for Permit<'_, T> { .finish() } } + +// ===== impl Permit ===== + +impl<T> OwnedPermit<T> { + /// Sends a value using the reserved capacity. + /// + /// Capacity for the message has already been reserved. The message is sent + /// to the receiver and the permit is consumed. The operation will succeed + /// even if the receiver half has been closed. See [`Receiver::close`] for + /// more details on performing a clean shutdown. + /// + /// Unlike [`Permit::send`], this method returns the [`Sender`] from which + /// the `OwnedPermit` was reserved. + /// + /// [`Receiver::close`]: Receiver::close + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); + /// + /// // Reserve capacity + /// let permit = tx.reserve_owned().await.unwrap(); + /// + /// // Send a message on the permit, returning the sender. + /// let tx = permit.send(456); + /// + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); + /// + /// // We may now reuse `tx` to send another message. + /// tx.send(789).await.unwrap(); + /// } + /// ``` + pub fn send(mut self, value: T) -> Sender<T> { + let chan = self.chan.take().unwrap_or_else(|| { + unreachable!("OwnedPermit channel is only taken when the permit is moved") + }); + chan.send(value); + + Sender { chan } + } + + /// Release the reserved capacity *without* sending a message, returning the + /// [`Sender`]. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = mpsc::channel(1); + /// + /// // Clone the sender and reserve capacity + /// let permit = tx.clone().reserve_owned().await.unwrap(); + /// + /// // Trying to send on the original `tx` will fail, since the `permit` + /// // has reserved all the available capacity. + /// assert!(tx.try_send(123).is_err()); + /// + /// // Release the permit without sending a message, returning the clone + /// // of the sender. + /// let tx2 = permit.release(); + /// + /// // We may now reuse `tx` to send another message. + /// tx.send(789).await.unwrap(); + /// # drop(rx); drop(tx2); + /// } + /// ``` + /// + /// [`Sender`]: Sender + pub fn release(mut self) -> Sender<T> { + use chan::Semaphore; + + let chan = self.chan.take().unwrap_or_else(|| { + unreachable!("OwnedPermit channel is only taken when the permit is moved") + }); + + // Add the permit back to the semaphore + chan.semaphore().add_permit(); + Sender { chan } + } +} + +impl<T> Drop for OwnedPermit<T> { + fn drop(&mut self) { + use chan::Semaphore; + + // Are we still holding onto the sender? + if let Some(chan) = self.chan.take() { + let semaphore = chan.semaphore(); + + // Add the permit back to the semaphore + semaphore.add_permit(); + + // If this `OwnedPermit` is holding the last sender for this + // channel, wake the receiver so that it can be notified that the + // channel is closed. + if semaphore.is_closed() && semaphore.is_idle() { + chan.wake_rx(); + } + } + + // Otherwise, do nothing. + } +} + +impl<T> fmt::Debug for OwnedPermit<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("OwnedPermit") + .field("chan", &self.chan) + .finish() + } +} diff --git a/src/sync/mpsc/mod.rs b/src/sync/mpsc/mod.rs index e7033f6..879e3dc 100644 --- a/src/sync/mpsc/mod.rs +++ b/src/sync/mpsc/mod.rs @@ -73,7 +73,7 @@ pub(super) mod block; mod bounded; -pub use self::bounded::{channel, Permit, Receiver, Sender}; +pub use self::bounded::{channel, OwnedPermit, Permit, Receiver, Sender}; mod chan; diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 0a118e7..9fd7c91 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -4,9 +4,9 @@ use crate::sync::batch_semaphore as semaphore; use std::cell::UnsafeCell; use std::error::Error; -use std::fmt; use std::ops::{Deref, DerefMut}; use std::sync::Arc; +use std::{fmt, marker, mem}; /// An asynchronous `Mutex`-like type. /// @@ -160,6 +160,19 @@ pub struct OwnedMutexGuard<T: ?Sized> { lock: Arc<Mutex<T>>, } +/// A handle to a held `Mutex` that has had a function applied to it via [`MutexGuard::map`]. +/// +/// This can be used to hold a subfield of the protected data. +/// +/// [`MutexGuard::map`]: method@MutexGuard::map +#[must_use = "if unused the Mutex will immediately unlock"] +pub struct MappedMutexGuard<'a, T: ?Sized> { + s: &'a semaphore::Semaphore, + data: *mut T, + // Needed to tell the borrow checker that we are holding a `&mut T` + marker: marker::PhantomData<&'a mut T>, +} + // As long as T: Send, it's fine to send and share Mutex<T> between threads. // If T was not Send, sending and sharing a Mutex<T> would be bad, since you can // access T through Mutex<T>. @@ -167,6 +180,8 @@ unsafe impl<T> Send for Mutex<T> where T: ?Sized + Send {} unsafe impl<T> Sync for Mutex<T> where T: ?Sized + Send {} unsafe impl<T> Sync for MutexGuard<'_, T> where T: ?Sized + Send + Sync {} unsafe impl<T> Sync for OwnedMutexGuard<T> where T: ?Sized + Send + Sync {} +unsafe impl<'a, T> Sync for MappedMutexGuard<'a, T> where T: ?Sized + Sync + 'a {} +unsafe impl<'a, T> Send for MappedMutexGuard<'a, T> where T: ?Sized + Send + 'a {} /// Error returned from the [`Mutex::try_lock`], [`RwLock::try_read`] and /// [`RwLock::try_write`] functions. @@ -451,6 +466,103 @@ where // === impl MutexGuard === +impl<'a, T: ?Sized> MutexGuard<'a, T> { + /// Makes a new [`MappedMutexGuard`] for a component of the locked data. + /// + /// This operation cannot fail as the [`MutexGuard`] passed in already locked the mutex. + /// + /// This is an associated function that needs to be used as `MutexGuard::map(...)`. A method + /// would interfere with methods of the same name on the contents of the locked data. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::{Mutex, MutexGuard}; + /// + /// #[derive(Debug, Clone, Copy, PartialEq, Eq)] + /// struct Foo(u32); + /// + /// # #[tokio::main] + /// # async fn main() { + /// let foo = Mutex::new(Foo(1)); + /// + /// { + /// let mut mapped = MutexGuard::map(foo.lock().await, |f| &mut f.0); + /// *mapped = 2; + /// } + /// + /// assert_eq!(Foo(2), *foo.lock().await); + /// # } + /// ``` + /// + /// [`MutexGuard`]: struct@MutexGuard + /// [`MappedMutexGuard`]: struct@MappedMutexGuard + #[inline] + pub fn map<U, F>(mut this: Self, f: F) -> MappedMutexGuard<'a, U> + where + F: FnOnce(&mut T) -> &mut U, + { + let data = f(&mut *this) as *mut U; + let s = &this.lock.s; + mem::forget(this); + MappedMutexGuard { + s, + data, + marker: marker::PhantomData, + } + } + + /// Attempts to make a new [`MappedMutexGuard`] for a component of the locked data. The + /// original guard is returned if the closure returns `None`. + /// + /// This operation cannot fail as the [`MutexGuard`] passed in already locked the mutex. + /// + /// This is an associated function that needs to be used as `MutexGuard::try_map(...)`. A + /// method would interfere with methods of the same name on the contents of the locked data. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::{Mutex, MutexGuard}; + /// + /// #[derive(Debug, Clone, Copy, PartialEq, Eq)] + /// struct Foo(u32); + /// + /// # #[tokio::main] + /// # async fn main() { + /// let foo = Mutex::new(Foo(1)); + /// + /// { + /// let mut mapped = MutexGuard::try_map(foo.lock().await, |f| Some(&mut f.0)) + /// .expect("should not fail"); + /// *mapped = 2; + /// } + /// + /// assert_eq!(Foo(2), *foo.lock().await); + /// # } + /// ``` + /// + /// [`MutexGuard`]: struct@MutexGuard + /// [`MappedMutexGuard`]: struct@MappedMutexGuard + #[inline] + pub fn try_map<U, F>(mut this: Self, f: F) -> Result<MappedMutexGuard<'a, U>, Self> + where + F: FnOnce(&mut T) -> Option<&mut U>, + { + let data = match f(&mut *this) { + Some(data) => data as *mut U, + None => return Err(this), + }; + let s = &this.lock.s; + mem::forget(this); + Ok(MappedMutexGuard { + s, + data, + marker: marker::PhantomData, + }) + } +} + impl<T: ?Sized> Drop for MutexGuard<'_, T> { fn drop(&mut self) { self.lock.s.release(1) @@ -514,3 +626,88 @@ impl<T: ?Sized + fmt::Display> fmt::Display for OwnedMutexGuard<T> { fmt::Display::fmt(&**self, f) } } + +// === impl MappedMutexGuard === + +impl<'a, T: ?Sized> MappedMutexGuard<'a, T> { + /// Makes a new [`MappedMutexGuard`] for a component of the locked data. + /// + /// This operation cannot fail as the [`MappedMutexGuard`] passed in already locked the mutex. + /// + /// This is an associated function that needs to be used as `MappedMutexGuard::map(...)`. A + /// method would interfere with methods of the same name on the contents of the locked data. + /// + /// [`MappedMutexGuard`]: struct@MappedMutexGuard + #[inline] + pub fn map<U, F>(mut this: Self, f: F) -> MappedMutexGuard<'a, U> + where + F: FnOnce(&mut T) -> &mut U, + { + let data = f(&mut *this) as *mut U; + let s = this.s; + mem::forget(this); + MappedMutexGuard { + s, + data, + marker: marker::PhantomData, + } + } + + /// Attempts to make a new [`MappedMutexGuard`] for a component of the locked data. The + /// original guard is returned if the closure returns `None`. + /// + /// This operation cannot fail as the [`MappedMutexGuard`] passed in already locked the mutex. + /// + /// This is an associated function that needs to be used as `MappedMutexGuard::try_map(...)`. A + /// method would interfere with methods of the same name on the contents of the locked data. + /// + /// [`MappedMutexGuard`]: struct@MappedMutexGuard + #[inline] + pub fn try_map<U, F>(mut this: Self, f: F) -> Result<MappedMutexGuard<'a, U>, Self> + where + F: FnOnce(&mut T) -> Option<&mut U>, + { + let data = match f(&mut *this) { + Some(data) => data as *mut U, + None => return Err(this), + }; + let s = this.s; + mem::forget(this); + Ok(MappedMutexGuard { + s, + data, + marker: marker::PhantomData, + }) + } +} + +impl<'a, T: ?Sized> Drop for MappedMutexGuard<'a, T> { + fn drop(&mut self) { + self.s.release(1) + } +} + +impl<'a, T: ?Sized> Deref for MappedMutexGuard<'a, T> { + type Target = T; + fn deref(&self) -> &Self::Target { + unsafe { &*self.data } + } +} + +impl<'a, T: ?Sized> DerefMut for MappedMutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { &mut *self.data } + } +} + +impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for MappedMutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized + fmt::Display> fmt::Display for MappedMutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} diff --git a/src/sync/notify.rs b/src/sync/notify.rs index 2d30da9..5d2132f 100644 --- a/src/sync/notify.rs +++ b/src/sync/notify.rs @@ -192,6 +192,10 @@ fn inc_num_notify_waiters_calls(data: usize) -> usize { data + (1 << NOTIFY_WAITERS_SHIFT) } +fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) { + data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst); +} + impl Notify { /// Create a new `Notify`, initialized without a permit. /// @@ -394,11 +398,9 @@ impl Notify { let curr = self.state.load(SeqCst); if let EMPTY | NOTIFIED = get_state(curr) { - // There are no waiting tasks. In this case, no synchronization is - // established between `notify` and `notified().await`. - // All we need to do is increment the number of times this - // method was called. - self.state.store(inc_num_notify_waiters_calls(curr), SeqCst); + // There are no waiting tasks. All we need to do is increment the + // number of times this method was called. + atomic_inc_num_notify_waiters_calls(&self.state); return; } diff --git a/src/sync/tests/loom_notify.rs b/src/sync/tests/loom_notify.rs index 4be949a..d484a75 100644 --- a/src/sync/tests/loom_notify.rs +++ b/src/sync/tests/loom_notify.rs @@ -33,12 +33,41 @@ fn notify_waiters() { tx.notify_waiters(); }); - th.join().unwrap(); - block_on(async { notified1.await; notified2.await; }); + + th.join().unwrap(); + }); +} + +#[test] +fn notify_waiters_and_one() { + loom::model(|| { + let notify = Arc::new(Notify::new()); + let tx1 = notify.clone(); + let tx2 = notify.clone(); + + let th1 = thread::spawn(move || { + tx1.notify_waiters(); + }); + + let th2 = thread::spawn(move || { + tx2.notify_one(); + }); + + let th3 = thread::spawn(move || { + let notified = notify.notified(); + + block_on(async { + notified.await; + }); + }); + + th1.join().unwrap(); + th2.join().unwrap(); + th3.join().unwrap(); }); } diff --git a/src/sync/watch.rs b/src/sync/watch.rs index bf6f0ac..db65e5a 100644 --- a/src/sync/watch.rs +++ b/src/sync/watch.rs @@ -205,7 +205,7 @@ impl<T> Receiver<T> { // not memory access. shared.ref_count_rx.fetch_add(1, Relaxed); - Self { version, shared } + Self { shared, version } } /// Returns a reference to the most recently sent value diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 28bbcdb..e4fe254 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -5,19 +5,24 @@ cfg_rt_multi_thread! { /// blocking the executor. /// /// In general, issuing a blocking call or performing a lot of compute in a - /// future without yielding is not okay, as it may prevent the executor from - /// driving other futures forward. This function runs the closure on the - /// current thread by having the thread temporarily cease from being a core - /// thread, and turns it into a blocking thread. See the [CPU-bound tasks - /// and blocking code][blocking] section for more information. - /// - /// Although this function avoids starving other independently spawned - /// tasks, any other code running concurrently in the same task will be - /// suspended during the call to `block_in_place`. This can happen e.g. when - /// using the [`join!`] macro. To avoid this issue, use [`spawn_blocking`] - /// instead. - /// - /// Note that this function can only be used when using the `multi_thread` runtime. + /// future without yielding is problematic, as it may prevent the executor + /// from driving other tasks forward. Calling this function informs the + /// executor that the currently executing task is about to block the thread, + /// so the executor is able to hand off any other tasks it has to a new + /// worker thread before that happens. See the [CPU-bound tasks and blocking + /// code][blocking] section for more information. + /// + /// Be aware that although this function avoids starving other independently + /// spawned tasks, any other code running concurrently in the same task will + /// be suspended during the call to `block_in_place`. This can happen e.g. + /// when using the [`join!`] macro. To avoid this issue, use + /// [`spawn_blocking`] instead of `block_in_place`. + /// + /// Note that this function cannot be used within a [`current_thread`] runtime + /// because in this case there are no other worker threads to hand off tasks + /// to. On the other hand, calling the function outside a runtime is + /// allowed. In this case, `block_in_place` just calls the provided closure + /// normally. /// /// Code running behind `block_in_place` cannot be cancelled. When you shut /// down the executor, it will wait indefinitely for all blocking operations @@ -43,6 +48,28 @@ cfg_rt_multi_thread! { /// }); /// # } /// ``` + /// + /// Code running inside `block_in_place` may use `block_on` to reenter the + /// async context. + /// + /// ``` + /// use tokio::task; + /// use tokio::runtime::Handle; + /// + /// # async fn docs() { + /// task::block_in_place(move || { + /// Handle::current().block_on(async move { + /// // do something async + /// }); + /// }); + /// # } + /// ``` + /// + /// # Panics + /// + /// This function panics if called from a [`current_thread`] runtime. + /// + /// [`current_thread`]: fn@crate::runtime::Builder::new_current_thread pub fn block_in_place<F, R>(f: F) -> R where F: FnOnce() -> R, diff --git a/src/task/mod.rs b/src/task/mod.rs index abae818..7255535 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -122,6 +122,11 @@ //! Instead, Tokio provides two APIs for running blocking operations in an //! asynchronous context: [`task::spawn_blocking`] and [`task::block_in_place`]. //! +//! Be aware that if you call a non-async method from async code, that non-async +//! method is still inside the asynchronous context, so you should also avoid +//! blocking operations there. This includes destructors of objects destroyed in +//! async code. +//! //! #### spawn_blocking //! //! The `task::spawn_blocking` function is similar to the `task::spawn` function diff --git a/src/task/unconstrained.rs b/src/task/unconstrained.rs index 4a62f81..31c732b 100644 --- a/src/task/unconstrained.rs +++ b/src/task/unconstrained.rs @@ -5,6 +5,7 @@ use std::task::{Context, Poll}; pin_project! { /// Future for the [`unconstrained`](unconstrained) method. + #[cfg_attr(docsrs, doc(cfg(feature = "rt")))] #[must_use = "Unconstrained does nothing unless polled"] pub struct Unconstrained<F> { #[pin] @@ -38,6 +39,7 @@ where /// otherwise. /// /// See also the usage example in the [task module](index.html#unconstrained). +#[cfg_attr(docsrs, doc(cfg(feature = "rt")))] pub fn unconstrained<F>(inner: F) -> Unconstrained<F> { Unconstrained { inner } } diff --git a/src/time/clock.rs b/src/time/clock.rs index 8957800..c5ef86b 100644 --- a/src/time/clock.rs +++ b/src/time/clock.rs @@ -120,22 +120,11 @@ cfg_test_util! { /// Panics if time is not frozen or if called from outside of the Tokio /// runtime. pub async fn advance(duration: Duration) { - use crate::future::poll_fn; - use std::task::Poll; - let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); + let until = clock.now() + duration; clock.advance(duration); - let mut yielded = false; - poll_fn(|cx| { - if yielded { - Poll::Ready(()) - } else { - yielded = true; - cx.waker().wake_by_ref(); - Poll::Pending - } - }).await; + crate::time::sleep_until(until).await; } /// Return the current instant, factoring in frozen time. diff --git a/src/time/driver/entry.rs b/src/time/driver/entry.rs index e630fa8..08edab3 100644 --- a/src/time/driver/entry.rs +++ b/src/time/driver/entry.rs @@ -85,7 +85,7 @@ const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE; /// requires only the driver lock. pub(super) struct StateCell { /// Holds either the scheduled expiration time for this timer, or (if the - /// timer has been fired and is unregistered), [`u64::max_value()`]. + /// timer has been fired and is unregistered), `u64::max_value()`. state: AtomicU64, /// If the timer is fired (an Acquire order read on state shows /// `u64::max_value()`), holds the result that should be returned from diff --git a/src/time/driver/handle.rs b/src/time/driver/handle.rs index 9a05a54..77b4358 100644 --- a/src/time/driver/handle.rs +++ b/src/time/driver/handle.rs @@ -76,7 +76,7 @@ cfg_not_rt! { /// lazy, and so outside executed inside the runtime successfully without /// panicking. pub(crate) fn current() -> Self { - panic!(crate::util::error::CONTEXT_MISSING_ERROR) + panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) } } } diff --git a/src/time/interval.rs b/src/time/interval.rs index 20cfcec..4b1c6f6 100644 --- a/src/time/interval.rs +++ b/src/time/interval.rs @@ -176,4 +176,9 @@ impl Interval { // Return the current instant Poll::Ready(now) } + + /// Returns the period of the interval. + pub fn period(&self) -> Duration { + self.period + } } diff --git a/src/util/linked_list.rs b/src/util/linked_list.rs index dd00e14..480ea09 100644 --- a/src/util/linked_list.rs +++ b/src/util/linked_list.rs @@ -77,7 +77,7 @@ pub(crate) struct Pointers<T> { /// #[repr(C)]. /// /// See this link for more information: -/// https://github.com/rust-lang/rust/pull/82834 +/// <https://github.com/rust-lang/rust/pull/82834> #[repr(C)] struct PointersInner<T> { /// The previous node in the list. null if there is no previous node. @@ -93,7 +93,7 @@ struct PointersInner<T> { next: Option<NonNull<T>>, /// This type is !Unpin due to the heuristic from: - /// https://github.com/rust-lang/rust/pull/82834 + /// <https://github.com/rust-lang/rust/pull/82834> _pin: PhantomPinned, } diff --git a/src/util/rand.rs b/src/util/rand.rs index 5660103..17b3ec1 100644 --- a/src/util/rand.rs +++ b/src/util/rand.rs @@ -3,10 +3,10 @@ use std::cell::Cell; /// Fast random number generate /// /// Implement xorshift64+: 2 32-bit xorshift sequences added together. -/// Shift triplet [17,7,16] was calculated as indicated in Marsaglia's -/// Xorshift paper: https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf +/// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's +/// Xorshift paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf> /// This generator passes the SmallCrush suite, part of TestU01 framework: -/// http://simul.iro.umontreal.ca/testu01/tu01.html +/// <http://simul.iro.umontreal.ca/testu01/tu01.html> #[derive(Debug)] pub(crate) struct FastRand { one: Cell<u32>, diff --git a/src/util/slab.rs b/src/util/slab.rs index efc72e1..2ddaa6c 100644 --- a/src/util/slab.rs +++ b/src/util/slab.rs @@ -296,7 +296,7 @@ impl<T> Slab<T> { // Remove the slots vector from the page. This is done so that the // freeing process is done outside of the lock's critical section. - let vec = mem::replace(&mut slots.slots, vec![]); + let vec = mem::take(&mut slots.slots); slots.head = 0; // Drop the lock so we can drop the vector outside the lock below. diff --git a/tests/fs_file.rs b/tests/fs_file.rs index bf2f1d7..1f4760e 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -22,6 +22,27 @@ async fn basic_read() { assert_eq!(n, HELLO.len()); assert_eq!(&buf[..n], HELLO); + + // Drop the data from the cache to stimulate uncached codepath on Linux (see preadv2 in + // file.rs) + #[cfg(target_os = "linux")] + { + use std::os::unix::io::AsRawFd; + nix::unistd::fsync(tempfile.as_raw_fd()).unwrap(); + nix::fcntl::posix_fadvise( + tempfile.as_raw_fd(), + 0, + 0, + nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED, + ) + .unwrap(); + } + + let mut file = File::open(tempfile.path()).await.unwrap(); + let n = file.read(&mut buf).await.unwrap(); + + assert_eq!(n, HELLO.len()); + assert_eq!(&buf[..n], HELLO); } #[tokio::test] diff --git a/tests/io_buf_reader.rs b/tests/io_buf_reader.rs new file mode 100644 index 0000000..ac5f11c --- /dev/null +++ b/tests/io_buf_reader.rs @@ -0,0 +1,362 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +// https://github.com/rust-lang/futures-rs/blob/1803948ff091b4eabf7f3bf39e16bbbdefca5cc8/futures/tests/io_buf_reader.rs + +use futures::task::{noop_waker_ref, Context, Poll}; +use std::cmp; +use std::io::{self, Cursor}; +use std::pin::Pin; +use tokio::io::{ + AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, BufReader, + ReadBuf, SeekFrom, +}; + +macro_rules! run_fill_buf { + ($reader:expr) => {{ + let mut cx = Context::from_waker(noop_waker_ref()); + loop { + if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) { + break x; + } + } + }}; +} + +struct MaybePending<'a> { + inner: &'a [u8], + ready_read: bool, + ready_fill_buf: bool, +} + +impl<'a> MaybePending<'a> { + fn new(inner: &'a [u8]) -> Self { + Self { + inner, + ready_read: false, + ready_fill_buf: false, + } + } +} + +impl AsyncRead for MaybePending<'_> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + if self.ready_read { + self.ready_read = false; + Pin::new(&mut self.inner).poll_read(cx, buf) + } else { + self.ready_read = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} + +impl AsyncBufRead for MaybePending<'_> { + fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + if self.ready_fill_buf { + self.ready_fill_buf = false; + if self.inner.is_empty() { + return Poll::Ready(Ok(&[])); + } + let len = cmp::min(2, self.inner.len()); + Poll::Ready(Ok(&self.inner[0..len])) + } else { + self.ready_fill_buf = true; + Poll::Pending + } + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + self.inner = &self.inner[amt..]; + } +} + +#[tokio::test] +async fn test_buffered_reader() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, inner); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 3); + assert_eq!(buf, [5, 6, 7]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 2); + assert_eq!(buf, [0, 1]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [2]); + assert_eq!(reader.buffer(), [3]); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [3, 0, 0]); + assert_eq!(reader.buffer(), []); + + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [4, 0, 0]); + assert_eq!(reader.buffer(), []); + + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); +} + +#[tokio::test] +async fn test_buffered_reader_seek() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, Cursor::new(inner)); + + assert_eq!(reader.seek(SeekFrom::Start(3)).await.unwrap(), 3); + assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]); + assert!(reader + .seek(SeekFrom::Current(i64::min_value())) + .await + .is_err()); + assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]); + assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4); + assert_eq!(run_fill_buf!(reader).unwrap(), &[1, 2][..]); + Pin::new(&mut reader).consume(1); + assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3); +} + +#[tokio::test] +async fn test_buffered_reader_seek_underflow() { + // gimmick reader that yields its position modulo 256 for each byte + struct PositionReader { + pos: u64, + } + impl AsyncRead for PositionReader { + fn poll_read( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + let b = buf.initialize_unfilled(); + let len = b.len(); + for x in b { + *x = self.pos as u8; + self.pos = self.pos.wrapping_add(1); + } + buf.advance(len); + Poll::Ready(Ok(())) + } + } + impl AsyncSeek for PositionReader { + fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + match pos { + SeekFrom::Start(n) => { + self.pos = n; + } + SeekFrom::Current(n) => { + self.pos = self.pos.wrapping_add(n as u64); + } + SeekFrom::End(n) => { + self.pos = u64::max_value().wrapping_add(n as u64); + } + } + Ok(()) + } + fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<u64>> { + Poll::Ready(Ok(self.pos)) + } + } + + let mut reader = BufReader::with_capacity(5, PositionReader { pos: 0 }); + assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1, 2, 3, 4][..]); + assert_eq!( + reader.seek(SeekFrom::End(-5)).await.unwrap(), + u64::max_value() - 5 + ); + assert_eq!(run_fill_buf!(reader).unwrap().len(), 5); + // the following seek will require two underlying seeks + let expected = 9_223_372_036_854_775_802; + assert_eq!( + reader + .seek(SeekFrom::Current(i64::min_value())) + .await + .unwrap(), + expected + ); + assert_eq!(run_fill_buf!(reader).unwrap().len(), 5); + // seeking to 0 should empty the buffer. + assert_eq!(reader.seek(SeekFrom::Current(0)).await.unwrap(), expected); + assert_eq!(reader.get_ref().pos, expected); +} + +#[tokio::test] +async fn test_short_reads() { + /// A dummy reader intended at testing short-reads propagation. + struct ShortReader { + lengths: Vec<usize>, + } + + impl AsyncRead for ShortReader { + fn poll_read( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + if !self.lengths.is_empty() { + buf.advance(self.lengths.remove(0)); + } + Poll::Ready(Ok(())) + } + } + + let inner = ShortReader { + lengths: vec![0, 1, 2, 0, 1, 0], + }; + let mut reader = BufReader::new(inner); + let mut buf = [0, 0]; + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 1); + assert_eq!(reader.read(&mut buf).await.unwrap(), 2); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 1); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); +} + +#[tokio::test] +async fn maybe_pending() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, MaybePending::new(inner)); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 3); + assert_eq!(buf, [5, 6, 7]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 2); + assert_eq!(buf, [0, 1]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [2]); + assert_eq!(reader.buffer(), [3]); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [3, 0, 0]); + assert_eq!(reader.buffer(), []); + + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [4, 0, 0]); + assert_eq!(reader.buffer(), []); + + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); +} + +#[tokio::test] +async fn maybe_pending_buf_read() { + let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]); + let mut reader = BufReader::with_capacity(2, inner); + let mut v = Vec::new(); + reader.read_until(3, &mut v).await.unwrap(); + assert_eq!(v, [0, 1, 2, 3]); + v.clear(); + reader.read_until(1, &mut v).await.unwrap(); + assert_eq!(v, [1]); + v.clear(); + reader.read_until(8, &mut v).await.unwrap(); + assert_eq!(v, [0]); + v.clear(); + reader.read_until(9, &mut v).await.unwrap(); + assert_eq!(v, []); +} + +// https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309 +#[tokio::test] +async fn maybe_pending_seek() { + struct MaybePendingSeek<'a> { + inner: Cursor<&'a [u8]>, + ready: bool, + seek_res: Option<io::Result<()>>, + } + + impl<'a> MaybePendingSeek<'a> { + fn new(inner: &'a [u8]) -> Self { + Self { + inner: Cursor::new(inner), + ready: true, + seek_res: None, + } + } + } + + impl AsyncRead for MaybePendingSeek<'_> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_read(cx, buf) + } + } + + impl AsyncBufRead for MaybePendingSeek<'_> { + fn poll_fill_buf( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<&[u8]>> { + let this: *mut Self = &mut *self as *mut _; + Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + Pin::new(&mut self.inner).consume(amt) + } + } + + impl AsyncSeek for MaybePendingSeek<'_> { + fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + self.seek_res = Some(Pin::new(&mut self.inner).start_seek(pos)); + Ok(()) + } + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { + if self.ready { + self.ready = false; + self.seek_res.take().unwrap_or(Ok(()))?; + Pin::new(&mut self.inner).poll_complete(cx) + } else { + self.ready = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } + + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner)); + + assert_eq!(reader.seek(SeekFrom::Current(3)).await.unwrap(), 3); + assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]); + assert!(reader + .seek(SeekFrom::Current(i64::min_value())) + .await + .is_err()); + assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]); + assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4); + assert_eq!(run_fill_buf!(reader).unwrap(), &[1, 2][..]); + Pin::new(&mut reader).consume(1); + assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3); +} diff --git a/tests/io_buf_writer.rs b/tests/io_buf_writer.rs new file mode 100644 index 0000000..6f4f10a --- /dev/null +++ b/tests/io_buf_writer.rs @@ -0,0 +1,251 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +// https://github.com/rust-lang/futures-rs/blob/1803948ff091b4eabf7f3bf39e16bbbdefca5cc8/futures/tests/io_buf_writer.rs + +use futures::task::{Context, Poll}; +use std::io::{self, Cursor}; +use std::pin::Pin; +use tokio::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, SeekFrom}; + +struct MaybePending { + inner: Vec<u8>, + ready: bool, +} + +impl MaybePending { + fn new(inner: Vec<u8>) -> Self { + Self { + inner, + ready: false, + } + } +} + +impl AsyncWrite for MaybePending { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + if self.ready { + self.ready = false; + Pin::new(&mut self.inner).poll_write(cx, buf) + } else { + self.ready = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } +} + +#[tokio::test] +async fn buf_writer() { + let mut writer = BufWriter::with_capacity(2, Vec::new()); + + writer.write(&[0, 1]).await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1]); + + writer.write(&[2]).await.unwrap(); + assert_eq!(writer.buffer(), [2]); + assert_eq!(*writer.get_ref(), [0, 1]); + + writer.write(&[3]).await.unwrap(); + assert_eq!(writer.buffer(), [2, 3]); + assert_eq!(*writer.get_ref(), [0, 1]); + + writer.flush().await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); + + writer.write(&[4]).await.unwrap(); + writer.write(&[5]).await.unwrap(); + assert_eq!(writer.buffer(), [4, 5]); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); + + writer.write(&[6]).await.unwrap(); + assert_eq!(writer.buffer(), [6]); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5]); + + writer.write(&[7, 8]).await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8]); + + writer.write(&[9, 10, 11]).await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + + writer.flush().await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); +} + +#[tokio::test] +async fn buf_writer_inner_flushes() { + let mut w = BufWriter::with_capacity(3, Vec::new()); + w.write(&[0, 1]).await.unwrap(); + assert_eq!(*w.get_ref(), []); + w.flush().await.unwrap(); + let w = w.into_inner(); + assert_eq!(w, [0, 1]); +} + +#[tokio::test] +async fn buf_writer_seek() { + let mut w = BufWriter::with_capacity(3, Cursor::new(Vec::new())); + w.write_all(&[0, 1, 2, 3, 4, 5]).await.unwrap(); + w.write_all(&[6, 7]).await.unwrap(); + assert_eq!(w.seek(SeekFrom::Current(0)).await.unwrap(), 8); + assert_eq!(&w.get_ref().get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]); + assert_eq!(w.seek(SeekFrom::Start(2)).await.unwrap(), 2); + w.write_all(&[8, 9]).await.unwrap(); + w.flush().await.unwrap(); + assert_eq!(&w.into_inner().into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]); +} + +#[tokio::test] +async fn maybe_pending_buf_writer() { + let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new())); + + writer.write(&[0, 1]).await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(&writer.get_ref().inner, &[0, 1]); + + writer.write(&[2]).await.unwrap(); + assert_eq!(writer.buffer(), [2]); + assert_eq!(&writer.get_ref().inner, &[0, 1]); + + writer.write(&[3]).await.unwrap(); + assert_eq!(writer.buffer(), [2, 3]); + assert_eq!(&writer.get_ref().inner, &[0, 1]); + + writer.flush().await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]); + + writer.write(&[4]).await.unwrap(); + writer.write(&[5]).await.unwrap(); + assert_eq!(writer.buffer(), [4, 5]); + assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]); + + writer.write(&[6]).await.unwrap(); + assert_eq!(writer.buffer(), [6]); + assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5]); + + writer.write(&[7, 8]).await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8]); + + writer.write(&[9, 10, 11]).await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!( + writer.get_ref().inner, + &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] + ); + + writer.flush().await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!( + &writer.get_ref().inner, + &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] + ); +} + +#[tokio::test] +async fn maybe_pending_buf_writer_inner_flushes() { + let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new())); + w.write(&[0, 1]).await.unwrap(); + assert_eq!(&w.get_ref().inner, &[]); + w.flush().await.unwrap(); + let w = w.into_inner().inner; + assert_eq!(w, [0, 1]); +} + +#[tokio::test] +async fn maybe_pending_buf_writer_seek() { + struct MaybePendingSeek { + inner: Cursor<Vec<u8>>, + ready_write: bool, + ready_seek: bool, + seek_res: Option<io::Result<()>>, + } + + impl MaybePendingSeek { + fn new(inner: Vec<u8>) -> Self { + Self { + inner: Cursor::new(inner), + ready_write: false, + ready_seek: false, + seek_res: None, + } + } + } + + impl AsyncWrite for MaybePendingSeek { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + if self.ready_write { + self.ready_write = false; + Pin::new(&mut self.inner).poll_write(cx, buf) + } else { + self.ready_write = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } + } + + impl AsyncSeek for MaybePendingSeek { + fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + self.seek_res = Some(Pin::new(&mut self.inner).start_seek(pos)); + Ok(()) + } + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { + if self.ready_seek { + self.ready_seek = false; + self.seek_res.take().unwrap_or(Ok(()))?; + Pin::new(&mut self.inner).poll_complete(cx) + } else { + self.ready_seek = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } + + let mut w = BufWriter::with_capacity(3, MaybePendingSeek::new(Vec::new())); + w.write_all(&[0, 1, 2, 3, 4, 5]).await.unwrap(); + w.write_all(&[6, 7]).await.unwrap(); + assert_eq!(w.seek(SeekFrom::Current(0)).await.unwrap(), 8); + assert_eq!( + &w.get_ref().inner.get_ref()[..], + &[0, 1, 2, 3, 4, 5, 6, 7][..] + ); + assert_eq!(w.seek(SeekFrom::Start(2)).await.unwrap(), 2); + w.write_all(&[8, 9]).await.unwrap(); + w.flush().await.unwrap(); + assert_eq!( + &w.into_inner().inner.into_inner()[..], + &[0, 1, 8, 9, 4, 5, 6, 7] + ); +} diff --git a/tests/io_mem_stream.rs b/tests/io_mem_stream.rs index 3335214..520391a 100644 --- a/tests/io_mem_stream.rs +++ b/tests/io_mem_stream.rs @@ -63,6 +63,26 @@ async fn disconnect() { } #[tokio::test] +#[cfg(not(target_os = "android"))] +async fn disconnect_reader() { + let (a, mut b) = duplex(2); + + let t1 = tokio::spawn(async move { + // this will block, as not all data fits into duplex + b.write_all(b"ping").await.unwrap_err(); + }); + + let t2 = tokio::spawn(async move { + // here we drop the reader side, and we expect the writer in the other + // task to exit with an error + drop(a); + }); + + t2.await.unwrap(); + t1.await.unwrap(); +} + +#[tokio::test] async fn max_write_size() { let (mut a, mut b) = duplex(32); @@ -73,11 +93,11 @@ async fn max_write_size() { assert_eq!(n, 4); }); - let t2 = tokio::spawn(async move { - let mut buf = [0u8; 4]; - b.read_exact(&mut buf).await.unwrap(); - }); + let mut buf = [0u8; 4]; + b.read_exact(&mut buf).await.unwrap(); t1.await.unwrap(); - t2.await.unwrap(); + + // drop b only after task t1 finishes writing + drop(b); } diff --git a/tests/io_write_all_buf.rs b/tests/io_write_all_buf.rs new file mode 100644 index 0000000..b49a58e --- /dev/null +++ b/tests/io_write_all_buf.rs @@ -0,0 +1,96 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio_test::{assert_err, assert_ok}; + +use bytes::{Buf, Bytes, BytesMut}; +use std::cmp; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[tokio::test] +async fn write_all_buf() { + struct Wr { + buf: BytesMut, + cnt: usize, + } + + impl AsyncWrite for Wr { + fn poll_write( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + let n = cmp::min(4, buf.len()); + dbg!(buf); + let buf = &buf[0..n]; + + self.cnt += 1; + self.buf.extend(buf); + Ok(buf.len()).into() + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Ok(()).into() + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Ok(()).into() + } + } + + let mut wr = Wr { + buf: BytesMut::with_capacity(64), + cnt: 0, + }; + + let mut buf = Bytes::from_static(b"hello").chain(Bytes::from_static(b"world")); + + assert_ok!(wr.write_all_buf(&mut buf).await); + assert_eq!(wr.buf, b"helloworld"[..]); + // expect 4 writes, [hell],[o],[worl],[d] + assert_eq!(wr.cnt, 4); + assert_eq!(buf.has_remaining(), false); +} + +#[tokio::test] +async fn write_buf_err() { + /// Error out after writing the first 4 bytes + struct Wr { + cnt: usize, + } + + impl AsyncWrite for Wr { + fn poll_write( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.cnt += 1; + if self.cnt == 2 { + return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "whoops"))); + } + Poll::Ready(Ok(4)) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Ok(()).into() + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Ok(()).into() + } + } + + let mut wr = Wr { cnt: 0 }; + + let mut buf = Bytes::from_static(b"hello").chain(Bytes::from_static(b"world")); + + assert_err!(wr.write_all_buf(&mut buf).await); + assert_eq!( + buf.copy_to_bytes(buf.remaining()), + Bytes::from_static(b"oworld") + ); +} diff --git a/tests/macros_select.rs b/tests/macros_select.rs index ea06d51..07c2b81 100644 --- a/tests/macros_select.rs +++ b/tests/macros_select.rs @@ -359,9 +359,6 @@ async fn join_with_select() { async fn use_future_in_if_condition() { use tokio::time::{self, Duration}; - let sleep = time::sleep(Duration::from_millis(50)); - tokio::pin!(sleep); - tokio::select! { _ = time::sleep(Duration::from_millis(50)), if false => { panic!("if condition ignored") diff --git a/tests/macros_test.rs b/tests/macros_test.rs index 8396398..f5bc5a0 100644 --- a/tests/macros_test.rs +++ b/tests/macros_test.rs @@ -21,7 +21,20 @@ async fn test_macro_is_resilient_to_shadowing() { // https://github.com/tokio-rs/tokio/issues/3403 #[rustfmt::skip] // this `rustfmt::skip` is necessary because unused_braces does not warn if the block contains newline. #[tokio::main] -async fn unused_braces_main() { println!("hello") } +pub async fn unused_braces_main() { println!("hello") } #[rustfmt::skip] // this `rustfmt::skip` is necessary because unused_braces does not warn if the block contains newline. #[tokio::test] async fn unused_braces_test() { assert_eq!(1 + 1, 2) } + +// https://github.com/tokio-rs/tokio/pull/3766#issuecomment-835508651 +#[std::prelude::v1::test] +fn trait_method() { + trait A { + fn f(self); + } + impl A for () { + #[tokio::main] + async fn f(self) {} + } + ().f() +} diff --git a/tests/rt_common.rs b/tests/rt_common.rs index cb1d0f6..e5fc7a9 100644 --- a/tests/rt_common.rs +++ b/tests/rt_common.rs @@ -647,6 +647,7 @@ rt_test! { } #[test] + #[cfg(not(target_os = "android"))] fn panic_in_task() { let rt = rt(); let (tx, rx) = oneshot::channel(); diff --git a/tests/task_blocking.rs b/tests/task_blocking.rs index 82bef8a..d9514d2 100644 --- a/tests/task_blocking.rs +++ b/tests/task_blocking.rs @@ -114,6 +114,7 @@ fn can_enter_basic_rt_from_within_block_in_place() { } #[test] +#[cfg(not(target_os = "android"))] fn useful_panic_message_when_dropping_rt_in_rt() { use std::panic::{catch_unwind, AssertUnwindSafe}; diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index e34c2bb..0b5d12a 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -55,7 +55,7 @@ async fn try_read_write() { tokio::task::yield_now().await; } - // Fill the write buffer + // Fill the write buffer using non-vectored I/O loop { // Still ready let mut writable = task::spawn(client.writable()); @@ -75,7 +75,7 @@ async fn try_read_write() { let mut writable = task::spawn(client.writable()); assert_pending!(writable.poll()); - // Drain the socket from the server end + // Drain the socket from the server end using non-vectored I/O let mut read = vec![0; written.len()]; let mut i = 0; @@ -92,6 +92,51 @@ async fn try_read_write() { assert_eq!(read, written); } + written.clear(); + client.writable().await.unwrap(); + + // Fill the write buffer using vectored I/O + let data_bufs: Vec<_> = DATA.chunks(10).map(io::IoSlice::new).collect(); + loop { + // Still ready + let mut writable = task::spawn(client.writable()); + assert_ready_ok!(writable.poll()); + + match client.try_write_vectored(&data_bufs) { + Ok(n) => written.extend(&DATA[..n]), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + break; + } + Err(e) => panic!("error = {:?}", e), + } + } + + { + // Write buffer full + let mut writable = task::spawn(client.writable()); + assert_pending!(writable.poll()); + + // Drain the socket from the server end using vectored I/O + let mut read = vec![0; written.len()]; + let mut i = 0; + + while i < read.len() { + server.readable().await.unwrap(); + + let mut bufs: Vec<_> = read[i..] + .chunks_mut(0x10000) + .map(io::IoSliceMut::new) + .collect(); + match server.try_read_vectored(&mut bufs) { + Ok(n) => i += n, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("error = {:?}", e), + } + } + + assert_eq!(read, written); + } + // Now, we listen for shutdown drop(client); diff --git a/tests/time_pause.rs b/tests/time_pause.rs index bc84ac5..d1834af 100644 --- a/tests/time_pause.rs +++ b/tests/time_pause.rs @@ -3,8 +3,14 @@ use rand::SeedableRng; use rand::{rngs::StdRng, Rng}; -use tokio::time::{self, Duration, Instant}; -use tokio_test::assert_err; +use tokio::time::{self, Duration, Instant, Sleep}; +use tokio_test::{assert_elapsed, assert_err, assert_pending, assert_ready_eq, task}; + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; #[tokio::test] async fn pause_time_in_main() { @@ -57,3 +63,162 @@ async fn paused_time_stress_run() -> Vec<Duration> { times } + +#[tokio::test(start_paused = true)] +async fn advance_after_poll() { + time::sleep(ms(1)).await; + + let start = Instant::now(); + + let mut sleep = task::spawn(time::sleep_until(start + ms(300))); + + assert_pending!(sleep.poll()); + + let before = Instant::now(); + time::advance(ms(100)).await; + assert_elapsed!(before, ms(100)); + + assert_pending!(sleep.poll()); +} + +#[tokio::test(start_paused = true)] +async fn sleep_no_poll() { + let start = Instant::now(); + + // TODO: Skip this + time::advance(ms(1)).await; + + let mut sleep = task::spawn(time::sleep_until(start + ms(300))); + + let before = Instant::now(); + time::advance(ms(100)).await; + assert_elapsed!(before, ms(100)); + + assert_pending!(sleep.poll()); +} + +enum State { + Begin, + AwaitingAdvance(Pin<Box<dyn Future<Output = ()>>>), + AfterAdvance, +} + +struct Tester { + sleep: Pin<Box<Sleep>>, + state: State, + before: Option<Instant>, + poll: bool, +} + +impl Future for Tester { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match &mut self.state { + State::Begin => { + if self.poll { + assert_pending!(self.sleep.as_mut().poll(cx)); + } + self.before = Some(Instant::now()); + let advance_fut = Box::pin(time::advance(ms(100))); + self.state = State::AwaitingAdvance(advance_fut); + self.poll(cx) + } + State::AwaitingAdvance(ref mut advance_fut) => match advance_fut.as_mut().poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(()) => { + self.state = State::AfterAdvance; + self.poll(cx) + } + }, + State::AfterAdvance => { + assert_elapsed!(self.before.unwrap(), ms(100)); + + assert_pending!(self.sleep.as_mut().poll(cx)); + + Poll::Ready(()) + } + } + } +} + +#[tokio::test(start_paused = true)] +async fn sleep_same_task() { + let start = Instant::now(); + + // TODO: Skip this + time::advance(ms(1)).await; + + let sleep = Box::pin(time::sleep_until(start + ms(300))); + + Tester { + sleep, + state: State::Begin, + before: None, + poll: true, + } + .await; +} + +#[tokio::test(start_paused = true)] +async fn sleep_same_task_no_poll() { + let start = Instant::now(); + + // TODO: Skip this + time::advance(ms(1)).await; + + let sleep = Box::pin(time::sleep_until(start + ms(300))); + + Tester { + sleep, + state: State::Begin, + before: None, + poll: false, + } + .await; +} + +#[tokio::test(start_paused = true)] +async fn interval() { + let start = Instant::now(); + + // TODO: Skip this + time::advance(ms(1)).await; + + let mut i = task::spawn(time::interval_at(start, ms(300))); + + assert_ready_eq!(poll_next(&mut i), start); + assert_pending!(poll_next(&mut i)); + + let before = Instant::now(); + time::advance(ms(100)).await; + assert_elapsed!(before, ms(100)); + assert_pending!(poll_next(&mut i)); + + let before = Instant::now(); + time::advance(ms(200)).await; + assert_elapsed!(before, ms(200)); + assert_ready_eq!(poll_next(&mut i), start + ms(300)); + assert_pending!(poll_next(&mut i)); + + let before = Instant::now(); + time::advance(ms(400)).await; + assert_elapsed!(before, ms(400)); + assert_ready_eq!(poll_next(&mut i), start + ms(600)); + assert_pending!(poll_next(&mut i)); + + let before = Instant::now(); + time::advance(ms(500)).await; + assert_elapsed!(before, ms(500)); + assert_ready_eq!(poll_next(&mut i), start + ms(900)); + assert_ready_eq!(poll_next(&mut i), start + ms(1200)); + assert_pending!(poll_next(&mut i)); +} + +fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> { + interval.enter(|cx, mut interval| interval.poll_tick(cx)) +} + +fn ms(n: u64) -> Duration { + Duration::from_millis(n) +} diff --git a/tests/time_sleep.rs b/tests/time_sleep.rs index 2736258..9c04d22 100644 --- a/tests/time_sleep.rs +++ b/tests/time_sleep.rs @@ -7,22 +7,7 @@ use std::task::Context; use futures::task::noop_waker_ref; use tokio::time::{self, Duration, Instant}; -use tokio_test::{assert_pending, assert_ready, task}; - -macro_rules! assert_elapsed { - ($now:expr, $ms:expr) => {{ - let elapsed = $now.elapsed(); - let lower = ms($ms); - - // Handles ms rounding - assert!( - elapsed >= lower && elapsed <= lower + ms(1), - "actual = {:?}, expected = {:?}", - elapsed, - lower - ); - }}; -} +use tokio_test::{assert_elapsed, assert_pending, assert_ready, task}; #[tokio::test] async fn immediate_sleep() { @@ -32,7 +17,7 @@ async fn immediate_sleep() { // Ready! time::sleep_until(now).await; - assert_elapsed!(now, 0); + assert_elapsed!(now, ms(1)); } #[tokio::test] @@ -60,10 +45,11 @@ async fn delayed_sleep_level_0() { for &i in &[1, 10, 60] { let now = Instant::now(); + let dur = ms(i); - time::sleep_until(now + ms(i)).await; + time::sleep_until(now + dur).await; - assert_elapsed!(now, i); + assert_elapsed!(now, dur); } } @@ -77,7 +63,7 @@ async fn sub_ms_delayed_sleep() { time::sleep_until(deadline).await; - assert_elapsed!(now, 1); + assert_elapsed!(now, ms(1)); } } @@ -90,7 +76,7 @@ async fn delayed_sleep_wrapping_level_0() { let now = Instant::now(); time::sleep_until(now + ms(60)).await; - assert_elapsed!(now, 60); + assert_elapsed!(now, ms(60)); } #[tokio::test] @@ -107,7 +93,7 @@ async fn reset_future_sleep_before_fire() { sleep.as_mut().reset(Instant::now() + ms(200)); sleep.await; - assert_elapsed!(now, 200); + assert_elapsed!(now, ms(200)); } #[tokio::test] @@ -124,7 +110,7 @@ async fn reset_past_sleep_before_turn() { sleep.as_mut().reset(now + ms(80)); sleep.await; - assert_elapsed!(now, 80); + assert_elapsed!(now, ms(80)); } #[tokio::test] @@ -143,7 +129,7 @@ async fn reset_past_sleep_before_fire() { sleep.as_mut().reset(now + ms(80)); sleep.await; - assert_elapsed!(now, 80); + assert_elapsed!(now, ms(80)); } #[tokio::test] @@ -154,11 +140,11 @@ async fn reset_future_sleep_after_fire() { let mut sleep = Box::pin(time::sleep_until(now + ms(100))); sleep.as_mut().await; - assert_elapsed!(now, 100); + assert_elapsed!(now, ms(100)); sleep.as_mut().reset(now + ms(110)); sleep.await; - assert_elapsed!(now, 110); + assert_elapsed!(now, ms(110)); } #[tokio::test] diff --git a/tests/uds_stream.rs b/tests/uds_stream.rs index c528620..2754e84 100644 --- a/tests/uds_stream.rs +++ b/tests/uds_stream.rs @@ -90,7 +90,7 @@ async fn try_read_write() -> std::io::Result<()> { tokio::task::yield_now().await; } - // Fill the write buffer + // Fill the write buffer using non-vectored I/O loop { // Still ready let mut writable = task::spawn(client.writable()); @@ -110,7 +110,7 @@ async fn try_read_write() -> std::io::Result<()> { let mut writable = task::spawn(client.writable()); assert_pending!(writable.poll()); - // Drain the socket from the server end + // Drain the socket from the server end using non-vectored I/O let mut read = vec![0; written.len()]; let mut i = 0; @@ -127,6 +127,51 @@ async fn try_read_write() -> std::io::Result<()> { assert_eq!(read, written); } + written.clear(); + client.writable().await.unwrap(); + + // Fill the write buffer using vectored I/O + let msg_bufs: Vec<_> = msg.chunks(3).map(io::IoSlice::new).collect(); + loop { + // Still ready + let mut writable = task::spawn(client.writable()); + assert_ready_ok!(writable.poll()); + + match client.try_write_vectored(&msg_bufs) { + Ok(n) => written.extend(&msg[..n]), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + break; + } + Err(e) => panic!("error = {:?}", e), + } + } + + { + // Write buffer full + let mut writable = task::spawn(client.writable()); + assert_pending!(writable.poll()); + + // Drain the socket from the server end using vectored I/O + let mut read = vec![0; written.len()]; + let mut i = 0; + + while i < read.len() { + server.readable().await?; + + let mut bufs: Vec<_> = read[i..] + .chunks_mut(0x10000) + .map(io::IoSliceMut::new) + .collect(); + match server.try_read_vectored(&mut bufs) { + Ok(n) => i += n, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("error = {:?}", e), + } + } + + assert_eq!(read, written); + } + // Now, we listen for shutdown drop(client); |