aboutsummaryrefslogtreecommitdiff
path: root/src/signal/unix.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/signal/unix.rs')
-rw-r--r--src/signal/unix.rs142
1 files changed, 41 insertions, 101 deletions
diff --git a/src/signal/unix.rs b/src/signal/unix.rs
index b46b15c..aaaa75e 100644
--- a/src/signal/unix.rs
+++ b/src/signal/unix.rs
@@ -5,18 +5,21 @@
#![cfg(unix)]
-use crate::io::{AsyncRead, PollEvented};
use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage};
+use crate::sync::mpsc::error::TryRecvError;
use crate::sync::mpsc::{channel, Receiver};
use libc::c_int;
-use mio_uds::UnixStream;
+use mio::net::UnixStream;
use std::io::{self, Error, ErrorKind, Write};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Once;
use std::task::{Context, Poll};
+pub(crate) mod driver;
+use self::driver::Handle;
+
pub(crate) type OsStorage = Vec<SignalInfo>;
// Number of different unix signals
@@ -202,9 +205,9 @@ impl Default for SignalInfo {
/// The purpose of this signal handler is to primarily:
///
/// 1. Flag that our specific signal was received (e.g. store an atomic flag)
-/// 2. Wake up driver tasks by writing a byte to a pipe
+/// 2. Wake up the driver by writing a byte to a pipe
///
-/// Those two operations shoudl both be async-signal safe.
+/// Those two operations should both be async-signal safe.
fn action(globals: Pin<&'static Globals>, signal: c_int) {
globals.record_event(signal as EventId);
@@ -219,7 +222,7 @@ fn action(globals: Pin<&'static Globals>, signal: c_int) {
///
/// This will register the signal handler if it hasn't already been registered,
/// returning any error along the way if that fails.
-fn signal_enable(signal: c_int) -> io::Result<()> {
+fn signal_enable(signal: c_int, handle: Handle) -> io::Result<()> {
if signal < 0 || signal_hook_registry::FORBIDDEN.contains(&signal) {
return Err(Error::new(
ErrorKind::Other,
@@ -227,6 +230,9 @@ fn signal_enable(signal: c_int) -> io::Result<()> {
));
}
+ // Check that we have a signal driver running
+ handle.check_inner()?;
+
let globals = globals();
let siginfo = match globals.storage().get(signal as EventId) {
Some(slot) => slot,
@@ -254,63 +260,6 @@ fn signal_enable(signal: c_int) -> io::Result<()> {
}
}
-#[derive(Debug)]
-struct Driver {
- wakeup: PollEvented<UnixStream>,
-}
-
-impl Driver {
- fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
- // Drain the data from the pipe and maintain interest in getting more
- self.drain(cx);
- // Broadcast any signals which were received
- globals().broadcast();
-
- Poll::Pending
- }
-}
-
-impl Driver {
- fn new() -> io::Result<Driver> {
- // NB: We give each driver a "fresh" reciever file descriptor to avoid
- // the issues described in alexcrichton/tokio-process#42.
- //
- // In the past we would reuse the actual receiver file descriptor and
- // swallow any errors around double registration of the same descriptor.
- // I'm not sure if the second (failed) registration simply doesn't end up
- // receiving wake up notifications, or there could be some race condition
- // when consuming readiness events, but having distinct descriptors for
- // distinct PollEvented instances appears to mitigate this.
- //
- // Unfortunately we cannot just use a single global PollEvented instance
- // either, since we can't compare Handles or assume they will always
- // point to the exact same reactor.
- let stream = globals().receiver.try_clone()?;
- let wakeup = PollEvented::new(stream)?;
-
- Ok(Driver { wakeup })
- }
-
- /// Drain all data in the global receiver, ensuring we'll get woken up when
- /// there is a write on the other end.
- ///
- /// We do *NOT* use the existence of any read bytes as evidence a signal was
- /// received since the `pending` flags would have already been set if that
- /// was the case. See
- /// [#38](https://github.com/alexcrichton/tokio-signal/issues/38) for more
- /// info.
- fn drain(&mut self, cx: &mut Context<'_>) {
- loop {
- match Pin::new(&mut self.wakeup).poll_read(cx, &mut [0; 128]) {
- Poll::Ready(Ok(0)) => panic!("EOF on self-pipe"),
- Poll::Ready(Ok(_)) => {}
- Poll::Ready(Err(e)) => panic!("Bad read on self-pipe: {}", e),
- Poll::Pending => break,
- }
- }
- }
-}
-
/// A stream of events for receiving a particular type of OS signal.
///
/// In general signal handling on Unix is a pretty tricky topic, and this
@@ -376,7 +325,6 @@ impl Driver {
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct Signal {
- driver: Driver,
rx: Receiver<()>,
}
@@ -403,21 +351,21 @@ pub struct Signal {
/// * If the signal is one of
/// [`signal_hook::FORBIDDEN`](fn@signal_hook_registry::register#panics)
pub fn signal(kind: SignalKind) -> io::Result<Signal> {
+ signal_with_handle(kind, Handle::current())
+}
+
+pub(crate) fn signal_with_handle(kind: SignalKind, handle: Handle) -> io::Result<Signal> {
let signal = kind.0;
// Turn the signal delivery on once we are ready for it
- signal_enable(signal)?;
-
- // Ensure there's a driver for our associated event loop processing
- // signals.
- let driver = Driver::new()?;
+ signal_enable(signal, handle)?;
// One wakeup in a queue is enough, no need for us to buffer up any
// more.
let (tx, rx) = channel(1);
globals().register_listener(signal as EventId, tx);
- Ok(Signal { driver, rx })
+ Ok(Signal { rx })
}
impl Signal {
@@ -449,38 +397,14 @@ impl Signal {
poll_fn(|cx| self.poll_recv(cx)).await
}
- /// Polls to receive the next signal notification event, outside of an
- /// `async` context.
- ///
- /// `None` is returned if no more events can be received by this stream.
- ///
- /// # Examples
- ///
- /// Polling from a manually implemented future
- ///
- /// ```rust,no_run
- /// use std::pin::Pin;
- /// use std::future::Future;
- /// use std::task::{Context, Poll};
- /// use tokio::signal::unix::Signal;
- ///
- /// struct MyFuture {
- /// signal: Signal,
- /// }
- ///
- /// impl Future for MyFuture {
- /// type Output = Option<()>;
- ///
- /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- /// println!("polling MyFuture");
- /// self.signal.poll_recv(cx)
- /// }
- /// }
- /// ```
- pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
- let _ = self.driver.poll(cx);
+ pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
self.rx.poll_recv(cx)
}
+
+ /// Try to receive a signal notification without blocking or registering a waker.
+ pub(crate) fn try_recv(&mut self) -> Result<(), TryRecvError> {
+ self.rx.try_recv()
+ }
}
cfg_stream! {
@@ -493,6 +417,22 @@ cfg_stream! {
}
}
+// Work around for abstracting streams internally
+pub(crate) trait InternalStream: Unpin {
+ fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>;
+ fn try_recv(&mut self) -> Result<(), TryRecvError>;
+}
+
+impl InternalStream for Signal {
+ fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.poll_recv(cx)
+ }
+
+ fn try_recv(&mut self) -> Result<(), TryRecvError> {
+ self.try_recv()
+ }
+}
+
pub(crate) fn ctrl_c() -> io::Result<Signal> {
signal(SignalKind::interrupt())
}
@@ -503,11 +443,11 @@ mod tests {
#[test]
fn signal_enable_error_on_invalid_input() {
- signal_enable(-1).unwrap_err();
+ signal_enable(-1, Handle::default()).unwrap_err();
}
#[test]
fn signal_enable_error_on_forbidden_input() {
- signal_enable(signal_hook_registry::FORBIDDEN[0]).unwrap_err();
+ signal_enable(signal_hook_registry::FORBIDDEN[0], Handle::default()).unwrap_err();
}
}