aboutsummaryrefslogtreecommitdiff
path: root/src/sync/mpsc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/mpsc.rs')
-rw-r--r--src/sync/mpsc.rs59
1 files changed, 50 insertions, 9 deletions
diff --git a/src/sync/mpsc.rs b/src/sync/mpsc.rs
index 55ed5c4..fd48c72 100644
--- a/src/sync/mpsc.rs
+++ b/src/sync/mpsc.rs
@@ -44,7 +44,7 @@ enum State<T> {
pub struct PollSender<T> {
sender: Option<Sender<T>>,
state: State<T>,
- acquire: ReusableBoxFuture<'static, Result<OwnedPermit<T>, PollSendError<T>>>,
+ acquire: PollSenderFuture<T>,
}
// Creates a future for acquiring a permit from the underlying channel. This is used to ensure
@@ -64,13 +64,56 @@ async fn make_acquire_future<T>(
}
}
-impl<T: Send + 'static> PollSender<T> {
+type InnerFuture<'a, T> = ReusableBoxFuture<'a, Result<OwnedPermit<T>, PollSendError<T>>>;
+
+#[derive(Debug)]
+// TODO: This should be replace with a type_alias_impl_trait to eliminate `'static` and all the transmutes
+struct PollSenderFuture<T>(InnerFuture<'static, T>);
+
+impl<T> PollSenderFuture<T> {
+ /// Create with an empty inner future with no `Send` bound.
+ fn empty() -> Self {
+ // We don't use `make_acquire_future` here because our relaxed bounds on `T` are not
+ // compatible with the transitive bounds required by `Sender<T>`.
+ Self(ReusableBoxFuture::new(async { unreachable!() }))
+ }
+}
+
+impl<T: Send> PollSenderFuture<T> {
+ /// Create with an empty inner future.
+ fn new() -> Self {
+ let v = InnerFuture::new(make_acquire_future(None));
+ // This is safe because `make_acquire_future(None)` is actually `'static`
+ Self(unsafe { mem::transmute::<InnerFuture<'_, T>, InnerFuture<'static, T>>(v) })
+ }
+
+ /// Poll the inner future.
+ fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<OwnedPermit<T>, PollSendError<T>>> {
+ self.0.poll(cx)
+ }
+
+ /// Replace the inner future.
+ fn set(&mut self, sender: Option<Sender<T>>) {
+ let inner: *mut InnerFuture<'static, T> = &mut self.0;
+ let inner: *mut InnerFuture<'_, T> = inner.cast();
+ // SAFETY: The `make_acquire_future(sender)` future must not exist after the type `T`
+ // becomes invalid, and this casts away the type-level lifetime check for that. However, the
+ // inner future is never moved out of this `PollSenderFuture<T>`, so the future will not
+ // live longer than the `PollSenderFuture<T>` lives. A `PollSenderFuture<T>` is guaranteed
+ // to not exist after the type `T` becomes invalid, because it is annotated with a `T`, so
+ // this is ok.
+ let inner = unsafe { &mut *inner };
+ inner.set(make_acquire_future(sender));
+ }
+}
+
+impl<T: Send> PollSender<T> {
/// Creates a new `PollSender`.
pub fn new(sender: Sender<T>) -> Self {
Self {
sender: Some(sender.clone()),
state: State::Idle(sender),
- acquire: ReusableBoxFuture::new(make_acquire_future(None)),
+ acquire: PollSenderFuture::new(),
}
}
@@ -97,7 +140,7 @@ impl<T: Send + 'static> PollSender<T> {
State::Idle(sender) => {
// Start trying to acquire a permit to reserve a slot for our send, and
// immediately loop back around to poll it the first time.
- self.acquire.set(make_acquire_future(Some(sender)));
+ self.acquire.set(Some(sender));
(None, State::Acquiring)
}
State::Acquiring => match self.acquire.poll(cx) {
@@ -194,7 +237,7 @@ impl<T: Send + 'static> PollSender<T> {
match self.state {
State::Idle(_) => self.state = State::Closed,
State::Acquiring => {
- self.acquire.set(make_acquire_future(None));
+ self.acquire.set(None);
self.state = State::Closed;
}
_ => {}
@@ -215,7 +258,7 @@ impl<T: Send + 'static> PollSender<T> {
// We're currently trying to reserve a slot to send into.
State::Acquiring => {
// Replacing the future drops the in-flight one.
- self.acquire.set(make_acquire_future(None));
+ self.acquire.set(None);
// If we haven't closed yet, we have to clone our stored sender since we have no way
// to get it back from the acquire future we just dropped.
@@ -255,9 +298,7 @@ impl<T> Clone for PollSender<T> {
Self {
sender,
state,
- // We don't use `make_acquire_future` here because our relaxed bounds on `T` are not
- // compatible with the transitive bounds required by `Sender<T>`.
- acquire: ReusableBoxFuture::new(async { unreachable!() }),
+ acquire: PollSenderFuture::empty(),
}
}
}