summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Geisler <mgeisler@google.com>2024-04-09 20:15:33 +0200
committerMartin Geisler <mgeisler@google.com>2024-04-09 20:52:10 +0200
commite1020e5c50ebac3e39b4ec0a42be20959ad40b88 (patch)
tree97accf9565802ec9fb0d7a12b6e920d83bf03e22
parent1f81bb10fd38b377c9934d1bc429868ba28f50f0 (diff)
downloadoneshot-uniffi-upstream.tar.gz
Import 'oneshot-uniffi' crateupstream
Request Document: go/android-rust-importing-crates For CL Reviewers: go/android3p#cl-review For Build Team: go/ab-third-party-imports Bug: http://b/330717829 Test: m liboneshot_uniffi Change-Id: I934958fc68736c85a39b309165a4660a88bb0026
-rw-r--r--.cargo_vcs_info.json6
-rw-r--r--.github/workflows/build-and-test.yml40
-rw-r--r--.github/workflows/style-sanity.yml55
-rw-r--r--.gitignore2
-rw-r--r--CHANGELOG.md69
-rw-r--r--Cargo.toml64
l---------LICENSE1
-rw-r--r--LICENSE-APACHE201
-rw-r--r--LICENSE-MIT19
-rw-r--r--METADATA21
-rw-r--r--MODULE_LICENSE_APACHE20
-rw-r--r--OWNERS2
-rw-r--r--README.md94
-rw-r--r--benches/benches.rs122
-rw-r--r--cargo_embargo.json8
-rwxr-xr-xcheck_mem_leaks.sh13
-rw-r--r--examples/recv_before_send.rs18
-rw-r--r--examples/recv_before_send_then_drop_sender.rs18
-rw-r--r--examples/recv_ref_before_send.rs18
-rw-r--r--examples/recv_ref_before_send_then_drop_sender.rs18
-rw-r--r--examples/recv_timeout_before_send.rs18
-rw-r--r--examples/recv_timeout_before_send_then_drop_sender.rs18
-rw-r--r--examples/recv_with_dropped_sender.rs11
-rw-r--r--examples/send_before_recv.rs11
-rw-r--r--examples/send_then_drop_receiver.rs7
-rw-r--r--examples/send_with_dropped_receiver.rs8
-rw-r--r--src/errors.rs147
-rw-r--r--src/lib.rs1242
-rw-r--r--src/loombox.rs151
-rw-r--r--tests/assert_mem.rs37
-rw-r--r--tests/async.rs128
-rw-r--r--tests/future.rs65
-rw-r--r--tests/helpers/mod.rs63
-rw-r--r--tests/helpers/waker.rs64
-rw-r--r--tests/loom.rs223
-rw-r--r--tests/raw.rs46
-rw-r--r--tests/sync.rs343
37 files changed, 3371 insertions, 0 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
new file mode 100644
index 0000000..018637a
--- /dev/null
+++ b/.cargo_vcs_info.json
@@ -0,0 +1,6 @@
+{
+ "git": {
+ "sha1": "eb8f8ebacb9c38861d88923830a53715ef733fa4"
+ },
+ "path_in_vcs": ""
+} \ No newline at end of file
diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml
new file mode 100644
index 0000000..6aeb92a
--- /dev/null
+++ b/.github/workflows/build-and-test.yml
@@ -0,0 +1,40 @@
+name: Cargo build and test
+on: [pull_request, workflow_dispatch]
+env:
+ CARGO_TERM_COLOR: always
+ RUSTFLAGS: "--deny warnings "
+jobs:
+ build-and-test:
+ strategy:
+ matrix:
+ os: [ubuntu-latest, macos-latest, windows-latest]
+ rust: [stable, beta]
+ include:
+ - os: ubuntu-latest
+ rust: nightly
+ - os: ubuntu-latest
+ rust: 1.65.0
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v3
+
+ - name: Install Rust
+ uses: ATiltedTree/setup-rust@v1.0.4
+ with:
+ rust-version: ${{ matrix.rust }}
+
+ - name: Install cargo-hack
+ uses: taiki-e/install-action@cargo-hack
+
+ - name: Build
+ run: cargo build
+
+ - name: Test
+ run: cargo hack --feature-powerset test
+
+ - name: Test with artificial delay
+ run: RUSTFLAGS+="--cfg oneshot_test_delay" cargo hack --feature-powerset test
+
+ - name: Test with loom
+ run: RUSTFLAGS+="--cfg loom" LOOM_MAX_BRANCHES=100000 cargo hack --feature-powerset test --test sync --test loom
diff --git a/.github/workflows/style-sanity.yml b/.github/workflows/style-sanity.yml
new file mode 100644
index 0000000..8458efd
--- /dev/null
+++ b/.github/workflows/style-sanity.yml
@@ -0,0 +1,55 @@
+name: Rust linting, formatting and audit
+on:
+ pull_request:
+ paths:
+ - .github/workflows/*.yml
+ - '**/*.rs'
+ - Cargo.toml
+ - Cargo.lock
+ workflow_dispatch:
+jobs:
+ clippy-linting:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+
+ - uses: actions-rs/toolchain@v1.0.6
+ with:
+ toolchain: stable
+ components: clippy
+ override: true
+
+ - name: Clippy check
+ run: |
+ export RUSTFLAGS="--deny warnings"
+ time cargo clippy --verbose
+
+ check-formatting:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+
+ - uses: actions-rs/toolchain@v1.0.6
+ with:
+ toolchain: stable
+ components: rustfmt
+ override: true
+
+ - name: Check formatting
+ run: |
+ rustfmt --version
+ cargo fmt -- --check
+
+ audit:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+
+ - name: Install cargo-audit
+ uses: actions-rs/install@v0.1.2
+ with:
+ crate: cargo-audit
+ version: latest
+
+ - name: Audit
+ run: cargo audit
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..96ef6c0
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+/target
+Cargo.lock
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..c4c9283
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,69 @@
+# Changelog
+All notable changes to this project will be documented in this file.
+
+The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
+and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
+
+### Categories each change fall into
+
+* **Added**: for new features.
+* **Changed**: for changes in existing functionality.
+* **Deprecated**: for soon-to-be removed features.
+* **Removed**: for now removed features.
+* **Fixed**: for any bug fixes.
+* **Security**: in case of vulnerabilities.
+
+
+## [Unreleased]
+
+
+## [0.1.6] - 2023-09-14
+### Added
+* Add `into_raw` and `from_raw` methods on both `Sender` and `Receiver`. Allows passing `oneshot`
+ channels over FFI without an extra layer of heap allocation.
+
+
+## [0.1.5] - 2022-09-01
+### Fixed
+- Handle the UNPARKING state correctly in all recv methods. `try_recv` will now not panic
+ if used on a `Receiver` that is being unparked from an async wait. The other `recv` methods
+ will still panic (as they should), but with a better error message.
+
+
+## [0.1.4] - 2022-08-30
+### Changed
+- Upgrade to Rust edition 2021. Also increases the MSRV to Rust 1.60.
+- Add null-pointer optimization to `Sender`, `Receiver` and `SendError`.
+ This reduces the call stack size of Sender::send and it makes
+ `Option<Sender>` and `Option<Receiver>` pointer sized (#18).
+- Relax the memory ordering of all atomic operations from `SeqCst` to the most appropriate
+ lower ordering (#17 + #20).
+
+### Fixed
+- Fix undefined behavior due to multiple mutable references to the same channel instance (#18).
+- Fix race condition that could happen during unparking of a receiving `Receiver` (#17 + #20).
+
+
+## [0.1.3] - 2021-11-23
+### Fixed
+- Keep the *last* `Waker` in `Future::poll`, not the *first* one. Stops breaking the contract
+ on how futures should work.
+
+
+## [0.1.2] - 2020-08-11
+### Fixed
+- Fix unreachable code panic that happened if the `Receiver` of an empty but open channel was
+ polled and then dropped.
+
+
+## [0.1.1] - 2020-05-10
+Initial implementation. Supports basically all the (for now) intended functionality.
+Sender is as lock-free as I think it can get and the receiver can both do thread blocking
+and be awaited asynchronously. The receiver also has a wait-free `try_recv` method.
+
+The crate has two features. They are activated by default, but the user can opt out of async
+support as well as usage of libstd (making the crate `no_std` but still requiring liballoc)
+
+
+## [0.1.0] - 2019-05-30
+Name reserved on crate.io by someone other than the author of this crate.
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..cba7266
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,64 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g., crates.io) dependencies.
+#
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
+
+[package]
+edition = "2021"
+rust-version = "1.60.0"
+name = "oneshot-uniffi"
+version = "0.1.6"
+authors = ["Linus Färnstrand <faern@faern.net>"]
+description = """
+Patched version of oneshot specifically for the UniFFI project.
+
+This removes the `loom` target and dependency which helps with UniFFI's downstream consumers.
+"""
+readme = "README.md"
+keywords = [
+ "oneshot",
+ "spsc",
+ "async",
+ "sync",
+ "channel",
+]
+categories = [
+ "asynchronous",
+ "concurrency",
+]
+license = "MIT OR Apache-2.0"
+repository = "https://github.com/faern/oneshot"
+
+[[bench]]
+name = "benches"
+harness = false
+
+[dev-dependencies.async-std]
+version = "1"
+features = ["attributes"]
+
+[dev-dependencies.criterion]
+version = "0.3"
+
+[dev-dependencies.tokio]
+version = "1"
+features = [
+ "rt",
+ "rt-multi-thread",
+ "macros",
+ "time",
+]
+
+[features]
+async = []
+default = [
+ "std",
+ "async",
+]
+std = []
diff --git a/LICENSE b/LICENSE
new file mode 120000
index 0000000..6b579aa
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1 @@
+LICENSE-APACHE \ No newline at end of file
diff --git a/LICENSE-APACHE b/LICENSE-APACHE
new file mode 100644
index 0000000..16fe87b
--- /dev/null
+++ b/LICENSE-APACHE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+Copyright [yyyy] [name of copyright owner]
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff --git a/LICENSE-MIT b/LICENSE-MIT
new file mode 100644
index 0000000..9cf1062
--- /dev/null
+++ b/LICENSE-MIT
@@ -0,0 +1,19 @@
+MIT License
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/METADATA b/METADATA
new file mode 100644
index 0000000..0fc6c63
--- /dev/null
+++ b/METADATA
@@ -0,0 +1,21 @@
+name: "oneshot-uniffi"
+description: "()"
+third_party {
+ identifier {
+ type: "crates.io"
+ value: "oneshot-uniffi"
+ }
+ identifier {
+ type: "Archive"
+ value: "https://static.crates.io/crates/oneshot-uniffi/oneshot-uniffi-0.1.6.crate"
+ primary_source: true
+ }
+ version: "0.1.6"
+ # Dual-licensed, using the least restrictive per go/thirdpartylicenses#same.
+ license_type: NOTICE
+ last_upgrade_date {
+ year: 2024
+ month: 3
+ day: 21
+ }
+}
diff --git a/MODULE_LICENSE_APACHE2 b/MODULE_LICENSE_APACHE2
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/MODULE_LICENSE_APACHE2
diff --git a/OWNERS b/OWNERS
new file mode 100644
index 0000000..48bea6e
--- /dev/null
+++ b/OWNERS
@@ -0,0 +1,2 @@
+# Bug component: 688011
+include platform/prebuilts/rust:main:/OWNERS
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..535f011
--- /dev/null
+++ b/README.md
@@ -0,0 +1,94 @@
+# oneshot
+
+Oneshot spsc (single producer, single consumer) channel. Meaning each channel instance
+can only transport a single message. This has a few nice outcomes. One thing is that
+the implementation can be very efficient, utilizing the knowledge that there will
+only be one message. But more importantly, it allows the API to be expressed in such
+a way that certain edge cases that you don't want to care about when only sending a
+single message on a channel does not exist. For example: The sender can't be copied
+or cloned, and the send method takes ownership and consumes the sender.
+So you are guaranteed, at the type level, that there can only be one message sent.
+
+The sender's send method is non-blocking, and potentially lock- and wait-free.
+See documentation on [Sender::send] for situations where it might not be fully wait-free.
+The receiver supports both lock- and wait-free `try_recv` as well as indefinite and time
+limited thread blocking receive operations. The receiver also implements `Future` and
+supports asynchronously awaiting the message.
+
+
+## Examples
+
+This example sets up a background worker that processes requests coming in on a standard
+mpsc channel and replies on a oneshot channel provided with each request. The worker can
+be interacted with both from sync and async contexts since the oneshot receiver
+can receive both blocking and async.
+
+```rust
+use std::sync::mpsc;
+use std::thread;
+use std::time::Duration;
+
+type Request = String;
+
+// Starts a background thread performing some computation on requests sent to it.
+// Delivers the response back over a oneshot channel.
+fn spawn_processing_thread() -> mpsc::Sender<(Request, oneshot::Sender<usize>)> {
+ let (request_sender, request_receiver) = mpsc::channel::<(Request, oneshot::Sender<usize>)>();
+ thread::spawn(move || {
+ for (request_data, response_sender) in request_receiver.iter() {
+ let compute_operation = || request_data.len();
+ let _ = response_sender.send(compute_operation()); // <- Send on the oneshot channel
+ }
+ });
+ request_sender
+}
+
+let processor = spawn_processing_thread();
+
+// If compiled with `std` the library can receive messages with timeout on regular threads
+#[cfg(feature = "std")] {
+ let (response_sender, response_receiver) = oneshot::channel();
+ let request = Request::from("data from sync thread");
+
+ processor.send((request, response_sender)).expect("Processor down");
+ match response_receiver.recv_timeout(Duration::from_secs(1)) { // <- Receive on the oneshot channel
+ Ok(result) => println!("Processor returned {}", result),
+ Err(oneshot::RecvTimeoutError::Timeout) => eprintln!("Processor was too slow"),
+ Err(oneshot::RecvTimeoutError::Disconnected) => panic!("Processor exited"),
+ }
+}
+
+// If compiled with the `async` feature, the `Receiver` can be awaited in an async context
+#[cfg(feature = "async")] {
+ tokio::runtime::Runtime::new()
+ .unwrap()
+ .block_on(async move {
+ let (response_sender, response_receiver) = oneshot::channel();
+ let request = Request::from("data from sync thread");
+
+ processor.send((request, response_sender)).expect("Processor down");
+ match response_receiver.await { // <- Receive on the oneshot channel asynchronously
+ Ok(result) => println!("Processor returned {}", result),
+ Err(_e) => panic!("Processor exited"),
+ }
+ });
+}
+```
+
+## Sync vs async
+
+The main motivation for writing this library was that there were no (known to me) channel
+implementations allowing you to seamlessly send messages between a normal thread and an async
+task, or the other way around. If message passing is the way you are communicating, of course
+that should work smoothly between the sync and async parts of the program!
+
+This library achieves that by having a fast and cheap send operation that can
+be used in both sync threads and async tasks. The receiver has both thread blocking
+receive methods for synchronous usage, and implements `Future` for asynchronous usage.
+
+The receiving endpoint of this channel implements Rust's `Future` trait and can be waited on
+in an asynchronous task. This implementation is completely executor/runtime agnostic. It should
+be possible to use this library with any executor.
+
+
+License: MIT OR Apache-2.0
diff --git a/benches/benches.rs b/benches/benches.rs
new file mode 100644
index 0000000..438d46a
--- /dev/null
+++ b/benches/benches.rs
@@ -0,0 +1,122 @@
+use criterion::{black_box, criterion_group, criterion_main, Criterion};
+use std::mem;
+use std::time::{Duration, Instant};
+
+criterion_group!(benches, bench);
+criterion_main!(benches);
+
+macro_rules! bench_send_and_recv {
+ ($c:expr, $($type:ty => $value:expr);+) => {
+ // Sanity check that all $values are of $type.
+ $(let _: $type = $value;)*
+ {
+ let mut group = $c.benchmark_group("create_channel");
+ $(group.bench_function(stringify!($type), |b| {
+ b.iter(oneshot::channel::<$type>)
+ });)*
+ group.finish();
+ }
+ {
+ let mut group = $c.benchmark_group("create_and_send");
+ $(group.bench_function(stringify!($type), |b| {
+ b.iter(|| {
+ let (sender, _receiver) = oneshot::channel();
+ sender.send(black_box($value)).unwrap()
+ });
+ });)*
+ group.finish();
+ }
+ {
+ let mut group = $c.benchmark_group("create_and_send_on_closed");
+ $(group.bench_function(stringify!($type), |b| {
+ b.iter(|| {
+ let (sender, _) = oneshot::channel();
+ sender.send(black_box($value)).unwrap_err()
+ });
+ });)*
+ group.finish();
+ }
+ {
+ let mut group = $c.benchmark_group("create_send_and_recv");
+ $(group.bench_function(stringify!($type), |b| {
+ b.iter(|| {
+ let (sender, receiver) = oneshot::channel();
+ sender.send(black_box($value)).unwrap();
+ receiver.recv().unwrap()
+ });
+ });)*
+ group.finish();
+ }
+ {
+ let mut group = $c.benchmark_group("create_send_and_recv_ref");
+ $(group.bench_function(stringify!($type), |b| {
+ b.iter(|| {
+ let (sender, receiver) = oneshot::channel();
+ sender.send(black_box($value)).unwrap();
+ receiver.recv_ref().unwrap()
+ });
+ });)*
+ group.finish();
+ }
+ };
+}
+
+fn bench(c: &mut Criterion) {
+ bench_send_and_recv!(c,
+ () => ();
+ u8 => 7u8;
+ usize => 9876usize;
+ u128 => 1234567u128;
+ [u8; 64] => [0b10101010u8; 64];
+ [u8; 4096] => [0b10101010u8; 4096]
+ );
+
+ bench_try_recv(c);
+ bench_recv_deadline_now(c);
+ bench_recv_timeout_zero(c);
+}
+
+fn bench_try_recv(c: &mut Criterion) {
+ let (sender, receiver) = oneshot::channel::<u128>();
+ c.bench_function("try_recv_empty", |b| {
+ b.iter(|| receiver.try_recv().unwrap_err())
+ });
+ mem::drop(sender);
+ c.bench_function("try_recv_empty_closed", |b| {
+ b.iter(|| receiver.try_recv().unwrap_err())
+ });
+}
+
+fn bench_recv_deadline_now(c: &mut Criterion) {
+ let now = Instant::now();
+ {
+ let (_sender, receiver) = oneshot::channel::<u128>();
+ c.bench_function("recv_deadline_now", |b| {
+ b.iter(|| receiver.recv_deadline(now).unwrap_err())
+ });
+ }
+ {
+ let (sender, receiver) = oneshot::channel::<u128>();
+ mem::drop(sender);
+ c.bench_function("recv_deadline_now_closed", |b| {
+ b.iter(|| receiver.recv_deadline(now).unwrap_err())
+ });
+ }
+}
+
+fn bench_recv_timeout_zero(c: &mut Criterion) {
+ let zero = Duration::from_nanos(0);
+ {
+ let (_sender, receiver) = oneshot::channel::<u128>();
+ c.bench_function("recv_timeout_zero", |b| {
+ b.iter(|| receiver.recv_timeout(zero).unwrap_err())
+ });
+ }
+ {
+ let (sender, receiver) = oneshot::channel::<u128>();
+ mem::drop(sender);
+ c.bench_function("recv_timeout_zero_closed", |b| {
+ b.iter(|| receiver.recv_timeout(zero).unwrap_err())
+ });
+ }
+}
diff --git a/cargo_embargo.json b/cargo_embargo.json
new file mode 100644
index 0000000..762838b
--- /dev/null
+++ b/cargo_embargo.json
@@ -0,0 +1,8 @@
+{
+ "run_cargo": false,
+ "module_visibility": {
+ "liboneshot_uniffi": [
+ "//external/rust/crates/uniffi_core"
+ ]
+ }
+}
diff --git a/check_mem_leaks.sh b/check_mem_leaks.sh
new file mode 100755
index 0000000..5a10835
--- /dev/null
+++ b/check_mem_leaks.sh
@@ -0,0 +1,13 @@
+#!/usr/bin/env bash
+
+set -eu
+
+SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+cd "$SCRIPT_DIR"
+
+for example_path in examples/*.rs; do
+ example_filename=$(basename -- $example_path)
+ example=${example_filename%.*}
+ echo $example
+ cargo valgrind run --example "$example"
+done
diff --git a/examples/recv_before_send.rs b/examples/recv_before_send.rs
new file mode 100644
index 0000000..2eda3dd
--- /dev/null
+++ b/examples/recv_before_send.rs
@@ -0,0 +1,18 @@
+#[cfg(feature = "std")]
+fn main() {
+ use std::thread;
+ use std::time::Duration;
+
+ let (sender, receiver) = oneshot::channel();
+ let t = thread::spawn(move || {
+ thread::sleep(Duration::from_millis(2));
+ sender.send(9u128).unwrap();
+ });
+ assert_eq!(receiver.recv(), Ok(9));
+ t.join().unwrap();
+}
+
+#[cfg(not(feature = "std"))]
+fn main() {
+ panic!("This example is only for when the \"sync\" feature is used");
+}
diff --git a/examples/recv_before_send_then_drop_sender.rs b/examples/recv_before_send_then_drop_sender.rs
new file mode 100644
index 0000000..aea7d66
--- /dev/null
+++ b/examples/recv_before_send_then_drop_sender.rs
@@ -0,0 +1,18 @@
+#[cfg(feature = "std")]
+fn main() {
+ use std::thread;
+ use std::time::Duration;
+
+ let (sender, receiver) = oneshot::channel::<u128>();
+ let t = thread::spawn(move || {
+ thread::sleep(Duration::from_millis(2));
+ std::mem::drop(sender);
+ });
+ assert!(receiver.recv().is_err());
+ t.join().unwrap();
+}
+
+#[cfg(not(feature = "std"))]
+fn main() {
+ panic!("This example is only for when the \"sync\" feature is used");
+}
diff --git a/examples/recv_ref_before_send.rs b/examples/recv_ref_before_send.rs
new file mode 100644
index 0000000..6ed74dd
--- /dev/null
+++ b/examples/recv_ref_before_send.rs
@@ -0,0 +1,18 @@
+#[cfg(feature = "std")]
+fn main() {
+ use std::thread;
+ use std::time::Duration;
+
+ let (sender, receiver) = oneshot::channel();
+ let t = thread::spawn(move || {
+ thread::sleep(Duration::from_millis(2));
+ sender.send(9u128).unwrap();
+ });
+ assert_eq!(receiver.recv_ref(), Ok(9));
+ t.join().unwrap();
+}
+
+#[cfg(not(feature = "std"))]
+fn main() {
+ panic!("This example is only for when the \"sync\" feature is used");
+}
diff --git a/examples/recv_ref_before_send_then_drop_sender.rs b/examples/recv_ref_before_send_then_drop_sender.rs
new file mode 100644
index 0000000..75ff3d6
--- /dev/null
+++ b/examples/recv_ref_before_send_then_drop_sender.rs
@@ -0,0 +1,18 @@
+#[cfg(feature = "std")]
+fn main() {
+ use std::thread;
+ use std::time::Duration;
+
+ let (sender, receiver) = oneshot::channel::<u128>();
+ let t = thread::spawn(move || {
+ thread::sleep(Duration::from_millis(2));
+ std::mem::drop(sender);
+ });
+ assert!(receiver.recv_ref().is_err());
+ t.join().unwrap();
+}
+
+#[cfg(not(feature = "std"))]
+fn main() {
+ panic!("This example is only for when the \"sync\" feature is used");
+}
diff --git a/examples/recv_timeout_before_send.rs b/examples/recv_timeout_before_send.rs
new file mode 100644
index 0000000..85a2ac8
--- /dev/null
+++ b/examples/recv_timeout_before_send.rs
@@ -0,0 +1,18 @@
+#[cfg(feature = "std")]
+fn main() {
+ use std::thread;
+ use std::time::Duration;
+
+ let (sender, receiver) = oneshot::channel();
+ let t = thread::spawn(move || {
+ thread::sleep(Duration::from_millis(2));
+ sender.send(9u128).unwrap();
+ });
+ assert_eq!(receiver.recv_timeout(Duration::from_millis(100)), Ok(9));
+ t.join().unwrap();
+}
+
+#[cfg(not(feature = "std"))]
+fn main() {
+ panic!("This example is only for when the \"sync\" feature is used");
+}
diff --git a/examples/recv_timeout_before_send_then_drop_sender.rs b/examples/recv_timeout_before_send_then_drop_sender.rs
new file mode 100644
index 0000000..32c31fc
--- /dev/null
+++ b/examples/recv_timeout_before_send_then_drop_sender.rs
@@ -0,0 +1,18 @@
+#[cfg(feature = "std")]
+fn main() {
+ use std::thread;
+ use std::time::Duration;
+
+ let (sender, receiver) = oneshot::channel::<u128>();
+ let t = thread::spawn(move || {
+ thread::sleep(Duration::from_millis(2));
+ std::mem::drop(sender);
+ });
+ assert!(receiver.recv_timeout(Duration::from_millis(100)).is_err());
+ t.join().unwrap();
+}
+
+#[cfg(not(feature = "std"))]
+fn main() {
+ panic!("This example is only for when the \"sync\" feature is used");
+}
diff --git a/examples/recv_with_dropped_sender.rs b/examples/recv_with_dropped_sender.rs
new file mode 100644
index 0000000..f7a7171
--- /dev/null
+++ b/examples/recv_with_dropped_sender.rs
@@ -0,0 +1,11 @@
+#[cfg(feature = "std")]
+fn main() {
+ let (sender, receiver) = oneshot::channel::<u128>();
+ std::mem::drop(sender);
+ receiver.recv().unwrap_err();
+}
+
+#[cfg(not(feature = "std"))]
+fn main() {
+ panic!("This example is only for when the \"sync\" feature is used");
+}
diff --git a/examples/send_before_recv.rs b/examples/send_before_recv.rs
new file mode 100644
index 0000000..c31ba65
--- /dev/null
+++ b/examples/send_before_recv.rs
@@ -0,0 +1,11 @@
+#[cfg(feature = "std")]
+fn main() {
+ let (sender, receiver) = oneshot::channel();
+ assert!(sender.send(19i128).is_ok());
+ assert_eq!(receiver.recv(), Ok(19i128));
+}
+
+#[cfg(not(feature = "std"))]
+fn main() {
+ panic!("This example is only for when the \"sync\" feature is used");
+}
diff --git a/examples/send_then_drop_receiver.rs b/examples/send_then_drop_receiver.rs
new file mode 100644
index 0000000..941c508
--- /dev/null
+++ b/examples/send_then_drop_receiver.rs
@@ -0,0 +1,7 @@
+use std::mem;
+
+fn main() {
+ let (sender, receiver) = oneshot::channel();
+ assert!(sender.send(19i128).is_ok());
+ mem::drop(receiver);
+}
diff --git a/examples/send_with_dropped_receiver.rs b/examples/send_with_dropped_receiver.rs
new file mode 100644
index 0000000..19bfa38
--- /dev/null
+++ b/examples/send_with_dropped_receiver.rs
@@ -0,0 +1,8 @@
+use std::mem;
+
+fn main() {
+ let (sender, receiver) = oneshot::channel();
+ mem::drop(receiver);
+ let send_error = sender.send(5u128).unwrap_err();
+ assert_eq!(send_error.into_inner(), 5);
+}
diff --git a/src/errors.rs b/src/errors.rs
new file mode 100644
index 0000000..1fd0de1
--- /dev/null
+++ b/src/errors.rs
@@ -0,0 +1,147 @@
+use super::{dealloc, Channel};
+use core::fmt;
+use core::mem;
+use core::ptr::NonNull;
+
+/// An error returned when trying to send on a closed channel. Returned from
+/// [`Sender::send`](crate::Sender::send) if the corresponding [`Receiver`](crate::Receiver)
+/// has already been dropped.
+///
+/// The message that could not be sent can be retreived again with [`SendError::into_inner`].
+pub struct SendError<T> {
+ channel_ptr: NonNull<Channel<T>>,
+}
+
+unsafe impl<T: Send> Send for SendError<T> {}
+unsafe impl<T: Sync> Sync for SendError<T> {}
+
+impl<T> SendError<T> {
+ /// # Safety
+ ///
+ /// By calling this function, the caller semantically transfers ownership of the
+ /// channel's resources to the created `SendError`. Thus the caller must ensure that the
+ /// pointer is not used in a way which would violate this ownership transfer. Moreover,
+ /// the caller must assert that the channel contains a valid, initialized message.
+ pub(crate) const unsafe fn new(channel_ptr: NonNull<Channel<T>>) -> Self {
+ Self { channel_ptr }
+ }
+
+ /// Consumes the error and returns the message that failed to be sent.
+ #[inline]
+ pub fn into_inner(self) -> T {
+ let channel_ptr = self.channel_ptr;
+
+ // Don't run destructor if we consumed ourselves. Freeing happens here.
+ mem::forget(self);
+
+ // SAFETY: we have ownership of the channel
+ let channel: &Channel<T> = unsafe { channel_ptr.as_ref() };
+
+ // SAFETY: we know that the message is initialized according to the safety requirements of
+ // `new`
+ let message = unsafe { channel.take_message() };
+
+ // SAFETY: we own the channel
+ unsafe { dealloc(channel_ptr) };
+
+ message
+ }
+
+ /// Get a reference to the message that failed to be sent.
+ #[inline]
+ pub fn as_inner(&self) -> &T {
+ unsafe { self.channel_ptr.as_ref().message().assume_init_ref() }
+ }
+}
+
+impl<T> Drop for SendError<T> {
+ fn drop(&mut self) {
+ // SAFETY: we have ownership of the channel and require that the message is initialized
+ // upon construction
+ unsafe {
+ self.channel_ptr.as_ref().drop_message();
+ dealloc(self.channel_ptr);
+ }
+ }
+}
+
+impl<T> fmt::Display for SendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ "sending on a closed channel".fmt(f)
+ }
+}
+
+impl<T> fmt::Debug for SendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "SendError<{}>(_)", stringify!(T))
+ }
+}
+
+#[cfg(feature = "std")]
+impl<T> std::error::Error for SendError<T> {}
+
+/// An error returned from the blocking [`Receiver::recv`](crate::Receiver::recv) method.
+///
+/// The receive operation can only fail if the corresponding [`Sender`](crate::Sender) was dropped
+/// before sending any message, or if a message has already been sent and received on the channel.
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
+pub struct RecvError;
+
+impl fmt::Display for RecvError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ "receiving on a closed channel".fmt(f)
+ }
+}
+
+#[cfg(feature = "std")]
+impl std::error::Error for RecvError {}
+
+/// An error returned when failing to receive a message in the non-blocking
+/// [`Receiver::try_recv`](crate::Receiver::try_recv).
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
+pub enum TryRecvError {
+ /// The channel is still open, but there was no message present in it.
+ Empty,
+
+ /// The channel is closed. Either the sender was dropped before sending any message, or the
+ /// message has already been extracted from the receiver.
+ Disconnected,
+}
+
+impl fmt::Display for TryRecvError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let msg = match self {
+ TryRecvError::Empty => "receiving on an empty channel",
+ TryRecvError::Disconnected => "receiving on a closed channel",
+ };
+ msg.fmt(f)
+ }
+}
+
+#[cfg(feature = "std")]
+impl std::error::Error for TryRecvError {}
+
+/// An error returned when failing to receive a message in
+/// [`Receiver::recv_timeout`](crate::Receiver::recv_timeout).
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
+pub enum RecvTimeoutError {
+ /// No message arrived on the channel before the timeout was reached. The channel is still open.
+ Timeout,
+
+ /// The channel is closed. Either the sender was dropped before sending any message, or the
+ /// message has already been extracted from the receiver.
+ Disconnected,
+}
+
+impl fmt::Display for RecvTimeoutError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let msg = match self {
+ RecvTimeoutError::Timeout => "timed out waiting on channel",
+ RecvTimeoutError::Disconnected => "channel is empty and sending half is closed",
+ };
+ msg.fmt(f)
+ }
+}
+
+#[cfg(feature = "std")]
+impl std::error::Error for RecvTimeoutError {}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..8da012b
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,1242 @@
+//! Oneshot spsc (single producer, single consumer) channel. Meaning each channel instance
+//! can only transport a single message. This has a few nice outcomes. One thing is that
+//! the implementation can be very efficient, utilizing the knowledge that there will
+//! only be one message. But more importantly, it allows the API to be expressed in such
+//! a way that certain edge cases that you don't want to care about when only sending a
+//! single message on a channel does not exist. For example: The sender can't be copied
+//! or cloned, and the send method takes ownership and consumes the sender.
+//! So you are guaranteed, at the type level, that there can only be one message sent.
+//!
+//! The sender's send method is non-blocking, and potentially lock- and wait-free.
+//! See documentation on [Sender::send] for situations where it might not be fully wait-free.
+//! The receiver supports both lock- and wait-free `try_recv` as well as indefinite and time
+//! limited thread blocking receive operations. The receiver also implements `Future` and
+//! supports asynchronously awaiting the message.
+//!
+//!
+//! # Examples
+//!
+//! This example sets up a background worker that processes requests coming in on a standard
+//! mpsc channel and replies on a oneshot channel provided with each request. The worker can
+//! be interacted with both from sync and async contexts since the oneshot receiver
+//! can receive both blocking and async.
+//!
+//! ```rust
+//! use std::sync::mpsc;
+//! use std::thread;
+//! use std::time::Duration;
+//!
+//! type Request = String;
+//!
+//! // Starts a background thread performing some computation on requests sent to it.
+//! // Delivers the response back over a oneshot channel.
+//! fn spawn_processing_thread() -> mpsc::Sender<(Request, oneshot::Sender<usize>)> {
+//! let (request_sender, request_receiver) = mpsc::channel::<(Request, oneshot::Sender<usize>)>();
+//! thread::spawn(move || {
+//! for (request_data, response_sender) in request_receiver.iter() {
+//! let compute_operation = || request_data.len();
+//! let _ = response_sender.send(compute_operation()); // <- Send on the oneshot channel
+//! }
+//! });
+//! request_sender
+//! }
+//!
+//! let processor = spawn_processing_thread();
+//!
+//! // If compiled with `std` the library can receive messages with timeout on regular threads
+//! #[cfg(feature = "std")] {
+//! let (response_sender, response_receiver) = oneshot::channel();
+//! let request = Request::from("data from sync thread");
+//!
+//! processor.send((request, response_sender)).expect("Processor down");
+//! match response_receiver.recv_timeout(Duration::from_secs(1)) { // <- Receive on the oneshot channel
+//! Ok(result) => println!("Processor returned {}", result),
+//! Err(oneshot::RecvTimeoutError::Timeout) => eprintln!("Processor was too slow"),
+//! Err(oneshot::RecvTimeoutError::Disconnected) => panic!("Processor exited"),
+//! }
+//! }
+//!
+//! // If compiled with the `async` feature, the `Receiver` can be awaited in an async context
+//! #[cfg(feature = "async")] {
+//! tokio::runtime::Runtime::new()
+//! .unwrap()
+//! .block_on(async move {
+//! let (response_sender, response_receiver) = oneshot::channel();
+//! let request = Request::from("data from sync thread");
+//!
+//! processor.send((request, response_sender)).expect("Processor down");
+//! match response_receiver.await { // <- Receive on the oneshot channel asynchronously
+//! Ok(result) => println!("Processor returned {}", result),
+//! Err(_e) => panic!("Processor exited"),
+//! }
+//! });
+//! }
+//! ```
+//!
+//! # Sync vs async
+//!
+//! The main motivation for writing this library was that there were no (known to me) channel
+//! implementations allowing you to seamlessly send messages between a normal thread and an async
+//! task, or the other way around. If message passing is the way you are communicating, of course
+//! that should work smoothly between the sync and async parts of the program!
+//!
+//! This library achieves that by having a fast and cheap send operation that can
+//! be used in both sync threads and async tasks. The receiver has both thread blocking
+//! receive methods for synchronous usage, and implements `Future` for asynchronous usage.
+//!
+//! The receiving endpoint of this channel implements Rust's `Future` trait and can be waited on
+//! in an asynchronous task. This implementation is completely executor/runtime agnostic. It should
+//! be possible to use this library with any executor.
+//!
+
+// # Implementation description
+//
+// When a channel is created via the channel function, it creates a single heap allocation
+// containing:
+// * A one byte atomic integer that represents the current channel state,
+// * Uninitialized memory to fit the message,
+// * Uninitialized memory to fit the waker that can wake the receiving task or thread up.
+//
+// The size of the waker depends on which features are activated, it ranges from 0 to 24 bytes[1].
+// So with all features enabled (the default) each channel allocates 25 bytes plus the size of the
+// message, plus any padding needed to get correct memory alignment.
+//
+// The Sender and Receiver only holds a raw pointer to the heap channel object. The last endpoint
+// to be consumed or dropped is responsible for freeing the heap memory. The first endpoint to
+// be consumed or dropped signal via the state that it is gone. And the second one see this and
+// frees the memory.
+//
+// ## Footnotes
+//
+// [1]: Mind that the waker only takes zero bytes when all features are disabled, making it
+// impossible to *wait* for the message. `try_recv` the only available method in this scenario.
+
+#![deny(rust_2018_idioms)]
+#![cfg_attr(not(feature = "std"), no_std)]
+
+#[cfg(not(loom))]
+extern crate alloc;
+
+use core::{
+ marker::PhantomData,
+ mem::{self, MaybeUninit},
+ ptr::{self, NonNull},
+};
+
+#[cfg(not(loom))]
+use core::{
+ cell::UnsafeCell,
+ sync::atomic::{fence, AtomicU8, Ordering::*},
+};
+#[cfg(loom)]
+use loom::{
+ cell::UnsafeCell,
+ sync::atomic::{fence, AtomicU8, Ordering::*},
+};
+
+#[cfg(all(feature = "async", not(loom)))]
+use core::hint;
+#[cfg(all(feature = "async", loom))]
+use loom::hint;
+
+#[cfg(feature = "async")]
+use core::{
+ pin::Pin,
+ task::{self, Poll},
+};
+#[cfg(feature = "std")]
+use std::time::{Duration, Instant};
+
+#[cfg(feature = "std")]
+mod thread {
+ #[cfg(not(loom))]
+ pub use std::thread::{current, park, park_timeout, yield_now, Thread};
+
+ #[cfg(loom)]
+ pub use loom::thread::{current, park, yield_now, Thread};
+
+ // loom does not support parking with a timeout. So we just
+ // yield. This means that the "park" will "spuriously" wake up
+ // way too early. But the code should properly handle this.
+ // One thing to note is that very short timeouts are needed
+ // when using loom, since otherwise the looping will cause
+ // an overflow in loom.
+ #[cfg(loom)]
+ pub fn park_timeout(_timeout: std::time::Duration) {
+ loom::thread::yield_now()
+ }
+}
+
+#[cfg(loom)]
+mod loombox;
+#[cfg(not(loom))]
+use alloc::boxed::Box;
+#[cfg(loom)]
+use loombox::Box;
+
+mod errors;
+pub use errors::{RecvError, RecvTimeoutError, SendError, TryRecvError};
+
+/// Creates a new oneshot channel and returns the two endpoints, [`Sender`] and [`Receiver`].
+pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
+ // Allocate the channel on the heap and get the pointer.
+ // The last endpoint of the channel to be alive is responsible for freeing the channel
+ // and dropping any object that might have been written to it.
+
+ let channel_ptr = Box::into_raw(Box::new(Channel::new()));
+
+ // SAFETY: `channel_ptr` came from a Box and thus is not null
+ let channel_ptr = unsafe { NonNull::new_unchecked(channel_ptr) };
+
+ (
+ Sender {
+ channel_ptr,
+ _invariant: PhantomData,
+ },
+ Receiver { channel_ptr },
+ )
+}
+
+#[derive(Debug)]
+pub struct Sender<T> {
+ channel_ptr: NonNull<Channel<T>>,
+ // In reality we want contravariance, however we can't obtain that.
+ //
+ // Consider the following scenario:
+ // ```
+ // let (mut tx, rx) = channel::<&'short u8>();
+ // let (tx2, rx2) = channel::<&'long u8>();
+ //
+ // tx = tx2;
+ //
+ // // Pretend short_ref is some &'short u8
+ // tx.send(short_ref).unwrap();
+ // let long_ref = rx2.recv().unwrap();
+ // ```
+ //
+ // If this type were covariant then we could safely extend lifetimes, which is not okay.
+ // Hence, we enforce invariance.
+ _invariant: PhantomData<fn(T) -> T>,
+}
+
+#[derive(Debug)]
+pub struct Receiver<T> {
+ // Covariance is the right choice here. Consider the example presented in Sender, and you'll
+ // see that if we replaced `rx` instead then we would get the expected behavior
+ channel_ptr: NonNull<Channel<T>>,
+}
+
+unsafe impl<T: Send> Send for Sender<T> {}
+unsafe impl<T: Send> Send for Receiver<T> {}
+impl<T> Unpin for Receiver<T> {}
+
+impl<T> Sender<T> {
+ /// Sends `message` over the channel to the corresponding [`Receiver`].
+ ///
+ /// Returns an error if the receiver has already been dropped. The message can
+ /// be extracted from the error.
+ ///
+ /// This method is lock-free and wait-free when sending on a channel that the
+ /// receiver is currently not receiving on. If the receiver is receiving during the send
+ /// operation this method includes waking up the thread/task. Unparking a thread involves
+ /// a mutex in Rust's standard library at the time of writing this.
+ /// How lock-free waking up an async task is
+ /// depends on your executor. If this method returns a `SendError`, please mind that dropping
+ /// the error involves running any drop implementation on the message type, and freeing the
+ /// channel's heap allocation, which might or might not be lock-free.
+ pub fn send(self, message: T) -> Result<(), SendError<T>> {
+ let channel_ptr = self.channel_ptr;
+
+ // Don't run our Drop implementation if send was called, any cleanup now happens here
+ mem::forget(self);
+
+ // SAFETY: The channel exists on the heap for the entire duration of this method and we
+ // only ever acquire shared references to it. Note that if the receiver disconnects it
+ // does not free the channel.
+ let channel = unsafe { channel_ptr.as_ref() };
+
+ // Write the message into the channel on the heap.
+ // SAFETY: The receiver only ever accesses this memory location if we are in the MESSAGE
+ // state, and since we're responsible for setting that state, we can guarantee that we have
+ // exclusive access to this memory location to perform this write.
+ unsafe { channel.write_message(message) };
+
+ // Set the state to signal there is a message on the channel.
+ // ORDERING: we use release ordering to ensure the write of the message is visible to the
+ // receiving thread. The EMPTY and DISCONNECTED branches do not observe any shared state,
+ // and thus we do not need acquire orderng. The RECEIVING branch manages synchronization
+ // independent of this operation.
+ //
+ // EMPTY + 1 = MESSAGE
+ // RECEIVING + 1 = UNPARKING
+ // DISCONNECTED + 1 = invalid, however this state is never observed
+ match channel.state.fetch_add(1, Release) {
+ // The receiver is alive and has not started waiting. Send done.
+ EMPTY => Ok(()),
+ // The receiver is waiting. Wake it up so it can return the message.
+ RECEIVING => {
+ // ORDERING: Synchronizes with the write of the waker to memory, and prevents the
+ // taking of the waker from being ordered before this operation.
+ fence(Acquire);
+
+ // Take the waker, but critically do not unpark it. If we unparked now, then the
+ // receiving thread could still observe the UNPARKING state and re-park, meaning
+ // that after we change to the MESSAGE state, it would remain parked indefinitely
+ // or until a spurious wakeup.
+ // SAFETY: at this point we are in the UNPARKING state, and the receiving thread
+ // does not access the waker while in this state, nor does it free the channel
+ // allocation in this state.
+ let waker = unsafe { channel.take_waker() };
+
+ // ORDERING: this ordering serves two-fold: it synchronizes with the acquire load
+ // in the receiving thread, ensuring that both our read of the waker and write of
+ // the message happen-before the taking of the message and freeing of the channel.
+ // Furthermore, we need acquire ordering to ensure the unparking of the receiver
+ // happens after the channel state is updated.
+ channel.state.swap(MESSAGE, AcqRel);
+
+ // Note: it is possible that between the store above and this statement that
+ // the receiving thread is spuriously unparked, takes the message, and frees
+ // the channel allocation. However, we took ownership of the channel out of
+ // that allocation, and freeing the channel does not drop the waker since the
+ // waker is wrapped in MaybeUninit. Therefore this data is valid regardless of
+ // whether or not the receive has completed by this point.
+ waker.unpark();
+
+ Ok(())
+ }
+ // The receiver was already dropped. The error is responsible for freeing the channel.
+ // SAFETY: since the receiver disconnected it will no longer access `channel_ptr`, so
+ // we can transfer exclusive ownership of the channel's resources to the error.
+ // Moreover, since we just placed the message in the channel, the channel contains a
+ // valid message.
+ DISCONNECTED => Err(unsafe { SendError::new(channel_ptr) }),
+ _ => unreachable!(),
+ }
+ }
+
+ /// Consumes the Sender, returning a raw pointer to the channel on the heap.
+ ///
+ /// This is intended to simplify using oneshot channels with some FFI code. The only safe thing
+ /// to do with the returned pointer is to later reconstruct the Sender with [Sender::from_raw].
+ /// Memory will leak if the Sender is never reconstructed.
+ pub fn into_raw(self) -> *mut () {
+ let raw = self.channel_ptr.as_ptr() as *mut ();
+ mem::forget(self);
+ raw
+ }
+
+ /// Consumes a raw pointer from [Sender::into_raw], recreating the Sender.
+ ///
+ /// # Safety
+ ///
+ /// This pointer must have come from [`Sender<T>::into_raw`] with the same message type, `T`.
+ /// At most one Sender must exist for a channel at any point in time.
+ /// Constructing multiple Senders from the same raw pointer leads to undefined behavior.
+ pub unsafe fn from_raw(raw: *mut ()) -> Self {
+ Self {
+ channel_ptr: NonNull::new_unchecked(raw as *mut Channel<T>),
+ _invariant: PhantomData,
+ }
+ }
+}
+
+impl<T> Drop for Sender<T> {
+ fn drop(&mut self) {
+ // SAFETY: The receiver only ever frees the channel if we are in the MESSAGE or
+ // DISCONNECTED states. If we are in the MESSAGE state, then we called
+ // mem::forget(self), so we should not be in this function call. If we are in the
+ // DISCONNECTED state, then the receiver either received a MESSAGE so this statement is
+ // unreachable, or was dropped and observed that our side was still alive, and thus didn't
+ // free the channel.
+ let channel = unsafe { self.channel_ptr.as_ref() };
+
+ // Set the channel state to disconnected and read what state the receiver was in
+ // ORDERING: we don't need release ordering here since there are no modifications we
+ // need to make visible to other thread, and the Err(RECEIVING) branch handles
+ // synchronization independent of this cmpxchg
+ //
+ // EMPTY ^ 001 = DISCONNECTED
+ // RECEIVING ^ 001 = UNPARKING
+ // DISCONNECTED ^ 001 = EMPTY (invalid), but this state is never observed
+ match channel.state.fetch_xor(0b001, Relaxed) {
+ // The receiver has not started waiting, nor is it dropped.
+ EMPTY => (),
+ // The receiver is waiting. Wake it up so it can detect that the channel disconnected.
+ RECEIVING => {
+ // See comments in Sender::send
+
+ fence(Acquire);
+
+ let waker = unsafe { channel.take_waker() };
+
+ // We still need release ordering here to make sure our read of the waker happens
+ // before this, and acquire ordering to ensure the unparking of the receiver
+ // happens after this.
+ channel.state.swap(DISCONNECTED, AcqRel);
+
+ // The Acquire ordering above ensures that the write of the DISCONNECTED state
+ // happens-before unparking the receiver.
+ waker.unpark();
+ }
+ // The receiver was already dropped. We are responsible for freeing the channel.
+ DISCONNECTED => {
+ // SAFETY: when the receiver switches the state to DISCONNECTED they have received
+ // the message or will no longer be trying to receive the message, and have
+ // observed that the sender is still alive, meaning that we're responsible for
+ // freeing the channel allocation.
+ unsafe { dealloc(self.channel_ptr) };
+ }
+ _ => unreachable!(),
+ }
+ }
+}
+
+impl<T> Receiver<T> {
+ /// Checks if there is a message in the channel without blocking. Returns:
+ /// * `Ok(message)` if there was a message in the channel.
+ /// * `Err(Empty)` if the [`Sender`] is alive, but has not yet sent a message.
+ /// * `Err(Disconnected)` if the [`Sender`] was dropped before sending anything or if the
+ /// message has already been extracted by a previous receive call.
+ ///
+ /// If a message is returned, the channel is disconnected and any subsequent receive operation
+ /// using this receiver will return an error.
+ ///
+ /// This method is completely lock-free and wait-free. The only thing it does is an atomic
+ /// integer load of the channel state. And if there is a message in the channel it additionally
+ /// performs one atomic integer store and copies the message from the heap to the stack for
+ /// returning it.
+ pub fn try_recv(&self) -> Result<T, TryRecvError> {
+ // SAFETY: The channel will not be freed while this method is still running.
+ let channel = unsafe { self.channel_ptr.as_ref() };
+
+ // ORDERING: we use acquire ordering to synchronize with the store of the message.
+ match channel.state.load(Acquire) {
+ MESSAGE => {
+ // It's okay to break up the load and store since once we're in the message state
+ // the sender no longer modifies the state
+ // ORDERING: at this point the sender has done its job and is no longer active, so
+ // we don't need to make any side effects visible to it
+ channel.state.store(DISCONNECTED, Relaxed);
+
+ // SAFETY: we are in the MESSAGE state so the message is present
+ Ok(unsafe { channel.take_message() })
+ }
+ EMPTY => Err(TryRecvError::Empty),
+ DISCONNECTED => Err(TryRecvError::Disconnected),
+ #[cfg(feature = "async")]
+ RECEIVING | UNPARKING => Err(TryRecvError::Empty),
+ _ => unreachable!(),
+ }
+ }
+
+ /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is
+ /// disconnected.
+ ///
+ /// This method will always block the current thread if there is no data available and it is
+ /// still possible for the message to be sent. Once the message is sent to the corresponding
+ /// [`Sender`], then this receiver will wake up and return that message.
+ ///
+ /// If the corresponding [`Sender`] has disconnected (been dropped), or it disconnects while
+ /// this call is blocking, this call will wake up and return `Err` to indicate that the message
+ /// can never be received on this channel.
+ ///
+ /// If a sent message has already been extracted from this channel this method will return an
+ /// error.
+ ///
+ /// # Panics
+ ///
+ /// Panics if called after this receiver has been polled asynchronously.
+ #[cfg(feature = "std")]
+ pub fn recv(self) -> Result<T, RecvError> {
+ // Note that we don't need to worry about changing the state to disconnected or setting the
+ // state to an invalid value at any point in this function because we take ownership of
+ // self, and this function does not exit until the message has been received or both side
+ // of the channel are inactive and cleaned up.
+
+ let channel_ptr = self.channel_ptr;
+
+ // Don't run our Drop implementation if we are receiving consuming ourselves.
+ mem::forget(self);
+
+ // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
+ // is still alive, meaning that even if the sender was dropped then it would have observed
+ // the fact that we're still alive and left the responsibility of deallocating the
+ // channel to us, so channel_ptr is valid
+ let channel = unsafe { channel_ptr.as_ref() };
+
+ // ORDERING: we use acquire ordering to synchronize with the write of the message in the
+ // case that it's available
+ match channel.state.load(Acquire) {
+ // The sender is alive but has not sent anything yet. We prepare to park.
+ EMPTY => {
+ // Conditionally add a delay here to help the tests trigger the edge cases where
+ // the sender manages to be dropped or send something before we are able to store
+ // our waker object in the channel.
+ #[cfg(oneshot_test_delay)]
+ std::thread::sleep(std::time::Duration::from_millis(10));
+
+ // Write our waker instance to the channel.
+ // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
+ // try to access the waker until it sees the state set to RECEIVING below
+ unsafe { channel.write_waker(ReceiverWaker::current_thread()) };
+
+ // Switch the state to RECEIVING. We need to do this in one atomic step in case the
+ // sender disconnected or sent the message while we wrote the waker to memory. We
+ // don't need to do a compare exchange here however because if the original state
+ // was not EMPTY, then the sender has either finished sending the message or is
+ // being dropped, so the RECEIVING state will never be observed after we return.
+ // ORDERING: we use release ordering so the sender can synchronize with our writing
+ // of the waker to memory. The individual branches handle any additional
+ // synchronizaton
+ match channel.state.swap(RECEIVING, Release) {
+ // We stored our waker, now we park until the sender has changed the state
+ EMPTY => loop {
+ thread::park();
+
+ // ORDERING: synchronize with the write of the message
+ match channel.state.load(Acquire) {
+ // The sender sent the message while we were parked.
+ MESSAGE => {
+ // SAFETY: we are in the message state so the message is valid
+ let message = unsafe { channel.take_message() };
+
+ // SAFETY: the Sender delegates the responsibility of deallocating
+ // the channel to us upon sending the message
+ unsafe { dealloc(channel_ptr) };
+
+ break Ok(message);
+ }
+ // The sender was dropped while we were parked.
+ DISCONNECTED => {
+ // SAFETY: the Sender doesn't deallocate the channel allocation in
+ // its drop implementation if we're receiving
+ unsafe { dealloc(channel_ptr) };
+
+ break Err(RecvError);
+ }
+ // State did not change, spurious wakeup, park again.
+ RECEIVING | UNPARKING => (),
+ _ => unreachable!(),
+ }
+ },
+ // The sender sent the message while we prepared to park.
+ MESSAGE => {
+ // ORDERING: Synchronize with the write of the message. This branch is
+ // unlikely to be taken, so it's likely more efficient to use a fence here
+ // instead of AcqRel ordering on the RMW operation
+ fence(Acquire);
+
+ // SAFETY: we started in the empty state and the sender switched us to the
+ // message state. This means that it did not take the waker, so we're
+ // responsible for dropping it.
+ unsafe { channel.drop_waker() };
+
+ // SAFETY: we are in the message state so the message is valid
+ let message = unsafe { channel.take_message() };
+
+ // SAFETY: the Sender delegates the responsibility of deallocating the
+ // channel to us upon sending the message
+ unsafe { dealloc(channel_ptr) };
+
+ Ok(message)
+ }
+ // The sender was dropped before sending anything while we prepared to park.
+ DISCONNECTED => {
+ // SAFETY: we started in the empty state and the sender switched us to the
+ // disconnected state. It does not take the waker when it does this so we
+ // need to drop it.
+ unsafe { channel.drop_waker() };
+
+ // SAFETY: the sender does not deallocate the channel if it switches from
+ // empty to disconnected so we need to free the allocation
+ unsafe { dealloc(channel_ptr) };
+
+ Err(RecvError)
+ }
+ _ => unreachable!(),
+ }
+ }
+ // The sender already sent the message.
+ MESSAGE => {
+ // SAFETY: we are in the message state so the message is valid
+ let message = unsafe { channel.take_message() };
+
+ // SAFETY: we are already in the message state so the sender has been forgotten
+ // and it's our job to clean up resources
+ unsafe { dealloc(channel_ptr) };
+
+ Ok(message)
+ }
+ // The sender was dropped before sending anything, or we already received the message.
+ DISCONNECTED => {
+ // SAFETY: the sender does not deallocate the channel if it switches from empty to
+ // disconnected so we need to free the allocation
+ unsafe { dealloc(channel_ptr) };
+
+ Err(RecvError)
+ }
+ // The receiver must have been `Future::poll`ed prior to this call.
+ #[cfg(feature = "async")]
+ RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR),
+ _ => unreachable!(),
+ }
+ }
+
+ /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is
+ /// disconnected. This is a non consuming version of [`Receiver::recv`], but with a bit
+ /// worse performance. Prefer `[`Receiver::recv`]` if your code allows consuming the receiver.
+ ///
+ /// If a message is returned, the channel is disconnected and any subsequent receive operation
+ /// using this receiver will return an error.
+ ///
+ /// # Panics
+ ///
+ /// Panics if called after this receiver has been polled asynchronously.
+ #[cfg(feature = "std")]
+ pub fn recv_ref(&self) -> Result<T, RecvError> {
+ self.start_recv_ref(RecvError, |channel| {
+ loop {
+ thread::park();
+
+ // ORDERING: we use acquire ordering to synchronize with the write of the message
+ match channel.state.load(Acquire) {
+ // The sender sent the message while we were parked.
+ // We take the message and mark the channel disconnected.
+ MESSAGE => {
+ // ORDERING: the sender is inactive at this point so we don't need to make
+ // any reads or writes visible to the sending thread
+ channel.state.store(DISCONNECTED, Relaxed);
+
+ // SAFETY: we were just in the message state so the message is valid
+ break Ok(unsafe { channel.take_message() });
+ }
+ // The sender was dropped while we were parked.
+ DISCONNECTED => break Err(RecvError),
+ // State did not change, spurious wakeup, park again.
+ RECEIVING | UNPARKING => (),
+ _ => unreachable!(),
+ }
+ }
+ })
+ }
+
+ /// Like [`Receiver::recv`], but will not block longer than `timeout`. Returns:
+ /// * `Ok(message)` if there was a message in the channel before the timeout was reached.
+ /// * `Err(Timeout)` if no message arrived on the channel before the timeout was reached.
+ /// * `Err(Disconnected)` if the sender was dropped before sending anything or if the message
+ /// has already been extracted by a previous receive call.
+ ///
+ /// If a message is returned, the channel is disconnected and any subsequent receive operation
+ /// using this receiver will return an error.
+ ///
+ /// If the supplied `timeout` is so large that Rust's `Instant` type can't represent this point
+ /// in the future this falls back to an indefinitely blocking receive operation.
+ ///
+ /// # Panics
+ ///
+ /// Panics if called after this receiver has been polled asynchronously.
+ #[cfg(feature = "std")]
+ pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
+ match Instant::now().checked_add(timeout) {
+ Some(deadline) => self.recv_deadline(deadline),
+ None => self.recv_ref().map_err(|_| RecvTimeoutError::Disconnected),
+ }
+ }
+
+ /// Like [`Receiver::recv`], but will not block longer than until `deadline`. Returns:
+ /// * `Ok(message)` if there was a message in the channel before the deadline was reached.
+ /// * `Err(Timeout)` if no message arrived on the channel before the deadline was reached.
+ /// * `Err(Disconnected)` if the sender was dropped before sending anything or if the message
+ /// has already been extracted by a previous receive call.
+ ///
+ /// If a message is returned, the channel is disconnected and any subsequent receive operation
+ /// using this receiver will return an error.
+ ///
+ /// # Panics
+ ///
+ /// Panics if called after this receiver has been polled asynchronously.
+ #[cfg(feature = "std")]
+ pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
+ /// # Safety
+ ///
+ /// If the sender is unparking us after a message send, the message must already have been
+ /// written to the channel and an acquire memory barrier issued before calling this function
+ #[cold]
+ unsafe fn wait_for_unpark<T>(channel: &Channel<T>) -> Result<T, RecvTimeoutError> {
+ loop {
+ thread::park();
+
+ // ORDERING: The callee has already synchronized with any message write
+ match channel.state.load(Relaxed) {
+ MESSAGE => {
+ // ORDERING: the sender has been dropped, so this update only
+ // needs to be visible to us
+ channel.state.store(DISCONNECTED, Relaxed);
+ break Ok(channel.take_message());
+ }
+ DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
+ // The sender is still unparking us. We continue on the empty state here since
+ // the current implementation eagerly sets the state to EMPTY upon timeout.
+ EMPTY => (),
+ _ => unreachable!(),
+ }
+ }
+ }
+
+ self.start_recv_ref(RecvTimeoutError::Disconnected, |channel| {
+ loop {
+ match deadline.checked_duration_since(Instant::now()) {
+ Some(timeout) => {
+ thread::park_timeout(timeout);
+
+ // ORDERING: synchronize with the write of the message
+ match channel.state.load(Acquire) {
+ // The sender sent the message while we were parked.
+ MESSAGE => {
+ // ORDERING: the sender has been `mem::forget`-ed so this update
+ // only needs to be visible to us.
+ channel.state.store(DISCONNECTED, Relaxed);
+
+ // SAFETY: we either are in the message state or were just in the
+ // message state
+ break Ok(unsafe { channel.take_message() });
+ }
+ // The sender was dropped while we were parked.
+ DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
+ // State did not change, spurious wakeup, park again.
+ RECEIVING | UNPARKING => (),
+ _ => unreachable!(),
+ }
+ }
+ None => {
+ // ORDERING: synchronize with the write of the message
+ match channel.state.swap(EMPTY, Acquire) {
+ // We reached the end of the timeout without receiving a message
+ RECEIVING => {
+ // SAFETY: we were in the receiving state and are now in the empty
+ // state, so the sender has not and will not try to read the waker,
+ // so we have exclusive access to drop it.
+ unsafe { channel.drop_waker() };
+
+ break Err(RecvTimeoutError::Timeout);
+ }
+ // The sender sent the message while we were parked.
+ MESSAGE => {
+ // Same safety and ordering as the Some branch
+
+ channel.state.store(DISCONNECTED, Relaxed);
+ break Ok(unsafe { channel.take_message() });
+ }
+ // The sender was dropped while we were parked.
+ DISCONNECTED => {
+ // ORDERING: we were originally in the disconnected state meaning
+ // that the sender is inactive and no longer observing the state,
+ // so we only need to change it back to DISCONNECTED for if the
+ // receiver is dropped or a recv* method is called again
+ channel.state.store(DISCONNECTED, Relaxed);
+
+ break Err(RecvTimeoutError::Disconnected);
+ }
+ // The sender sent the message and started unparking us
+ UNPARKING => {
+ // We were in the UNPARKING state and are now in the EMPTY state.
+ // We wait to be properly unparked and to observe if the sender
+ // sets MESSAGE or DISCONNECTED state.
+ // SAFETY: The load above has synchronized with any message write.
+ break unsafe { wait_for_unpark(channel) };
+ }
+ _ => unreachable!(),
+ }
+ }
+ }
+ }
+ })
+ }
+
+ /// Begins the process of receiving on the channel by reference. If the message is already
+ /// ready, or the sender has disconnected, then this function will return the appropriate
+ /// Result immediately. Otherwise, it will write the waker to memory, check to see if the
+ /// sender has finished or disconnected again, and then will call `finish`. `finish` is
+ /// thus responsible for cleaning up the channel's resources appropriately before it returns,
+ /// such as destroying the waker, for instance.
+ #[cfg(feature = "std")]
+ #[inline]
+ fn start_recv_ref<E>(
+ &self,
+ disconnected_error: E,
+ finish: impl FnOnce(&Channel<T>) -> Result<T, E>,
+ ) -> Result<T, E> {
+ // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
+ // is still alive, meaning that even if the sender was dropped then it would have observed
+ // the fact that we're still alive and left the responsibility of deallocating the
+ // channel to us, so `self.channel` is valid
+ let channel = unsafe { self.channel_ptr.as_ref() };
+
+ // ORDERING: synchronize with the write of the message
+ match channel.state.load(Acquire) {
+ // The sender is alive but has not sent anything yet. We prepare to park.
+ EMPTY => {
+ // Conditionally add a delay here to help the tests trigger the edge cases where
+ // the sender manages to be dropped or send something before we are able to store
+ // our waker object in the channel.
+ #[cfg(oneshot_test_delay)]
+ std::thread::sleep(std::time::Duration::from_millis(10));
+
+ // Write our waker instance to the channel.
+ // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
+ // try to access the waker until it sees the state set to RECEIVING below
+ unsafe { channel.write_waker(ReceiverWaker::current_thread()) };
+
+ // ORDERING: we use release ordering on success so the sender can synchronize with
+ // our write of the waker. We use relaxed ordering on failure since the sender does
+ // not need to synchronize with our write and the individual match arms handle any
+ // additional synchronization
+ match channel
+ .state
+ .compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
+ {
+ // We stored our waker, now we delegate to the callback to finish the receive
+ // operation
+ Ok(_) => finish(channel),
+ // The sender sent the message while we prepared to finish
+ Err(MESSAGE) => {
+ // See comments in `recv` for ordering and safety
+
+ fence(Acquire);
+
+ unsafe { channel.drop_waker() };
+
+ // ORDERING: the sender has been `mem::forget`-ed so this update only
+ // needs to be visible to us
+ channel.state.store(DISCONNECTED, Relaxed);
+
+ // SAFETY: The MESSAGE state tells us there is a correctly initialized
+ // message
+ Ok(unsafe { channel.take_message() })
+ }
+ // The sender was dropped before sending anything while we prepared to park.
+ Err(DISCONNECTED) => {
+ // See comments in `recv` for safety
+ unsafe { channel.drop_waker() };
+ Err(disconnected_error)
+ }
+ _ => unreachable!(),
+ }
+ }
+ // The sender sent the message. We take the message and mark the channel disconnected.
+ MESSAGE => {
+ // ORDERING: the sender has been `mem::forget`-ed so this update only needs to be
+ // visible to us
+ channel.state.store(DISCONNECTED, Relaxed);
+
+ // SAFETY: we are in the message state so the message is valid
+ Ok(unsafe { channel.take_message() })
+ }
+ // The sender was dropped before sending anything, or we already received the message.
+ DISCONNECTED => Err(disconnected_error),
+ // The receiver must have been `Future::poll`ed prior to this call.
+ #[cfg(feature = "async")]
+ RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR),
+ _ => unreachable!(),
+ }
+ }
+
+ /// Consumes the Receiver, returning a raw pointer to the channel on the heap.
+ ///
+ /// This is intended to simplify using oneshot channels with some FFI code. The only safe thing
+ /// to do with the returned pointer is to later reconstruct the Receiver with
+ /// [Receiver::from_raw]. Memory will leak if the Receiver is never reconstructed.
+ pub fn into_raw(self) -> *mut () {
+ let raw = self.channel_ptr.as_ptr() as *mut ();
+ mem::forget(self);
+ raw
+ }
+
+ /// Consumes a raw pointer from [Receiver::into_raw], recreating the Receiver.
+ ///
+ /// # Safety
+ ///
+ /// This pointer must have come from [`Receiver<T>::into_raw`] with the same message type, `T`.
+ /// At most one Receiver must exist for a channel at any point in time.
+ /// Constructing multiple Receivers from the same raw pointer leads to undefined behavior.
+ pub unsafe fn from_raw(raw: *mut ()) -> Self {
+ Self {
+ channel_ptr: NonNull::new_unchecked(raw as *mut Channel<T>),
+ }
+ }
+}
+
+#[cfg(feature = "async")]
+impl<T> core::future::Future for Receiver<T> {
+ type Output = Result<T, RecvError>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
+ // is still alive, meaning that even if the sender was dropped then it would have observed
+ // the fact that we're still alive and left the responsibility of deallocating the
+ // channel to us, so `self.channel` is valid
+ let channel = unsafe { self.channel_ptr.as_ref() };
+
+ // ORDERING: we use acquire ordering to synchronize with the store of the message.
+ match channel.state.load(Acquire) {
+ // The sender is alive but has not sent anything yet.
+ EMPTY => {
+ // SAFETY: We can't be in the forbidden states, and no waker in the channel.
+ unsafe { channel.write_async_waker(cx) }
+ }
+ // We were polled again while waiting for the sender. Replace the waker with the new one.
+ RECEIVING => {
+ // ORDERING: We use relaxed ordering on both success and failure since we have not
+ // written anything above that must be released, and the individual match arms
+ // handle any additional synchronization.
+ match channel
+ .state
+ .compare_exchange(RECEIVING, EMPTY, Relaxed, Relaxed)
+ {
+ // We successfully changed the state back to EMPTY. Replace the waker.
+ // This is the most likely branch to be taken, which is why we don't use any
+ // memory barriers in the compare_exchange above.
+ Ok(_) => {
+ // SAFETY: We wrote the waker in a previous call to poll. We do not need
+ // a memory barrier since the previous write here was by ourselves.
+ unsafe { channel.drop_waker() };
+ // SAFETY: We can't be in the forbidden states, and no waker in the channel.
+ unsafe { channel.write_async_waker(cx) }
+ }
+ // The sender sent the message while we prepared to replace the waker.
+ // We take the message and mark the channel disconnected.
+ // The sender has already taken the waker.
+ Err(MESSAGE) => {
+ // ORDERING: Synchronize with the write of the message. This branch is
+ // unlikely to be taken.
+ channel.state.swap(DISCONNECTED, Acquire);
+ // SAFETY: The state tells us the sender has initialized the message.
+ Poll::Ready(Ok(unsafe { channel.take_message() }))
+ }
+ // The sender was dropped before sending anything while we prepared to park.
+ // The sender has taken the waker already.
+ Err(DISCONNECTED) => Poll::Ready(Err(RecvError)),
+ // The sender is currently waking us up.
+ Err(UNPARKING) => {
+ // We can't trust that the old waker that the sender has access to
+ // is honored by the async runtime at this point. So we wake ourselves
+ // up to get polled instantly again.
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ _ => unreachable!(),
+ }
+ }
+ // The sender sent the message.
+ MESSAGE => {
+ // ORDERING: the sender has been dropped so this update only needs to be
+ // visible to us
+ channel.state.store(DISCONNECTED, Relaxed);
+ Poll::Ready(Ok(unsafe { channel.take_message() }))
+ }
+ // The sender was dropped before sending anything, or we already received the message.
+ DISCONNECTED => Poll::Ready(Err(RecvError)),
+ // The sender has observed the RECEIVING state and is currently reading the waker from
+ // a previous poll. We need to loop here until we observe the MESSAGE or DISCONNECTED
+ // state. We busy loop here since we know the sender is done very soon.
+ UNPARKING => loop {
+ hint::spin_loop();
+ // ORDERING: The load above has already synchronized with the write of the message.
+ match channel.state.load(Relaxed) {
+ MESSAGE => {
+ // ORDERING: the sender has been dropped, so this update only
+ // needs to be visible to us
+ channel.state.store(DISCONNECTED, Relaxed);
+ // SAFETY: We observed the MESSAGE state
+ break Poll::Ready(Ok(unsafe { channel.take_message() }));
+ }
+ DISCONNECTED => break Poll::Ready(Err(RecvError)),
+ UNPARKING => (),
+ _ => unreachable!(),
+ }
+ },
+ _ => unreachable!(),
+ }
+ }
+}
+
+impl<T> Drop for Receiver<T> {
+ fn drop(&mut self) {
+ // SAFETY: since the receiving side is still alive the sender would have observed that and
+ // left deallocating the channel allocation to us.
+ let channel = unsafe { self.channel_ptr.as_ref() };
+
+ // Set the channel state to disconnected and read what state the receiver was in
+ match channel.state.swap(DISCONNECTED, Acquire) {
+ // The sender has not sent anything, nor is it dropped.
+ EMPTY => (),
+ // The sender already sent something. We must drop it, and free the channel.
+ MESSAGE => {
+ // SAFETY: we are in the message state so the message is initialized
+ unsafe { channel.drop_message() };
+
+ // SAFETY: see safety comment at top of function
+ unsafe { dealloc(self.channel_ptr) };
+ }
+ // The receiver has been polled.
+ #[cfg(feature = "async")]
+ RECEIVING => {
+ // TODO: figure this out when async is fixed
+ unsafe { channel.drop_waker() };
+ }
+ // The sender was already dropped. We are responsible for freeing the channel.
+ DISCONNECTED => {
+ // SAFETY: see safety comment at top of function
+ unsafe { dealloc(self.channel_ptr) };
+ }
+ _ => unreachable!(),
+ }
+ }
+}
+
+/// All the values that the `Channel::state` field can have during the lifetime of a channel.
+mod states {
+ // These values are very explicitly chosen so that we can replace some cmpxchg calls with
+ // fetch_* calls.
+
+ /// The initial channel state. Active while both endpoints are still alive, no message has been
+ /// sent, and the receiver is not receiving.
+ pub const EMPTY: u8 = 0b011;
+ /// A message has been sent to the channel, but the receiver has not yet read it.
+ pub const MESSAGE: u8 = 0b100;
+ /// No message has yet been sent on the channel, but the receiver is currently receiving.
+ pub const RECEIVING: u8 = 0b000;
+ #[cfg(any(feature = "std", feature = "async"))]
+ pub const UNPARKING: u8 = 0b001;
+ /// The channel has been closed. This means that either the sender or receiver has been dropped,
+ /// or the message sent to the channel has already been received. Since this is a oneshot
+ /// channel, it is disconnected after the one message it is supposed to hold has been
+ /// transmitted.
+ pub const DISCONNECTED: u8 = 0b010;
+}
+use states::*;
+
+/// Internal channel data structure structure. the `channel` method allocates and puts one instance
+/// of this struct on the heap for each oneshot channel instance. The struct holds:
+/// * The current state of the channel.
+/// * The message in the channel. This memory is uninitialized until the message is sent.
+/// * The waker instance for the thread or task that is currently receiving on this channel.
+/// This memory is uninitialized until the receiver starts receiving.
+struct Channel<T> {
+ state: AtomicU8,
+ message: UnsafeCell<MaybeUninit<T>>,
+ waker: UnsafeCell<MaybeUninit<ReceiverWaker>>,
+}
+
+impl<T> Channel<T> {
+ pub fn new() -> Self {
+ Self {
+ state: AtomicU8::new(EMPTY),
+ message: UnsafeCell::new(MaybeUninit::uninit()),
+ waker: UnsafeCell::new(MaybeUninit::uninit()),
+ }
+ }
+
+ #[inline(always)]
+ unsafe fn message(&self) -> &MaybeUninit<T> {
+ #[cfg(loom)]
+ {
+ self.message.with(|ptr| &*ptr)
+ }
+
+ #[cfg(not(loom))]
+ {
+ &*self.message.get()
+ }
+ }
+
+ #[inline(always)]
+ unsafe fn with_message_mut<F>(&self, op: F)
+ where
+ F: FnOnce(&mut MaybeUninit<T>),
+ {
+ #[cfg(loom)]
+ {
+ self.message.with_mut(|ptr| op(&mut *ptr))
+ }
+
+ #[cfg(not(loom))]
+ {
+ op(&mut *self.message.get())
+ }
+ }
+
+ #[inline(always)]
+ #[cfg(any(feature = "std", feature = "async"))]
+ unsafe fn with_waker_mut<F>(&self, op: F)
+ where
+ F: FnOnce(&mut MaybeUninit<ReceiverWaker>),
+ {
+ #[cfg(loom)]
+ {
+ self.waker.with_mut(|ptr| op(&mut *ptr))
+ }
+
+ #[cfg(not(loom))]
+ {
+ op(&mut *self.waker.get())
+ }
+ }
+
+ #[inline(always)]
+ unsafe fn write_message(&self, message: T) {
+ self.with_message_mut(|slot| slot.as_mut_ptr().write(message));
+ }
+
+ #[inline(always)]
+ unsafe fn take_message(&self) -> T {
+ #[cfg(loom)]
+ {
+ self.message.with(|ptr| ptr::read(ptr)).assume_init()
+ }
+
+ #[cfg(not(loom))]
+ {
+ ptr::read(self.message.get()).assume_init()
+ }
+ }
+
+ #[inline(always)]
+ unsafe fn drop_message(&self) {
+ self.with_message_mut(|slot| slot.assume_init_drop());
+ }
+
+ #[cfg(any(feature = "std", feature = "async"))]
+ #[inline(always)]
+ unsafe fn write_waker(&self, waker: ReceiverWaker) {
+ self.with_waker_mut(|slot| slot.as_mut_ptr().write(waker));
+ }
+
+ #[inline(always)]
+ unsafe fn take_waker(&self) -> ReceiverWaker {
+ #[cfg(loom)]
+ {
+ self.waker.with(|ptr| ptr::read(ptr)).assume_init()
+ }
+
+ #[cfg(not(loom))]
+ {
+ ptr::read(self.waker.get()).assume_init()
+ }
+ }
+
+ #[cfg(any(feature = "std", feature = "async"))]
+ #[inline(always)]
+ unsafe fn drop_waker(&self) {
+ self.with_waker_mut(|slot| slot.assume_init_drop());
+ }
+
+ /// # Safety
+ ///
+ /// * `Channel::waker` must not have a waker stored in it when calling this method.
+ /// * Channel state must not be RECEIVING or UNPARKING when calling this method.
+ #[cfg(feature = "async")]
+ unsafe fn write_async_waker(&self, cx: &mut task::Context<'_>) -> Poll<Result<T, RecvError>> {
+ // Write our thread instance to the channel.
+ // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
+ // try to access the waker until it sees the state set to RECEIVING below
+ self.write_waker(ReceiverWaker::task_waker(cx));
+
+ // ORDERING: we use release ordering on success so the sender can synchronize with
+ // our write of the waker. We use relaxed ordering on failure since the sender does
+ // not need to synchronize with our write and the individual match arms handle any
+ // additional synchronization
+ match self
+ .state
+ .compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
+ {
+ // We stored our waker, now we return and let the sender wake us up
+ Ok(_) => Poll::Pending,
+ // The sender sent the message while we prepared to park.
+ // We take the message and mark the channel disconnected.
+ Err(MESSAGE) => {
+ // ORDERING: Synchronize with the write of the message. This branch is
+ // unlikely to be taken, so it's likely more efficient to use a fence here
+ // instead of AcqRel ordering on the compare_exchange operation
+ fence(Acquire);
+
+ // SAFETY: we started in the EMPTY state and the sender switched us to the
+ // MESSAGE state. This means that it did not take the waker, so we're
+ // responsible for dropping it.
+ self.drop_waker();
+
+ // ORDERING: sender does not exist, so this update only needs to be visible to us
+ self.state.store(DISCONNECTED, Relaxed);
+
+ // SAFETY: The MESSAGE state tells us there is a correctly initialized message
+ Poll::Ready(Ok(self.take_message()))
+ }
+ // The sender was dropped before sending anything while we prepared to park.
+ Err(DISCONNECTED) => {
+ // SAFETY: we started in the EMPTY state and the sender switched us to the
+ // DISCONNECTED state. This means that it did not take the waker, so we're
+ // responsible for dropping it.
+ self.drop_waker();
+ Poll::Ready(Err(RecvError))
+ }
+ _ => unreachable!(),
+ }
+ }
+}
+
+enum ReceiverWaker {
+ /// The receiver is waiting synchronously. Its thread is parked.
+ #[cfg(feature = "std")]
+ Thread(thread::Thread),
+ /// The receiver is waiting asynchronously. Its task can be woken up with this `Waker`.
+ #[cfg(feature = "async")]
+ Task(task::Waker),
+ /// A little hack to not make this enum an uninhibitable type when no features are enabled.
+ #[cfg(not(any(feature = "async", feature = "std")))]
+ _Uninhabited,
+}
+
+impl ReceiverWaker {
+ #[cfg(feature = "std")]
+ pub fn current_thread() -> Self {
+ Self::Thread(thread::current())
+ }
+
+ #[cfg(feature = "async")]
+ pub fn task_waker(cx: &task::Context<'_>) -> Self {
+ Self::Task(cx.waker().clone())
+ }
+
+ pub fn unpark(self) {
+ match self {
+ #[cfg(feature = "std")]
+ ReceiverWaker::Thread(thread) => thread.unpark(),
+ #[cfg(feature = "async")]
+ ReceiverWaker::Task(waker) => waker.wake(),
+ #[cfg(not(any(feature = "async", feature = "std")))]
+ ReceiverWaker::_Uninhabited => unreachable!(),
+ }
+ }
+}
+
+#[cfg(not(loom))]
+#[test]
+fn receiver_waker_size() {
+ let expected: usize = match (cfg!(feature = "std"), cfg!(feature = "async")) {
+ (false, false) => 0,
+ (false, true) => 16,
+ (true, false) => 8,
+ (true, true) => 16,
+ };
+ assert_eq!(mem::size_of::<ReceiverWaker>(), expected);
+}
+
+#[cfg(all(feature = "std", feature = "async"))]
+const RECEIVER_USED_SYNC_AND_ASYNC_ERROR: &str =
+ "Invalid to call a blocking receive method on oneshot::Receiver after it has been polled";
+
+#[inline]
+pub(crate) unsafe fn dealloc<T>(channel: NonNull<Channel<T>>) {
+ drop(Box::from_raw(channel.as_ptr()))
+}
diff --git a/src/loombox.rs b/src/loombox.rs
new file mode 100644
index 0000000..615db30
--- /dev/null
+++ b/src/loombox.rs
@@ -0,0 +1,151 @@
+use core::{borrow, fmt, hash, mem, ptr};
+use loom::alloc;
+
+pub struct Box<T: ?Sized> {
+ ptr: *mut T,
+}
+
+impl<T> Box<T> {
+ pub fn new(value: T) -> Self {
+ let layout = alloc::Layout::new::<T>();
+ let ptr = unsafe { alloc::alloc(layout) } as *mut T;
+ unsafe { ptr::write(ptr, value) };
+ Self { ptr }
+ }
+}
+
+impl<T: ?Sized> Box<T> {
+ #[inline]
+ pub fn into_raw(b: Box<T>) -> *mut T {
+ let ptr = b.ptr;
+ mem::forget(b);
+ ptr
+ }
+
+ pub const unsafe fn from_raw(ptr: *mut T) -> Box<T> {
+ Self { ptr }
+ }
+}
+
+impl<T: ?Sized> Drop for Box<T> {
+ fn drop(&mut self) {
+ unsafe {
+ let size = mem::size_of_val(&*self.ptr);
+ let align = mem::align_of_val(&*self.ptr);
+ let layout = alloc::Layout::from_size_align(size, align).unwrap();
+ ptr::drop_in_place(self.ptr);
+ alloc::dealloc(self.ptr as *mut u8, layout);
+ }
+ }
+}
+
+unsafe impl<T: Send> Send for Box<T> {}
+unsafe impl<T: Sync> Sync for Box<T> {}
+
+impl<T: ?Sized> core::ops::Deref for Box<T> {
+ type Target = T;
+
+ fn deref(&self) -> &T {
+ unsafe { &*self.ptr }
+ }
+}
+
+impl<T: ?Sized> core::ops::DerefMut for Box<T> {
+ fn deref_mut(&mut self) -> &mut T {
+ unsafe { &mut *self.ptr }
+ }
+}
+
+impl<T: ?Sized> borrow::Borrow<T> for Box<T> {
+ fn borrow(&self) -> &T {
+ &**self
+ }
+}
+
+impl<T: ?Sized> borrow::BorrowMut<T> for Box<T> {
+ fn borrow_mut(&mut self) -> &mut T {
+ &mut **self
+ }
+}
+
+impl<T: ?Sized> AsRef<T> for Box<T> {
+ fn as_ref(&self) -> &T {
+ &**self
+ }
+}
+
+impl<T: ?Sized> AsMut<T> for Box<T> {
+ fn as_mut(&mut self) -> &mut T {
+ &mut **self
+ }
+}
+
+impl<T: fmt::Display + ?Sized> fmt::Display for Box<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Display::fmt(&**self, f)
+ }
+}
+
+impl<T: fmt::Debug + ?Sized> fmt::Debug for Box<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Debug::fmt(&**self, f)
+ }
+}
+
+impl<T: Clone> Clone for Box<T> {
+ #[inline]
+ fn clone(&self) -> Box<T> {
+ Self::new(self.as_ref().clone())
+ }
+}
+
+impl<T: ?Sized + PartialEq> PartialEq for Box<T> {
+ #[inline]
+ fn eq(&self, other: &Box<T>) -> bool {
+ PartialEq::eq(&**self, &**other)
+ }
+
+ #[allow(clippy::partialeq_ne_impl)]
+ #[inline]
+ fn ne(&self, other: &Box<T>) -> bool {
+ PartialEq::ne(&**self, &**other)
+ }
+}
+
+impl<T: ?Sized + Eq> Eq for Box<T> {}
+
+impl<T: ?Sized + PartialOrd> PartialOrd for Box<T> {
+ #[inline]
+ fn partial_cmp(&self, other: &Box<T>) -> Option<core::cmp::Ordering> {
+ PartialOrd::partial_cmp(&**self, &**other)
+ }
+ #[inline]
+ fn lt(&self, other: &Box<T>) -> bool {
+ PartialOrd::lt(&**self, &**other)
+ }
+ #[inline]
+ fn le(&self, other: &Box<T>) -> bool {
+ PartialOrd::le(&**self, &**other)
+ }
+ #[inline]
+ fn ge(&self, other: &Box<T>) -> bool {
+ PartialOrd::ge(&**self, &**other)
+ }
+ #[inline]
+ fn gt(&self, other: &Box<T>) -> bool {
+ PartialOrd::gt(&**self, &**other)
+ }
+}
+
+impl<T: ?Sized + Ord> Ord for Box<T> {
+ #[inline]
+ fn cmp(&self, other: &Box<T>) -> core::cmp::Ordering {
+ Ord::cmp(&**self, &**other)
+ }
+}
+
+impl<T: ?Sized + hash::Hash> hash::Hash for Box<T> {
+ fn hash<H: hash::Hasher>(&self, state: &mut H) {
+ (**self).hash(state);
+ }
+}
diff --git a/tests/assert_mem.rs b/tests/assert_mem.rs
new file mode 100644
index 0000000..a993ad7
--- /dev/null
+++ b/tests/assert_mem.rs
@@ -0,0 +1,37 @@
+use oneshot::{Receiver, Sender};
+use std::mem;
+
+/// Just sanity check that both channel endpoints stay the size of a single pointer.
+#[test]
+fn channel_endpoints_single_pointer() {
+ const PTR_SIZE: usize = mem::size_of::<*const ()>();
+
+ assert_eq!(mem::size_of::<Sender<()>>(), PTR_SIZE);
+ assert_eq!(mem::size_of::<Receiver<()>>(), PTR_SIZE);
+
+ assert_eq!(mem::size_of::<Sender<u8>>(), PTR_SIZE);
+ assert_eq!(mem::size_of::<Receiver<u8>>(), PTR_SIZE);
+
+ assert_eq!(mem::size_of::<Sender<[u8; 1024]>>(), PTR_SIZE);
+ assert_eq!(mem::size_of::<Receiver<[u8; 1024]>>(), PTR_SIZE);
+
+ assert_eq!(mem::size_of::<Option<Sender<[u8; 1024]>>>(), PTR_SIZE);
+ assert_eq!(mem::size_of::<Option<Receiver<[u8; 1024]>>>(), PTR_SIZE);
+}
+
+/// Check that the `SendError` stays small. Useful to automatically detect if it is refactored
+/// to become large. We do not want the stack requirement for calling `Sender::send` to grow.
+#[test]
+fn error_sizes() {
+ const PTR_SIZE: usize = mem::size_of::<usize>();
+
+ assert_eq!(mem::size_of::<oneshot::SendError<()>>(), PTR_SIZE);
+ assert_eq!(mem::size_of::<oneshot::SendError<u8>>(), PTR_SIZE);
+ assert_eq!(mem::size_of::<oneshot::SendError<[u8; 1024]>>(), PTR_SIZE);
+
+ // The type returned from `Sender::send` is also just pointer sized
+ assert_eq!(
+ mem::size_of::<Result<(), oneshot::SendError<[u8; 1024]>>>(),
+ PTR_SIZE
+ );
+}
diff --git a/tests/async.rs b/tests/async.rs
new file mode 100644
index 0000000..e7633aa
--- /dev/null
+++ b/tests/async.rs
@@ -0,0 +1,128 @@
+#![cfg(all(feature = "async", not(loom)))]
+
+use core::mem;
+use core::time::Duration;
+
+mod helpers;
+use helpers::DropCounter;
+
+#[tokio::test]
+async fn send_before_await_tokio() {
+ let (sender, receiver) = oneshot::channel();
+ assert!(sender.send(19i128).is_ok());
+ assert_eq!(receiver.await, Ok(19i128));
+}
+
+#[async_std::test]
+async fn send_before_await_async_std() {
+ let (sender, receiver) = oneshot::channel();
+ assert!(sender.send(19i128).is_ok());
+ assert_eq!(receiver.await, Ok(19i128));
+}
+
+#[tokio::test]
+async fn await_with_dropped_sender_tokio() {
+ let (sender, receiver) = oneshot::channel::<u128>();
+ mem::drop(sender);
+ receiver.await.unwrap_err();
+}
+
+#[async_std::test]
+async fn await_with_dropped_sender_async_std() {
+ let (sender, receiver) = oneshot::channel::<u128>();
+ mem::drop(sender);
+ receiver.await.unwrap_err();
+}
+
+#[tokio::test]
+async fn await_before_send_tokio() {
+ let (sender, receiver) = oneshot::channel();
+ let (message, counter) = DropCounter::new(79u128);
+ let t = tokio::spawn(async move {
+ tokio::time::sleep(Duration::from_millis(10)).await;
+ sender.send(message)
+ });
+ let returned_message = receiver.await.unwrap();
+ assert_eq!(counter.count(), 0);
+ assert_eq!(*returned_message.value(), 79u128);
+ mem::drop(returned_message);
+ assert_eq!(counter.count(), 1);
+ t.await.unwrap().unwrap();
+}
+
+#[async_std::test]
+async fn await_before_send_async_std() {
+ let (sender, receiver) = oneshot::channel();
+ let (message, counter) = DropCounter::new(79u128);
+ let t = async_std::task::spawn(async move {
+ async_std::task::sleep(Duration::from_millis(10)).await;
+ sender.send(message)
+ });
+ let returned_message = receiver.await.unwrap();
+ assert_eq!(counter.count(), 0);
+ assert_eq!(*returned_message.value(), 79u128);
+ mem::drop(returned_message);
+ assert_eq!(counter.count(), 1);
+ t.await.unwrap();
+}
+
+#[tokio::test]
+async fn await_before_send_then_drop_sender_tokio() {
+ let (sender, receiver) = oneshot::channel::<u128>();
+ let t = tokio::spawn(async {
+ tokio::time::sleep(Duration::from_millis(10)).await;
+ mem::drop(sender);
+ });
+ assert!(receiver.await.is_err());
+ t.await.unwrap();
+}
+
+#[async_std::test]
+async fn await_before_send_then_drop_sender_async_std() {
+ let (sender, receiver) = oneshot::channel::<u128>();
+ let t = async_std::task::spawn(async {
+ async_std::task::sleep(Duration::from_millis(10)).await;
+ mem::drop(sender);
+ });
+ assert!(receiver.await.is_err());
+ t.await;
+}
+
+// Tests that the Receiver handles being used synchronously even after being polled
+#[tokio::test]
+async fn poll_future_and_then_try_recv() {
+ use core::future::Future;
+ use core::pin::Pin;
+ use core::task::{self, Poll};
+
+ struct StupidReceiverFuture(oneshot::Receiver<()>);
+
+ impl Future for StupidReceiverFuture {
+ type Output = Result<(), oneshot::RecvError>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ let poll_result = Future::poll(Pin::new(&mut self.0), cx);
+ self.0.try_recv().expect_err("Should never be a message");
+ poll_result
+ }
+ }
+
+ let (sender, receiver) = oneshot::channel();
+ let t = tokio::spawn(async {
+ tokio::time::sleep(Duration::from_millis(20)).await;
+ mem::drop(sender);
+ });
+ StupidReceiverFuture(receiver).await.unwrap_err();
+ t.await.unwrap();
+}
+
+#[tokio::test]
+async fn poll_receiver_then_drop_it() {
+ let (sender, receiver) = oneshot::channel::<()>();
+ // This will poll the receiver and then give up after 100 ms.
+ tokio::time::timeout(Duration::from_millis(100), receiver)
+ .await
+ .unwrap_err();
+ // Make sure the receiver has been dropped by the runtime.
+ assert!(sender.send(()).is_err());
+}
diff --git a/tests/future.rs b/tests/future.rs
new file mode 100644
index 0000000..3895946
--- /dev/null
+++ b/tests/future.rs
@@ -0,0 +1,65 @@
+#![cfg(feature = "async")]
+
+use core::{future, mem, pin, task};
+
+#[cfg(loom)]
+pub use loom::sync::{Arc, Mutex};
+#[cfg(not(loom))]
+pub use std::sync::{Arc, Mutex};
+
+mod helpers;
+use helpers::maybe_loom_model;
+
+#[test]
+fn multiple_receiver_polls_keeps_only_latest_waker() {
+ #[derive(Default)]
+ struct MockWaker {
+ cloned: usize,
+ dropped: usize,
+ }
+
+ fn clone_mock_waker(waker: *const ()) -> task::RawWaker {
+ let mock_waker = unsafe { Arc::from_raw(waker as *const Mutex<MockWaker>) };
+ mock_waker.lock().unwrap().cloned += 1;
+ let new_waker =
+ task::RawWaker::new(Arc::into_raw(mock_waker.clone()) as *const (), &VTABLE);
+ mem::forget(mock_waker);
+ new_waker
+ }
+
+ fn drop_mock_waker(waker: *const ()) {
+ let mock_waker = unsafe { Arc::from_raw(waker as *const Mutex<MockWaker>) };
+ mock_waker.lock().unwrap().dropped += 1;
+ }
+
+ const VTABLE: task::RawWakerVTable =
+ task::RawWakerVTable::new(clone_mock_waker, |_| (), |_| (), drop_mock_waker);
+
+ maybe_loom_model(|| {
+ let mock_waker1 = Arc::new(Mutex::new(MockWaker::default()));
+ let raw_waker1 =
+ task::RawWaker::new(Arc::into_raw(mock_waker1.clone()) as *const (), &VTABLE);
+ let waker1 = unsafe { task::Waker::from_raw(raw_waker1) };
+ let mut context1 = task::Context::from_waker(&waker1);
+
+ let (_sender, mut receiver) = oneshot::channel::<()>();
+
+ let poll_result = future::Future::poll(pin::Pin::new(&mut receiver), &mut context1);
+ assert_eq!(poll_result, task::Poll::Pending);
+ assert_eq!(mock_waker1.lock().unwrap().cloned, 1);
+ assert_eq!(mock_waker1.lock().unwrap().dropped, 0);
+
+ let mock_waker2 = Arc::new(Mutex::new(MockWaker::default()));
+ let raw_waker2 =
+ task::RawWaker::new(Arc::into_raw(mock_waker2.clone()) as *const (), &VTABLE);
+ let waker2 = unsafe { task::Waker::from_raw(raw_waker2) };
+ let mut context2 = task::Context::from_waker(&waker2);
+
+ let poll_result = future::Future::poll(pin::Pin::new(&mut receiver), &mut context2);
+ assert_eq!(poll_result, task::Poll::Pending);
+ assert_eq!(mock_waker2.lock().unwrap().cloned, 1);
+ assert_eq!(mock_waker2.lock().unwrap().dropped, 0);
+ assert_eq!(mock_waker1.lock().unwrap().cloned, 1);
+ assert_eq!(mock_waker1.lock().unwrap().dropped, 1);
+ });
+}
diff --git a/tests/helpers/mod.rs b/tests/helpers/mod.rs
new file mode 100644
index 0000000..1b14539
--- /dev/null
+++ b/tests/helpers/mod.rs
@@ -0,0 +1,63 @@
+#![allow(dead_code)]
+
+extern crate alloc;
+
+#[cfg(not(loom))]
+use alloc::sync::Arc;
+#[cfg(not(loom))]
+use core::sync::atomic::{AtomicUsize, Ordering::SeqCst};
+#[cfg(loom)]
+use loom::sync::{
+ atomic::{AtomicUsize, Ordering::SeqCst},
+ Arc,
+};
+
+#[cfg(loom)]
+pub mod waker;
+
+pub fn maybe_loom_model(test: impl Fn() + Sync + Send + 'static) {
+ #[cfg(loom)]
+ loom::model(test);
+ #[cfg(not(loom))]
+ test();
+}
+
+pub struct DropCounter<T> {
+ drop_count: Arc<AtomicUsize>,
+ value: Option<T>,
+}
+
+pub struct DropCounterHandle(Arc<AtomicUsize>);
+
+impl<T> DropCounter<T> {
+ pub fn new(value: T) -> (Self, DropCounterHandle) {
+ let drop_count = Arc::new(AtomicUsize::new(0));
+ (
+ Self {
+ drop_count: drop_count.clone(),
+ value: Some(value),
+ },
+ DropCounterHandle(drop_count),
+ )
+ }
+
+ pub fn value(&self) -> &T {
+ self.value.as_ref().unwrap()
+ }
+
+ pub fn into_value(mut self) -> T {
+ self.value.take().unwrap()
+ }
+}
+
+impl DropCounterHandle {
+ pub fn count(&self) -> usize {
+ self.0.load(SeqCst)
+ }
+}
+
+impl<T> Drop for DropCounter<T> {
+ fn drop(&mut self) {
+ self.drop_count.fetch_add(1, SeqCst);
+ }
+}
diff --git a/tests/helpers/waker.rs b/tests/helpers/waker.rs
new file mode 100644
index 0000000..2e3f1be
--- /dev/null
+++ b/tests/helpers/waker.rs
@@ -0,0 +1,64 @@
+//! Creates a Waker that can be observed from tests.
+
+use std::mem::forget;
+use std::sync::atomic::{AtomicU32, Ordering};
+use std::sync::Arc;
+use std::task::{RawWaker, RawWakerVTable, Waker};
+
+#[derive(Default)]
+pub struct WakerHandle {
+ clone_count: AtomicU32,
+ drop_count: AtomicU32,
+ wake_count: AtomicU32,
+}
+
+impl WakerHandle {
+ pub fn clone_count(&self) -> u32 {
+ self.clone_count.load(Ordering::Relaxed)
+ }
+
+ pub fn drop_count(&self) -> u32 {
+ self.drop_count.load(Ordering::Relaxed)
+ }
+
+ pub fn wake_count(&self) -> u32 {
+ self.wake_count.load(Ordering::Relaxed)
+ }
+}
+
+pub fn waker() -> (Waker, Arc<WakerHandle>) {
+ let waker_handle = Arc::new(WakerHandle::default());
+ let waker_handle_ptr = Arc::into_raw(waker_handle.clone());
+ let raw_waker = RawWaker::new(waker_handle_ptr as *const _, waker_vtable());
+ (unsafe { Waker::from_raw(raw_waker) }, waker_handle)
+}
+
+pub(super) fn waker_vtable() -> &'static RawWakerVTable {
+ &RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw)
+}
+
+unsafe fn clone_raw(data: *const ()) -> RawWaker {
+ let handle: Arc<WakerHandle> = Arc::from_raw(data as *const _);
+ handle.clone_count.fetch_add(1, Ordering::Relaxed);
+ forget(handle.clone());
+ forget(handle);
+ RawWaker::new(data, waker_vtable())
+}
+
+unsafe fn wake_raw(data: *const ()) {
+ let handle: Arc<WakerHandle> = Arc::from_raw(data as *const _);
+ handle.wake_count.fetch_add(1, Ordering::Relaxed);
+ handle.drop_count.fetch_add(1, Ordering::Relaxed);
+}
+
+unsafe fn wake_by_ref_raw(data: *const ()) {
+ let handle: Arc<WakerHandle> = Arc::from_raw(data as *const _);
+ handle.wake_count.fetch_add(1, Ordering::Relaxed);
+ forget(handle)
+}
+
+unsafe fn drop_raw(data: *const ()) {
+ let handle: Arc<WakerHandle> = Arc::from_raw(data as *const _);
+ handle.drop_count.fetch_add(1, Ordering::Relaxed);
+ drop(handle)
+}
diff --git a/tests/loom.rs b/tests/loom.rs
new file mode 100644
index 0000000..a7625a4
--- /dev/null
+++ b/tests/loom.rs
@@ -0,0 +1,223 @@
+#![cfg(loom)]
+
+use oneshot::TryRecvError;
+
+use loom::hint;
+use loom::thread;
+#[cfg(feature = "async")]
+use std::future::Future;
+#[cfg(feature = "async")]
+use std::pin::Pin;
+#[cfg(feature = "async")]
+use std::task::{self, Poll};
+#[cfg(feature = "std")]
+use std::time::Duration;
+
+mod helpers;
+
+#[test]
+fn try_recv() {
+ loom::model(|| {
+ let (sender, receiver) = oneshot::channel::<u128>();
+
+ let t = thread::spawn(move || loop {
+ match receiver.try_recv() {
+ Ok(msg) => break msg,
+ Err(TryRecvError::Empty) => hint::spin_loop(),
+ Err(TryRecvError::Disconnected) => panic!("Should not be disconnected"),
+ }
+ });
+
+ assert!(sender.send(19).is_ok());
+ assert_eq!(t.join().unwrap(), 19);
+ })
+}
+
+#[cfg(feature = "std")]
+#[test]
+fn send_recv_different_threads() {
+ loom::model(|| {
+ let (sender, receiver) = oneshot::channel();
+ let t2 = thread::spawn(move || {
+ assert_eq!(receiver.recv_timeout(Duration::from_millis(1)), Ok(9));
+ });
+ let t1 = thread::spawn(move || {
+ sender.send(9u128).unwrap();
+ });
+ t1.join().unwrap();
+ t2.join().unwrap();
+ })
+}
+
+#[cfg(feature = "std")]
+#[test]
+fn recv_drop_sender_different_threads() {
+ loom::model(|| {
+ let (sender, receiver) = oneshot::channel::<u128>();
+ let t2 = thread::spawn(move || {
+ assert!(receiver.recv_timeout(Duration::from_millis(0)).is_err());
+ });
+ let t1 = thread::spawn(move || {
+ drop(sender);
+ });
+ t1.join().unwrap();
+ t2.join().unwrap();
+ })
+}
+
+#[cfg(feature = "async")]
+#[test]
+fn async_recv() {
+ loom::model(|| {
+ let (sender, receiver) = oneshot::channel::<u128>();
+ let t1 = thread::spawn(move || {
+ sender.send(987).unwrap();
+ });
+ assert_eq!(loom::future::block_on(receiver), Ok(987));
+ t1.join().unwrap();
+ })
+}
+
+#[cfg(feature = "async")]
+#[test]
+fn send_then_poll() {
+ loom::model(|| {
+ let (sender, mut receiver) = oneshot::channel::<u128>();
+ sender.send(1234).unwrap();
+
+ let (waker, waker_handle) = helpers::waker::waker();
+ let mut context = task::Context::from_waker(&waker);
+
+ assert_eq!(
+ Pin::new(&mut receiver).poll(&mut context),
+ Poll::Ready(Ok(1234))
+ );
+ assert_eq!(waker_handle.clone_count(), 0);
+ assert_eq!(waker_handle.drop_count(), 0);
+ assert_eq!(waker_handle.wake_count(), 0);
+ })
+}
+
+#[cfg(feature = "async")]
+#[test]
+fn poll_then_send() {
+ loom::model(|| {
+ let (sender, mut receiver) = oneshot::channel::<u128>();
+
+ let (waker, waker_handle) = helpers::waker::waker();
+ let mut context = task::Context::from_waker(&waker);
+
+ assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending);
+ assert_eq!(waker_handle.clone_count(), 1);
+ assert_eq!(waker_handle.drop_count(), 0);
+ assert_eq!(waker_handle.wake_count(), 0);
+
+ sender.send(1234).unwrap();
+ assert_eq!(waker_handle.clone_count(), 1);
+ assert_eq!(waker_handle.drop_count(), 1);
+ assert_eq!(waker_handle.wake_count(), 1);
+
+ assert_eq!(
+ Pin::new(&mut receiver).poll(&mut context),
+ Poll::Ready(Ok(1234))
+ );
+ assert_eq!(waker_handle.clone_count(), 1);
+ assert_eq!(waker_handle.drop_count(), 1);
+ assert_eq!(waker_handle.wake_count(), 1);
+ })
+}
+
+#[cfg(feature = "async")]
+#[test]
+fn poll_with_different_wakers() {
+ loom::model(|| {
+ let (sender, mut receiver) = oneshot::channel::<u128>();
+
+ let (waker1, waker_handle1) = helpers::waker::waker();
+ let mut context1 = task::Context::from_waker(&waker1);
+
+ assert_eq!(Pin::new(&mut receiver).poll(&mut context1), Poll::Pending);
+ assert_eq!(waker_handle1.clone_count(), 1);
+ assert_eq!(waker_handle1.drop_count(), 0);
+ assert_eq!(waker_handle1.wake_count(), 0);
+
+ let (waker2, waker_handle2) = helpers::waker::waker();
+ let mut context2 = task::Context::from_waker(&waker2);
+
+ assert_eq!(Pin::new(&mut receiver).poll(&mut context2), Poll::Pending);
+ assert_eq!(waker_handle1.clone_count(), 1);
+ assert_eq!(waker_handle1.drop_count(), 1);
+ assert_eq!(waker_handle1.wake_count(), 0);
+
+ assert_eq!(waker_handle2.clone_count(), 1);
+ assert_eq!(waker_handle2.drop_count(), 0);
+ assert_eq!(waker_handle2.wake_count(), 0);
+
+ // Sending should cause the waker from the latest poll to be woken up
+ sender.send(1234).unwrap();
+ assert_eq!(waker_handle1.clone_count(), 1);
+ assert_eq!(waker_handle1.drop_count(), 1);
+ assert_eq!(waker_handle1.wake_count(), 0);
+
+ assert_eq!(waker_handle2.clone_count(), 1);
+ assert_eq!(waker_handle2.drop_count(), 1);
+ assert_eq!(waker_handle2.wake_count(), 1);
+ })
+}
+
+#[cfg(feature = "async")]
+#[test]
+fn poll_then_try_recv() {
+ loom::model(|| {
+ let (_sender, mut receiver) = oneshot::channel::<u128>();
+
+ let (waker, waker_handle) = helpers::waker::waker();
+ let mut context = task::Context::from_waker(&waker);
+
+ assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending);
+ assert_eq!(waker_handle.clone_count(), 1);
+ assert_eq!(waker_handle.drop_count(), 0);
+ assert_eq!(waker_handle.wake_count(), 0);
+
+ assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
+
+ assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending);
+ assert_eq!(waker_handle.clone_count(), 2);
+ assert_eq!(waker_handle.drop_count(), 1);
+ assert_eq!(waker_handle.wake_count(), 0);
+ })
+}
+
+#[cfg(feature = "async")]
+#[test]
+fn poll_then_try_recv_while_sending() {
+ loom::model(|| {
+ let (sender, mut receiver) = oneshot::channel::<u128>();
+
+ let (waker, waker_handle) = helpers::waker::waker();
+ let mut context = task::Context::from_waker(&waker);
+
+ assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending);
+ assert_eq!(waker_handle.clone_count(), 1);
+ assert_eq!(waker_handle.drop_count(), 0);
+ assert_eq!(waker_handle.wake_count(), 0);
+
+ let t = thread::spawn(move || {
+ sender.send(1234).unwrap();
+ });
+
+ let msg = loop {
+ match receiver.try_recv() {
+ Ok(msg) => break msg,
+ Err(TryRecvError::Empty) => hint::spin_loop(),
+ Err(TryRecvError::Disconnected) => panic!("Should not be disconnected"),
+ }
+ };
+ assert_eq!(msg, 1234);
+ assert_eq!(waker_handle.clone_count(), 1);
+ assert_eq!(waker_handle.drop_count(), 1);
+ assert_eq!(waker_handle.wake_count(), 1);
+
+ t.join().unwrap();
+ })
+}
diff --git a/tests/raw.rs b/tests/raw.rs
new file mode 100644
index 0000000..e38dc45
--- /dev/null
+++ b/tests/raw.rs
@@ -0,0 +1,46 @@
+#![cfg(not(loom))]
+
+use oneshot::{channel, Receiver, Sender};
+
+#[test]
+fn test_raw_sender() {
+ let (sender, receiver) = channel::<u32>();
+ let raw = sender.into_raw();
+ let recreated = unsafe { Sender::<u32>::from_raw(raw) };
+ recreated
+ .send(100)
+ .unwrap_or_else(|e| panic!("error sending after into_raw/from_raw roundtrip: {e}"));
+ assert_eq!(receiver.try_recv(), Ok(100))
+}
+
+#[test]
+fn test_raw_receiver() {
+ let (sender, receiver) = channel::<u32>();
+ let raw = receiver.into_raw();
+ sender.send(100).unwrap();
+ let recreated = unsafe { Receiver::<u32>::from_raw(raw) };
+ assert_eq!(
+ recreated
+ .try_recv()
+ .unwrap_or_else(|e| panic!("error receiving after into_raw/from_raw roundtrip: {e}")),
+ 100
+ )
+}
+
+#[test]
+fn test_raw_sender_and_receiver() {
+ let (sender, receiver) = channel::<u32>();
+ let raw_receiver = receiver.into_raw();
+ let raw_sender = sender.into_raw();
+
+ let recreated_sender = unsafe { Sender::<u32>::from_raw(raw_sender) };
+ recreated_sender.send(100).unwrap();
+
+ let recreated_receiver = unsafe { Receiver::<u32>::from_raw(raw_receiver) };
+ assert_eq!(
+ recreated_receiver
+ .try_recv()
+ .unwrap_or_else(|e| panic!("error receiving after into_raw/from_raw roundtrip: {e}")),
+ 100
+ )
+}
diff --git a/tests/sync.rs b/tests/sync.rs
new file mode 100644
index 0000000..c6ba081
--- /dev/null
+++ b/tests/sync.rs
@@ -0,0 +1,343 @@
+use core::mem;
+use oneshot::TryRecvError;
+
+#[cfg(feature = "std")]
+use oneshot::{RecvError, RecvTimeoutError};
+#[cfg(feature = "std")]
+use std::time::{Duration, Instant};
+
+#[cfg(feature = "std")]
+mod thread {
+ #[cfg(loom)]
+ pub use loom::thread::spawn;
+ #[cfg(not(loom))]
+ pub use std::thread::{sleep, spawn};
+
+ #[cfg(loom)]
+ pub fn sleep(_timeout: core::time::Duration) {
+ loom::thread::yield_now()
+ }
+}
+
+mod helpers;
+use helpers::{maybe_loom_model, DropCounter};
+
+#[test]
+fn send_before_try_recv() {
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel();
+ assert!(sender.send(19i128).is_ok());
+
+ assert_eq!(receiver.try_recv(), Ok(19i128));
+ assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
+ #[cfg(feature = "std")]
+ {
+ assert_eq!(receiver.recv_ref(), Err(RecvError));
+ assert!(receiver.recv_timeout(Duration::from_secs(1)).is_err());
+ }
+ })
+}
+
+#[cfg(feature = "std")]
+#[test]
+fn send_before_recv() {
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel::<()>();
+ assert!(sender.send(()).is_ok());
+ assert_eq!(receiver.recv(), Ok(()));
+ });
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel::<u8>();
+ assert!(sender.send(19).is_ok());
+ assert_eq!(receiver.recv(), Ok(19));
+ });
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel::<u64>();
+ assert!(sender.send(21).is_ok());
+ assert_eq!(receiver.recv(), Ok(21));
+ });
+ // FIXME: This test does not work with loom. There is something that happens after the
+ // channel object becomes larger than ~500 bytes and that makes an atomic read from the state
+ // result in "signal: 10, SIGBUS: access to undefined memory"
+ #[cfg(not(loom))]
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel::<[u8; 4096]>();
+ assert!(sender.send([0b10101010; 4096]).is_ok());
+ assert!(receiver.recv().unwrap()[..] == [0b10101010; 4096][..]);
+ });
+}
+
+#[cfg(feature = "std")]
+#[test]
+fn send_before_recv_ref() {
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel();
+ assert!(sender.send(19i128).is_ok());
+
+ assert_eq!(receiver.recv_ref(), Ok(19i128));
+ assert_eq!(receiver.recv_ref(), Err(RecvError));
+ assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
+ assert!(receiver.recv_timeout(Duration::from_secs(1)).is_err());
+ })
+}
+
+#[cfg(feature = "std")]
+#[test]
+fn send_before_recv_timeout() {
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel();
+ assert!(sender.send(19i128).is_ok());
+
+ let start = Instant::now();
+ let timeout = Duration::from_secs(1);
+ assert_eq!(receiver.recv_timeout(timeout), Ok(19i128));
+ assert!(start.elapsed() < Duration::from_millis(100));
+
+ assert!(receiver.recv_timeout(timeout).is_err());
+ assert!(receiver.try_recv().is_err());
+ assert!(receiver.recv().is_err());
+ })
+}
+
+#[test]
+fn send_then_drop_receiver() {
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel();
+ assert!(sender.send(19i128).is_ok());
+ mem::drop(receiver);
+ })
+}
+
+#[test]
+fn send_with_dropped_receiver() {
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel();
+ mem::drop(receiver);
+ let send_error = sender.send(5u128).unwrap_err();
+ assert_eq!(*send_error.as_inner(), 5);
+ assert_eq!(send_error.into_inner(), 5);
+ })
+}
+
+#[test]
+fn try_recv_with_dropped_sender() {
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel::<u128>();
+ mem::drop(sender);
+ receiver.try_recv().unwrap_err();
+ })
+}
+
+#[cfg(feature = "std")]
+#[test]
+fn recv_with_dropped_sender() {
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel::<u128>();
+ mem::drop(sender);
+ receiver.recv().unwrap_err();
+ })
+}
+
+#[cfg(feature = "std")]
+#[test]
+fn recv_before_send() {
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel();
+ let t = thread::spawn(move || {
+ thread::sleep(Duration::from_millis(2));
+ sender.send(9u128).unwrap();
+ });
+ assert_eq!(receiver.recv(), Ok(9));
+ t.join().unwrap();
+ })
+}
+
+#[cfg(feature = "std")]
+#[test]
+fn recv_timeout_before_send() {
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel();
+ let t = thread::spawn(move || {
+ thread::sleep(Duration::from_millis(2));
+ sender.send(9u128).unwrap();
+ });
+ assert_eq!(receiver.recv_timeout(Duration::from_secs(1)), Ok(9));
+ t.join().unwrap();
+ })
+}
+
+#[cfg(feature = "std")]
+#[test]
+fn recv_before_send_then_drop_sender() {
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel::<u128>();
+ let t = thread::spawn(move || {
+ thread::sleep(Duration::from_millis(10));
+ mem::drop(sender);
+ });
+ assert!(receiver.recv().is_err());
+ t.join().unwrap();
+ })
+}
+
+#[cfg(feature = "std")]
+#[test]
+fn recv_timeout_before_send_then_drop_sender() {
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel::<u128>();
+ let t = thread::spawn(move || {
+ thread::sleep(Duration::from_millis(10));
+ mem::drop(sender);
+ });
+ assert!(receiver.recv_timeout(Duration::from_secs(1)).is_err());
+ t.join().unwrap();
+ })
+}
+
+#[test]
+fn try_recv() {
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel::<u128>();
+ assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
+ mem::drop(sender)
+ })
+}
+
+#[cfg(feature = "std")]
+#[test]
+fn try_recv_then_drop_receiver() {
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel::<u128>();
+ let t1 = thread::spawn(move || {
+ let _ = sender.send(42);
+ });
+ let t2 = thread::spawn(move || {
+ assert!(matches!(
+ receiver.try_recv(),
+ Ok(42) | Err(TryRecvError::Empty)
+ ));
+ mem::drop(receiver);
+ });
+ t1.join().unwrap();
+ t2.join().unwrap();
+ })
+}
+
+#[cfg(feature = "std")]
+#[test]
+fn recv_deadline_and_timeout_no_time() {
+ maybe_loom_model(|| {
+ let (_sender, receiver) = oneshot::channel::<u128>();
+
+ let start = Instant::now();
+ assert_eq!(
+ receiver.recv_deadline(start),
+ Err(RecvTimeoutError::Timeout)
+ );
+ assert!(start.elapsed() < Duration::from_millis(200));
+
+ let start = Instant::now();
+ assert_eq!(
+ receiver.recv_timeout(Duration::from_millis(0)),
+ Err(RecvTimeoutError::Timeout)
+ );
+ assert!(start.elapsed() < Duration::from_millis(200));
+ })
+}
+
+// This test doesn't give meaningful results when run with oneshot_test_delay and loom
+#[cfg(all(feature = "std", not(all(oneshot_test_delay, loom))))]
+#[test]
+fn recv_deadline_time_should_elapse() {
+ maybe_loom_model(|| {
+ let (_sender, receiver) = oneshot::channel::<u128>();
+
+ let start = Instant::now();
+ #[cfg(not(loom))]
+ let timeout = Duration::from_millis(100);
+ #[cfg(loom)]
+ let timeout = Duration::from_millis(1);
+ assert_eq!(
+ receiver.recv_deadline(start + timeout),
+ Err(RecvTimeoutError::Timeout)
+ );
+ assert!(start.elapsed() > timeout);
+ assert!(start.elapsed() < timeout * 3);
+ })
+}
+
+#[cfg(all(feature = "std", not(all(oneshot_test_delay, loom))))]
+#[test]
+fn recv_timeout_time_should_elapse() {
+ maybe_loom_model(|| {
+ let (_sender, receiver) = oneshot::channel::<u128>();
+
+ let start = Instant::now();
+ #[cfg(not(loom))]
+ let timeout = Duration::from_millis(100);
+ #[cfg(loom)]
+ let timeout = Duration::from_millis(1);
+
+ assert_eq!(
+ receiver.recv_timeout(timeout),
+ Err(RecvTimeoutError::Timeout)
+ );
+ assert!(start.elapsed() > timeout);
+ assert!(start.elapsed() < timeout * 3);
+ })
+}
+
+#[cfg(not(loom))]
+#[test]
+fn non_send_type_can_be_used_on_same_thread() {
+ use std::ptr;
+
+ #[derive(Debug, Eq, PartialEq)]
+ struct NotSend(*mut ());
+
+ let (sender, receiver) = oneshot::channel();
+ sender.send(NotSend(ptr::null_mut())).unwrap();
+ let reply = receiver.try_recv().unwrap();
+ assert_eq!(reply, NotSend(ptr::null_mut()));
+}
+
+#[test]
+fn message_in_channel_dropped_on_receiver_drop() {
+ maybe_loom_model(|| {
+ let (sender, receiver) = oneshot::channel();
+ let (message, counter) = DropCounter::new(());
+ assert_eq!(counter.count(), 0);
+ sender.send(message).unwrap();
+ assert_eq!(counter.count(), 0);
+ mem::drop(receiver);
+ assert_eq!(counter.count(), 1);
+ })
+}
+
+#[test]
+fn send_error_drops_message_correctly() {
+ maybe_loom_model(|| {
+ let (sender, _) = oneshot::channel();
+ let (message, counter) = DropCounter::new(());
+
+ let send_error = sender.send(message).unwrap_err();
+ assert_eq!(counter.count(), 0);
+ mem::drop(send_error);
+ assert_eq!(counter.count(), 1);
+ });
+}
+
+#[test]
+fn send_error_drops_message_correctly_on_into_inner() {
+ maybe_loom_model(|| {
+ let (sender, _) = oneshot::channel();
+ let (message, counter) = DropCounter::new(());
+
+ let send_error = sender.send(message).unwrap_err();
+ assert_eq!(counter.count(), 0);
+ let message = send_error.into_inner();
+ assert_eq!(counter.count(), 0);
+ mem::drop(message);
+ assert_eq!(counter.count(), 1);
+ });
+}