diff options
Diffstat (limited to 'src/sync/mpsc.rs')
-rw-r--r-- | src/sync/mpsc.rs | 59 |
1 files changed, 50 insertions, 9 deletions
diff --git a/src/sync/mpsc.rs b/src/sync/mpsc.rs index 55ed5c4..fd48c72 100644 --- a/src/sync/mpsc.rs +++ b/src/sync/mpsc.rs @@ -44,7 +44,7 @@ enum State<T> { pub struct PollSender<T> { sender: Option<Sender<T>>, state: State<T>, - acquire: ReusableBoxFuture<'static, Result<OwnedPermit<T>, PollSendError<T>>>, + acquire: PollSenderFuture<T>, } // Creates a future for acquiring a permit from the underlying channel. This is used to ensure @@ -64,13 +64,56 @@ async fn make_acquire_future<T>( } } -impl<T: Send + 'static> PollSender<T> { +type InnerFuture<'a, T> = ReusableBoxFuture<'a, Result<OwnedPermit<T>, PollSendError<T>>>; + +#[derive(Debug)] +// TODO: This should be replace with a type_alias_impl_trait to eliminate `'static` and all the transmutes +struct PollSenderFuture<T>(InnerFuture<'static, T>); + +impl<T> PollSenderFuture<T> { + /// Create with an empty inner future with no `Send` bound. + fn empty() -> Self { + // We don't use `make_acquire_future` here because our relaxed bounds on `T` are not + // compatible with the transitive bounds required by `Sender<T>`. + Self(ReusableBoxFuture::new(async { unreachable!() })) + } +} + +impl<T: Send> PollSenderFuture<T> { + /// Create with an empty inner future. + fn new() -> Self { + let v = InnerFuture::new(make_acquire_future(None)); + // This is safe because `make_acquire_future(None)` is actually `'static` + Self(unsafe { mem::transmute::<InnerFuture<'_, T>, InnerFuture<'static, T>>(v) }) + } + + /// Poll the inner future. + fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<OwnedPermit<T>, PollSendError<T>>> { + self.0.poll(cx) + } + + /// Replace the inner future. + fn set(&mut self, sender: Option<Sender<T>>) { + let inner: *mut InnerFuture<'static, T> = &mut self.0; + let inner: *mut InnerFuture<'_, T> = inner.cast(); + // SAFETY: The `make_acquire_future(sender)` future must not exist after the type `T` + // becomes invalid, and this casts away the type-level lifetime check for that. However, the + // inner future is never moved out of this `PollSenderFuture<T>`, so the future will not + // live longer than the `PollSenderFuture<T>` lives. A `PollSenderFuture<T>` is guaranteed + // to not exist after the type `T` becomes invalid, because it is annotated with a `T`, so + // this is ok. + let inner = unsafe { &mut *inner }; + inner.set(make_acquire_future(sender)); + } +} + +impl<T: Send> PollSender<T> { /// Creates a new `PollSender`. pub fn new(sender: Sender<T>) -> Self { Self { sender: Some(sender.clone()), state: State::Idle(sender), - acquire: ReusableBoxFuture::new(make_acquire_future(None)), + acquire: PollSenderFuture::new(), } } @@ -97,7 +140,7 @@ impl<T: Send + 'static> PollSender<T> { State::Idle(sender) => { // Start trying to acquire a permit to reserve a slot for our send, and // immediately loop back around to poll it the first time. - self.acquire.set(make_acquire_future(Some(sender))); + self.acquire.set(Some(sender)); (None, State::Acquiring) } State::Acquiring => match self.acquire.poll(cx) { @@ -194,7 +237,7 @@ impl<T: Send + 'static> PollSender<T> { match self.state { State::Idle(_) => self.state = State::Closed, State::Acquiring => { - self.acquire.set(make_acquire_future(None)); + self.acquire.set(None); self.state = State::Closed; } _ => {} @@ -215,7 +258,7 @@ impl<T: Send + 'static> PollSender<T> { // We're currently trying to reserve a slot to send into. State::Acquiring => { // Replacing the future drops the in-flight one. - self.acquire.set(make_acquire_future(None)); + self.acquire.set(None); // If we haven't closed yet, we have to clone our stored sender since we have no way // to get it back from the acquire future we just dropped. @@ -255,9 +298,7 @@ impl<T> Clone for PollSender<T> { Self { sender, state, - // We don't use `make_acquire_future` here because our relaxed bounds on `T` are not - // compatible with the transitive bounds required by `Sender<T>`. - acquire: ReusableBoxFuture::new(async { unreachable!() }), + acquire: PollSenderFuture::empty(), } } } |