aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke Huang <huangluke@google.com>2021-05-30 08:40:50 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2021-05-30 08:40:50 +0000
commit0d66e18fb85a0d694d52ca0e873d162c070094a0 (patch)
tree9960fb5406e537c303b85517c25ac6c65d47d634
parent55513e0bfafb02059e7c43f7accedaf0e651bbac (diff)
parentd4230f61dfcffdae2dda70d2103abf5699616382 (diff)
downloadtokio-0d66e18fb85a0d694d52ca0e873d162c070094a0.tar.gz
Merge "Upgrade rust/crates/tokio to 1.6.0 and use cargo2android.json to generate bp file" am: d4230f61df
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio/+/1718994 Change-Id: I6f3bef3d4f45e33471fc70d0e7648912a994705a
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp628
-rw-r--r--CHANGELOG.md47
-rw-r--r--Cargo.toml10
-rw-r--r--Cargo.toml.orig10
-rw-r--r--METADATA8
-rw-r--r--TEST_MAPPING99
-rw-r--r--cargo2android.json11
-rw-r--r--patches/Android.bp.patch1324
-rw-r--r--patches/io_mem_stream.patch12
-rw-r--r--patches/rt_common.patch12
-rw-r--r--patches/task_blocking.rs12
-rw-r--r--src/fs/file.rs191
-rw-r--r--src/io/driver/mod.rs2
-rw-r--r--src/io/driver/registration.rs7
-rw-r--r--src/io/poll_evented.rs26
-rw-r--r--src/io/util/async_write_ext.rs58
-rw-r--r--src/io/util/buf_reader.rs124
-rw-r--r--src/io/util/buf_stream.rs4
-rw-r--r--src/io/util/buf_writer.rs62
-rw-r--r--src/io/util/copy_bidirectional.rs1
-rw-r--r--src/io/util/lines.rs2
-rw-r--r--src/io/util/mem.rs26
-rw-r--r--src/io/util/mod.rs1
-rw-r--r--src/io/util/read_line.rs4
-rw-r--r--src/io/util/read_to_string.rs2
-rw-r--r--src/io/util/split.rs2
-rw-r--r--src/io/util/write_all_buf.rs56
-rw-r--r--src/macros/select.rs2
-rw-r--r--src/net/tcp/stream.rs140
-rw-r--r--src/net/unix/mod.rs4
-rw-r--r--src/net/unix/stream.rs140
-rw-r--r--src/process/mod.rs13
-rw-r--r--src/process/unix/driver.rs94
-rw-r--r--src/process/unix/mod.rs9
-rw-r--r--src/process/unix/orphan.rs195
-rw-r--r--src/process/unix/reap.rs6
-rw-r--r--src/runtime/blocking/pool.rs4
-rw-r--r--src/runtime/driver.rs8
-rw-r--r--src/runtime/task/core.rs2
-rw-r--r--src/signal/unix.rs25
-rw-r--r--src/sync/barrier.rs18
-rw-r--r--src/sync/mod.rs2
-rw-r--r--src/sync/mpsc/bounded.rs301
-rw-r--r--src/sync/mpsc/mod.rs2
-rw-r--r--src/sync/mutex.rs199
-rw-r--r--src/sync/notify.rs12
-rw-r--r--src/sync/tests/loom_notify.rs33
-rw-r--r--src/sync/watch.rs2
-rw-r--r--src/task/blocking.rs53
-rw-r--r--src/task/mod.rs5
-rw-r--r--src/task/unconstrained.rs2
-rw-r--r--src/time/clock.rs15
-rw-r--r--src/time/driver/entry.rs2
-rw-r--r--src/time/driver/handle.rs2
-rw-r--r--src/time/interval.rs5
-rw-r--r--src/util/linked_list.rs4
-rw-r--r--src/util/rand.rs6
-rw-r--r--src/util/slab.rs2
-rw-r--r--tests/fs_file.rs21
-rw-r--r--tests/io_buf_reader.rs362
-rw-r--r--tests/io_buf_writer.rs251
-rw-r--r--tests/io_mem_stream.rs30
-rw-r--r--tests/io_write_all_buf.rs96
-rw-r--r--tests/macros_select.rs3
-rw-r--r--tests/macros_test.rs15
-rw-r--r--tests/rt_common.rs1
-rw-r--r--tests/task_blocking.rs1
-rw-r--r--tests/tcp_stream.rs49
-rw-r--r--tests/time_pause.rs169
-rw-r--r--tests/time_sleep.rs38
-rw-r--r--tests/uds_stream.rs49
72 files changed, 4452 insertions, 683 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index b75e29b..13071e3 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "a5ee2f0d3d78daa01e2c6c12d22b82474dc5c32a"
+ "sha1": "580dc9594c8e42b7d2dec60447f35238b8dfcf35"
}
}
diff --git a/Android.bp b/Android.bp
index 222916b..d289f14 100644
--- a/Android.bp
+++ b/Android.bp
@@ -1,4 +1,4 @@
-// This file is generated by cargo2android.py --device --run --features io-util,macros,rt-multi-thread,sync,net,fs,time --tests --patch=patches/Android.bp.patch.
+// This file is generated by cargo2android.py --config cargo2android.json.
// Do not modify this file as changes will be overridden on upgrade.
package {
@@ -58,7 +58,7 @@ rust_library {
}
rust_defaults {
- name: "tokio_defaults",
+ name: "tokio_defaults_tokio",
crate_name: "tokio",
test_suites: ["general-tests"],
auto_gen_config: true,
@@ -100,8 +100,23 @@ rust_defaults {
}
rust_test_host {
+ name: "tokio_host_test_tests__require_full",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/_require_full.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests__require_full",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/_require_full.rs"],
+}
+
+rust_test_host {
name: "tokio_host_test_tests_buffered",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/buffered.rs"],
test_options: {
unit_test: true,
@@ -110,13 +125,28 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_buffered",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/buffered.rs"],
}
rust_test_host {
+ name: "tokio_host_test_tests_io_async_fd",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_async_fd.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_io_async_fd",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_async_fd.rs"],
+}
+
+rust_test_host {
name: "tokio_host_test_tests_io_async_read",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_async_read.rs"],
test_options: {
unit_test: true,
@@ -125,13 +155,43 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_io_async_read",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_async_read.rs"],
}
rust_test_host {
+ name: "tokio_host_test_tests_io_chain",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_chain.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_io_chain",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_chain.rs"],
+}
+
+rust_test_host {
+ name: "tokio_host_test_tests_io_copy",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_copy.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_io_copy",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_copy.rs"],
+}
+
+rust_test_host {
name: "tokio_host_test_tests_io_copy_bidirectional",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_copy_bidirectional.rs"],
test_options: {
unit_test: true,
@@ -140,13 +200,43 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_io_copy_bidirectional",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_copy_bidirectional.rs"],
}
rust_test_host {
+ name: "tokio_host_test_tests_io_driver",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_driver.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_io_driver",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_driver.rs"],
+}
+
+rust_test_host {
+ name: "tokio_host_test_tests_io_driver_drop",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_driver_drop.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_io_driver_drop",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_driver_drop.rs"],
+}
+
+rust_test_host {
name: "tokio_host_test_tests_io_lines",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_lines.rs"],
test_options: {
unit_test: true,
@@ -155,13 +245,13 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_io_lines",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_lines.rs"],
}
rust_test_host {
name: "tokio_host_test_tests_io_mem_stream",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_mem_stream.rs"],
test_options: {
unit_test: true,
@@ -170,13 +260,13 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_io_mem_stream",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_mem_stream.rs"],
}
rust_test_host {
name: "tokio_host_test_tests_io_read",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_read.rs"],
test_options: {
unit_test: true,
@@ -185,13 +275,13 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_io_read",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_read.rs"],
}
rust_test_host {
name: "tokio_host_test_tests_io_read_buf",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_read_buf.rs"],
test_options: {
unit_test: true,
@@ -200,13 +290,43 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_io_read_buf",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_read_buf.rs"],
}
rust_test_host {
+ name: "tokio_host_test_tests_io_read_exact",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_read_exact.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_io_read_exact",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_read_exact.rs"],
+}
+
+rust_test_host {
+ name: "tokio_host_test_tests_io_read_line",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_read_line.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_io_read_line",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_read_line.rs"],
+}
+
+rust_test_host {
name: "tokio_host_test_tests_io_read_to_end",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_read_to_end.rs"],
test_options: {
unit_test: true,
@@ -215,13 +335,58 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_io_read_to_end",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_read_to_end.rs"],
}
rust_test_host {
+ name: "tokio_host_test_tests_io_read_to_string",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_read_to_string.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_io_read_to_string",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_read_to_string.rs"],
+}
+
+rust_test_host {
+ name: "tokio_host_test_tests_io_read_until",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_read_until.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_io_read_until",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_read_until.rs"],
+}
+
+rust_test_host {
+ name: "tokio_host_test_tests_io_split",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_split.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_io_split",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/io_split.rs"],
+}
+
+rust_test_host {
name: "tokio_host_test_tests_io_take",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_take.rs"],
test_options: {
unit_test: true,
@@ -230,13 +395,13 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_io_take",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_take.rs"],
}
rust_test_host {
name: "tokio_host_test_tests_io_write",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_write.rs"],
test_options: {
unit_test: true,
@@ -245,13 +410,13 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_io_write",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_write.rs"],
}
rust_test_host {
name: "tokio_host_test_tests_io_write_all",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_write_all.rs"],
test_options: {
unit_test: true,
@@ -260,13 +425,13 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_io_write_all",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_write_all.rs"],
}
rust_test_host {
name: "tokio_host_test_tests_io_write_buf",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_write_buf.rs"],
test_options: {
unit_test: true,
@@ -275,13 +440,13 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_io_write_buf",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_write_buf.rs"],
}
rust_test_host {
name: "tokio_host_test_tests_io_write_int",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_write_int.rs"],
test_options: {
unit_test: true,
@@ -290,13 +455,13 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_io_write_int",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/io_write_int.rs"],
}
rust_test_host {
name: "tokio_host_test_tests_macros_join",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/macros_join.rs"],
test_options: {
unit_test: true,
@@ -305,13 +470,103 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_macros_join",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/macros_join.rs"],
}
rust_test_host {
+ name: "tokio_host_test_tests_macros_pin",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/macros_pin.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_macros_pin",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/macros_pin.rs"],
+}
+
+rust_test_host {
+ name: "tokio_host_test_tests_macros_select",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/macros_select.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_macros_select",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/macros_select.rs"],
+}
+
+rust_test_host {
+ name: "tokio_host_test_tests_macros_test",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/macros_test.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_macros_test",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/macros_test.rs"],
+}
+
+rust_test_host {
+ name: "tokio_host_test_tests_macros_try_join",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/macros_try_join.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_macros_try_join",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/macros_try_join.rs"],
+}
+
+rust_test_host {
+ name: "tokio_host_test_tests_net_bind_resource",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/net_bind_resource.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_net_bind_resource",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/net_bind_resource.rs"],
+}
+
+rust_test_host {
+ name: "tokio_host_test_tests_net_lookup_host",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/net_lookup_host.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_net_lookup_host",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/net_lookup_host.rs"],
+}
+
+rust_test_host {
name: "tokio_host_test_tests_no_rt",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/no_rt.rs"],
test_options: {
unit_test: true,
@@ -320,13 +575,28 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_no_rt",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/no_rt.rs"],
}
rust_test_host {
+ name: "tokio_host_test_tests_process_kill_on_drop",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/process_kill_on_drop.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_process_kill_on_drop",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/process_kill_on_drop.rs"],
+}
+
+rust_test_host {
name: "tokio_host_test_tests_rt_basic",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/rt_basic.rs"],
test_options: {
unit_test: true,
@@ -335,13 +605,29 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_rt_basic",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/rt_basic.rs"],
}
rust_test_host {
+ name: "tokio_host_test_tests_rt_common",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/rt_common.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_rt_common",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/rt_common.rs"],
+}
+
+
+rust_test_host {
name: "tokio_host_test_tests_rt_threaded",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/rt_threaded.rs"],
test_options: {
unit_test: true,
@@ -350,13 +636,13 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_rt_threaded",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/rt_threaded.rs"],
}
rust_test_host {
name: "tokio_host_test_tests_sync_barrier",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/sync_barrier.rs"],
test_options: {
unit_test: true,
@@ -365,13 +651,13 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_sync_barrier",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/sync_barrier.rs"],
}
rust_test_host {
name: "tokio_host_test_tests_sync_broadcast",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/sync_broadcast.rs"],
test_options: {
unit_test: true,
@@ -380,13 +666,13 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_sync_broadcast",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/sync_broadcast.rs"],
}
rust_test_host {
name: "tokio_host_test_tests_sync_errors",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/sync_errors.rs"],
test_options: {
unit_test: true,
@@ -395,13 +681,13 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_sync_errors",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/sync_errors.rs"],
}
rust_test_host {
name: "tokio_host_test_tests_sync_mpsc",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/sync_mpsc.rs"],
test_options: {
unit_test: true,
@@ -410,13 +696,28 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_sync_mpsc",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/sync_mpsc.rs"],
}
rust_test_host {
+ name: "tokio_host_test_tests_sync_mutex",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/sync_mutex.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_sync_mutex",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/sync_mutex.rs"],
+}
+
+rust_test_host {
name: "tokio_host_test_tests_sync_mutex_owned",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/sync_mutex_owned.rs"],
test_options: {
unit_test: true,
@@ -425,13 +726,43 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_sync_mutex_owned",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/sync_mutex_owned.rs"],
}
rust_test_host {
+ name: "tokio_host_test_tests_sync_notify",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/sync_notify.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_sync_notify",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/sync_notify.rs"],
+}
+
+rust_test_host {
+ name: "tokio_host_test_tests_sync_oneshot",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/sync_oneshot.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_sync_oneshot",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/sync_oneshot.rs"],
+}
+
+rust_test_host {
name: "tokio_host_test_tests_sync_rwlock",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/sync_rwlock.rs"],
test_options: {
unit_test: true,
@@ -440,13 +771,43 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_sync_rwlock",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/sync_rwlock.rs"],
}
rust_test_host {
+ name: "tokio_host_test_tests_sync_semaphore",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/sync_semaphore.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_sync_semaphore",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/sync_semaphore.rs"],
+}
+
+rust_test_host {
+ name: "tokio_host_test_tests_sync_semaphore_owned",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/sync_semaphore_owned.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_sync_semaphore_owned",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/sync_semaphore_owned.rs"],
+}
+
+rust_test_host {
name: "tokio_host_test_tests_sync_watch",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/sync_watch.rs"],
test_options: {
unit_test: true,
@@ -455,13 +816,43 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_sync_watch",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/sync_watch.rs"],
}
rust_test_host {
+ name: "tokio_host_test_tests_task_abort",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/task_abort.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_task_abort",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/task_abort.rs"],
+}
+
+rust_test_host {
+ name: "tokio_host_test_tests_task_blocking",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/task_blocking.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_task_blocking",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/task_blocking.rs"],
+}
+
+rust_test_host {
name: "tokio_host_test_tests_task_local",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/task_local.rs"],
test_options: {
unit_test: true,
@@ -470,13 +861,13 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_task_local",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/task_local.rs"],
}
rust_test_host {
name: "tokio_host_test_tests_task_local_set",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/task_local_set.rs"],
test_options: {
unit_test: true,
@@ -485,13 +876,13 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_task_local_set",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/task_local_set.rs"],
}
rust_test_host {
name: "tokio_host_test_tests_tcp_accept",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/tcp_accept.rs"],
test_options: {
unit_test: true,
@@ -500,13 +891,28 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_tcp_accept",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/tcp_accept.rs"],
}
rust_test_host {
+ name: "tokio_host_test_tests_tcp_connect",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/tcp_connect.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_tcp_connect",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/tcp_connect.rs"],
+}
+
+rust_test_host {
name: "tokio_host_test_tests_tcp_echo",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/tcp_echo.rs"],
test_options: {
unit_test: true,
@@ -515,13 +921,28 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_tcp_echo",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/tcp_echo.rs"],
}
rust_test_host {
+ name: "tokio_host_test_tests_tcp_into_split",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/tcp_into_split.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_tcp_into_split",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/tcp_into_split.rs"],
+}
+
+rust_test_host {
name: "tokio_host_test_tests_tcp_into_std",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/tcp_into_std.rs"],
test_options: {
unit_test: true,
@@ -530,13 +951,28 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_tcp_into_std",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/tcp_into_std.rs"],
}
rust_test_host {
+ name: "tokio_host_test_tests_tcp_peek",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/tcp_peek.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_tcp_peek",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/tcp_peek.rs"],
+}
+
+rust_test_host {
name: "tokio_host_test_tests_tcp_shutdown",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/tcp_shutdown.rs"],
test_options: {
unit_test: true,
@@ -545,13 +981,43 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_tcp_shutdown",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/tcp_shutdown.rs"],
}
rust_test_host {
+ name: "tokio_host_test_tests_tcp_socket",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/tcp_socket.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_tcp_socket",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/tcp_socket.rs"],
+}
+
+rust_test_host {
+ name: "tokio_host_test_tests_tcp_split",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/tcp_split.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_tcp_split",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/tcp_split.rs"],
+}
+
+rust_test_host {
name: "tokio_host_test_tests_time_rt",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/time_rt.rs"],
test_options: {
unit_test: true,
@@ -560,13 +1026,43 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_time_rt",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/time_rt.rs"],
}
rust_test_host {
+ name: "tokio_host_test_tests_udp",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/udp.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_udp",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/udp.rs"],
+}
+
+rust_test_host {
+ name: "tokio_host_test_tests_uds_cred",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/uds_cred.rs"],
+ test_options: {
+ unit_test: true,
+ },
+}
+
+rust_test {
+ name: "tokio_device_test_tests_uds_cred",
+ defaults: ["tokio_defaults_tokio"],
+ srcs: ["tests/uds_cred.rs"],
+}
+
+rust_test_host {
name: "tokio_host_test_tests_uds_split",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/uds_split.rs"],
test_options: {
unit_test: true,
@@ -575,6 +1071,6 @@ rust_test_host {
rust_test {
name: "tokio_device_test_tests_uds_split",
- defaults: ["tokio_defaults"],
+ defaults: ["tokio_defaults_tokio"],
srcs: ["tests/uds_split.rs"],
}
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0808920..2349b0d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,49 @@
+# 1.6.0 (May 14, 2021)
+
+### Added
+
+- fs: try doing a non-blocking read before punting to the threadpool ([#3518])
+- io: add `write_all_buf` to `AsyncWriteExt` ([#3737])
+- io: implement `AsyncSeek` for `BufReader`, `BufWriter`, and `BufStream` ([#3491])
+- net: support non-blocking vectored I/O ([#3761])
+- sync: add `mpsc::Sender::{reserve_owned, try_reserve_owned}` ([#3704])
+- sync: add a `MutexGuard::map` method that returns a `MappedMutexGuard` ([#2472])
+- time: add getter for Interval's period ([#3705])
+
+### Fixed
+
+- io: wake pending writers on `DuplexStream` close ([#3756])
+- process: avoid redundant effort to reap orphan processes ([#3743])
+- signal: use `std::os::raw::c_int` instead of `libc::c_int` on public API ([#3774])
+- sync: preserve permit state in `notify_waiters` ([#3660])
+- task: update `JoinHandle` panic message ([#3727])
+- time: prevent `time::advance` from going too far ([#3712])
+
+### Documented
+
+- net: hide `net::unix::datagram` module from docs ([#3775])
+- process: updated example ([#3748])
+- sync: `Barrier` doc should use task, not thread ([#3780])
+- task: update documentation on `block_in_place` ([#3753])
+
+[#2472]: https://github.com/tokio-rs/tokio/pull/2472
+[#3491]: https://github.com/tokio-rs/tokio/pull/3491
+[#3518]: https://github.com/tokio-rs/tokio/pull/3518
+[#3660]: https://github.com/tokio-rs/tokio/pull/3660
+[#3704]: https://github.com/tokio-rs/tokio/pull/3704
+[#3705]: https://github.com/tokio-rs/tokio/pull/3705
+[#3712]: https://github.com/tokio-rs/tokio/pull/3712
+[#3727]: https://github.com/tokio-rs/tokio/pull/3727
+[#3737]: https://github.com/tokio-rs/tokio/pull/3737
+[#3743]: https://github.com/tokio-rs/tokio/pull/3743
+[#3748]: https://github.com/tokio-rs/tokio/pull/3748
+[#3753]: https://github.com/tokio-rs/tokio/pull/3753
+[#3756]: https://github.com/tokio-rs/tokio/pull/3756
+[#3761]: https://github.com/tokio-rs/tokio/pull/3761
+[#3774]: https://github.com/tokio-rs/tokio/pull/3774
+[#3775]: https://github.com/tokio-rs/tokio/pull/3775
+[#3780]: https://github.com/tokio-rs/tokio/pull/3780
+
# 1.5.0 (April 12, 2021)
### Added
@@ -19,6 +65,7 @@
- rt: fix panic in `JoinHandle::abort()` when called from other threads ([#3672])
- sync: don't panic in `oneshot::try_recv` ([#3674])
- sync: fix notifications getting dropped on receiver drop ([#3652])
+- sync: fix `Semaphore` permit overflow calculation ([#3644])
### Documented
diff --git a/Cargo.toml b/Cargo.toml
index 05ab658..85a245a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,11 +13,11 @@
[package]
edition = "2018"
name = "tokio"
-version = "1.5.0"
+version = "1.6.0"
authors = ["Tokio Contributors <team@tokio.rs>"]
description = "An event-driven, non-blocking I/O platform for writing asynchronous I/O\nbacked applications.\n"
homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio/1.5.0/tokio/"
+documentation = "https://docs.rs/tokio/1.6.0/tokio/"
readme = "README.md"
keywords = ["io", "async", "non-blocking", "futures"]
categories = ["asynchronous", "network-programming"]
@@ -85,7 +85,7 @@ version = "1"
[features]
default = []
-fs = []
+fs = ["libc"]
full = ["fs", "io-util", "io-std", "macros", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"]
io-std = []
io-util = ["memchr", "bytes"]
@@ -107,14 +107,14 @@ features = ["std"]
optional = true
default-features = false
[target."cfg(unix)".dependencies.libc]
-version = "0.2.42"
+version = "0.2.87"
optional = true
[target."cfg(unix)".dependencies.signal-hook-registry]
version = "1.1.1"
optional = true
[target."cfg(unix)".dev-dependencies.libc]
-version = "0.2.42"
+version = "0.2.87"
[target."cfg(unix)".dev-dependencies.nix]
version = "0.19.0"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 5e53c3f..005767e 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -7,12 +7,12 @@ name = "tokio"
# - README.md
# - Update CHANGELOG.md.
# - Create "v1.0.x" git tag.
-version = "1.5.0"
+version = "1.6.0"
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
readme = "README.md"
-documentation = "https://docs.rs/tokio/1.5.0/tokio/"
+documentation = "https://docs.rs/tokio/1.6.0/tokio/"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
description = """
@@ -42,7 +42,7 @@ full = [
"time",
]
-fs = []
+fs = ["libc"]
io-util = ["memchr", "bytes"]
# stdin, stdout, stderr
io-std = []
@@ -103,11 +103,11 @@ parking_lot = { version = "0.11.0", optional = true }
tracing = { version = "0.1.21", default-features = false, features = ["std"], optional = true } # Not in full
[target.'cfg(unix)'.dependencies]
-libc = { version = "0.2.42", optional = true }
+libc = { version = "0.2.87", optional = true }
signal-hook-registry = { version = "1.1.1", optional = true }
[target.'cfg(unix)'.dev-dependencies]
-libc = { version = "0.2.42" }
+libc = { version = "0.2.87" }
nix = { version = "0.19.0" }
[target.'cfg(windows)'.dependencies.winapi]
diff --git a/METADATA b/METADATA
index 0a2c7af..0dcfe0b 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/tokio/tokio-1.5.0.crate"
+ value: "https://static.crates.io/crates/tokio/tokio-1.6.0.crate"
}
- version: "1.5.0"
+ version: "1.6.0"
license_type: NOTICE
last_upgrade_date {
year: 2021
- month: 4
- day: 21
+ month: 5
+ day: 26
}
}
diff --git a/TEST_MAPPING b/TEST_MAPPING
index 686fbda..6c266ef 100644
--- a/TEST_MAPPING
+++ b/TEST_MAPPING
@@ -20,15 +20,33 @@
"name": "tokio-test_device_test_tests_macros"
},
{
+ "name": "tokio_device_test_tests__require_full"
+ },
+ {
"name": "tokio_device_test_tests_buffered"
},
{
+ "name": "tokio_device_test_tests_io_async_fd"
+ },
+ {
"name": "tokio_device_test_tests_io_async_read"
},
{
+ "name": "tokio_device_test_tests_io_chain"
+ },
+ {
+ "name": "tokio_device_test_tests_io_copy"
+ },
+ {
"name": "tokio_device_test_tests_io_copy_bidirectional"
},
{
+ "name": "tokio_device_test_tests_io_driver"
+ },
+ {
+ "name": "tokio_device_test_tests_io_driver_drop"
+ },
+ {
"name": "tokio_device_test_tests_io_lines"
},
{
@@ -41,9 +59,24 @@
"name": "tokio_device_test_tests_io_read_buf"
},
{
+ "name": "tokio_device_test_tests_io_read_exact"
+ },
+ {
+ "name": "tokio_device_test_tests_io_read_line"
+ },
+ {
"name": "tokio_device_test_tests_io_read_to_end"
},
{
+ "name": "tokio_device_test_tests_io_read_to_string"
+ },
+ {
+ "name": "tokio_device_test_tests_io_read_until"
+ },
+ {
+ "name": "tokio_device_test_tests_io_split"
+ },
+ {
"name": "tokio_device_test_tests_io_take"
},
{
@@ -62,12 +95,36 @@
"name": "tokio_device_test_tests_macros_join"
},
{
+ "name": "tokio_device_test_tests_macros_pin"
+ },
+ {
+ "name": "tokio_device_test_tests_macros_select"
+ },
+ {
+ "name": "tokio_device_test_tests_macros_test"
+ },
+ {
+ "name": "tokio_device_test_tests_macros_try_join"
+ },
+ {
+ "name": "tokio_device_test_tests_net_bind_resource"
+ },
+ {
+ "name": "tokio_device_test_tests_net_lookup_host"
+ },
+ {
"name": "tokio_device_test_tests_no_rt"
},
{
+ "name": "tokio_device_test_tests_process_kill_on_drop"
+ },
+ {
"name": "tokio_device_test_tests_rt_basic"
},
{
+ "name": "tokio_device_test_tests_rt_common"
+ },
+ {
"name": "tokio_device_test_tests_rt_threaded"
},
{
@@ -83,15 +140,36 @@
"name": "tokio_device_test_tests_sync_mpsc"
},
{
+ "name": "tokio_device_test_tests_sync_mutex"
+ },
+ {
"name": "tokio_device_test_tests_sync_mutex_owned"
},
{
+ "name": "tokio_device_test_tests_sync_notify"
+ },
+ {
+ "name": "tokio_device_test_tests_sync_oneshot"
+ },
+ {
"name": "tokio_device_test_tests_sync_rwlock"
},
{
+ "name": "tokio_device_test_tests_sync_semaphore"
+ },
+ {
+ "name": "tokio_device_test_tests_sync_semaphore_owned"
+ },
+ {
"name": "tokio_device_test_tests_sync_watch"
},
{
+ "name": "tokio_device_test_tests_task_abort"
+ },
+ {
+ "name": "tokio_device_test_tests_task_blocking"
+ },
+ {
"name": "tokio_device_test_tests_task_local"
},
{
@@ -101,18 +179,39 @@
"name": "tokio_device_test_tests_tcp_accept"
},
{
+ "name": "tokio_device_test_tests_tcp_connect"
+ },
+ {
"name": "tokio_device_test_tests_tcp_echo"
},
{
+ "name": "tokio_device_test_tests_tcp_into_split"
+ },
+ {
"name": "tokio_device_test_tests_tcp_into_std"
},
{
+ "name": "tokio_device_test_tests_tcp_peek"
+ },
+ {
"name": "tokio_device_test_tests_tcp_shutdown"
},
{
+ "name": "tokio_device_test_tests_tcp_socket"
+ },
+ {
+ "name": "tokio_device_test_tests_tcp_split"
+ },
+ {
"name": "tokio_device_test_tests_time_rt"
},
{
+ "name": "tokio_device_test_tests_udp"
+ },
+ {
+ "name": "tokio_device_test_tests_uds_cred"
+ },
+ {
"name": "tokio_device_test_tests_uds_split"
}
]
diff --git a/cargo2android.json b/cargo2android.json
new file mode 100644
index 0000000..a4c19a2
--- /dev/null
+++ b/cargo2android.json
@@ -0,0 +1,11 @@
+{
+ "apex-available": [
+ "//apex_available:platform",
+ "com.android.resolv"
+ ],
+ "min_sdk_version": "29",
+ "features": "io-util,macros,rt-multi-thread,sync,net,fs,time",
+ "device": true,
+ "run": true,
+ "patch": "patches/Android.bp.patch"
+} \ No newline at end of file
diff --git a/patches/Android.bp.patch b/patches/Android.bp.patch
index 759d95f..645b3c3 100644
--- a/patches/Android.bp.patch
+++ b/patches/Android.bp.patch
@@ -1,310 +1,1026 @@
diff --git a/Android.bp b/Android.bp
-index 6b8ca5b..222916b 100644
+index e2a61ba..d289f14 100644
--- a/Android.bp
+++ b/Android.bp
-@@ -50,6 +50,11 @@ rust_library {
- "libpin_project_lite",
+@@ -56,3 +56,1021 @@ rust_library {
],
- proc_macros: ["libtokio_macros"],
-+ apex_available: [
-+ "//apex_available:platform",
-+ "com.android.resolv",
-+ ],
-+ min_sdk_version: "29",
+ min_sdk_version: "29",
}
-
- rust_defaults {
-@@ -61,6 +66,7 @@ rust_defaults {
- features: [
- "bytes",
- "fs",
++
++rust_defaults {
++ name: "tokio_defaults_tokio",
++ crate_name: "tokio",
++ test_suites: ["general-tests"],
++ auto_gen_config: true,
++ edition: "2018",
++ features: [
++ "bytes",
++ "fs",
+ "full",
- "io-util",
- "libc",
- "macros",
-@@ -108,36 +114,6 @@ rust_test {
- srcs: ["tests/buffered.rs"],
- }
-
--rust_test_host {
-- name: "tokio_host_test_tests_fs_file",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/fs_file.rs"],
-- test_options: {
-- unit_test: true,
-- },
--}
--
--rust_test {
-- name: "tokio_device_test_tests_fs_file",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/fs_file.rs"],
--}
--
--rust_test_host {
-- name: "tokio_host_test_tests_fs_link",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/fs_link.rs"],
-- test_options: {
-- unit_test: true,
-- },
--}
--
--rust_test {
-- name: "tokio_device_test_tests_fs_link",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/fs_link.rs"],
--}
--
- rust_test_host {
- name: "tokio_host_test_tests_io_async_read",
- defaults: ["tokio_defaults"],
-@@ -348,51 +324,6 @@ rust_test {
- srcs: ["tests/no_rt.rs"],
- }
-
--rust_test_host {
-- name: "tokio_host_test_tests_process_issue_2174",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/process_issue_2174.rs"],
-- test_options: {
-- unit_test: true,
-- },
--}
--
--rust_test {
-- name: "tokio_device_test_tests_process_issue_2174",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/process_issue_2174.rs"],
--}
--
--rust_test_host {
-- name: "tokio_host_test_tests_process_issue_42",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/process_issue_42.rs"],
-- test_options: {
-- unit_test: true,
-- },
--}
--
--rust_test {
-- name: "tokio_device_test_tests_process_issue_42",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/process_issue_42.rs"],
--}
--
--rust_test_host {
-- name: "tokio_host_test_tests_process_smoke",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/process_smoke.rs"],
-- test_options: {
-- unit_test: true,
-- },
--}
--
--rust_test {
-- name: "tokio_device_test_tests_process_smoke",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/process_smoke.rs"],
--}
--
- rust_test_host {
- name: "tokio_host_test_tests_rt_basic",
- defaults: ["tokio_defaults"],
-@@ -423,111 +354,6 @@ rust_test {
- srcs: ["tests/rt_threaded.rs"],
- }
-
--rust_test_host {
-- name: "tokio_host_test_tests_signal_ctrl_c",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/signal_ctrl_c.rs"],
-- test_options: {
-- unit_test: true,
-- },
--}
--
--rust_test {
-- name: "tokio_device_test_tests_signal_ctrl_c",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/signal_ctrl_c.rs"],
--}
--
--rust_test_host {
-- name: "tokio_host_test_tests_signal_drop_rt",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/signal_drop_rt.rs"],
-- test_options: {
-- unit_test: true,
-- },
--}
--
--rust_test {
-- name: "tokio_device_test_tests_signal_drop_rt",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/signal_drop_rt.rs"],
--}
--
--rust_test_host {
-- name: "tokio_host_test_tests_signal_drop_signal",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/signal_drop_signal.rs"],
-- test_options: {
-- unit_test: true,
-- },
--}
--
--rust_test {
-- name: "tokio_device_test_tests_signal_drop_signal",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/signal_drop_signal.rs"],
--}
--
--rust_test_host {
-- name: "tokio_host_test_tests_signal_multi_rt",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/signal_multi_rt.rs"],
-- test_options: {
-- unit_test: true,
-- },
--}
--
--rust_test {
-- name: "tokio_device_test_tests_signal_multi_rt",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/signal_multi_rt.rs"],
--}
--
--rust_test_host {
-- name: "tokio_host_test_tests_signal_no_rt",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/signal_no_rt.rs"],
-- test_options: {
-- unit_test: true,
-- },
--}
--
--rust_test {
-- name: "tokio_device_test_tests_signal_no_rt",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/signal_no_rt.rs"],
--}
--
--rust_test_host {
-- name: "tokio_host_test_tests_signal_notify_both",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/signal_notify_both.rs"],
-- test_options: {
-- unit_test: true,
-- },
--}
--
--rust_test {
-- name: "tokio_device_test_tests_signal_notify_both",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/signal_notify_both.rs"],
--}
--
--rust_test_host {
-- name: "tokio_host_test_tests_signal_twice",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/signal_twice.rs"],
-- test_options: {
-- unit_test: true,
-- },
--}
--
--rust_test {
-- name: "tokio_device_test_tests_signal_twice",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/signal_twice.rs"],
--}
--
- rust_test_host {
- name: "tokio_host_test_tests_sync_barrier",
- defaults: ["tokio_defaults"],
-@@ -603,21 +429,6 @@ rust_test {
- srcs: ["tests/sync_mutex_owned.rs"],
- }
-
--rust_test_host {
-- name: "tokio_host_test_tests_sync_once_cell",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/sync_once_cell.rs"],
-- test_options: {
-- unit_test: true,
-- },
--}
--
--rust_test {
-- name: "tokio_device_test_tests_sync_once_cell",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/sync_once_cell.rs"],
--}
--
- rust_test_host {
- name: "tokio_host_test_tests_sync_rwlock",
- defaults: ["tokio_defaults"],
-@@ -738,21 +549,6 @@ rust_test {
- srcs: ["tests/tcp_shutdown.rs"],
- }
-
--rust_test_host {
-- name: "tokio_host_test_tests_time_interval",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/time_interval.rs"],
-- test_options: {
-- unit_test: true,
-- },
--}
--
--rust_test {
-- name: "tokio_device_test_tests_time_interval",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/time_interval.rs"],
--}
--
- rust_test_host {
- name: "tokio_host_test_tests_time_rt",
- defaults: ["tokio_defaults"],
-@@ -768,21 +564,6 @@ rust_test {
- srcs: ["tests/time_rt.rs"],
- }
-
--rust_test_host {
-- name: "tokio_host_test_tests_time_timeout",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/time_timeout.rs"],
-- test_options: {
-- unit_test: true,
-- },
--}
--
--rust_test {
-- name: "tokio_device_test_tests_time_timeout",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/time_timeout.rs"],
--}
--
- rust_test_host {
- name: "tokio_host_test_tests_uds_split",
- defaults: ["tokio_defaults"],
-@@ -797,18 +578,3 @@ rust_test {
- defaults: ["tokio_defaults"],
- srcs: ["tests/uds_split.rs"],
- }
--
--rust_test_host {
-- name: "tokio_host_test_tests_uds_stream",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/uds_stream.rs"],
-- test_options: {
-- unit_test: true,
-- },
--}
--
--rust_test {
-- name: "tokio_device_test_tests_uds_stream",
-- defaults: ["tokio_defaults"],
-- srcs: ["tests/uds_stream.rs"],
--}
++ "io-util",
++ "libc",
++ "macros",
++ "memchr",
++ "mio",
++ "net",
++ "num_cpus",
++ "rt",
++ "rt-multi-thread",
++ "sync",
++ "time",
++ "tokio-macros",
++ ],
++ cfgs: ["tokio_track_caller"],
++ rustlibs: [
++ "libasync_stream",
++ "libbytes",
++ "libfutures",
++ "liblibc",
++ "libmemchr",
++ "libmio",
++ "libnix",
++ "libnum_cpus",
++ "libpin_project_lite",
++ "librand",
++ "libtokio",
++ "libtokio_stream",
++ "libtokio_test",
++ ],
++ proc_macros: ["libtokio_macros"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests__require_full",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/_require_full.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests__require_full",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/_require_full.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_buffered",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/buffered.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_buffered",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/buffered.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_async_fd",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_async_fd.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_async_fd",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_async_fd.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_async_read",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_async_read.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_async_read",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_async_read.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_chain",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_chain.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_chain",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_chain.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_copy",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_copy.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_copy",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_copy.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_copy_bidirectional",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_copy_bidirectional.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_copy_bidirectional",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_copy_bidirectional.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_driver",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_driver.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_driver",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_driver.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_driver_drop",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_driver_drop.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_driver_drop",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_driver_drop.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_lines",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_lines.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_lines",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_lines.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_mem_stream",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_mem_stream.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_mem_stream",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_mem_stream.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_read",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_read.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_read",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_read.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_read_buf",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_read_buf.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_read_buf",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_read_buf.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_read_exact",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_read_exact.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_read_exact",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_read_exact.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_read_line",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_read_line.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_read_line",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_read_line.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_read_to_end",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_read_to_end.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_read_to_end",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_read_to_end.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_read_to_string",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_read_to_string.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_read_to_string",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_read_to_string.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_read_until",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_read_until.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_read_until",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_read_until.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_split",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_split.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_split",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_split.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_take",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_take.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_take",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_take.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_write",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_write.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_write",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_write.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_write_all",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_write_all.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_write_all",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_write_all.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_write_buf",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_write_buf.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_write_buf",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_write_buf.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_io_write_int",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_write_int.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_io_write_int",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/io_write_int.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_macros_join",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/macros_join.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_macros_join",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/macros_join.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_macros_pin",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/macros_pin.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_macros_pin",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/macros_pin.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_macros_select",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/macros_select.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_macros_select",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/macros_select.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_macros_test",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/macros_test.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_macros_test",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/macros_test.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_macros_try_join",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/macros_try_join.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_macros_try_join",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/macros_try_join.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_net_bind_resource",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/net_bind_resource.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_net_bind_resource",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/net_bind_resource.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_net_lookup_host",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/net_lookup_host.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_net_lookup_host",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/net_lookup_host.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_no_rt",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/no_rt.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_no_rt",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/no_rt.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_process_kill_on_drop",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/process_kill_on_drop.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_process_kill_on_drop",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/process_kill_on_drop.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_rt_basic",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/rt_basic.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_rt_basic",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/rt_basic.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_rt_common",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/rt_common.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_rt_common",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/rt_common.rs"],
++}
++
++
++rust_test_host {
++ name: "tokio_host_test_tests_rt_threaded",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/rt_threaded.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_rt_threaded",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/rt_threaded.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_sync_barrier",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_barrier.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_sync_barrier",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_barrier.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_sync_broadcast",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_broadcast.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_sync_broadcast",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_broadcast.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_sync_errors",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_errors.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_sync_errors",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_errors.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_sync_mpsc",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_mpsc.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_sync_mpsc",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_mpsc.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_sync_mutex",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_mutex.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_sync_mutex",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_mutex.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_sync_mutex_owned",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_mutex_owned.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_sync_mutex_owned",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_mutex_owned.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_sync_notify",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_notify.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_sync_notify",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_notify.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_sync_oneshot",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_oneshot.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_sync_oneshot",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_oneshot.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_sync_rwlock",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_rwlock.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_sync_rwlock",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_rwlock.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_sync_semaphore",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_semaphore.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_sync_semaphore",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_semaphore.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_sync_semaphore_owned",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_semaphore_owned.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_sync_semaphore_owned",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_semaphore_owned.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_sync_watch",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_watch.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_sync_watch",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/sync_watch.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_task_abort",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/task_abort.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_task_abort",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/task_abort.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_task_blocking",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/task_blocking.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_task_blocking",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/task_blocking.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_task_local",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/task_local.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_task_local",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/task_local.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_task_local_set",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/task_local_set.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_task_local_set",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/task_local_set.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_tcp_accept",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_accept.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_tcp_accept",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_accept.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_tcp_connect",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_connect.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_tcp_connect",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_connect.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_tcp_echo",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_echo.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_tcp_echo",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_echo.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_tcp_into_split",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_into_split.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_tcp_into_split",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_into_split.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_tcp_into_std",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_into_std.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_tcp_into_std",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_into_std.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_tcp_peek",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_peek.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_tcp_peek",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_peek.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_tcp_shutdown",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_shutdown.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_tcp_shutdown",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_shutdown.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_tcp_socket",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_socket.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_tcp_socket",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_socket.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_tcp_split",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_split.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_tcp_split",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/tcp_split.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_time_rt",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/time_rt.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_time_rt",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/time_rt.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_udp",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/udp.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_udp",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/udp.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_uds_cred",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/uds_cred.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_uds_cred",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/uds_cred.rs"],
++}
++
++rust_test_host {
++ name: "tokio_host_test_tests_uds_split",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/uds_split.rs"],
++ test_options: {
++ unit_test: true,
++ },
++}
++
++rust_test {
++ name: "tokio_device_test_tests_uds_split",
++ defaults: ["tokio_defaults_tokio"],
++ srcs: ["tests/uds_split.rs"],
++}
diff --git a/patches/io_mem_stream.patch b/patches/io_mem_stream.patch
new file mode 100644
index 0000000..c21ce18
--- /dev/null
+++ b/patches/io_mem_stream.patch
@@ -0,0 +1,12 @@
+diff --git a/tests/io_mem_stream.rs b/tests/io_mem_stream.rs
+index 01baa53..520391a 100644
+--- a/tests/io_mem_stream.rs
++++ b/tests/io_mem_stream.rs
+@@ -63,6 +63,7 @@ async fn disconnect() {
+ }
+
+ #[tokio::test]
++#[cfg(not(target_os = "android"))]
+ async fn disconnect_reader() {
+ let (a, mut b) = duplex(2);
+
diff --git a/patches/rt_common.patch b/patches/rt_common.patch
new file mode 100644
index 0000000..1444cfe
--- /dev/null
+++ b/patches/rt_common.patch
@@ -0,0 +1,12 @@
+diff --git a/tests/rt_common.rs b/tests/rt_common.rs
+index cb1d0f6..e5fc7a9 100644
+--- a/tests/rt_common.rs
++++ b/tests/rt_common.rs
+@@ -647,6 +647,7 @@ rt_test! {
+ }
+
+ #[test]
++ #[cfg(not(target_os = "android"))]
+ fn panic_in_task() {
+ let rt = rt();
+ let (tx, rx) = oneshot::channel();
diff --git a/patches/task_blocking.rs b/patches/task_blocking.rs
new file mode 100644
index 0000000..7f4f7d4
--- /dev/null
+++ b/patches/task_blocking.rs
@@ -0,0 +1,12 @@
+diff --git a/tests/task_blocking.rs b/tests/task_blocking.rs
+index 82bef8a..d9514d2 100644
+--- a/tests/task_blocking.rs
++++ b/tests/task_blocking.rs
+@@ -114,6 +114,7 @@ fn can_enter_basic_rt_from_within_block_in_place() {
+ }
+
+ #[test]
++#[cfg(not(target_os = "android"))]
+ fn useful_panic_message_when_dropping_rt_in_rt() {
+ use std::panic::{catch_unwind, AssertUnwindSafe};
+
diff --git a/src/fs/file.rs b/src/fs/file.rs
index 5c06e73..abd6e8c 100644
--- a/src/fs/file.rs
+++ b/src/fs/file.rs
@@ -491,14 +491,18 @@ impl AsyncRead for File {
loop {
match inner.state {
Idle(ref mut buf_cell) => {
- let mut buf = buf_cell.take().unwrap();
+ let buf = buf_cell.as_mut().unwrap();
if !buf.is_empty() {
buf.copy_to(dst);
- *buf_cell = Some(buf);
return Ready(Ok(()));
}
+ if let Some(x) = try_nonblocking_read(me.std.as_ref(), dst) {
+ return Ready(x);
+ }
+
+ let mut buf = buf_cell.take().unwrap();
buf.ensure_capacity_for(dst);
let std = me.std.clone();
@@ -756,3 +760,186 @@ impl Inner {
}
}
}
+
+#[cfg(all(target_os = "linux", not(test)))]
+pub(crate) fn try_nonblocking_read(
+ file: &crate::fs::sys::File,
+ dst: &mut ReadBuf<'_>,
+) -> Option<std::io::Result<()>> {
+ use std::sync::atomic::{AtomicBool, Ordering};
+
+ static NONBLOCKING_READ_SUPPORTED: AtomicBool = AtomicBool::new(true);
+ if !NONBLOCKING_READ_SUPPORTED.load(Ordering::Relaxed) {
+ return None;
+ }
+ let out = preadv2::preadv2_safe(file, dst, -1, preadv2::RWF_NOWAIT);
+ if let Err(err) = &out {
+ match err.raw_os_error() {
+ Some(libc::ENOSYS) => {
+ NONBLOCKING_READ_SUPPORTED.store(false, Ordering::Relaxed);
+ return None;
+ }
+ Some(libc::ENOTSUP) | Some(libc::EAGAIN) => return None,
+ _ => {}
+ }
+ }
+ Some(out)
+}
+
+#[cfg(any(not(target_os = "linux"), test))]
+pub(crate) fn try_nonblocking_read(
+ _file: &crate::fs::sys::File,
+ _dst: &mut ReadBuf<'_>,
+) -> Option<std::io::Result<()>> {
+ None
+}
+
+#[cfg(target_os = "linux")]
+mod preadv2 {
+ use libc::{c_int, c_long, c_void, iovec, off_t, ssize_t};
+ use std::os::unix::prelude::AsRawFd;
+
+ use crate::io::ReadBuf;
+
+ pub(crate) fn preadv2_safe(
+ file: &std::fs::File,
+ dst: &mut ReadBuf<'_>,
+ offset: off_t,
+ flags: c_int,
+ ) -> std::io::Result<()> {
+ unsafe {
+ /* We have to defend against buffer overflows manually here. The slice API makes
+ * this fairly straightforward. */
+ let unfilled = dst.unfilled_mut();
+ let mut iov = iovec {
+ iov_base: unfilled.as_mut_ptr() as *mut c_void,
+ iov_len: unfilled.len(),
+ };
+ /* We take a File object rather than an fd as reading from a sensitive fd may confuse
+ * other unsafe code that assumes that only they have access to that fd. */
+ let bytes_read = preadv2(
+ file.as_raw_fd(),
+ &mut iov as *mut iovec as *const iovec,
+ 1,
+ offset,
+ flags,
+ );
+ if bytes_read < 0 {
+ Err(std::io::Error::last_os_error())
+ } else {
+ /* preadv2 returns the number of bytes read, e.g. the number of bytes that have
+ * written into `unfilled`. So it's safe to assume that the data is now
+ * initialised */
+ dst.assume_init(bytes_read as usize);
+ dst.advance(bytes_read as usize);
+ Ok(())
+ }
+ }
+ }
+
+ #[cfg(test)]
+ mod test {
+ use super::*;
+
+ #[test]
+ fn test_preadv2_safe() {
+ use std::io::{Seek, Write};
+ use std::mem::MaybeUninit;
+ use tempfile::tempdir;
+
+ let tmp = tempdir().unwrap();
+ let filename = tmp.path().join("file");
+ const MESSAGE: &[u8] = b"Hello this is a test";
+ {
+ let mut f = std::fs::File::create(&filename).unwrap();
+ f.write_all(MESSAGE).unwrap();
+ }
+ let f = std::fs::File::open(&filename).unwrap();
+
+ let mut buf = [MaybeUninit::<u8>::new(0); 50];
+ let mut br = ReadBuf::uninit(&mut buf);
+
+ // Basic use:
+ preadv2_safe(&f, &mut br, 0, 0).unwrap();
+ assert_eq!(br.initialized().len(), MESSAGE.len());
+ assert_eq!(br.filled(), MESSAGE);
+
+ // Here we check that offset works, but also that appending to a non-empty buffer
+ // behaves correctly WRT initialisation.
+ preadv2_safe(&f, &mut br, 5, 0).unwrap();
+ assert_eq!(br.initialized().len(), MESSAGE.len() * 2 - 5);
+ assert_eq!(br.filled(), b"Hello this is a test this is a test".as_ref());
+
+ // offset of -1 means use the current cursor. This has not been advanced by the
+ // previous reads because we specified an offset there.
+ preadv2_safe(&f, &mut br, -1, 0).unwrap();
+ assert_eq!(br.remaining(), 0);
+ assert_eq!(
+ br.filled(),
+ b"Hello this is a test this is a testHello this is a".as_ref()
+ );
+
+ // but the offset should have been advanced by that read
+ br.clear();
+ preadv2_safe(&f, &mut br, -1, 0).unwrap();
+ assert_eq!(br.filled(), b" test");
+
+ // This should be in cache, so RWF_NOWAIT should work, but it not being in cache
+ // (EAGAIN) or not supported by the underlying filesystem (ENOTSUP) is fine too.
+ br.clear();
+ match preadv2_safe(&f, &mut br, 0, RWF_NOWAIT) {
+ Ok(()) => assert_eq!(br.filled(), MESSAGE),
+ Err(e) => assert!(matches!(
+ e.raw_os_error(),
+ Some(libc::ENOTSUP) | Some(libc::EAGAIN)
+ )),
+ }
+
+ // Test handling large offsets
+ {
+ // I hope the underlying filesystem supports sparse files
+ let mut w = std::fs::OpenOptions::new()
+ .write(true)
+ .open(&filename)
+ .unwrap();
+ w.set_len(0x1_0000_0000).unwrap();
+ w.seek(std::io::SeekFrom::Start(0x1_0000_0000)).unwrap();
+ w.write_all(b"This is a Large File").unwrap();
+ }
+
+ br.clear();
+ preadv2_safe(&f, &mut br, 0x1_0000_0008, 0).unwrap();
+ assert_eq!(br.filled(), b"a Large File");
+ }
+ }
+
+ fn pos_to_lohi(offset: off_t) -> (c_long, c_long) {
+ // 64-bit offset is split over high and low 32-bits on 32-bit architectures.
+ // 64-bit architectures still have high and low arguments, but only the low
+ // one is inspected. See pos_from_hilo in linux/fs/read_write.c.
+ const HALF_LONG_BITS: usize = core::mem::size_of::<c_long>() * 8 / 2;
+ (
+ offset as c_long,
+ // We want to shift this off_t value by size_of::<c_long>(). We can't do
+ // it in one shift because if they're both 64-bits we'd be doing u64 >> 64
+ // which is implementation defined. Instead do it in two halves:
+ ((offset >> HALF_LONG_BITS) >> HALF_LONG_BITS) as c_long,
+ )
+ }
+
+ pub(crate) const RWF_NOWAIT: c_int = 0x00000008;
+ unsafe fn preadv2(
+ fd: c_int,
+ iov: *const iovec,
+ iovcnt: c_int,
+ offset: off_t,
+ flags: c_int,
+ ) -> ssize_t {
+ // Call via libc::syscall rather than libc::preadv2. preadv2 is only supported by glibc
+ // and only since v2.26. By using syscall we don't need to worry about compatiblity with
+ // old glibc versions and it will work on Android and musl too. The downside is that you
+ // can't use `LD_PRELOAD` tricks any more to intercept these calls.
+ let (lo, hi) = pos_to_lohi(offset);
+ libc::syscall(libc::SYS_preadv2, fd, iov, iovcnt, lo, hi, flags) as ssize_t
+ }
+}
diff --git a/src/io/driver/mod.rs b/src/io/driver/mod.rs
index fa2d420..52451c6 100644
--- a/src/io/driver/mod.rs
+++ b/src/io/driver/mod.rs
@@ -273,7 +273,7 @@ cfg_not_rt! {
/// This function panics if there is no current reactor set, or if the `rt`
/// feature flag is not enabled.
pub(super) fn current() -> Self {
- panic!(crate::util::error::CONTEXT_MISSING_ERROR)
+ panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
}
}
}
diff --git a/src/io/driver/registration.rs b/src/io/driver/registration.rs
index 8251fe6..7350be6 100644
--- a/src/io/driver/registration.rs
+++ b/src/io/driver/registration.rs
@@ -14,8 +14,9 @@ cfg_io_driver! {
/// that it will receive task notifications on readiness. This is the lowest
/// level API for integrating with a reactor.
///
- /// The association between an I/O resource is made by calling [`new`]. Once
- /// the association is established, it remains established until the
+ /// The association between an I/O resource is made by calling
+ /// [`new_with_interest_and_handle`].
+ /// Once the association is established, it remains established until the
/// registration instance is dropped.
///
/// A registration instance represents two separate readiness streams. One
@@ -36,7 +37,7 @@ cfg_io_driver! {
/// stream. The write readiness event stream is only for `Ready::writable()`
/// events.
///
- /// [`new`]: method@Self::new
+ /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle
/// [`poll_read_ready`]: method@Self::poll_read_ready`
/// [`poll_write_ready`]: method@Self::poll_write_ready`
#[derive(Debug)]
diff --git a/src/io/poll_evented.rs b/src/io/poll_evented.rs
index 47ae558..a31e6db 100644
--- a/src/io/poll_evented.rs
+++ b/src/io/poll_evented.rs
@@ -10,10 +10,10 @@ cfg_io_driver! {
/// [`std::io::Write`] traits with the reactor that drives it.
///
/// `PollEvented` uses [`Registration`] internally to take a type that
- /// implements [`mio::Evented`] as well as [`std::io::Read`] and or
+ /// implements [`mio::event::Source`] as well as [`std::io::Read`] and or
/// [`std::io::Write`] and associate it with a reactor that will drive it.
///
- /// Once the [`mio::Evented`] type is wrapped by `PollEvented`, it can be
+ /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be
/// used from within the future's execution model. As such, the
/// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`]
/// implementations using the underlying I/O resource as well as readiness
@@ -40,13 +40,13 @@ cfg_io_driver! {
/// [`poll_read_ready`] again will also indicate read readiness.
///
/// When the operation is attempted and is unable to succeed due to the I/O
- /// resource not being ready, the caller must call [`clear_read_ready`] or
- /// [`clear_write_ready`]. This clears the readiness state until a new
+ /// resource not being ready, the caller must call `clear_read_ready` or
+ /// `clear_write_ready`. This clears the readiness state until a new
/// readiness event is received.
///
/// This allows the caller to implement additional functions. For example,
/// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and
- /// [`clear_read_ready`].
+ /// `clear_read_ready`.
///
/// ## Platform-specific events
///
@@ -54,17 +54,11 @@ cfg_io_driver! {
/// These events are included as part of the read readiness event stream. The
/// write readiness event stream is only for `Ready::writable()` events.
///
- /// [`std::io::Read`]: trait@std::io::Read
- /// [`std::io::Write`]: trait@std::io::Write
- /// [`AsyncRead`]: trait@AsyncRead
- /// [`AsyncWrite`]: trait@AsyncWrite
- /// [`mio::Evented`]: trait@mio::Evented
- /// [`Registration`]: struct@Registration
- /// [`TcpListener`]: struct@crate::net::TcpListener
- /// [`clear_read_ready`]: method@Self::clear_read_ready
- /// [`clear_write_ready`]: method@Self::clear_write_ready
- /// [`poll_read_ready`]: method@Self::poll_read_ready
- /// [`poll_write_ready`]: method@Self::poll_write_ready
+ /// [`AsyncRead`]: crate::io::AsyncRead
+ /// [`AsyncWrite`]: crate::io::AsyncWrite
+ /// [`TcpListener`]: crate::net::TcpListener
+ /// [`poll_read_ready`]: Registration::poll_read_ready
+ /// [`poll_write_ready`]: Registration::poll_write_ready
pub(crate) struct PollEvented<E: Source> {
io: Option<E>,
registration: Registration,
diff --git a/src/io/util/async_write_ext.rs b/src/io/util/async_write_ext.rs
index d011d82..2510ccd 100644
--- a/src/io/util/async_write_ext.rs
+++ b/src/io/util/async_write_ext.rs
@@ -2,6 +2,7 @@ use crate::io::util::flush::{flush, Flush};
use crate::io::util::shutdown::{shutdown, Shutdown};
use crate::io::util::write::{write, Write};
use crate::io::util::write_all::{write_all, WriteAll};
+use crate::io::util::write_all_buf::{write_all_buf, WriteAllBuf};
use crate::io::util::write_buf::{write_buf, WriteBuf};
use crate::io::util::write_int::{
WriteI128, WriteI128Le, WriteI16, WriteI16Le, WriteI32, WriteI32Le, WriteI64, WriteI64Le,
@@ -159,7 +160,6 @@ cfg_io_util! {
write_vectored(self, bufs)
}
-
/// Writes a buffer into this writer, advancing the buffer's internal
/// cursor.
///
@@ -197,10 +197,11 @@ cfg_io_util! {
///
/// # Examples
///
- /// [`File`] implements `Read` and [`Cursor<&[u8]>`] implements [`Buf`]:
+ /// [`File`] implements [`AsyncWrite`] and [`Cursor`]`<&[u8]>` implements [`Buf`]:
///
/// [`File`]: crate::fs::File
/// [`Buf`]: bytes::Buf
+ /// [`Cursor`]: std::io::Cursor
///
/// ```no_run
/// use tokio::io::{self, AsyncWriteExt};
@@ -233,6 +234,59 @@ cfg_io_util! {
write_buf(self, src)
}
+ /// Attempts to write an entire buffer into this writer
+ ///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn write_all_buf(&mut self, buf: impl Buf) -> Result<(), io::Error> {
+ /// while buf.has_remaining() {
+ /// self.write_buf(&mut buf).await?;
+ /// }
+ /// }
+ /// ```
+ ///
+ /// This method will continuously call [`write`] until
+ /// [`buf.has_remaining()`](bytes::Buf::has_remaining) returns false. This method will not
+ /// return until the entire buffer has been successfully written or an error occurs. The
+ /// first error generated will be returned.
+ ///
+ /// The buffer is advanced after each chunk is successfully written. After failure,
+ /// `src.chunk()` will return the chunk that failed to write.
+ ///
+ /// # Examples
+ ///
+ /// [`File`] implements [`AsyncWrite`] and [`Cursor`]`<&[u8]>` implements [`Buf`]:
+ ///
+ /// [`File`]: crate::fs::File
+ /// [`Buf`]: bytes::Buf
+ /// [`Cursor`]: std::io::Cursor
+ ///
+ /// ```no_run
+ /// use tokio::io::{self, AsyncWriteExt};
+ /// use tokio::fs::File;
+ ///
+ /// use std::io::Cursor;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let mut file = File::create("foo.txt").await?;
+ /// let mut buffer = Cursor::new(b"data to write");
+ ///
+ /// file.write_all_buf(&mut buffer).await?;
+ /// Ok(())
+ /// }
+ /// ```
+ ///
+ /// [`write`]: AsyncWriteExt::write
+ fn write_all_buf<'a, B>(&'a mut self, src: &'a mut B) -> WriteAllBuf<'a, Self, B>
+ where
+ Self: Sized + Unpin,
+ B: Buf,
+ {
+ write_all_buf(self, src)
+ }
+
/// Attempts to write an entire buffer into this writer.
///
/// Equivalent to:
diff --git a/src/io/util/buf_reader.rs b/src/io/util/buf_reader.rs
index 271f61b..cc65ef2 100644
--- a/src/io/util/buf_reader.rs
+++ b/src/io/util/buf_reader.rs
@@ -1,11 +1,11 @@
use crate::io::util::DEFAULT_BUF_SIZE;
-use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
+use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
use pin_project_lite::pin_project;
-use std::io;
+use std::io::{self, SeekFrom};
use std::pin::Pin;
use std::task::{Context, Poll};
-use std::{cmp, fmt};
+use std::{cmp, fmt, mem};
pin_project! {
/// The `BufReader` struct adds buffering to any reader.
@@ -30,6 +30,7 @@ pin_project! {
pub(super) buf: Box<[u8]>,
pub(super) pos: usize,
pub(super) cap: usize,
+ pub(super) seek_state: SeekState,
}
}
@@ -48,6 +49,7 @@ impl<R: AsyncRead> BufReader<R> {
buf: buffer.into_boxed_slice(),
pos: 0,
cap: 0,
+ seek_state: SeekState::Init,
}
}
@@ -141,6 +143,122 @@ impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
}
}
+#[derive(Debug, Clone, Copy)]
+pub(super) enum SeekState {
+ /// start_seek has not been called.
+ Init,
+ /// start_seek has been called, but poll_complete has not yet been called.
+ Start(SeekFrom),
+ /// Waiting for completion of the first poll_complete in the `n.checked_sub(remainder).is_none()` branch.
+ PendingOverflowed(i64),
+ /// Waiting for completion of poll_complete.
+ Pending,
+}
+
+/// Seek to an offset, in bytes, in the underlying reader.
+///
+/// The position used for seeking with `SeekFrom::Current(_)` is the
+/// position the underlying reader would be at if the `BufReader` had no
+/// internal buffer.
+///
+/// Seeking always discards the internal buffer, even if the seek position
+/// would otherwise fall within it. This guarantees that calling
+/// `.into_inner()` immediately after a seek yields the underlying reader
+/// at the same position.
+///
+/// See [`AsyncSeek`] for more details.
+///
+/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
+/// where `n` minus the internal buffer length overflows an `i64`, two
+/// seeks will be performed instead of one. If the second seek returns
+/// `Err`, the underlying reader will be left at the same position it would
+/// have if you called `seek` with `SeekFrom::Current(0)`.
+impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
+ fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
+ // We needs to call seek operation multiple times.
+ // And we should always call both start_seek and poll_complete,
+ // as start_seek alone cannot guarantee that the operation will be completed.
+ // poll_complete receives a Context and returns a Poll, so it cannot be called
+ // inside start_seek.
+ *self.project().seek_state = SeekState::Start(pos);
+ Ok(())
+ }
+
+ fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
+ let res = match mem::replace(self.as_mut().project().seek_state, SeekState::Init) {
+ SeekState::Init => {
+ // 1.x AsyncSeek recommends calling poll_complete before start_seek.
+ // We don't have to guarantee that the value returned by
+ // poll_complete called without start_seek is correct,
+ // so we'll return 0.
+ return Poll::Ready(Ok(0));
+ }
+ SeekState::Start(SeekFrom::Current(n)) => {
+ let remainder = (self.cap - self.pos) as i64;
+ // it should be safe to assume that remainder fits within an i64 as the alternative
+ // means we managed to allocate 8 exbibytes and that's absurd.
+ // But it's not out of the realm of possibility for some weird underlying reader to
+ // support seeking by i64::min_value() so we need to handle underflow when subtracting
+ // remainder.
+ if let Some(offset) = n.checked_sub(remainder) {
+ self.as_mut()
+ .get_pin_mut()
+ .start_seek(SeekFrom::Current(offset))?;
+ self.as_mut().get_pin_mut().poll_complete(cx)?
+ } else {
+ // seek backwards by our remainder, and then by the offset
+ self.as_mut()
+ .get_pin_mut()
+ .start_seek(SeekFrom::Current(-remainder))?;
+ if self.as_mut().get_pin_mut().poll_complete(cx)?.is_pending() {
+ *self.as_mut().project().seek_state = SeekState::PendingOverflowed(n);
+ return Poll::Pending;
+ }
+
+ // https://github.com/rust-lang/rust/pull/61157#issuecomment-495932676
+ self.as_mut().discard_buffer();
+
+ self.as_mut()
+ .get_pin_mut()
+ .start_seek(SeekFrom::Current(n))?;
+ self.as_mut().get_pin_mut().poll_complete(cx)?
+ }
+ }
+ SeekState::PendingOverflowed(n) => {
+ if self.as_mut().get_pin_mut().poll_complete(cx)?.is_pending() {
+ *self.as_mut().project().seek_state = SeekState::PendingOverflowed(n);
+ return Poll::Pending;
+ }
+
+ // https://github.com/rust-lang/rust/pull/61157#issuecomment-495932676
+ self.as_mut().discard_buffer();
+
+ self.as_mut()
+ .get_pin_mut()
+ .start_seek(SeekFrom::Current(n))?;
+ self.as_mut().get_pin_mut().poll_complete(cx)?
+ }
+ SeekState::Start(pos) => {
+ // Seeking with Start/End doesn't care about our buffer length.
+ self.as_mut().get_pin_mut().start_seek(pos)?;
+ self.as_mut().get_pin_mut().poll_complete(cx)?
+ }
+ SeekState::Pending => self.as_mut().get_pin_mut().poll_complete(cx)?,
+ };
+
+ match res {
+ Poll::Ready(res) => {
+ self.discard_buffer();
+ Poll::Ready(Ok(res))
+ }
+ Poll::Pending => {
+ *self.as_mut().project().seek_state = SeekState::Pending;
+ Poll::Pending
+ }
+ }
+ }
+}
+
impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
fn poll_write(
self: Pin<&mut Self>,
diff --git a/src/io/util/buf_stream.rs b/src/io/util/buf_stream.rs
index cc857e2..9238665 100644
--- a/src/io/util/buf_stream.rs
+++ b/src/io/util/buf_stream.rs
@@ -94,9 +94,11 @@ impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> {
buf: rbuf,
pos,
cap,
+ seek_state: rseek_state,
},
buf: wbuf,
written,
+ seek_state: wseek_state,
} = b;
BufStream {
@@ -105,10 +107,12 @@ impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> {
inner,
buf: wbuf,
written,
+ seek_state: wseek_state,
},
buf: rbuf,
pos,
cap,
+ seek_state: rseek_state,
},
}
}
diff --git a/src/io/util/buf_writer.rs b/src/io/util/buf_writer.rs
index 5e3d4b7..4e8e493 100644
--- a/src/io/util/buf_writer.rs
+++ b/src/io/util/buf_writer.rs
@@ -1,9 +1,9 @@
use crate::io::util::DEFAULT_BUF_SIZE;
-use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
+use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
use pin_project_lite::pin_project;
use std::fmt;
-use std::io::{self, Write};
+use std::io::{self, SeekFrom, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -34,6 +34,7 @@ pin_project! {
pub(super) inner: W,
pub(super) buf: Vec<u8>,
pub(super) written: usize,
+ pub(super) seek_state: SeekState,
}
}
@@ -50,6 +51,7 @@ impl<W: AsyncWrite> BufWriter<W> {
inner,
buf: Vec::with_capacity(cap),
written: 0,
+ seek_state: SeekState::Init,
}
}
@@ -142,6 +144,62 @@ impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
}
}
+#[derive(Debug, Clone, Copy)]
+pub(super) enum SeekState {
+ /// start_seek has not been called.
+ Init,
+ /// start_seek has been called, but poll_complete has not yet been called.
+ Start(SeekFrom),
+ /// Waiting for completion of poll_complete.
+ Pending,
+}
+
+/// Seek to the offset, in bytes, in the underlying writer.
+///
+/// Seeking always writes out the internal buffer before seeking.
+impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
+ fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
+ // We need to flush the internal buffer before seeking.
+ // It receives a `Context` and returns a `Poll`, so it cannot be called
+ // inside `start_seek`.
+ *self.project().seek_state = SeekState::Start(pos);
+ Ok(())
+ }
+
+ fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
+ let pos = match self.seek_state {
+ SeekState::Init => {
+ return self.project().inner.poll_complete(cx);
+ }
+ SeekState::Start(pos) => Some(pos),
+ SeekState::Pending => None,
+ };
+
+ // Flush the internal buffer before seeking.
+ ready!(self.as_mut().flush_buf(cx))?;
+
+ let mut me = self.project();
+ if let Some(pos) = pos {
+ // Ensure previous seeks have finished before starting a new one
+ ready!(me.inner.as_mut().poll_complete(cx))?;
+ if let Err(e) = me.inner.as_mut().start_seek(pos) {
+ *me.seek_state = SeekState::Init;
+ return Poll::Ready(Err(e));
+ }
+ }
+ match me.inner.poll_complete(cx) {
+ Poll::Ready(res) => {
+ *me.seek_state = SeekState::Init;
+ Poll::Ready(res)
+ }
+ Poll::Pending => {
+ *me.seek_state = SeekState::Pending;
+ Poll::Pending
+ }
+ }
+ }
+}
+
impl<W: AsyncWrite + AsyncRead> AsyncRead for BufWriter<W> {
fn poll_read(
self: Pin<&mut Self>,
diff --git a/src/io/util/copy_bidirectional.rs b/src/io/util/copy_bidirectional.rs
index cc43f0f..c93060b 100644
--- a/src/io/util/copy_bidirectional.rs
+++ b/src/io/util/copy_bidirectional.rs
@@ -104,6 +104,7 @@ where
/// # Return value
///
/// Returns a tuple of bytes copied `a` to `b` and bytes copied `b` to `a`.
+#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub async fn copy_bidirectional<A, B>(a: &mut A, b: &mut B) -> Result<(u64, u64), std::io::Error>
where
A: AsyncRead + AsyncWrite + Unpin + ?Sized,
diff --git a/src/io/util/lines.rs b/src/io/util/lines.rs
index ed6a944..d02a453 100644
--- a/src/io/util/lines.rs
+++ b/src/io/util/lines.rs
@@ -128,7 +128,7 @@ where
}
}
- Poll::Ready(Ok(Some(mem::replace(me.buf, String::new()))))
+ Poll::Ready(Ok(Some(mem::take(me.buf))))
}
}
diff --git a/src/io/util/mem.rs b/src/io/util/mem.rs
index e91a932..4eefe7b 100644
--- a/src/io/util/mem.rs
+++ b/src/io/util/mem.rs
@@ -16,6 +16,14 @@ use std::{
/// that can be used as in-memory IO types. Writing to one of the pairs will
/// allow that data to be read from the other, and vice versa.
///
+/// # Closing a `DuplexStream`
+///
+/// If one end of the `DuplexStream` channel is dropped, any pending reads on
+/// the other side will continue to read data until the buffer is drained, then
+/// they will signal EOF by returning 0 bytes. Any writes to the other side,
+/// including pending ones (that are waiting for free space in the buffer) will
+/// return `Err(BrokenPipe)` immediately.
+///
/// # Example
///
/// ```
@@ -37,6 +45,7 @@ use std::{
/// # }
/// ```
#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub struct DuplexStream {
read: Arc<Mutex<Pipe>>,
write: Arc<Mutex<Pipe>>,
@@ -72,6 +81,7 @@ struct Pipe {
///
/// The `max_buf_size` argument is the maximum amount of bytes that can be
/// written to a side before the write returns `Poll::Pending`.
+#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream) {
let one = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
let two = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
@@ -134,7 +144,8 @@ impl AsyncWrite for DuplexStream {
impl Drop for DuplexStream {
fn drop(&mut self) {
// notify the other side of the closure
- self.write.lock().close();
+ self.write.lock().close_write();
+ self.read.lock().close_read();
}
}
@@ -151,12 +162,21 @@ impl Pipe {
}
}
- fn close(&mut self) {
+ fn close_write(&mut self) {
self.is_closed = true;
+ // needs to notify any readers that no more data will come
if let Some(waker) = self.read_waker.take() {
waker.wake();
}
}
+
+ fn close_read(&mut self) {
+ self.is_closed = true;
+ // needs to notify any writers that they have to abort
+ if let Some(waker) = self.write_waker.take() {
+ waker.wake();
+ }
+ }
}
impl AsyncRead for Pipe {
@@ -217,7 +237,7 @@ impl AsyncWrite for Pipe {
mut self: Pin<&mut Self>,
_: &mut task::Context<'_>,
) -> Poll<std::io::Result<()>> {
- self.close();
+ self.close_write();
Poll::Ready(Ok(()))
}
}
diff --git a/src/io/util/mod.rs b/src/io/util/mod.rs
index ab38664..fd3dd0d 100644
--- a/src/io/util/mod.rs
+++ b/src/io/util/mod.rs
@@ -77,6 +77,7 @@ cfg_io_util! {
mod write_vectored;
mod write_all;
mod write_buf;
+ mod write_all_buf;
mod write_int;
diff --git a/src/io/util/read_line.rs b/src/io/util/read_line.rs
index d38ffaf..e641f51 100644
--- a/src/io/util/read_line.rs
+++ b/src/io/util/read_line.rs
@@ -36,7 +36,7 @@ where
{
ReadLine {
reader,
- buf: mem::replace(string, String::new()).into_bytes(),
+ buf: mem::take(string).into_bytes(),
output: string,
read: 0,
_pin: PhantomPinned,
@@ -99,7 +99,7 @@ pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
read: &mut usize,
) -> Poll<io::Result<usize>> {
let io_res = ready!(read_until_internal(reader, cx, b'\n', buf, read));
- let utf8_res = String::from_utf8(mem::replace(buf, Vec::new()));
+ let utf8_res = String::from_utf8(mem::take(buf));
// At this point both buf and output are empty. The allocation is in utf8_res.
diff --git a/src/io/util/read_to_string.rs b/src/io/util/read_to_string.rs
index 2c17383..b3d82a2 100644
--- a/src/io/util/read_to_string.rs
+++ b/src/io/util/read_to_string.rs
@@ -37,7 +37,7 @@ pub(crate) fn read_to_string<'a, R>(
where
R: AsyncRead + ?Sized + Unpin,
{
- let buf = mem::replace(string, String::new()).into_bytes();
+ let buf = mem::take(string).into_bytes();
ReadToString {
reader,
buf: VecWithInitialized::new(buf),
diff --git a/src/io/util/split.rs b/src/io/util/split.rs
index 4f3ce4e..9c2bb05 100644
--- a/src/io/util/split.rs
+++ b/src/io/util/split.rs
@@ -106,7 +106,7 @@ where
me.buf.pop();
}
- Poll::Ready(Ok(Some(mem::replace(me.buf, Vec::new()))))
+ Poll::Ready(Ok(Some(mem::take(me.buf))))
}
}
diff --git a/src/io/util/write_all_buf.rs b/src/io/util/write_all_buf.rs
new file mode 100644
index 0000000..05af7fe
--- /dev/null
+++ b/src/io/util/write_all_buf.rs
@@ -0,0 +1,56 @@
+use crate::io::AsyncWrite;
+
+use bytes::Buf;
+use pin_project_lite::pin_project;
+use std::future::Future;
+use std::io;
+use std::marker::PhantomPinned;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+ /// A future to write some of the buffer to an `AsyncWrite`.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct WriteAllBuf<'a, W, B> {
+ writer: &'a mut W,
+ buf: &'a mut B,
+ #[pin]
+ _pin: PhantomPinned,
+ }
+}
+
+/// Tries to write some bytes from the given `buf` to the writer in an
+/// asynchronous manner, returning a future.
+pub(crate) fn write_all_buf<'a, W, B>(writer: &'a mut W, buf: &'a mut B) -> WriteAllBuf<'a, W, B>
+where
+ W: AsyncWrite + Unpin,
+ B: Buf,
+{
+ WriteAllBuf {
+ writer,
+ buf,
+ _pin: PhantomPinned,
+ }
+}
+
+impl<W, B> Future for WriteAllBuf<'_, W, B>
+where
+ W: AsyncWrite + Unpin,
+ B: Buf,
+{
+ type Output = io::Result<()>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ let me = self.project();
+ while me.buf.has_remaining() {
+ let n = ready!(Pin::new(&mut *me.writer).poll_write(cx, me.buf.chunk())?);
+ me.buf.advance(n);
+ if n == 0 {
+ return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
+ }
+ }
+
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/src/macros/select.rs b/src/macros/select.rs
index 3ba16b6..f98ebff 100644
--- a/src/macros/select.rs
+++ b/src/macros/select.rs
@@ -154,7 +154,7 @@
/// `select!` panics if all branches are disabled **and** there is no provided
/// `else` branch. A branch is disabled when the provided `if` precondition
/// returns `false` **or** when the pattern does not match the result of `<async
-/// expression>.
+/// expression>`.
///
/// # Examples
///
diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs
index e231e5d..2a367ef 100644
--- a/src/net/tcp/stream.rs
+++ b/src/net/tcp/stream.rs
@@ -563,6 +563,84 @@ impl TcpStream {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
+ /// Try to read data from the stream into the provided buffers, returning
+ /// how many bytes were read.
+ ///
+ /// Data is copied to fill each buffer in order, with the final buffer
+ /// written to possibly being only partially filled. This method behaves
+ /// equivalently to a single call to [`try_read()`] with concatenated
+ /// buffers.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_vectored()` is non-blocking, the buffer does not have to be
+ /// stored by the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`try_read()`]: TcpStream::try_read()
+ /// [`readable()`]: TcpStream::readable()
+ /// [`ready()`]: TcpStream::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ /// use std::io::{self, IoSliceMut};
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// stream.readable().await?;
+ ///
+ /// // Creating the buffer **after** the `await` prevents it from
+ /// // being stored in the async task.
+ /// let mut buf_a = [0; 512];
+ /// let mut buf_b = [0; 1024];
+ /// let mut bufs = [
+ /// IoSliceMut::new(&mut buf_a),
+ /// IoSliceMut::new(&mut buf_b),
+ /// ];
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_read_vectored(&mut bufs) {
+ /// Ok(0) => break,
+ /// Ok(n) => {
+ /// println!("read {} bytes", n);
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
+ use std::io::Read;
+
+ self.io
+ .registration()
+ .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
+ }
+
cfg_io_util! {
/// Try to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
@@ -775,6 +853,68 @@ impl TcpStream {
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}
+ /// Try to write several buffers to the stream, returning how many bytes
+ /// were written.
+ ///
+ /// Data is written from each buffer in order, with the final buffer read
+ /// from possible being only partially consumed. This method behaves
+ /// equivalently to a single call to [`try_write()`] with concatenated
+ /// buffers.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// [`try_write()`]: TcpStream::try_write()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// stream.writable().await?;
+ ///
+ /// // Try to write data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_write_vectored(&bufs) {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
+ use std::io::Write;
+
+ self.io
+ .registration()
+ .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
+ }
+
/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
diff --git a/src/net/unix/mod.rs b/src/net/unix/mod.rs
index 19ee34a..c3046f1 100644
--- a/src/net/unix/mod.rs
+++ b/src/net/unix/mod.rs
@@ -1,5 +1,9 @@
//! Unix domain socket utility types
+// This module does not currently provide any public API, but it was
+// unintentionally defined as a public module. Hide it from the documentation
+// instead of changing it to a private module to avoid breakage.
+#[doc(hidden)]
pub mod datagram;
pub(crate) mod listener;
diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs
index d797aae..917844b 100644
--- a/src/net/unix/stream.rs
+++ b/src/net/unix/stream.rs
@@ -271,6 +271,84 @@ impl UnixStream {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
+ /// Try to read data from the stream into the provided buffers, returning
+ /// how many bytes were read.
+ ///
+ /// Data is copied to fill each buffer in order, with the final buffer
+ /// written to possibly being only partially filled. This method behaves
+ /// equivalently to a single call to [`try_read()`] with concatenated
+ /// buffers.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_vectored()` is non-blocking, the buffer does not have to be
+ /// stored by the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`try_read()`]: UnixStream::try_read()
+ /// [`readable()`]: UnixStream::readable()
+ /// [`ready()`]: UnixStream::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ /// use std::error::Error;
+ /// use std::io::{self, IoSliceMut};
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ /// let stream = UnixStream::connect(bind_path).await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// stream.readable().await?;
+ ///
+ /// // Creating the buffer **after** the `await` prevents it from
+ /// // being stored in the async task.
+ /// let mut buf_a = [0; 512];
+ /// let mut buf_b = [0; 1024];
+ /// let mut bufs = [
+ /// IoSliceMut::new(&mut buf_a),
+ /// IoSliceMut::new(&mut buf_b),
+ /// ];
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_read_vectored(&mut bufs) {
+ /// Ok(0) => break,
+ /// Ok(n) => {
+ /// println!("read {} bytes", n);
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
+ }
+
cfg_io_util! {
/// Try to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
@@ -487,6 +565,68 @@ impl UnixStream {
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}
+ /// Try to write several buffers to the stream, returning how many bytes
+ /// were written.
+ ///
+ /// Data is written from each buffer in order, with the final buffer read
+ /// from possible being only partially consumed. This method behaves
+ /// equivalently to a single call to [`try_write()`] with concatenated
+ /// buffers.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// [`try_write()`]: UnixStream::try_write()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ /// let stream = UnixStream::connect(bind_path).await?;
+ ///
+ /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// stream.writable().await?;
+ ///
+ /// // Try to write data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_write_vectored(&bufs) {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
+ }
+
/// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`.
///
/// This function is intended to be used to wrap a UnixStream from the
diff --git a/src/process/mod.rs b/src/process/mod.rs
index 00e39b0..96ceb6d 100644
--- a/src/process/mod.rs
+++ b/src/process/mod.rs
@@ -994,13 +994,22 @@ impl Child {
/// If the caller wishes to explicitly control when the child's stdin
/// handle is closed, they may `.take()` it before calling `.wait()`:
///
- /// ```no_run
+ /// ```
+ /// # #[cfg(not(unix))]fn main(){}
+ /// # #[cfg(unix)]
/// use tokio::io::AsyncWriteExt;
+ /// # #[cfg(unix)]
/// use tokio::process::Command;
+ /// # #[cfg(unix)]
+ /// use std::process::Stdio;
///
+ /// # #[cfg(unix)]
/// #[tokio::main]
/// async fn main() {
- /// let mut child = Command::new("cat").spawn().unwrap();
+ /// let mut child = Command::new("cat")
+ /// .stdin(Stdio::piped())
+ /// .spawn()
+ /// .unwrap();
///
/// let mut stdin = child.stdin.take().unwrap();
/// tokio::spawn(async move {
diff --git a/src/process/unix/driver.rs b/src/process/unix/driver.rs
index 110b484..43b2efa 100644
--- a/src/process/unix/driver.rs
+++ b/src/process/unix/driver.rs
@@ -3,11 +3,8 @@
//! Process driver
use crate::park::Park;
-use crate::process::unix::orphan::ReapOrphanQueue;
use crate::process::unix::GlobalOrphanQueue;
-use crate::signal::unix::driver::Driver as SignalDriver;
-use crate::signal::unix::{signal_with_handle, SignalKind};
-use crate::sync::watch;
+use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle};
use std::io;
use std::time::Duration;
@@ -16,51 +13,20 @@ use std::time::Duration;
#[derive(Debug)]
pub(crate) struct Driver {
park: SignalDriver,
- inner: CoreDriver<watch::Receiver<()>, GlobalOrphanQueue>,
-}
-
-#[derive(Debug)]
-struct CoreDriver<S, Q> {
- sigchild: S,
- orphan_queue: Q,
-}
-
-trait HasChanged {
- fn has_changed(&mut self) -> bool;
-}
-
-impl<T> HasChanged for watch::Receiver<T> {
- fn has_changed(&mut self) -> bool {
- self.try_has_changed().and_then(Result::ok).is_some()
- }
-}
-
-// ===== impl CoreDriver =====
-
-impl<S, Q> CoreDriver<S, Q>
-where
- S: HasChanged,
- Q: ReapOrphanQueue,
-{
- fn process(&mut self) {
- if self.sigchild.has_changed() {
- self.orphan_queue.reap_orphans();
- }
- }
+ signal_handle: SignalHandle,
}
// ===== impl Driver =====
impl Driver {
/// Creates a new signal `Driver` instance that delegates wakeups to `park`.
- pub(crate) fn new(park: SignalDriver) -> io::Result<Self> {
- let sigchild = signal_with_handle(SignalKind::child(), park.handle())?;
- let inner = CoreDriver {
- sigchild,
- orphan_queue: GlobalOrphanQueue,
- };
+ pub(crate) fn new(park: SignalDriver) -> Self {
+ let signal_handle = park.handle();
- Ok(Self { park, inner })
+ Self {
+ park,
+ signal_handle,
+ }
}
}
@@ -76,13 +42,13 @@ impl Park for Driver {
fn park(&mut self) -> Result<(), Self::Error> {
self.park.park()?;
- self.inner.process();
+ GlobalOrphanQueue::reap_orphans(&self.signal_handle);
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.park.park_timeout(duration)?;
- self.inner.process();
+ GlobalOrphanQueue::reap_orphans(&self.signal_handle);
Ok(())
}
@@ -90,43 +56,3 @@ impl Park for Driver {
self.park.shutdown()
}
}
-
-#[cfg(test)]
-mod test {
- use super::*;
- use crate::process::unix::orphan::test::MockQueue;
-
- struct MockStream {
- total_try_recv: usize,
- values: Vec<Option<()>>,
- }
-
- impl MockStream {
- fn new(values: Vec<Option<()>>) -> Self {
- Self {
- total_try_recv: 0,
- values,
- }
- }
- }
-
- impl HasChanged for MockStream {
- fn has_changed(&mut self) -> bool {
- self.total_try_recv += 1;
- self.values.remove(0).is_some()
- }
- }
-
- #[test]
- fn no_reap_if_no_signal() {
- let mut driver = CoreDriver {
- sigchild: MockStream::new(vec![None]),
- orphan_queue: MockQueue::<()>::new(),
- };
-
- driver.process();
-
- assert_eq!(1, driver.sigchild.total_try_recv);
- assert_eq!(0, driver.orphan_queue.total_reaps.get());
- }
-}
diff --git a/src/process/unix/mod.rs b/src/process/unix/mod.rs
index 852a191..fab63dd 100644
--- a/src/process/unix/mod.rs
+++ b/src/process/unix/mod.rs
@@ -24,7 +24,7 @@
pub(crate) mod driver;
pub(crate) mod orphan;
-use orphan::{OrphanQueue, OrphanQueueImpl, ReapOrphanQueue, Wait};
+use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
mod reap;
use reap::Reaper;
@@ -32,6 +32,7 @@ use reap::Reaper;
use crate::io::PollEvented;
use crate::process::kill::Kill;
use crate::process::SpawnedChild;
+use crate::signal::unix::driver::Handle as SignalHandle;
use crate::signal::unix::{signal, Signal, SignalKind};
use mio::event::Source;
@@ -73,9 +74,9 @@ impl fmt::Debug for GlobalOrphanQueue {
}
}
-impl ReapOrphanQueue for GlobalOrphanQueue {
- fn reap_orphans(&self) {
- ORPHAN_QUEUE.reap_orphans()
+impl GlobalOrphanQueue {
+ fn reap_orphans(handle: &SignalHandle) {
+ ORPHAN_QUEUE.reap_orphans(handle)
}
}
diff --git a/src/process/unix/orphan.rs b/src/process/unix/orphan.rs
index 8a1e127..07f0dcf 100644
--- a/src/process/unix/orphan.rs
+++ b/src/process/unix/orphan.rs
@@ -1,6 +1,9 @@
+use crate::loom::sync::{Mutex, MutexGuard};
+use crate::signal::unix::driver::Handle as SignalHandle;
+use crate::signal::unix::{signal_with_handle, SignalKind};
+use crate::sync::watch;
use std::io;
use std::process::ExitStatus;
-use std::sync::Mutex;
/// An interface for waiting on a process to exit.
pub(crate) trait Wait {
@@ -20,21 +23,8 @@ impl<T: Wait> Wait for &mut T {
}
}
-/// An interface for reaping a set of orphaned processes.
-pub(crate) trait ReapOrphanQueue {
- /// Attempts to reap every process in the queue, ignoring any errors and
- /// enqueueing any orphans which have not yet exited.
- fn reap_orphans(&self);
-}
-
-impl<T: ReapOrphanQueue> ReapOrphanQueue for &T {
- fn reap_orphans(&self) {
- (**self).reap_orphans()
- }
-}
-
/// An interface for queueing up an orphaned process so that it can be reaped.
-pub(crate) trait OrphanQueue<T>: ReapOrphanQueue {
+pub(crate) trait OrphanQueue<T> {
/// Adds an orphan to the queue.
fn push_orphan(&self, orphan: T);
}
@@ -48,50 +38,91 @@ impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O {
/// An implementation of `OrphanQueue`.
#[derive(Debug)]
pub(crate) struct OrphanQueueImpl<T> {
+ sigchild: Mutex<Option<watch::Receiver<()>>>,
queue: Mutex<Vec<T>>,
}
impl<T> OrphanQueueImpl<T> {
pub(crate) fn new() -> Self {
Self {
+ sigchild: Mutex::new(None),
queue: Mutex::new(Vec::new()),
}
}
#[cfg(test)]
fn len(&self) -> usize {
- self.queue.lock().unwrap().len()
+ self.queue.lock().len()
}
-}
-impl<T: Wait> OrphanQueue<T> for OrphanQueueImpl<T> {
- fn push_orphan(&self, orphan: T) {
- self.queue.lock().unwrap().push(orphan)
+ pub(crate) fn push_orphan(&self, orphan: T)
+ where
+ T: Wait,
+ {
+ self.queue.lock().push(orphan)
}
-}
-impl<T: Wait> ReapOrphanQueue for OrphanQueueImpl<T> {
- fn reap_orphans(&self) {
- let mut queue = self.queue.lock().unwrap();
- let queue = &mut *queue;
-
- for i in (0..queue.len()).rev() {
- match queue[i].try_wait() {
- Ok(None) => {}
- Ok(Some(_)) | Err(_) => {
- // The stdlib handles interruption errors (EINTR) when polling a child process.
- // All other errors represent invalid inputs or pids that have already been
- // reaped, so we can drop the orphan in case an error is raised.
- queue.swap_remove(i);
+ /// Attempts to reap every process in the queue, ignoring any errors and
+ /// enqueueing any orphans which have not yet exited.
+ pub(crate) fn reap_orphans(&self, handle: &SignalHandle)
+ where
+ T: Wait,
+ {
+ // If someone else is holding the lock, they will be responsible for draining
+ // the queue as necessary, so we can safely bail if that happens
+ if let Some(mut sigchild_guard) = self.sigchild.try_lock() {
+ match &mut *sigchild_guard {
+ Some(sigchild) => {
+ if sigchild.try_has_changed().and_then(Result::ok).is_some() {
+ drain_orphan_queue(self.queue.lock());
+ }
+ }
+ None => {
+ let queue = self.queue.lock();
+
+ // Be lazy and only initialize the SIGCHLD listener if there
+ // are any orphaned processes in the queue.
+ if !queue.is_empty() {
+ // An errors shouldn't really happen here, but if it does it
+ // means that the signal driver isn't running, in
+ // which case there isn't anything we can
+ // register/initialize here, so we can try again later
+ if let Ok(sigchild) = signal_with_handle(SignalKind::child(), &handle) {
+ *sigchild_guard = Some(sigchild);
+ drain_orphan_queue(queue);
+ }
+ }
}
}
}
}
}
+fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>)
+where
+ T: Wait,
+{
+ for i in (0..queue.len()).rev() {
+ match queue[i].try_wait() {
+ Ok(None) => {}
+ Ok(Some(_)) | Err(_) => {
+ // The stdlib handles interruption errors (EINTR) when polling a child process.
+ // All other errors represent invalid inputs or pids that have already been
+ // reaped, so we can drop the orphan in case an error is raised.
+ queue.swap_remove(i);
+ }
+ }
+ }
+
+ drop(queue);
+}
+
#[cfg(all(test, not(loom)))]
pub(crate) mod test {
use super::*;
+ use crate::io::driver::Driver as IoDriver;
+ use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle};
+ use crate::sync::watch;
use std::cell::{Cell, RefCell};
use std::io;
use std::os::unix::process::ExitStatusExt;
@@ -100,14 +131,12 @@ pub(crate) mod test {
pub(crate) struct MockQueue<W> {
pub(crate) all_enqueued: RefCell<Vec<W>>,
- pub(crate) total_reaps: Cell<usize>,
}
impl<W> MockQueue<W> {
pub(crate) fn new() -> Self {
Self {
all_enqueued: RefCell::new(Vec::new()),
- total_reaps: Cell::new(0),
}
}
}
@@ -118,12 +147,6 @@ pub(crate) mod test {
}
}
- impl<W> ReapOrphanQueue for MockQueue<W> {
- fn reap_orphans(&self) {
- self.total_reaps.set(self.total_reaps.get() + 1);
- }
- }
-
struct MockWait {
total_waits: Rc<Cell<usize>>,
num_wait_until_status: usize,
@@ -191,27 +214,107 @@ pub(crate) mod test {
assert_eq!(orphanage.len(), 4);
- orphanage.reap_orphans();
+ drain_orphan_queue(orphanage.queue.lock());
assert_eq!(orphanage.len(), 2);
assert_eq!(first_waits.get(), 1);
assert_eq!(second_waits.get(), 1);
assert_eq!(third_waits.get(), 1);
assert_eq!(fourth_waits.get(), 1);
- orphanage.reap_orphans();
+ drain_orphan_queue(orphanage.queue.lock());
assert_eq!(orphanage.len(), 1);
assert_eq!(first_waits.get(), 1);
assert_eq!(second_waits.get(), 2);
assert_eq!(third_waits.get(), 2);
assert_eq!(fourth_waits.get(), 1);
- orphanage.reap_orphans();
+ drain_orphan_queue(orphanage.queue.lock());
assert_eq!(orphanage.len(), 0);
assert_eq!(first_waits.get(), 1);
assert_eq!(second_waits.get(), 2);
assert_eq!(third_waits.get(), 3);
assert_eq!(fourth_waits.get(), 1);
- orphanage.reap_orphans(); // Safe to reap when empty
+ // Safe to reap when empty
+ drain_orphan_queue(orphanage.queue.lock());
+ }
+
+ #[test]
+ fn no_reap_if_no_signal_received() {
+ let (tx, rx) = watch::channel(());
+
+ let handle = SignalHandle::default();
+
+ let orphanage = OrphanQueueImpl::new();
+ *orphanage.sigchild.lock() = Some(rx);
+
+ let orphan = MockWait::new(2);
+ let waits = orphan.total_waits.clone();
+ orphanage.push_orphan(orphan);
+
+ orphanage.reap_orphans(&handle);
+ assert_eq!(waits.get(), 0);
+
+ orphanage.reap_orphans(&handle);
+ assert_eq!(waits.get(), 0);
+
+ tx.send(()).unwrap();
+ orphanage.reap_orphans(&handle);
+ assert_eq!(waits.get(), 1);
+ }
+
+ #[test]
+ fn no_reap_if_signal_lock_held() {
+ let handle = SignalHandle::default();
+
+ let orphanage = OrphanQueueImpl::new();
+ let signal_guard = orphanage.sigchild.lock();
+
+ let orphan = MockWait::new(2);
+ let waits = orphan.total_waits.clone();
+ orphanage.push_orphan(orphan);
+
+ orphanage.reap_orphans(&handle);
+ assert_eq!(waits.get(), 0);
+
+ drop(signal_guard);
+ }
+
+ #[test]
+ fn does_not_register_signal_if_queue_empty() {
+ let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap();
+ let handle = signal_driver.handle();
+
+ let orphanage = OrphanQueueImpl::new();
+ assert!(orphanage.sigchild.lock().is_none()); // Sanity
+
+ // No register when queue empty
+ orphanage.reap_orphans(&handle);
+ assert!(orphanage.sigchild.lock().is_none());
+
+ let orphan = MockWait::new(2);
+ let waits = orphan.total_waits.clone();
+ orphanage.push_orphan(orphan);
+
+ orphanage.reap_orphans(&handle);
+ assert!(orphanage.sigchild.lock().is_some());
+ assert_eq!(waits.get(), 1); // Eager reap when registering listener
+ }
+
+ #[test]
+ fn does_nothing_if_signal_could_not_be_registered() {
+ let handle = SignalHandle::default();
+
+ let orphanage = OrphanQueueImpl::new();
+ assert!(orphanage.sigchild.lock().is_none());
+
+ let orphan = MockWait::new(2);
+ let waits = orphan.total_waits.clone();
+ orphanage.push_orphan(orphan);
+
+ // Signal handler has "gone away", nothing to register or reap
+ orphanage.reap_orphans(&handle);
+ assert!(orphanage.sigchild.lock().is_none());
+ assert_eq!(waits.get(), 0);
}
}
diff --git a/src/process/unix/reap.rs b/src/process/unix/reap.rs
index 5dc95e5..f7f4d3c 100644
--- a/src/process/unix/reap.rs
+++ b/src/process/unix/reap.rs
@@ -224,7 +224,6 @@ mod test {
assert!(grim.poll_unpin(&mut context).is_pending());
assert_eq!(1, grim.signal.total_polls);
assert_eq!(1, grim.total_waits);
- assert_eq!(0, grim.orphan_queue.total_reaps.get());
assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
// Not yet exited, couldn't register interest the first time
@@ -232,7 +231,6 @@ mod test {
assert!(grim.poll_unpin(&mut context).is_pending());
assert_eq!(3, grim.signal.total_polls);
assert_eq!(3, grim.total_waits);
- assert_eq!(0, grim.orphan_queue.total_reaps.get());
assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
// Exited
@@ -245,7 +243,6 @@ mod test {
}
assert_eq!(4, grim.signal.total_polls);
assert_eq!(4, grim.total_waits);
- assert_eq!(0, grim.orphan_queue.total_reaps.get());
assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
}
@@ -260,7 +257,6 @@ mod test {
grim.kill().unwrap();
assert_eq!(1, grim.total_kills);
- assert_eq!(0, grim.orphan_queue.total_reaps.get());
assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
}
@@ -276,7 +272,6 @@ mod test {
drop(grim);
- assert_eq!(0, queue.total_reaps.get());
assert!(queue.all_enqueued.borrow().is_empty());
}
@@ -294,7 +289,6 @@ mod test {
let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![]));
drop(grim);
- assert_eq!(0, queue.total_reaps.get());
assert_eq!(1, queue.all_enqueued.borrow().len());
}
diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs
index 791e405..5c9b8ed 100644
--- a/src/runtime/blocking/pool.rs
+++ b/src/runtime/blocking/pool.rs
@@ -61,7 +61,7 @@ struct Shared {
/// Prior to shutdown, we clean up JoinHandles by having each timed-out
/// thread join on the previous timed-out thread. This is not strictly
/// necessary but helps avoid Valgrind false positives, see
- /// https://github.com/tokio-rs/tokio/commit/646fbae76535e397ef79dbcaacb945d4c829f666
+ /// <https://github.com/tokio-rs/tokio/commit/646fbae76535e397ef79dbcaacb945d4c829f666>
/// for more information.
last_exiting_thread: Option<thread::JoinHandle<()>>,
/// This holds the JoinHandles for all running threads; on shutdown, the thread
@@ -151,7 +151,7 @@ impl BlockingPool {
self.spawner.inner.condvar.notify_all();
let last_exited_thread = std::mem::take(&mut shared.last_exiting_thread);
- let workers = std::mem::replace(&mut shared.worker_threads, HashMap::new());
+ let workers = std::mem::take(&mut shared.worker_threads);
drop(shared);
diff --git a/src/runtime/driver.rs b/src/runtime/driver.rs
index a0e8e23..7e45977 100644
--- a/src/runtime/driver.rs
+++ b/src/runtime/driver.rs
@@ -23,7 +23,7 @@ cfg_io_driver! {
let io_handle = io_driver.handle();
let (signal_driver, signal_handle) = create_signal_driver(io_driver)?;
- let process_driver = create_process_driver(signal_driver)?;
+ let process_driver = create_process_driver(signal_driver);
(Either::A(process_driver), Some(io_handle), signal_handle)
} else {
@@ -80,7 +80,7 @@ cfg_not_signal_internal! {
cfg_process_driver! {
type ProcessDriver = crate::process::unix::driver::Driver;
- fn create_process_driver(signal_driver: SignalDriver) -> io::Result<ProcessDriver> {
+ fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver {
crate::process::unix::driver::Driver::new(signal_driver)
}
}
@@ -89,8 +89,8 @@ cfg_not_process_driver! {
cfg_io_driver! {
type ProcessDriver = SignalDriver;
- fn create_process_driver(signal_driver: SignalDriver) -> io::Result<ProcessDriver> {
- Ok(signal_driver)
+ fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver {
+ signal_driver
}
}
}
diff --git a/src/runtime/task/core.rs b/src/runtime/task/core.rs
index 9f7ff55..fb6dafd 100644
--- a/src/runtime/task/core.rs
+++ b/src/runtime/task/core.rs
@@ -279,7 +279,7 @@ impl<T: Future> CoreStage<T> {
// Safety:: the caller ensures mutal exclusion to the field.
match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) {
Stage::Finished(output) => output,
- _ => panic!("unexpected task state"),
+ _ => panic!("JoinHandle polled after completion"),
}
})
}
diff --git a/src/signal/unix.rs b/src/signal/unix.rs
index cb1d1cc..f96b2f4 100644
--- a/src/signal/unix.rs
+++ b/src/signal/unix.rs
@@ -9,7 +9,6 @@ use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storag
use crate::signal::RxFuture;
use crate::sync::watch;
-use libc::c_int;
use mio::net::UnixStream;
use std::io::{self, Error, ErrorKind, Write};
use std::pin::Pin;
@@ -61,7 +60,7 @@ impl Init for OsExtraData {
/// Represents the specific kind of signal to listen for.
#[derive(Debug, Clone, Copy)]
-pub struct SignalKind(c_int);
+pub struct SignalKind(libc::c_int);
impl SignalKind {
/// Allows for listening to any valid OS signal.
@@ -74,8 +73,14 @@ impl SignalKind {
/// // let signum = libc::OS_SPECIFIC_SIGNAL;
/// let kind = SignalKind::from_raw(signum);
/// ```
- pub fn from_raw(signum: c_int) -> Self {
- Self(signum)
+ // Use `std::os::raw::c_int` on public API to prevent leaking a non-stable
+ // type alias from libc.
+ // `libc::c_int` and `std::os::raw::c_int` are currently the same type, and are
+ // unlikely to change to other types, but technically libc can change this
+ // in the future minor version.
+ // See https://github.com/tokio-rs/tokio/issues/3767 for more.
+ pub fn from_raw(signum: std::os::raw::c_int) -> Self {
+ Self(signum as libc::c_int)
}
/// Represents the SIGALRM signal.
@@ -208,7 +213,7 @@ impl Default for SignalInfo {
/// 2. Wake up the driver by writing a byte to a pipe
///
/// Those two operations should both be async-signal safe.
-fn action(globals: Pin<&'static Globals>, signal: c_int) {
+fn action(globals: Pin<&'static Globals>, signal: libc::c_int) {
globals.record_event(signal as EventId);
// Send a wakeup, ignore any errors (anything reasonably possible is
@@ -222,7 +227,7 @@ fn action(globals: Pin<&'static Globals>, signal: c_int) {
///
/// This will register the signal handler if it hasn't already been registered,
/// returning any error along the way if that fails.
-fn signal_enable(signal: SignalKind, handle: Handle) -> io::Result<()> {
+fn signal_enable(signal: SignalKind, handle: &Handle) -> io::Result<()> {
let signal = signal.0;
if signal < 0 || signal_hook_registry::FORBIDDEN.contains(&signal) {
return Err(Error::new(
@@ -352,7 +357,7 @@ pub struct Signal {
/// * If the signal is one of
/// [`signal_hook::FORBIDDEN`](fn@signal_hook_registry::register#panics)
pub fn signal(kind: SignalKind) -> io::Result<Signal> {
- let rx = signal_with_handle(kind, Handle::current())?;
+ let rx = signal_with_handle(kind, &Handle::current())?;
Ok(Signal {
inner: RxFuture::new(rx),
@@ -361,7 +366,7 @@ pub fn signal(kind: SignalKind) -> io::Result<Signal> {
pub(crate) fn signal_with_handle(
kind: SignalKind,
- handle: Handle,
+ handle: &Handle,
) -> io::Result<watch::Receiver<()>> {
// Turn the signal delivery on once we are ready for it
signal_enable(kind, handle)?;
@@ -457,14 +462,14 @@ mod tests {
#[test]
fn signal_enable_error_on_invalid_input() {
- signal_enable(SignalKind::from_raw(-1), Handle::default()).unwrap_err();
+ signal_enable(SignalKind::from_raw(-1), &Handle::default()).unwrap_err();
}
#[test]
fn signal_enable_error_on_forbidden_input() {
signal_enable(
SignalKind::from_raw(signal_hook_registry::FORBIDDEN[0]),
- Handle::default(),
+ &Handle::default(),
)
.unwrap_err();
}
diff --git a/src/sync/barrier.rs b/src/sync/barrier.rs
index a8b291f..e3c95f6 100644
--- a/src/sync/barrier.rs
+++ b/src/sync/barrier.rs
@@ -2,7 +2,7 @@ use crate::sync::watch;
use std::sync::Mutex;
-/// A barrier enables multiple threads to synchronize the beginning of some computation.
+/// A barrier enables multiple tasks to synchronize the beginning of some computation.
///
/// ```
/// # #[tokio::main]
@@ -52,10 +52,10 @@ struct BarrierState {
}
impl Barrier {
- /// Creates a new barrier that can block a given number of threads.
+ /// Creates a new barrier that can block a given number of tasks.
///
- /// A barrier will block `n`-1 threads which call [`Barrier::wait`] and then wake up all
- /// threads at once when the `n`th thread calls `wait`.
+ /// A barrier will block `n`-1 tasks which call [`Barrier::wait`] and then wake up all
+ /// tasks at once when the `n`th task calls `wait`.
pub fn new(mut n: usize) -> Barrier {
let (waker, wait) = crate::sync::watch::channel(0);
@@ -79,11 +79,11 @@ impl Barrier {
/// Does not resolve until all tasks have rendezvoused here.
///
- /// Barriers are re-usable after all threads have rendezvoused once, and can
+ /// Barriers are re-usable after all tasks have rendezvoused once, and can
/// be used continuously.
///
/// A single (arbitrary) future will receive a [`BarrierWaitResult`] that returns `true` from
- /// [`BarrierWaitResult::is_leader`] when returning from this function, and all other threads
+ /// [`BarrierWaitResult::is_leader`] when returning from this function, and all other tasks
/// will receive a result that will return `false` from `is_leader`.
pub async fn wait(&self) -> BarrierWaitResult {
// NOTE: we are taking a _synchronous_ lock here.
@@ -129,14 +129,14 @@ impl Barrier {
}
}
-/// A `BarrierWaitResult` is returned by `wait` when all threads in the `Barrier` have rendezvoused.
+/// A `BarrierWaitResult` is returned by `wait` when all tasks in the `Barrier` have rendezvoused.
#[derive(Debug, Clone)]
pub struct BarrierWaitResult(bool);
impl BarrierWaitResult {
- /// Returns `true` if this thread from wait is the "leader thread".
+ /// Returns `true` if this task from wait is the "leader task".
///
- /// Only one thread will have `true` returned from their result, all other threads will have
+ /// Only one task will have `true` returned from their result, all other tasks will have
/// `false` returned.
pub fn is_leader(&self) -> bool {
self.0
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index d89a9dd..5f97c1a 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -436,7 +436,7 @@ cfg_sync! {
pub mod mpsc;
mod mutex;
- pub use mutex::{Mutex, MutexGuard, TryLockError, OwnedMutexGuard};
+ pub use mutex::{Mutex, MutexGuard, TryLockError, OwnedMutexGuard, MappedMutexGuard};
pub(crate) mod notify;
pub use notify::Notify;
diff --git a/src/sync/mpsc/bounded.rs b/src/sync/mpsc/bounded.rs
index 1f670bf..ce857d7 100644
--- a/src/sync/mpsc/bounded.rs
+++ b/src/sync/mpsc/bounded.rs
@@ -33,6 +33,22 @@ pub struct Permit<'a, T> {
chan: &'a chan::Tx<T, Semaphore>,
}
+/// Owned permit to send one value into the channel.
+///
+/// This is identical to the [`Permit`] type, except that it moves the sender
+/// rather than borrowing it.
+///
+/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
+/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
+/// before generating a message to send.
+///
+/// [`Permit`]: Permit
+/// [`Sender::reserve_owned()`]: Sender::reserve_owned
+/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
+pub struct OwnedPermit<T> {
+ chan: Option<chan::Tx<T, Semaphore>>,
+}
+
/// Receive values from the associated `Sender`.
///
/// Instances are created by the [`channel`](channel) function.
@@ -229,10 +245,11 @@ impl<T> Receiver<T> {
///
/// To guarantee that no messages are dropped, after calling `close()`,
/// `recv()` must be called until `None` is returned. If there are
- /// outstanding [`Permit`] values, the `recv` method will not return `None`
- /// until those are released.
+ /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
+ /// not return `None` until those are released.
///
/// [`Permit`]: Permit
+ /// [`OwnedPermit`]: OwnedPermit
///
/// # Examples
///
@@ -624,12 +641,96 @@ impl<T> Sender<T> {
/// }
/// ```
pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
+ self.reserve_inner().await?;
+ Ok(Permit { chan: &self.chan })
+ }
+
+ /// Wait for channel capacity, moving the `Sender` and returning an owned
+ /// permit. Once capacity to send one message is available, it is reserved
+ /// for the caller.
+ ///
+ /// This moves the sender _by value_, and returns an owned permit that can
+ /// be used to send a message into the channel. Unlike [`Sender::reserve`],
+ /// this method may be used in cases where the permit must be valid for the
+ /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
+ /// essentially a reference count increment, comparable to [`Arc::clone`]),
+ /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
+ /// moved, it can be cloned prior to calling `reserve_owned`.
+ ///
+ /// If the channel is full, the function waits for the number of unreceived
+ /// messages to become less than the channel capacity. Capacity to send one
+ /// message is reserved for the caller. An [`OwnedPermit`] is returned to
+ /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
+ /// consumes the reserved capacity.
+ ///
+ /// Dropping the [`OwnedPermit`] without sending a message releases the
+ /// capacity back to the channel.
+ ///
+ /// # Examples
+ /// Sending a message using an [`OwnedPermit`]:
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Reserve capacity, moving the sender.
+ /// let permit = tx.reserve_owned().await.unwrap();
+ ///
+ /// // Send a message, consuming the permit and returning
+ /// // the moved sender.
+ /// let tx = permit.send(123);
+ ///
+ /// // The value sent on the permit is received.
+ /// assert_eq!(rx.recv().await.unwrap(), 123);
+ ///
+ /// // The sender can now be used again.
+ /// tx.send(456).await.unwrap();
+ /// }
+ /// ```
+ ///
+ /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
+ /// by value, it can be inexpensively cloned before calling `reserve_owned`:
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Clone the sender and reserve capacity.
+ /// let permit = tx.clone().reserve_owned().await.unwrap();
+ ///
+ /// // Trying to send directly on the `tx` will fail due to no
+ /// // available capacity.
+ /// assert!(tx.try_send(123).is_err());
+ ///
+ /// // Sending on the permit succeeds.
+ /// permit.send(456);
+ ///
+ /// // The value sent on the permit is received
+ /// assert_eq!(rx.recv().await.unwrap(), 456);
+ /// }
+ /// ```
+ ///
+ /// [`Sender::reserve`]: Sender::reserve
+ /// [`OwnedPermit`]: OwnedPermit
+ /// [`send`]: OwnedPermit::send
+ /// [`Arc::clone`]: std::sync::Arc::clone
+ pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
+ self.reserve_inner().await?;
+ Ok(OwnedPermit {
+ chan: Some(self.chan),
+ })
+ }
+
+ async fn reserve_inner(&self) -> Result<(), SendError<()>> {
match self.chan.semaphore().0.acquire(1).await {
- Ok(_) => {}
- Err(_) => return Err(SendError(())),
+ Ok(_) => Ok(()),
+ Err(_) => Err(SendError(())),
}
-
- Ok(Permit { chan: &self.chan })
}
/// Try to acquire a slot in the channel without waiting for the slot to become
@@ -684,6 +785,72 @@ impl<T> Sender<T> {
Ok(Permit { chan: &self.chan })
}
+ /// Try to acquire a slot in the channel without waiting for the slot to become
+ /// available, returning an owned permit.
+ ///
+ /// This moves the sender _by value_, and returns an owned permit that can
+ /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
+ /// this method may be used in cases where the permit must be valid for the
+ /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
+ /// essentially a reference count increment, comparable to [`Arc::clone`]),
+ /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
+ /// moved, it can be cloned prior to calling `try_reserve_owned`.
+ ///
+ /// If the channel is full this function will return a [`TrySendError`].
+ /// Since the sender is taken by value, the `TrySendError` returned in this
+ /// case contains the sender, so that it may be used again. Otherwise, if
+ /// there is a slot available, this method will return an [`OwnedPermit`]
+ /// that can then be used to [`send`] on the channel with a guaranteed slot.
+ /// This function is similar to [`reserve_owned`] except it does not await
+ /// for the slot to become available.
+ ///
+ /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
+ /// to the channel.
+ ///
+ /// [`OwnedPermit`]: OwnedPermit
+ /// [`send`]: OwnedPermit::send
+ /// [`reserve_owned`]: Sender::reserve_owned
+ /// [`Arc::clone`]: std::sync::Arc::clone
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Reserve capacity
+ /// let permit = tx.clone().try_reserve_owned().unwrap();
+ ///
+ /// // Trying to send directly on the `tx` will fail due to no
+ /// // available capacity.
+ /// assert!(tx.try_send(123).is_err());
+ ///
+ /// // Trying to reserve an additional slot on the `tx` will
+ /// // fail because there is no capacity.
+ /// assert!(tx.try_reserve().is_err());
+ ///
+ /// // Sending on the permit succeeds
+ /// permit.send(456);
+ ///
+ /// // The value sent on the permit is received
+ /// assert_eq!(rx.recv().await.unwrap(), 456);
+ ///
+ /// }
+ /// ```
+ pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
+ match self.chan.semaphore().0.try_acquire(1) {
+ Ok(_) => {}
+ Err(_) => return Err(TrySendError::Full(self)),
+ }
+
+ Ok(OwnedPermit {
+ chan: Some(self.chan),
+ })
+ }
+
/// Returns `true` if senders belong to the same channel.
///
/// # Examples
@@ -804,6 +971,8 @@ impl<T> Drop for Permit<'_, T> {
// Add the permit back to the semaphore
semaphore.add_permit();
+ // If this is the last sender for this channel, wake the receiver so
+ // that it can be notified that the channel is closed.
if semaphore.is_closed() && semaphore.is_idle() {
self.chan.wake_rx();
}
@@ -817,3 +986,123 @@ impl<T> fmt::Debug for Permit<'_, T> {
.finish()
}
}
+
+// ===== impl Permit =====
+
+impl<T> OwnedPermit<T> {
+ /// Sends a value using the reserved capacity.
+ ///
+ /// Capacity for the message has already been reserved. The message is sent
+ /// to the receiver and the permit is consumed. The operation will succeed
+ /// even if the receiver half has been closed. See [`Receiver::close`] for
+ /// more details on performing a clean shutdown.
+ ///
+ /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
+ /// the `OwnedPermit` was reserved.
+ ///
+ /// [`Receiver::close`]: Receiver::close
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Reserve capacity
+ /// let permit = tx.reserve_owned().await.unwrap();
+ ///
+ /// // Send a message on the permit, returning the sender.
+ /// let tx = permit.send(456);
+ ///
+ /// // The value sent on the permit is received
+ /// assert_eq!(rx.recv().await.unwrap(), 456);
+ ///
+ /// // We may now reuse `tx` to send another message.
+ /// tx.send(789).await.unwrap();
+ /// }
+ /// ```
+ pub fn send(mut self, value: T) -> Sender<T> {
+ let chan = self.chan.take().unwrap_or_else(|| {
+ unreachable!("OwnedPermit channel is only taken when the permit is moved")
+ });
+ chan.send(value);
+
+ Sender { chan }
+ }
+
+ /// Release the reserved capacity *without* sending a message, returning the
+ /// [`Sender`].
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, rx) = mpsc::channel(1);
+ ///
+ /// // Clone the sender and reserve capacity
+ /// let permit = tx.clone().reserve_owned().await.unwrap();
+ ///
+ /// // Trying to send on the original `tx` will fail, since the `permit`
+ /// // has reserved all the available capacity.
+ /// assert!(tx.try_send(123).is_err());
+ ///
+ /// // Release the permit without sending a message, returning the clone
+ /// // of the sender.
+ /// let tx2 = permit.release();
+ ///
+ /// // We may now reuse `tx` to send another message.
+ /// tx.send(789).await.unwrap();
+ /// # drop(rx); drop(tx2);
+ /// }
+ /// ```
+ ///
+ /// [`Sender`]: Sender
+ pub fn release(mut self) -> Sender<T> {
+ use chan::Semaphore;
+
+ let chan = self.chan.take().unwrap_or_else(|| {
+ unreachable!("OwnedPermit channel is only taken when the permit is moved")
+ });
+
+ // Add the permit back to the semaphore
+ chan.semaphore().add_permit();
+ Sender { chan }
+ }
+}
+
+impl<T> Drop for OwnedPermit<T> {
+ fn drop(&mut self) {
+ use chan::Semaphore;
+
+ // Are we still holding onto the sender?
+ if let Some(chan) = self.chan.take() {
+ let semaphore = chan.semaphore();
+
+ // Add the permit back to the semaphore
+ semaphore.add_permit();
+
+ // If this `OwnedPermit` is holding the last sender for this
+ // channel, wake the receiver so that it can be notified that the
+ // channel is closed.
+ if semaphore.is_closed() && semaphore.is_idle() {
+ chan.wake_rx();
+ }
+ }
+
+ // Otherwise, do nothing.
+ }
+}
+
+impl<T> fmt::Debug for OwnedPermit<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("OwnedPermit")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
diff --git a/src/sync/mpsc/mod.rs b/src/sync/mpsc/mod.rs
index e7033f6..879e3dc 100644
--- a/src/sync/mpsc/mod.rs
+++ b/src/sync/mpsc/mod.rs
@@ -73,7 +73,7 @@
pub(super) mod block;
mod bounded;
-pub use self::bounded::{channel, Permit, Receiver, Sender};
+pub use self::bounded::{channel, OwnedPermit, Permit, Receiver, Sender};
mod chan;
diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs
index 0a118e7..9fd7c91 100644
--- a/src/sync/mutex.rs
+++ b/src/sync/mutex.rs
@@ -4,9 +4,9 @@ use crate::sync::batch_semaphore as semaphore;
use std::cell::UnsafeCell;
use std::error::Error;
-use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
+use std::{fmt, marker, mem};
/// An asynchronous `Mutex`-like type.
///
@@ -160,6 +160,19 @@ pub struct OwnedMutexGuard<T: ?Sized> {
lock: Arc<Mutex<T>>,
}
+/// A handle to a held `Mutex` that has had a function applied to it via [`MutexGuard::map`].
+///
+/// This can be used to hold a subfield of the protected data.
+///
+/// [`MutexGuard::map`]: method@MutexGuard::map
+#[must_use = "if unused the Mutex will immediately unlock"]
+pub struct MappedMutexGuard<'a, T: ?Sized> {
+ s: &'a semaphore::Semaphore,
+ data: *mut T,
+ // Needed to tell the borrow checker that we are holding a `&mut T`
+ marker: marker::PhantomData<&'a mut T>,
+}
+
// As long as T: Send, it's fine to send and share Mutex<T> between threads.
// If T was not Send, sending and sharing a Mutex<T> would be bad, since you can
// access T through Mutex<T>.
@@ -167,6 +180,8 @@ unsafe impl<T> Send for Mutex<T> where T: ?Sized + Send {}
unsafe impl<T> Sync for Mutex<T> where T: ?Sized + Send {}
unsafe impl<T> Sync for MutexGuard<'_, T> where T: ?Sized + Send + Sync {}
unsafe impl<T> Sync for OwnedMutexGuard<T> where T: ?Sized + Send + Sync {}
+unsafe impl<'a, T> Sync for MappedMutexGuard<'a, T> where T: ?Sized + Sync + 'a {}
+unsafe impl<'a, T> Send for MappedMutexGuard<'a, T> where T: ?Sized + Send + 'a {}
/// Error returned from the [`Mutex::try_lock`], [`RwLock::try_read`] and
/// [`RwLock::try_write`] functions.
@@ -451,6 +466,103 @@ where
// === impl MutexGuard ===
+impl<'a, T: ?Sized> MutexGuard<'a, T> {
+ /// Makes a new [`MappedMutexGuard`] for a component of the locked data.
+ ///
+ /// This operation cannot fail as the [`MutexGuard`] passed in already locked the mutex.
+ ///
+ /// This is an associated function that needs to be used as `MutexGuard::map(...)`. A method
+ /// would interfere with methods of the same name on the contents of the locked data.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::{Mutex, MutexGuard};
+ ///
+ /// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
+ /// struct Foo(u32);
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// let foo = Mutex::new(Foo(1));
+ ///
+ /// {
+ /// let mut mapped = MutexGuard::map(foo.lock().await, |f| &mut f.0);
+ /// *mapped = 2;
+ /// }
+ ///
+ /// assert_eq!(Foo(2), *foo.lock().await);
+ /// # }
+ /// ```
+ ///
+ /// [`MutexGuard`]: struct@MutexGuard
+ /// [`MappedMutexGuard`]: struct@MappedMutexGuard
+ #[inline]
+ pub fn map<U, F>(mut this: Self, f: F) -> MappedMutexGuard<'a, U>
+ where
+ F: FnOnce(&mut T) -> &mut U,
+ {
+ let data = f(&mut *this) as *mut U;
+ let s = &this.lock.s;
+ mem::forget(this);
+ MappedMutexGuard {
+ s,
+ data,
+ marker: marker::PhantomData,
+ }
+ }
+
+ /// Attempts to make a new [`MappedMutexGuard`] for a component of the locked data. The
+ /// original guard is returned if the closure returns `None`.
+ ///
+ /// This operation cannot fail as the [`MutexGuard`] passed in already locked the mutex.
+ ///
+ /// This is an associated function that needs to be used as `MutexGuard::try_map(...)`. A
+ /// method would interfere with methods of the same name on the contents of the locked data.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::{Mutex, MutexGuard};
+ ///
+ /// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
+ /// struct Foo(u32);
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// let foo = Mutex::new(Foo(1));
+ ///
+ /// {
+ /// let mut mapped = MutexGuard::try_map(foo.lock().await, |f| Some(&mut f.0))
+ /// .expect("should not fail");
+ /// *mapped = 2;
+ /// }
+ ///
+ /// assert_eq!(Foo(2), *foo.lock().await);
+ /// # }
+ /// ```
+ ///
+ /// [`MutexGuard`]: struct@MutexGuard
+ /// [`MappedMutexGuard`]: struct@MappedMutexGuard
+ #[inline]
+ pub fn try_map<U, F>(mut this: Self, f: F) -> Result<MappedMutexGuard<'a, U>, Self>
+ where
+ F: FnOnce(&mut T) -> Option<&mut U>,
+ {
+ let data = match f(&mut *this) {
+ Some(data) => data as *mut U,
+ None => return Err(this),
+ };
+ let s = &this.lock.s;
+ mem::forget(this);
+ Ok(MappedMutexGuard {
+ s,
+ data,
+ marker: marker::PhantomData,
+ })
+ }
+}
+
impl<T: ?Sized> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
self.lock.s.release(1)
@@ -514,3 +626,88 @@ impl<T: ?Sized + fmt::Display> fmt::Display for OwnedMutexGuard<T> {
fmt::Display::fmt(&**self, f)
}
}
+
+// === impl MappedMutexGuard ===
+
+impl<'a, T: ?Sized> MappedMutexGuard<'a, T> {
+ /// Makes a new [`MappedMutexGuard`] for a component of the locked data.
+ ///
+ /// This operation cannot fail as the [`MappedMutexGuard`] passed in already locked the mutex.
+ ///
+ /// This is an associated function that needs to be used as `MappedMutexGuard::map(...)`. A
+ /// method would interfere with methods of the same name on the contents of the locked data.
+ ///
+ /// [`MappedMutexGuard`]: struct@MappedMutexGuard
+ #[inline]
+ pub fn map<U, F>(mut this: Self, f: F) -> MappedMutexGuard<'a, U>
+ where
+ F: FnOnce(&mut T) -> &mut U,
+ {
+ let data = f(&mut *this) as *mut U;
+ let s = this.s;
+ mem::forget(this);
+ MappedMutexGuard {
+ s,
+ data,
+ marker: marker::PhantomData,
+ }
+ }
+
+ /// Attempts to make a new [`MappedMutexGuard`] for a component of the locked data. The
+ /// original guard is returned if the closure returns `None`.
+ ///
+ /// This operation cannot fail as the [`MappedMutexGuard`] passed in already locked the mutex.
+ ///
+ /// This is an associated function that needs to be used as `MappedMutexGuard::try_map(...)`. A
+ /// method would interfere with methods of the same name on the contents of the locked data.
+ ///
+ /// [`MappedMutexGuard`]: struct@MappedMutexGuard
+ #[inline]
+ pub fn try_map<U, F>(mut this: Self, f: F) -> Result<MappedMutexGuard<'a, U>, Self>
+ where
+ F: FnOnce(&mut T) -> Option<&mut U>,
+ {
+ let data = match f(&mut *this) {
+ Some(data) => data as *mut U,
+ None => return Err(this),
+ };
+ let s = this.s;
+ mem::forget(this);
+ Ok(MappedMutexGuard {
+ s,
+ data,
+ marker: marker::PhantomData,
+ })
+ }
+}
+
+impl<'a, T: ?Sized> Drop for MappedMutexGuard<'a, T> {
+ fn drop(&mut self) {
+ self.s.release(1)
+ }
+}
+
+impl<'a, T: ?Sized> Deref for MappedMutexGuard<'a, T> {
+ type Target = T;
+ fn deref(&self) -> &Self::Target {
+ unsafe { &*self.data }
+ }
+}
+
+impl<'a, T: ?Sized> DerefMut for MappedMutexGuard<'a, T> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ unsafe { &mut *self.data }
+ }
+}
+
+impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for MappedMutexGuard<'a, T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Debug::fmt(&**self, f)
+ }
+}
+
+impl<'a, T: ?Sized + fmt::Display> fmt::Display for MappedMutexGuard<'a, T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Display::fmt(&**self, f)
+ }
+}
diff --git a/src/sync/notify.rs b/src/sync/notify.rs
index 2d30da9..5d2132f 100644
--- a/src/sync/notify.rs
+++ b/src/sync/notify.rs
@@ -192,6 +192,10 @@ fn inc_num_notify_waiters_calls(data: usize) -> usize {
data + (1 << NOTIFY_WAITERS_SHIFT)
}
+fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) {
+ data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst);
+}
+
impl Notify {
/// Create a new `Notify`, initialized without a permit.
///
@@ -394,11 +398,9 @@ impl Notify {
let curr = self.state.load(SeqCst);
if let EMPTY | NOTIFIED = get_state(curr) {
- // There are no waiting tasks. In this case, no synchronization is
- // established between `notify` and `notified().await`.
- // All we need to do is increment the number of times this
- // method was called.
- self.state.store(inc_num_notify_waiters_calls(curr), SeqCst);
+ // There are no waiting tasks. All we need to do is increment the
+ // number of times this method was called.
+ atomic_inc_num_notify_waiters_calls(&self.state);
return;
}
diff --git a/src/sync/tests/loom_notify.rs b/src/sync/tests/loom_notify.rs
index 4be949a..d484a75 100644
--- a/src/sync/tests/loom_notify.rs
+++ b/src/sync/tests/loom_notify.rs
@@ -33,12 +33,41 @@ fn notify_waiters() {
tx.notify_waiters();
});
- th.join().unwrap();
-
block_on(async {
notified1.await;
notified2.await;
});
+
+ th.join().unwrap();
+ });
+}
+
+#[test]
+fn notify_waiters_and_one() {
+ loom::model(|| {
+ let notify = Arc::new(Notify::new());
+ let tx1 = notify.clone();
+ let tx2 = notify.clone();
+
+ let th1 = thread::spawn(move || {
+ tx1.notify_waiters();
+ });
+
+ let th2 = thread::spawn(move || {
+ tx2.notify_one();
+ });
+
+ let th3 = thread::spawn(move || {
+ let notified = notify.notified();
+
+ block_on(async {
+ notified.await;
+ });
+ });
+
+ th1.join().unwrap();
+ th2.join().unwrap();
+ th3.join().unwrap();
});
}
diff --git a/src/sync/watch.rs b/src/sync/watch.rs
index bf6f0ac..db65e5a 100644
--- a/src/sync/watch.rs
+++ b/src/sync/watch.rs
@@ -205,7 +205,7 @@ impl<T> Receiver<T> {
// not memory access.
shared.ref_count_rx.fetch_add(1, Relaxed);
- Self { version, shared }
+ Self { shared, version }
}
/// Returns a reference to the most recently sent value
diff --git a/src/task/blocking.rs b/src/task/blocking.rs
index 28bbcdb..e4fe254 100644
--- a/src/task/blocking.rs
+++ b/src/task/blocking.rs
@@ -5,19 +5,24 @@ cfg_rt_multi_thread! {
/// blocking the executor.
///
/// In general, issuing a blocking call or performing a lot of compute in a
- /// future without yielding is not okay, as it may prevent the executor from
- /// driving other futures forward. This function runs the closure on the
- /// current thread by having the thread temporarily cease from being a core
- /// thread, and turns it into a blocking thread. See the [CPU-bound tasks
- /// and blocking code][blocking] section for more information.
- ///
- /// Although this function avoids starving other independently spawned
- /// tasks, any other code running concurrently in the same task will be
- /// suspended during the call to `block_in_place`. This can happen e.g. when
- /// using the [`join!`] macro. To avoid this issue, use [`spawn_blocking`]
- /// instead.
- ///
- /// Note that this function can only be used when using the `multi_thread` runtime.
+ /// future without yielding is problematic, as it may prevent the executor
+ /// from driving other tasks forward. Calling this function informs the
+ /// executor that the currently executing task is about to block the thread,
+ /// so the executor is able to hand off any other tasks it has to a new
+ /// worker thread before that happens. See the [CPU-bound tasks and blocking
+ /// code][blocking] section for more information.
+ ///
+ /// Be aware that although this function avoids starving other independently
+ /// spawned tasks, any other code running concurrently in the same task will
+ /// be suspended during the call to `block_in_place`. This can happen e.g.
+ /// when using the [`join!`] macro. To avoid this issue, use
+ /// [`spawn_blocking`] instead of `block_in_place`.
+ ///
+ /// Note that this function cannot be used within a [`current_thread`] runtime
+ /// because in this case there are no other worker threads to hand off tasks
+ /// to. On the other hand, calling the function outside a runtime is
+ /// allowed. In this case, `block_in_place` just calls the provided closure
+ /// normally.
///
/// Code running behind `block_in_place` cannot be cancelled. When you shut
/// down the executor, it will wait indefinitely for all blocking operations
@@ -43,6 +48,28 @@ cfg_rt_multi_thread! {
/// });
/// # }
/// ```
+ ///
+ /// Code running inside `block_in_place` may use `block_on` to reenter the
+ /// async context.
+ ///
+ /// ```
+ /// use tokio::task;
+ /// use tokio::runtime::Handle;
+ ///
+ /// # async fn docs() {
+ /// task::block_in_place(move || {
+ /// Handle::current().block_on(async move {
+ /// // do something async
+ /// });
+ /// });
+ /// # }
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// This function panics if called from a [`current_thread`] runtime.
+ ///
+ /// [`current_thread`]: fn@crate::runtime::Builder::new_current_thread
pub fn block_in_place<F, R>(f: F) -> R
where
F: FnOnce() -> R,
diff --git a/src/task/mod.rs b/src/task/mod.rs
index abae818..7255535 100644
--- a/src/task/mod.rs
+++ b/src/task/mod.rs
@@ -122,6 +122,11 @@
//! Instead, Tokio provides two APIs for running blocking operations in an
//! asynchronous context: [`task::spawn_blocking`] and [`task::block_in_place`].
//!
+//! Be aware that if you call a non-async method from async code, that non-async
+//! method is still inside the asynchronous context, so you should also avoid
+//! blocking operations there. This includes destructors of objects destroyed in
+//! async code.
+//!
//! #### spawn_blocking
//!
//! The `task::spawn_blocking` function is similar to the `task::spawn` function
diff --git a/src/task/unconstrained.rs b/src/task/unconstrained.rs
index 4a62f81..31c732b 100644
--- a/src/task/unconstrained.rs
+++ b/src/task/unconstrained.rs
@@ -5,6 +5,7 @@ use std::task::{Context, Poll};
pin_project! {
/// Future for the [`unconstrained`](unconstrained) method.
+ #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
#[must_use = "Unconstrained does nothing unless polled"]
pub struct Unconstrained<F> {
#[pin]
@@ -38,6 +39,7 @@ where
/// otherwise.
///
/// See also the usage example in the [task module](index.html#unconstrained).
+#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
pub fn unconstrained<F>(inner: F) -> Unconstrained<F> {
Unconstrained { inner }
}
diff --git a/src/time/clock.rs b/src/time/clock.rs
index 8957800..c5ef86b 100644
--- a/src/time/clock.rs
+++ b/src/time/clock.rs
@@ -120,22 +120,11 @@ cfg_test_util! {
/// Panics if time is not frozen or if called from outside of the Tokio
/// runtime.
pub async fn advance(duration: Duration) {
- use crate::future::poll_fn;
- use std::task::Poll;
-
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
+ let until = clock.now() + duration;
clock.advance(duration);
- let mut yielded = false;
- poll_fn(|cx| {
- if yielded {
- Poll::Ready(())
- } else {
- yielded = true;
- cx.waker().wake_by_ref();
- Poll::Pending
- }
- }).await;
+ crate::time::sleep_until(until).await;
}
/// Return the current instant, factoring in frozen time.
diff --git a/src/time/driver/entry.rs b/src/time/driver/entry.rs
index e630fa8..08edab3 100644
--- a/src/time/driver/entry.rs
+++ b/src/time/driver/entry.rs
@@ -85,7 +85,7 @@ const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE;
/// requires only the driver lock.
pub(super) struct StateCell {
/// Holds either the scheduled expiration time for this timer, or (if the
- /// timer has been fired and is unregistered), [`u64::max_value()`].
+ /// timer has been fired and is unregistered), `u64::max_value()`.
state: AtomicU64,
/// If the timer is fired (an Acquire order read on state shows
/// `u64::max_value()`), holds the result that should be returned from
diff --git a/src/time/driver/handle.rs b/src/time/driver/handle.rs
index 9a05a54..77b4358 100644
--- a/src/time/driver/handle.rs
+++ b/src/time/driver/handle.rs
@@ -76,7 +76,7 @@ cfg_not_rt! {
/// lazy, and so outside executed inside the runtime successfully without
/// panicking.
pub(crate) fn current() -> Self {
- panic!(crate::util::error::CONTEXT_MISSING_ERROR)
+ panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
}
}
}
diff --git a/src/time/interval.rs b/src/time/interval.rs
index 20cfcec..4b1c6f6 100644
--- a/src/time/interval.rs
+++ b/src/time/interval.rs
@@ -176,4 +176,9 @@ impl Interval {
// Return the current instant
Poll::Ready(now)
}
+
+ /// Returns the period of the interval.
+ pub fn period(&self) -> Duration {
+ self.period
+ }
}
diff --git a/src/util/linked_list.rs b/src/util/linked_list.rs
index dd00e14..480ea09 100644
--- a/src/util/linked_list.rs
+++ b/src/util/linked_list.rs
@@ -77,7 +77,7 @@ pub(crate) struct Pointers<T> {
/// #[repr(C)].
///
/// See this link for more information:
-/// https://github.com/rust-lang/rust/pull/82834
+/// <https://github.com/rust-lang/rust/pull/82834>
#[repr(C)]
struct PointersInner<T> {
/// The previous node in the list. null if there is no previous node.
@@ -93,7 +93,7 @@ struct PointersInner<T> {
next: Option<NonNull<T>>,
/// This type is !Unpin due to the heuristic from:
- /// https://github.com/rust-lang/rust/pull/82834
+ /// <https://github.com/rust-lang/rust/pull/82834>
_pin: PhantomPinned,
}
diff --git a/src/util/rand.rs b/src/util/rand.rs
index 5660103..17b3ec1 100644
--- a/src/util/rand.rs
+++ b/src/util/rand.rs
@@ -3,10 +3,10 @@ use std::cell::Cell;
/// Fast random number generate
///
/// Implement xorshift64+: 2 32-bit xorshift sequences added together.
-/// Shift triplet [17,7,16] was calculated as indicated in Marsaglia's
-/// Xorshift paper: https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf
+/// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's
+/// Xorshift paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf>
/// This generator passes the SmallCrush suite, part of TestU01 framework:
-/// http://simul.iro.umontreal.ca/testu01/tu01.html
+/// <http://simul.iro.umontreal.ca/testu01/tu01.html>
#[derive(Debug)]
pub(crate) struct FastRand {
one: Cell<u32>,
diff --git a/src/util/slab.rs b/src/util/slab.rs
index efc72e1..2ddaa6c 100644
--- a/src/util/slab.rs
+++ b/src/util/slab.rs
@@ -296,7 +296,7 @@ impl<T> Slab<T> {
// Remove the slots vector from the page. This is done so that the
// freeing process is done outside of the lock's critical section.
- let vec = mem::replace(&mut slots.slots, vec![]);
+ let vec = mem::take(&mut slots.slots);
slots.head = 0;
// Drop the lock so we can drop the vector outside the lock below.
diff --git a/tests/fs_file.rs b/tests/fs_file.rs
index bf2f1d7..1f4760e 100644
--- a/tests/fs_file.rs
+++ b/tests/fs_file.rs
@@ -22,6 +22,27 @@ async fn basic_read() {
assert_eq!(n, HELLO.len());
assert_eq!(&buf[..n], HELLO);
+
+ // Drop the data from the cache to stimulate uncached codepath on Linux (see preadv2 in
+ // file.rs)
+ #[cfg(target_os = "linux")]
+ {
+ use std::os::unix::io::AsRawFd;
+ nix::unistd::fsync(tempfile.as_raw_fd()).unwrap();
+ nix::fcntl::posix_fadvise(
+ tempfile.as_raw_fd(),
+ 0,
+ 0,
+ nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED,
+ )
+ .unwrap();
+ }
+
+ let mut file = File::open(tempfile.path()).await.unwrap();
+ let n = file.read(&mut buf).await.unwrap();
+
+ assert_eq!(n, HELLO.len());
+ assert_eq!(&buf[..n], HELLO);
}
#[tokio::test]
diff --git a/tests/io_buf_reader.rs b/tests/io_buf_reader.rs
new file mode 100644
index 0000000..ac5f11c
--- /dev/null
+++ b/tests/io_buf_reader.rs
@@ -0,0 +1,362 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+// https://github.com/rust-lang/futures-rs/blob/1803948ff091b4eabf7f3bf39e16bbbdefca5cc8/futures/tests/io_buf_reader.rs
+
+use futures::task::{noop_waker_ref, Context, Poll};
+use std::cmp;
+use std::io::{self, Cursor};
+use std::pin::Pin;
+use tokio::io::{
+ AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, BufReader,
+ ReadBuf, SeekFrom,
+};
+
+macro_rules! run_fill_buf {
+ ($reader:expr) => {{
+ let mut cx = Context::from_waker(noop_waker_ref());
+ loop {
+ if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
+ break x;
+ }
+ }
+ }};
+}
+
+struct MaybePending<'a> {
+ inner: &'a [u8],
+ ready_read: bool,
+ ready_fill_buf: bool,
+}
+
+impl<'a> MaybePending<'a> {
+ fn new(inner: &'a [u8]) -> Self {
+ Self {
+ inner,
+ ready_read: false,
+ ready_fill_buf: false,
+ }
+ }
+}
+
+impl AsyncRead for MaybePending<'_> {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ if self.ready_read {
+ self.ready_read = false;
+ Pin::new(&mut self.inner).poll_read(cx, buf)
+ } else {
+ self.ready_read = true;
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+}
+
+impl AsyncBufRead for MaybePending<'_> {
+ fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ if self.ready_fill_buf {
+ self.ready_fill_buf = false;
+ if self.inner.is_empty() {
+ return Poll::Ready(Ok(&[]));
+ }
+ let len = cmp::min(2, self.inner.len());
+ Poll::Ready(Ok(&self.inner[0..len]))
+ } else {
+ self.ready_fill_buf = true;
+ Poll::Pending
+ }
+ }
+
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ self.inner = &self.inner[amt..];
+ }
+}
+
+#[tokio::test]
+async fn test_buffered_reader() {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let mut reader = BufReader::with_capacity(2, inner);
+
+ let mut buf = [0, 0, 0];
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 3);
+ assert_eq!(buf, [5, 6, 7]);
+ assert_eq!(reader.buffer(), []);
+
+ let mut buf = [0, 0];
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 2);
+ assert_eq!(buf, [0, 1]);
+ assert_eq!(reader.buffer(), []);
+
+ let mut buf = [0];
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 1);
+ assert_eq!(buf, [2]);
+ assert_eq!(reader.buffer(), [3]);
+
+ let mut buf = [0, 0, 0];
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 1);
+ assert_eq!(buf, [3, 0, 0]);
+ assert_eq!(reader.buffer(), []);
+
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 1);
+ assert_eq!(buf, [4, 0, 0]);
+ assert_eq!(reader.buffer(), []);
+
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+}
+
+#[tokio::test]
+async fn test_buffered_reader_seek() {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let mut reader = BufReader::with_capacity(2, Cursor::new(inner));
+
+ assert_eq!(reader.seek(SeekFrom::Start(3)).await.unwrap(), 3);
+ assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]);
+ assert!(reader
+ .seek(SeekFrom::Current(i64::min_value()))
+ .await
+ .is_err());
+ assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]);
+ assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4);
+ assert_eq!(run_fill_buf!(reader).unwrap(), &[1, 2][..]);
+ Pin::new(&mut reader).consume(1);
+ assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3);
+}
+
+#[tokio::test]
+async fn test_buffered_reader_seek_underflow() {
+ // gimmick reader that yields its position modulo 256 for each byte
+ struct PositionReader {
+ pos: u64,
+ }
+ impl AsyncRead for PositionReader {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ let b = buf.initialize_unfilled();
+ let len = b.len();
+ for x in b {
+ *x = self.pos as u8;
+ self.pos = self.pos.wrapping_add(1);
+ }
+ buf.advance(len);
+ Poll::Ready(Ok(()))
+ }
+ }
+ impl AsyncSeek for PositionReader {
+ fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
+ match pos {
+ SeekFrom::Start(n) => {
+ self.pos = n;
+ }
+ SeekFrom::Current(n) => {
+ self.pos = self.pos.wrapping_add(n as u64);
+ }
+ SeekFrom::End(n) => {
+ self.pos = u64::max_value().wrapping_add(n as u64);
+ }
+ }
+ Ok(())
+ }
+ fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<u64>> {
+ Poll::Ready(Ok(self.pos))
+ }
+ }
+
+ let mut reader = BufReader::with_capacity(5, PositionReader { pos: 0 });
+ assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1, 2, 3, 4][..]);
+ assert_eq!(
+ reader.seek(SeekFrom::End(-5)).await.unwrap(),
+ u64::max_value() - 5
+ );
+ assert_eq!(run_fill_buf!(reader).unwrap().len(), 5);
+ // the following seek will require two underlying seeks
+ let expected = 9_223_372_036_854_775_802;
+ assert_eq!(
+ reader
+ .seek(SeekFrom::Current(i64::min_value()))
+ .await
+ .unwrap(),
+ expected
+ );
+ assert_eq!(run_fill_buf!(reader).unwrap().len(), 5);
+ // seeking to 0 should empty the buffer.
+ assert_eq!(reader.seek(SeekFrom::Current(0)).await.unwrap(), expected);
+ assert_eq!(reader.get_ref().pos, expected);
+}
+
+#[tokio::test]
+async fn test_short_reads() {
+ /// A dummy reader intended at testing short-reads propagation.
+ struct ShortReader {
+ lengths: Vec<usize>,
+ }
+
+ impl AsyncRead for ShortReader {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ if !self.lengths.is_empty() {
+ buf.advance(self.lengths.remove(0));
+ }
+ Poll::Ready(Ok(()))
+ }
+ }
+
+ let inner = ShortReader {
+ lengths: vec![0, 1, 2, 0, 1, 0],
+ };
+ let mut reader = BufReader::new(inner);
+ let mut buf = [0, 0];
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 2);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+}
+
+#[tokio::test]
+async fn maybe_pending() {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let mut reader = BufReader::with_capacity(2, MaybePending::new(inner));
+
+ let mut buf = [0, 0, 0];
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 3);
+ assert_eq!(buf, [5, 6, 7]);
+ assert_eq!(reader.buffer(), []);
+
+ let mut buf = [0, 0];
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 2);
+ assert_eq!(buf, [0, 1]);
+ assert_eq!(reader.buffer(), []);
+
+ let mut buf = [0];
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 1);
+ assert_eq!(buf, [2]);
+ assert_eq!(reader.buffer(), [3]);
+
+ let mut buf = [0, 0, 0];
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 1);
+ assert_eq!(buf, [3, 0, 0]);
+ assert_eq!(reader.buffer(), []);
+
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 1);
+ assert_eq!(buf, [4, 0, 0]);
+ assert_eq!(reader.buffer(), []);
+
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+}
+
+#[tokio::test]
+async fn maybe_pending_buf_read() {
+ let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]);
+ let mut reader = BufReader::with_capacity(2, inner);
+ let mut v = Vec::new();
+ reader.read_until(3, &mut v).await.unwrap();
+ assert_eq!(v, [0, 1, 2, 3]);
+ v.clear();
+ reader.read_until(1, &mut v).await.unwrap();
+ assert_eq!(v, [1]);
+ v.clear();
+ reader.read_until(8, &mut v).await.unwrap();
+ assert_eq!(v, [0]);
+ v.clear();
+ reader.read_until(9, &mut v).await.unwrap();
+ assert_eq!(v, []);
+}
+
+// https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309
+#[tokio::test]
+async fn maybe_pending_seek() {
+ struct MaybePendingSeek<'a> {
+ inner: Cursor<&'a [u8]>,
+ ready: bool,
+ seek_res: Option<io::Result<()>>,
+ }
+
+ impl<'a> MaybePendingSeek<'a> {
+ fn new(inner: &'a [u8]) -> Self {
+ Self {
+ inner: Cursor::new(inner),
+ ready: true,
+ seek_res: None,
+ }
+ }
+ }
+
+ impl AsyncRead for MaybePendingSeek<'_> {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_read(cx, buf)
+ }
+ }
+
+ impl AsyncBufRead for MaybePendingSeek<'_> {
+ fn poll_fill_buf(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<&[u8]>> {
+ let this: *mut Self = &mut *self as *mut _;
+ Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx)
+ }
+
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ Pin::new(&mut self.inner).consume(amt)
+ }
+ }
+
+ impl AsyncSeek for MaybePendingSeek<'_> {
+ fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
+ self.seek_res = Some(Pin::new(&mut self.inner).start_seek(pos));
+ Ok(())
+ }
+ fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
+ if self.ready {
+ self.ready = false;
+ self.seek_res.take().unwrap_or(Ok(()))?;
+ Pin::new(&mut self.inner).poll_complete(cx)
+ } else {
+ self.ready = true;
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+ }
+
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
+
+ assert_eq!(reader.seek(SeekFrom::Current(3)).await.unwrap(), 3);
+ assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]);
+ assert!(reader
+ .seek(SeekFrom::Current(i64::min_value()))
+ .await
+ .is_err());
+ assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]);
+ assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4);
+ assert_eq!(run_fill_buf!(reader).unwrap(), &[1, 2][..]);
+ Pin::new(&mut reader).consume(1);
+ assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3);
+}
diff --git a/tests/io_buf_writer.rs b/tests/io_buf_writer.rs
new file mode 100644
index 0000000..6f4f10a
--- /dev/null
+++ b/tests/io_buf_writer.rs
@@ -0,0 +1,251 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+// https://github.com/rust-lang/futures-rs/blob/1803948ff091b4eabf7f3bf39e16bbbdefca5cc8/futures/tests/io_buf_writer.rs
+
+use futures::task::{Context, Poll};
+use std::io::{self, Cursor};
+use std::pin::Pin;
+use tokio::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, SeekFrom};
+
+struct MaybePending {
+ inner: Vec<u8>,
+ ready: bool,
+}
+
+impl MaybePending {
+ fn new(inner: Vec<u8>) -> Self {
+ Self {
+ inner,
+ ready: false,
+ }
+ }
+}
+
+impl AsyncWrite for MaybePending {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ if self.ready {
+ self.ready = false;
+ Pin::new(&mut self.inner).poll_write(cx, buf)
+ } else {
+ self.ready = true;
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_flush(cx)
+ }
+
+ fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_shutdown(cx)
+ }
+}
+
+#[tokio::test]
+async fn buf_writer() {
+ let mut writer = BufWriter::with_capacity(2, Vec::new());
+
+ writer.write(&[0, 1]).await.unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(*writer.get_ref(), [0, 1]);
+
+ writer.write(&[2]).await.unwrap();
+ assert_eq!(writer.buffer(), [2]);
+ assert_eq!(*writer.get_ref(), [0, 1]);
+
+ writer.write(&[3]).await.unwrap();
+ assert_eq!(writer.buffer(), [2, 3]);
+ assert_eq!(*writer.get_ref(), [0, 1]);
+
+ writer.flush().await.unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3]);
+
+ writer.write(&[4]).await.unwrap();
+ writer.write(&[5]).await.unwrap();
+ assert_eq!(writer.buffer(), [4, 5]);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3]);
+
+ writer.write(&[6]).await.unwrap();
+ assert_eq!(writer.buffer(), [6]);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5]);
+
+ writer.write(&[7, 8]).await.unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8]);
+
+ writer.write(&[9, 10, 11]).await.unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
+
+ writer.flush().await.unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
+}
+
+#[tokio::test]
+async fn buf_writer_inner_flushes() {
+ let mut w = BufWriter::with_capacity(3, Vec::new());
+ w.write(&[0, 1]).await.unwrap();
+ assert_eq!(*w.get_ref(), []);
+ w.flush().await.unwrap();
+ let w = w.into_inner();
+ assert_eq!(w, [0, 1]);
+}
+
+#[tokio::test]
+async fn buf_writer_seek() {
+ let mut w = BufWriter::with_capacity(3, Cursor::new(Vec::new()));
+ w.write_all(&[0, 1, 2, 3, 4, 5]).await.unwrap();
+ w.write_all(&[6, 7]).await.unwrap();
+ assert_eq!(w.seek(SeekFrom::Current(0)).await.unwrap(), 8);
+ assert_eq!(&w.get_ref().get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]);
+ assert_eq!(w.seek(SeekFrom::Start(2)).await.unwrap(), 2);
+ w.write_all(&[8, 9]).await.unwrap();
+ w.flush().await.unwrap();
+ assert_eq!(&w.into_inner().into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]);
+}
+
+#[tokio::test]
+async fn maybe_pending_buf_writer() {
+ let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new()));
+
+ writer.write(&[0, 1]).await.unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(&writer.get_ref().inner, &[0, 1]);
+
+ writer.write(&[2]).await.unwrap();
+ assert_eq!(writer.buffer(), [2]);
+ assert_eq!(&writer.get_ref().inner, &[0, 1]);
+
+ writer.write(&[3]).await.unwrap();
+ assert_eq!(writer.buffer(), [2, 3]);
+ assert_eq!(&writer.get_ref().inner, &[0, 1]);
+
+ writer.flush().await.unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]);
+
+ writer.write(&[4]).await.unwrap();
+ writer.write(&[5]).await.unwrap();
+ assert_eq!(writer.buffer(), [4, 5]);
+ assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]);
+
+ writer.write(&[6]).await.unwrap();
+ assert_eq!(writer.buffer(), [6]);
+ assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5]);
+
+ writer.write(&[7, 8]).await.unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8]);
+
+ writer.write(&[9, 10, 11]).await.unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(
+ writer.get_ref().inner,
+ &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
+ );
+
+ writer.flush().await.unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(
+ &writer.get_ref().inner,
+ &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
+ );
+}
+
+#[tokio::test]
+async fn maybe_pending_buf_writer_inner_flushes() {
+ let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new()));
+ w.write(&[0, 1]).await.unwrap();
+ assert_eq!(&w.get_ref().inner, &[]);
+ w.flush().await.unwrap();
+ let w = w.into_inner().inner;
+ assert_eq!(w, [0, 1]);
+}
+
+#[tokio::test]
+async fn maybe_pending_buf_writer_seek() {
+ struct MaybePendingSeek {
+ inner: Cursor<Vec<u8>>,
+ ready_write: bool,
+ ready_seek: bool,
+ seek_res: Option<io::Result<()>>,
+ }
+
+ impl MaybePendingSeek {
+ fn new(inner: Vec<u8>) -> Self {
+ Self {
+ inner: Cursor::new(inner),
+ ready_write: false,
+ ready_seek: false,
+ seek_res: None,
+ }
+ }
+ }
+
+ impl AsyncWrite for MaybePendingSeek {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ if self.ready_write {
+ self.ready_write = false;
+ Pin::new(&mut self.inner).poll_write(cx, buf)
+ } else {
+ self.ready_write = true;
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_flush(cx)
+ }
+
+ fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_shutdown(cx)
+ }
+ }
+
+ impl AsyncSeek for MaybePendingSeek {
+ fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
+ self.seek_res = Some(Pin::new(&mut self.inner).start_seek(pos));
+ Ok(())
+ }
+ fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
+ if self.ready_seek {
+ self.ready_seek = false;
+ self.seek_res.take().unwrap_or(Ok(()))?;
+ Pin::new(&mut self.inner).poll_complete(cx)
+ } else {
+ self.ready_seek = true;
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+ }
+
+ let mut w = BufWriter::with_capacity(3, MaybePendingSeek::new(Vec::new()));
+ w.write_all(&[0, 1, 2, 3, 4, 5]).await.unwrap();
+ w.write_all(&[6, 7]).await.unwrap();
+ assert_eq!(w.seek(SeekFrom::Current(0)).await.unwrap(), 8);
+ assert_eq!(
+ &w.get_ref().inner.get_ref()[..],
+ &[0, 1, 2, 3, 4, 5, 6, 7][..]
+ );
+ assert_eq!(w.seek(SeekFrom::Start(2)).await.unwrap(), 2);
+ w.write_all(&[8, 9]).await.unwrap();
+ w.flush().await.unwrap();
+ assert_eq!(
+ &w.into_inner().inner.into_inner()[..],
+ &[0, 1, 8, 9, 4, 5, 6, 7]
+ );
+}
diff --git a/tests/io_mem_stream.rs b/tests/io_mem_stream.rs
index 3335214..520391a 100644
--- a/tests/io_mem_stream.rs
+++ b/tests/io_mem_stream.rs
@@ -63,6 +63,26 @@ async fn disconnect() {
}
#[tokio::test]
+#[cfg(not(target_os = "android"))]
+async fn disconnect_reader() {
+ let (a, mut b) = duplex(2);
+
+ let t1 = tokio::spawn(async move {
+ // this will block, as not all data fits into duplex
+ b.write_all(b"ping").await.unwrap_err();
+ });
+
+ let t2 = tokio::spawn(async move {
+ // here we drop the reader side, and we expect the writer in the other
+ // task to exit with an error
+ drop(a);
+ });
+
+ t2.await.unwrap();
+ t1.await.unwrap();
+}
+
+#[tokio::test]
async fn max_write_size() {
let (mut a, mut b) = duplex(32);
@@ -73,11 +93,11 @@ async fn max_write_size() {
assert_eq!(n, 4);
});
- let t2 = tokio::spawn(async move {
- let mut buf = [0u8; 4];
- b.read_exact(&mut buf).await.unwrap();
- });
+ let mut buf = [0u8; 4];
+ b.read_exact(&mut buf).await.unwrap();
t1.await.unwrap();
- t2.await.unwrap();
+
+ // drop b only after task t1 finishes writing
+ drop(b);
}
diff --git a/tests/io_write_all_buf.rs b/tests/io_write_all_buf.rs
new file mode 100644
index 0000000..b49a58e
--- /dev/null
+++ b/tests/io_write_all_buf.rs
@@ -0,0 +1,96 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+use tokio::io::{AsyncWrite, AsyncWriteExt};
+use tokio_test::{assert_err, assert_ok};
+
+use bytes::{Buf, Bytes, BytesMut};
+use std::cmp;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+#[tokio::test]
+async fn write_all_buf() {
+ struct Wr {
+ buf: BytesMut,
+ cnt: usize,
+ }
+
+ impl AsyncWrite for Wr {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ let n = cmp::min(4, buf.len());
+ dbg!(buf);
+ let buf = &buf[0..n];
+
+ self.cnt += 1;
+ self.buf.extend(buf);
+ Ok(buf.len()).into()
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Ok(()).into()
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Ok(()).into()
+ }
+ }
+
+ let mut wr = Wr {
+ buf: BytesMut::with_capacity(64),
+ cnt: 0,
+ };
+
+ let mut buf = Bytes::from_static(b"hello").chain(Bytes::from_static(b"world"));
+
+ assert_ok!(wr.write_all_buf(&mut buf).await);
+ assert_eq!(wr.buf, b"helloworld"[..]);
+ // expect 4 writes, [hell],[o],[worl],[d]
+ assert_eq!(wr.cnt, 4);
+ assert_eq!(buf.has_remaining(), false);
+}
+
+#[tokio::test]
+async fn write_buf_err() {
+ /// Error out after writing the first 4 bytes
+ struct Wr {
+ cnt: usize,
+ }
+
+ impl AsyncWrite for Wr {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ _buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.cnt += 1;
+ if self.cnt == 2 {
+ return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "whoops")));
+ }
+ Poll::Ready(Ok(4))
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Ok(()).into()
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Ok(()).into()
+ }
+ }
+
+ let mut wr = Wr { cnt: 0 };
+
+ let mut buf = Bytes::from_static(b"hello").chain(Bytes::from_static(b"world"));
+
+ assert_err!(wr.write_all_buf(&mut buf).await);
+ assert_eq!(
+ buf.copy_to_bytes(buf.remaining()),
+ Bytes::from_static(b"oworld")
+ );
+}
diff --git a/tests/macros_select.rs b/tests/macros_select.rs
index ea06d51..07c2b81 100644
--- a/tests/macros_select.rs
+++ b/tests/macros_select.rs
@@ -359,9 +359,6 @@ async fn join_with_select() {
async fn use_future_in_if_condition() {
use tokio::time::{self, Duration};
- let sleep = time::sleep(Duration::from_millis(50));
- tokio::pin!(sleep);
-
tokio::select! {
_ = time::sleep(Duration::from_millis(50)), if false => {
panic!("if condition ignored")
diff --git a/tests/macros_test.rs b/tests/macros_test.rs
index 8396398..f5bc5a0 100644
--- a/tests/macros_test.rs
+++ b/tests/macros_test.rs
@@ -21,7 +21,20 @@ async fn test_macro_is_resilient_to_shadowing() {
// https://github.com/tokio-rs/tokio/issues/3403
#[rustfmt::skip] // this `rustfmt::skip` is necessary because unused_braces does not warn if the block contains newline.
#[tokio::main]
-async fn unused_braces_main() { println!("hello") }
+pub async fn unused_braces_main() { println!("hello") }
#[rustfmt::skip] // this `rustfmt::skip` is necessary because unused_braces does not warn if the block contains newline.
#[tokio::test]
async fn unused_braces_test() { assert_eq!(1 + 1, 2) }
+
+// https://github.com/tokio-rs/tokio/pull/3766#issuecomment-835508651
+#[std::prelude::v1::test]
+fn trait_method() {
+ trait A {
+ fn f(self);
+ }
+ impl A for () {
+ #[tokio::main]
+ async fn f(self) {}
+ }
+ ().f()
+}
diff --git a/tests/rt_common.rs b/tests/rt_common.rs
index cb1d0f6..e5fc7a9 100644
--- a/tests/rt_common.rs
+++ b/tests/rt_common.rs
@@ -647,6 +647,7 @@ rt_test! {
}
#[test]
+ #[cfg(not(target_os = "android"))]
fn panic_in_task() {
let rt = rt();
let (tx, rx) = oneshot::channel();
diff --git a/tests/task_blocking.rs b/tests/task_blocking.rs
index 82bef8a..d9514d2 100644
--- a/tests/task_blocking.rs
+++ b/tests/task_blocking.rs
@@ -114,6 +114,7 @@ fn can_enter_basic_rt_from_within_block_in_place() {
}
#[test]
+#[cfg(not(target_os = "android"))]
fn useful_panic_message_when_dropping_rt_in_rt() {
use std::panic::{catch_unwind, AssertUnwindSafe};
diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs
index e34c2bb..0b5d12a 100644
--- a/tests/tcp_stream.rs
+++ b/tests/tcp_stream.rs
@@ -55,7 +55,7 @@ async fn try_read_write() {
tokio::task::yield_now().await;
}
- // Fill the write buffer
+ // Fill the write buffer using non-vectored I/O
loop {
// Still ready
let mut writable = task::spawn(client.writable());
@@ -75,7 +75,7 @@ async fn try_read_write() {
let mut writable = task::spawn(client.writable());
assert_pending!(writable.poll());
- // Drain the socket from the server end
+ // Drain the socket from the server end using non-vectored I/O
let mut read = vec![0; written.len()];
let mut i = 0;
@@ -92,6 +92,51 @@ async fn try_read_write() {
assert_eq!(read, written);
}
+ written.clear();
+ client.writable().await.unwrap();
+
+ // Fill the write buffer using vectored I/O
+ let data_bufs: Vec<_> = DATA.chunks(10).map(io::IoSlice::new).collect();
+ loop {
+ // Still ready
+ let mut writable = task::spawn(client.writable());
+ assert_ready_ok!(writable.poll());
+
+ match client.try_write_vectored(&data_bufs) {
+ Ok(n) => written.extend(&DATA[..n]),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ break;
+ }
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ {
+ // Write buffer full
+ let mut writable = task::spawn(client.writable());
+ assert_pending!(writable.poll());
+
+ // Drain the socket from the server end using vectored I/O
+ let mut read = vec![0; written.len()];
+ let mut i = 0;
+
+ while i < read.len() {
+ server.readable().await.unwrap();
+
+ let mut bufs: Vec<_> = read[i..]
+ .chunks_mut(0x10000)
+ .map(io::IoSliceMut::new)
+ .collect();
+ match server.try_read_vectored(&mut bufs) {
+ Ok(n) => i += n,
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ assert_eq!(read, written);
+ }
+
// Now, we listen for shutdown
drop(client);
diff --git a/tests/time_pause.rs b/tests/time_pause.rs
index bc84ac5..d1834af 100644
--- a/tests/time_pause.rs
+++ b/tests/time_pause.rs
@@ -3,8 +3,14 @@
use rand::SeedableRng;
use rand::{rngs::StdRng, Rng};
-use tokio::time::{self, Duration, Instant};
-use tokio_test::assert_err;
+use tokio::time::{self, Duration, Instant, Sleep};
+use tokio_test::{assert_elapsed, assert_err, assert_pending, assert_ready_eq, task};
+
+use std::{
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
#[tokio::test]
async fn pause_time_in_main() {
@@ -57,3 +63,162 @@ async fn paused_time_stress_run() -> Vec<Duration> {
times
}
+
+#[tokio::test(start_paused = true)]
+async fn advance_after_poll() {
+ time::sleep(ms(1)).await;
+
+ let start = Instant::now();
+
+ let mut sleep = task::spawn(time::sleep_until(start + ms(300)));
+
+ assert_pending!(sleep.poll());
+
+ let before = Instant::now();
+ time::advance(ms(100)).await;
+ assert_elapsed!(before, ms(100));
+
+ assert_pending!(sleep.poll());
+}
+
+#[tokio::test(start_paused = true)]
+async fn sleep_no_poll() {
+ let start = Instant::now();
+
+ // TODO: Skip this
+ time::advance(ms(1)).await;
+
+ let mut sleep = task::spawn(time::sleep_until(start + ms(300)));
+
+ let before = Instant::now();
+ time::advance(ms(100)).await;
+ assert_elapsed!(before, ms(100));
+
+ assert_pending!(sleep.poll());
+}
+
+enum State {
+ Begin,
+ AwaitingAdvance(Pin<Box<dyn Future<Output = ()>>>),
+ AfterAdvance,
+}
+
+struct Tester {
+ sleep: Pin<Box<Sleep>>,
+ state: State,
+ before: Option<Instant>,
+ poll: bool,
+}
+
+impl Future for Tester {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match &mut self.state {
+ State::Begin => {
+ if self.poll {
+ assert_pending!(self.sleep.as_mut().poll(cx));
+ }
+ self.before = Some(Instant::now());
+ let advance_fut = Box::pin(time::advance(ms(100)));
+ self.state = State::AwaitingAdvance(advance_fut);
+ self.poll(cx)
+ }
+ State::AwaitingAdvance(ref mut advance_fut) => match advance_fut.as_mut().poll(cx) {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(()) => {
+ self.state = State::AfterAdvance;
+ self.poll(cx)
+ }
+ },
+ State::AfterAdvance => {
+ assert_elapsed!(self.before.unwrap(), ms(100));
+
+ assert_pending!(self.sleep.as_mut().poll(cx));
+
+ Poll::Ready(())
+ }
+ }
+ }
+}
+
+#[tokio::test(start_paused = true)]
+async fn sleep_same_task() {
+ let start = Instant::now();
+
+ // TODO: Skip this
+ time::advance(ms(1)).await;
+
+ let sleep = Box::pin(time::sleep_until(start + ms(300)));
+
+ Tester {
+ sleep,
+ state: State::Begin,
+ before: None,
+ poll: true,
+ }
+ .await;
+}
+
+#[tokio::test(start_paused = true)]
+async fn sleep_same_task_no_poll() {
+ let start = Instant::now();
+
+ // TODO: Skip this
+ time::advance(ms(1)).await;
+
+ let sleep = Box::pin(time::sleep_until(start + ms(300)));
+
+ Tester {
+ sleep,
+ state: State::Begin,
+ before: None,
+ poll: false,
+ }
+ .await;
+}
+
+#[tokio::test(start_paused = true)]
+async fn interval() {
+ let start = Instant::now();
+
+ // TODO: Skip this
+ time::advance(ms(1)).await;
+
+ let mut i = task::spawn(time::interval_at(start, ms(300)));
+
+ assert_ready_eq!(poll_next(&mut i), start);
+ assert_pending!(poll_next(&mut i));
+
+ let before = Instant::now();
+ time::advance(ms(100)).await;
+ assert_elapsed!(before, ms(100));
+ assert_pending!(poll_next(&mut i));
+
+ let before = Instant::now();
+ time::advance(ms(200)).await;
+ assert_elapsed!(before, ms(200));
+ assert_ready_eq!(poll_next(&mut i), start + ms(300));
+ assert_pending!(poll_next(&mut i));
+
+ let before = Instant::now();
+ time::advance(ms(400)).await;
+ assert_elapsed!(before, ms(400));
+ assert_ready_eq!(poll_next(&mut i), start + ms(600));
+ assert_pending!(poll_next(&mut i));
+
+ let before = Instant::now();
+ time::advance(ms(500)).await;
+ assert_elapsed!(before, ms(500));
+ assert_ready_eq!(poll_next(&mut i), start + ms(900));
+ assert_ready_eq!(poll_next(&mut i), start + ms(1200));
+ assert_pending!(poll_next(&mut i));
+}
+
+fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> {
+ interval.enter(|cx, mut interval| interval.poll_tick(cx))
+}
+
+fn ms(n: u64) -> Duration {
+ Duration::from_millis(n)
+}
diff --git a/tests/time_sleep.rs b/tests/time_sleep.rs
index 2736258..9c04d22 100644
--- a/tests/time_sleep.rs
+++ b/tests/time_sleep.rs
@@ -7,22 +7,7 @@ use std::task::Context;
use futures::task::noop_waker_ref;
use tokio::time::{self, Duration, Instant};
-use tokio_test::{assert_pending, assert_ready, task};
-
-macro_rules! assert_elapsed {
- ($now:expr, $ms:expr) => {{
- let elapsed = $now.elapsed();
- let lower = ms($ms);
-
- // Handles ms rounding
- assert!(
- elapsed >= lower && elapsed <= lower + ms(1),
- "actual = {:?}, expected = {:?}",
- elapsed,
- lower
- );
- }};
-}
+use tokio_test::{assert_elapsed, assert_pending, assert_ready, task};
#[tokio::test]
async fn immediate_sleep() {
@@ -32,7 +17,7 @@ async fn immediate_sleep() {
// Ready!
time::sleep_until(now).await;
- assert_elapsed!(now, 0);
+ assert_elapsed!(now, ms(1));
}
#[tokio::test]
@@ -60,10 +45,11 @@ async fn delayed_sleep_level_0() {
for &i in &[1, 10, 60] {
let now = Instant::now();
+ let dur = ms(i);
- time::sleep_until(now + ms(i)).await;
+ time::sleep_until(now + dur).await;
- assert_elapsed!(now, i);
+ assert_elapsed!(now, dur);
}
}
@@ -77,7 +63,7 @@ async fn sub_ms_delayed_sleep() {
time::sleep_until(deadline).await;
- assert_elapsed!(now, 1);
+ assert_elapsed!(now, ms(1));
}
}
@@ -90,7 +76,7 @@ async fn delayed_sleep_wrapping_level_0() {
let now = Instant::now();
time::sleep_until(now + ms(60)).await;
- assert_elapsed!(now, 60);
+ assert_elapsed!(now, ms(60));
}
#[tokio::test]
@@ -107,7 +93,7 @@ async fn reset_future_sleep_before_fire() {
sleep.as_mut().reset(Instant::now() + ms(200));
sleep.await;
- assert_elapsed!(now, 200);
+ assert_elapsed!(now, ms(200));
}
#[tokio::test]
@@ -124,7 +110,7 @@ async fn reset_past_sleep_before_turn() {
sleep.as_mut().reset(now + ms(80));
sleep.await;
- assert_elapsed!(now, 80);
+ assert_elapsed!(now, ms(80));
}
#[tokio::test]
@@ -143,7 +129,7 @@ async fn reset_past_sleep_before_fire() {
sleep.as_mut().reset(now + ms(80));
sleep.await;
- assert_elapsed!(now, 80);
+ assert_elapsed!(now, ms(80));
}
#[tokio::test]
@@ -154,11 +140,11 @@ async fn reset_future_sleep_after_fire() {
let mut sleep = Box::pin(time::sleep_until(now + ms(100)));
sleep.as_mut().await;
- assert_elapsed!(now, 100);
+ assert_elapsed!(now, ms(100));
sleep.as_mut().reset(now + ms(110));
sleep.await;
- assert_elapsed!(now, 110);
+ assert_elapsed!(now, ms(110));
}
#[tokio::test]
diff --git a/tests/uds_stream.rs b/tests/uds_stream.rs
index c528620..2754e84 100644
--- a/tests/uds_stream.rs
+++ b/tests/uds_stream.rs
@@ -90,7 +90,7 @@ async fn try_read_write() -> std::io::Result<()> {
tokio::task::yield_now().await;
}
- // Fill the write buffer
+ // Fill the write buffer using non-vectored I/O
loop {
// Still ready
let mut writable = task::spawn(client.writable());
@@ -110,7 +110,7 @@ async fn try_read_write() -> std::io::Result<()> {
let mut writable = task::spawn(client.writable());
assert_pending!(writable.poll());
- // Drain the socket from the server end
+ // Drain the socket from the server end using non-vectored I/O
let mut read = vec![0; written.len()];
let mut i = 0;
@@ -127,6 +127,51 @@ async fn try_read_write() -> std::io::Result<()> {
assert_eq!(read, written);
}
+ written.clear();
+ client.writable().await.unwrap();
+
+ // Fill the write buffer using vectored I/O
+ let msg_bufs: Vec<_> = msg.chunks(3).map(io::IoSlice::new).collect();
+ loop {
+ // Still ready
+ let mut writable = task::spawn(client.writable());
+ assert_ready_ok!(writable.poll());
+
+ match client.try_write_vectored(&msg_bufs) {
+ Ok(n) => written.extend(&msg[..n]),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ break;
+ }
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ {
+ // Write buffer full
+ let mut writable = task::spawn(client.writable());
+ assert_pending!(writable.poll());
+
+ // Drain the socket from the server end using vectored I/O
+ let mut read = vec![0; written.len()];
+ let mut i = 0;
+
+ while i < read.len() {
+ server.readable().await?;
+
+ let mut bufs: Vec<_> = read[i..]
+ .chunks_mut(0x10000)
+ .map(io::IoSliceMut::new)
+ .collect();
+ match server.try_read_vectored(&mut bufs) {
+ Ok(n) => i += n,
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ assert_eq!(read, written);
+ }
+
// Now, we listen for shutdown
drop(client);