diff options
Diffstat (limited to 'src/signal/unix.rs')
-rw-r--r-- | src/signal/unix.rs | 142 |
1 files changed, 41 insertions, 101 deletions
diff --git a/src/signal/unix.rs b/src/signal/unix.rs index b46b15c..aaaa75e 100644 --- a/src/signal/unix.rs +++ b/src/signal/unix.rs @@ -5,18 +5,21 @@ #![cfg(unix)] -use crate::io::{AsyncRead, PollEvented}; use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage}; +use crate::sync::mpsc::error::TryRecvError; use crate::sync::mpsc::{channel, Receiver}; use libc::c_int; -use mio_uds::UnixStream; +use mio::net::UnixStream; use std::io::{self, Error, ErrorKind, Write}; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Once; use std::task::{Context, Poll}; +pub(crate) mod driver; +use self::driver::Handle; + pub(crate) type OsStorage = Vec<SignalInfo>; // Number of different unix signals @@ -202,9 +205,9 @@ impl Default for SignalInfo { /// The purpose of this signal handler is to primarily: /// /// 1. Flag that our specific signal was received (e.g. store an atomic flag) -/// 2. Wake up driver tasks by writing a byte to a pipe +/// 2. Wake up the driver by writing a byte to a pipe /// -/// Those two operations shoudl both be async-signal safe. +/// Those two operations should both be async-signal safe. fn action(globals: Pin<&'static Globals>, signal: c_int) { globals.record_event(signal as EventId); @@ -219,7 +222,7 @@ fn action(globals: Pin<&'static Globals>, signal: c_int) { /// /// This will register the signal handler if it hasn't already been registered, /// returning any error along the way if that fails. -fn signal_enable(signal: c_int) -> io::Result<()> { +fn signal_enable(signal: c_int, handle: Handle) -> io::Result<()> { if signal < 0 || signal_hook_registry::FORBIDDEN.contains(&signal) { return Err(Error::new( ErrorKind::Other, @@ -227,6 +230,9 @@ fn signal_enable(signal: c_int) -> io::Result<()> { )); } + // Check that we have a signal driver running + handle.check_inner()?; + let globals = globals(); let siginfo = match globals.storage().get(signal as EventId) { Some(slot) => slot, @@ -254,63 +260,6 @@ fn signal_enable(signal: c_int) -> io::Result<()> { } } -#[derive(Debug)] -struct Driver { - wakeup: PollEvented<UnixStream>, -} - -impl Driver { - fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> { - // Drain the data from the pipe and maintain interest in getting more - self.drain(cx); - // Broadcast any signals which were received - globals().broadcast(); - - Poll::Pending - } -} - -impl Driver { - fn new() -> io::Result<Driver> { - // NB: We give each driver a "fresh" reciever file descriptor to avoid - // the issues described in alexcrichton/tokio-process#42. - // - // In the past we would reuse the actual receiver file descriptor and - // swallow any errors around double registration of the same descriptor. - // I'm not sure if the second (failed) registration simply doesn't end up - // receiving wake up notifications, or there could be some race condition - // when consuming readiness events, but having distinct descriptors for - // distinct PollEvented instances appears to mitigate this. - // - // Unfortunately we cannot just use a single global PollEvented instance - // either, since we can't compare Handles or assume they will always - // point to the exact same reactor. - let stream = globals().receiver.try_clone()?; - let wakeup = PollEvented::new(stream)?; - - Ok(Driver { wakeup }) - } - - /// Drain all data in the global receiver, ensuring we'll get woken up when - /// there is a write on the other end. - /// - /// We do *NOT* use the existence of any read bytes as evidence a signal was - /// received since the `pending` flags would have already been set if that - /// was the case. See - /// [#38](https://github.com/alexcrichton/tokio-signal/issues/38) for more - /// info. - fn drain(&mut self, cx: &mut Context<'_>) { - loop { - match Pin::new(&mut self.wakeup).poll_read(cx, &mut [0; 128]) { - Poll::Ready(Ok(0)) => panic!("EOF on self-pipe"), - Poll::Ready(Ok(_)) => {} - Poll::Ready(Err(e)) => panic!("Bad read on self-pipe: {}", e), - Poll::Pending => break, - } - } - } -} - /// A stream of events for receiving a particular type of OS signal. /// /// In general signal handling on Unix is a pretty tricky topic, and this @@ -376,7 +325,6 @@ impl Driver { #[must_use = "streams do nothing unless polled"] #[derive(Debug)] pub struct Signal { - driver: Driver, rx: Receiver<()>, } @@ -403,21 +351,21 @@ pub struct Signal { /// * If the signal is one of /// [`signal_hook::FORBIDDEN`](fn@signal_hook_registry::register#panics) pub fn signal(kind: SignalKind) -> io::Result<Signal> { + signal_with_handle(kind, Handle::current()) +} + +pub(crate) fn signal_with_handle(kind: SignalKind, handle: Handle) -> io::Result<Signal> { let signal = kind.0; // Turn the signal delivery on once we are ready for it - signal_enable(signal)?; - - // Ensure there's a driver for our associated event loop processing - // signals. - let driver = Driver::new()?; + signal_enable(signal, handle)?; // One wakeup in a queue is enough, no need for us to buffer up any // more. let (tx, rx) = channel(1); globals().register_listener(signal as EventId, tx); - Ok(Signal { driver, rx }) + Ok(Signal { rx }) } impl Signal { @@ -449,38 +397,14 @@ impl Signal { poll_fn(|cx| self.poll_recv(cx)).await } - /// Polls to receive the next signal notification event, outside of an - /// `async` context. - /// - /// `None` is returned if no more events can be received by this stream. - /// - /// # Examples - /// - /// Polling from a manually implemented future - /// - /// ```rust,no_run - /// use std::pin::Pin; - /// use std::future::Future; - /// use std::task::{Context, Poll}; - /// use tokio::signal::unix::Signal; - /// - /// struct MyFuture { - /// signal: Signal, - /// } - /// - /// impl Future for MyFuture { - /// type Output = Option<()>; - /// - /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - /// println!("polling MyFuture"); - /// self.signal.poll_recv(cx) - /// } - /// } - /// ``` - pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { - let _ = self.driver.poll(cx); + pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { self.rx.poll_recv(cx) } + + /// Try to receive a signal notification without blocking or registering a waker. + pub(crate) fn try_recv(&mut self) -> Result<(), TryRecvError> { + self.rx.try_recv() + } } cfg_stream! { @@ -493,6 +417,22 @@ cfg_stream! { } } +// Work around for abstracting streams internally +pub(crate) trait InternalStream: Unpin { + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>; + fn try_recv(&mut self) -> Result<(), TryRecvError>; +} + +impl InternalStream for Signal { + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.poll_recv(cx) + } + + fn try_recv(&mut self) -> Result<(), TryRecvError> { + self.try_recv() + } +} + pub(crate) fn ctrl_c() -> io::Result<Signal> { signal(SignalKind::interrupt()) } @@ -503,11 +443,11 @@ mod tests { #[test] fn signal_enable_error_on_invalid_input() { - signal_enable(-1).unwrap_err(); + signal_enable(-1, Handle::default()).unwrap_err(); } #[test] fn signal_enable_error_on_forbidden_input() { - signal_enable(signal_hook_registry::FORBIDDEN[0]).unwrap_err(); + signal_enable(signal_hook_registry::FORBIDDEN[0], Handle::default()).unwrap_err(); } } |