diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/lib.rs | 2 | ||||
-rw-r--r-- | src/net/tcp/socket.rs | 121 | ||||
-rw-r--r-- | src/net/unix/ucred.rs | 75 | ||||
-rw-r--r-- | src/runtime/blocking/pool.rs | 2 | ||||
-rw-r--r-- | src/runtime/blocking/task.rs | 3 | ||||
-rw-r--r-- | src/runtime/handle.rs | 16 | ||||
-rw-r--r-- | src/runtime/mod.rs | 7 | ||||
-rw-r--r-- | src/task/blocking.rs | 1 | ||||
-rw-r--r-- | src/task/local.rs | 2 | ||||
-rw-r--r-- | src/task/spawn.rs | 1 | ||||
-rw-r--r-- | src/util/trace.rs | 48 |
11 files changed, 233 insertions, 45 deletions
@@ -1,4 +1,4 @@ -#![doc(html_root_url = "https://docs.rs/tokio/0.3.2")] +#![doc(html_root_url = "https://docs.rs/tokio/0.3.3")] #![allow( clippy::cognitive_complexity, clippy::large_enum_variant, diff --git a/src/net/tcp/socket.rs b/src/net/tcp/socket.rs index 5b0f802..9e3ca99 100644 --- a/src/net/tcp/socket.rs +++ b/src/net/tcp/socket.rs @@ -183,6 +183,127 @@ impl TcpSocket { self.inner.set_reuseaddr(reuseaddr) } + /// Retrieves the value set for `SO_REUSEADDR` on this socket + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpSocket; + /// + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let addr = "127.0.0.1:8080".parse().unwrap(); + /// + /// let socket = TcpSocket::new_v4()?; + /// socket.set_reuseaddr(true)?; + /// assert!(socket.reuseaddr().unwrap()); + /// socket.bind(addr)?; + /// + /// let listener = socket.listen(1024)?; + /// Ok(()) + /// } + /// ``` + pub fn reuseaddr(&self) -> io::Result<bool> { + self.inner.get_reuseaddr() + } + + /// Allow the socket to bind to an in-use port. Only available for unix systems + /// (excluding Solaris & Illumos). + /// + /// Behavior is platform specific. Refer to the target platform's + /// documentation for more details. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpSocket; + /// + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let addr = "127.0.0.1:8080".parse().unwrap(); + /// + /// let socket = TcpSocket::new_v4()?; + /// socket.set_reuseport(true)?; + /// socket.bind(addr)?; + /// + /// let listener = socket.listen(1024)?; + /// Ok(()) + /// } + /// ``` + #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))] + #[cfg_attr( + docsrs, + doc(cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))) + )] + pub fn set_reuseport(&self, reuseport: bool) -> io::Result<()> { + self.inner.set_reuseport(reuseport) + } + + /// Allow the socket to bind to an in-use port. Only available for unix systems + /// (excluding Solaris & Illumos). + /// + /// Behavior is platform specific. Refer to the target platform's + /// documentation for more details. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpSocket; + /// + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let addr = "127.0.0.1:8080".parse().unwrap(); + /// + /// let socket = TcpSocket::new_v4()?; + /// socket.set_reuseport(true)?; + /// assert!(socket.reuseport().unwrap()); + /// socket.bind(addr)?; + /// + /// let listener = socket.listen(1024)?; + /// Ok(()) + /// } + /// ``` + #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))] + #[cfg_attr( + docsrs, + doc(cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))) + )] + pub fn reuseport(&self) -> io::Result<bool> { + self.inner.get_reuseport() + } + + /// Get the local address of this socket. + /// + /// Will fail on windows if called before `bind`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpSocket; + /// + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let addr = "127.0.0.1:8080".parse().unwrap(); + /// + /// let socket = TcpSocket::new_v4()?; + /// socket.bind(addr)?; + /// assert_eq!(socket.local_addr().unwrap().to_string(), "127.0.0.1:8080"); + /// let listener = socket.listen(1024)?; + /// Ok(()) + /// } + /// ``` + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.inner.get_localaddr() + } + /// Bind the socket to the given address. /// /// This calls the `bind(2)` operating-system function. Behavior is diff --git a/src/net/unix/ucred.rs b/src/net/unix/ucred.rs index ef214a7..7d73ee0 100644 --- a/src/net/unix/ucred.rs +++ b/src/net/unix/ucred.rs @@ -1,8 +1,10 @@ -use libc::{gid_t, uid_t}; +use libc::{gid_t, pid_t, uid_t}; /// Credentials of a process #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] pub struct UCred { + /// PID (process ID) of the process + pid: Option<pid_t>, /// UID (user ID) of the process uid: uid_t, /// GID (group ID) of the process @@ -19,6 +21,13 @@ impl UCred { pub fn gid(&self) -> gid_t { self.gid } + + /// Gets PID (process ID) of the process. + /// + /// This is only implemented under linux, android, IOS and MacOS + pub fn pid(&self) -> Option<pid_t> { + self.pid + } } #[cfg(any(target_os = "linux", target_os = "android"))] @@ -26,12 +35,13 @@ pub(crate) use self::impl_linux::get_peer_cred; #[cfg(any( target_os = "dragonfly", - target_os = "macos", - target_os = "ios", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd" ))] +pub(crate) use self::impl_bsd::get_peer_cred; + +#[cfg(any(target_os = "macos", target_os = "ios"))] pub(crate) use self::impl_macos::get_peer_cred; #[cfg(any(target_os = "solaris", target_os = "illumos"))] @@ -77,6 +87,7 @@ pub(crate) mod impl_linux { Ok(super::UCred { uid: ucred.uid, gid: ucred.gid, + pid: Some(ucred.pid), }) } else { Err(io::Error::last_os_error()) @@ -87,13 +98,11 @@ pub(crate) mod impl_linux { #[cfg(any( target_os = "dragonfly", - target_os = "macos", - target_os = "ios", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd" ))] -pub(crate) mod impl_macos { +pub(crate) mod impl_bsd { use crate::net::unix::UnixStream; use libc::getpeereid; @@ -114,6 +123,54 @@ pub(crate) mod impl_macos { Ok(super::UCred { uid: uid.assume_init(), gid: gid.assume_init(), + pid: None, + }) + } else { + Err(io::Error::last_os_error()) + } + } + } +} + +#[cfg(any(target_os = "macos", target_os = "ios"))] +pub(crate) mod impl_macos { + use crate::net::unix::UnixStream; + + use libc::{c_void, getpeereid, getsockopt, pid_t, LOCAL_PEEREPID, SOL_LOCAL}; + use std::io; + use std::mem::size_of; + use std::mem::MaybeUninit; + use std::os::unix::io::AsRawFd; + + pub(crate) fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> { + unsafe { + let raw_fd = sock.as_raw_fd(); + + let mut uid = MaybeUninit::uninit(); + let mut gid = MaybeUninit::uninit(); + let mut pid: MaybeUninit<pid_t> = MaybeUninit::uninit(); + let mut pid_size: MaybeUninit<u32> = MaybeUninit::new(size_of::<pid_t>() as u32); + + if getsockopt( + raw_fd, + SOL_LOCAL, + LOCAL_PEEREPID, + pid.as_mut_ptr() as *mut c_void, + pid_size.as_mut_ptr(), + ) != 0 + { + return Err(io::Error::last_os_error()); + } + + assert!(pid_size.assume_init() == (size_of::<pid_t>() as u32)); + + let ret = getpeereid(raw_fd, uid.as_mut_ptr(), gid.as_mut_ptr()); + + if ret == 0 { + Ok(super::UCred { + uid: uid.assume_init(), + gid: gid.assume_init(), + pid: Some(pid.assume_init()), }) } else { Err(io::Error::last_os_error()) @@ -154,7 +211,11 @@ pub(crate) mod impl_solaris { ucred_free(cred); - Ok(super::UCred { uid, gid }) + Ok(super::UCred { + uid, + gid, + pid: None, + }) } else { Err(io::Error::last_os_error()) } diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs index 2967a10..6b9fb1b 100644 --- a/src/runtime/blocking/pool.rs +++ b/src/runtime/blocking/pool.rs @@ -70,6 +70,7 @@ const KEEP_ALIVE: Duration = Duration::from_secs(10); pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, + R: Send + 'static, { let rt = context::current().expect("not currently running on the Tokio runtime."); rt.spawn_blocking(func) @@ -79,6 +80,7 @@ where pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()> where F: FnOnce() -> R + Send + 'static, + R: Send + 'static, { let rt = context::current().expect("not currently running on the Tokio runtime."); diff --git a/src/runtime/blocking/task.rs b/src/runtime/blocking/task.rs index a521af4..ee2d8d6 100644 --- a/src/runtime/blocking/task.rs +++ b/src/runtime/blocking/task.rs @@ -19,7 +19,8 @@ impl<T> Unpin for BlockingTask<T> {} impl<T, R> Future for BlockingTask<T> where - T: FnOnce() -> R, + T: FnOnce() -> R + Send + 'static, + R: Send + 'static, { type Output = R; diff --git a/src/runtime/handle.rs b/src/runtime/handle.rs index b1e8d8f..72b9c06 100644 --- a/src/runtime/handle.rs +++ b/src/runtime/handle.rs @@ -39,13 +39,27 @@ impl Handle { // context::enter(self.clone(), f) // } - /// Run the provided function on an executor dedicated to blocking operations. + /// Run the provided function on an executor dedicated to blocking + /// operations. + #[cfg_attr(tokio_track_caller, track_caller)] pub(crate) fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, + R: Send + 'static, { #[cfg(feature = "tracing")] let func = { + #[cfg(tokio_track_caller)] + let location = std::panic::Location::caller(); + #[cfg(tokio_track_caller)] + let span = tracing::trace_span!( + target: "tokio::task", + "task", + kind = %"blocking", + function = %std::any::type_name::<F>(), + spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), + ); + #[cfg(not(tokio_track_caller))] let span = tracing::trace_span!( target: "tokio::task", "task", diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index be4aa38..f85344d 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -357,11 +357,14 @@ cfg_rt! { /// }); /// # } /// ``` + #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static, { + #[cfg(feature = "tracing")] + let future = crate::util::trace::task(future, "task"); match &self.kind { #[cfg(feature = "rt-multi-thread")] Kind::ThreadPool(exec) => exec.spawn(future), @@ -385,9 +388,11 @@ cfg_rt! { /// println!("now running on a worker thread"); /// }); /// # } + #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, + R: Send + 'static, { self.handle.spawn_blocking(func) } @@ -415,7 +420,7 @@ cfg_rt! { /// /// # Panics /// - /// This function panics if the provided future panics, or if not called within an + /// This function panics if the provided future panics, or if called within an /// asynchronous execution context. /// /// # Examples diff --git a/src/task/blocking.rs b/src/task/blocking.rs index fc6632b..36bc457 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -104,6 +104,7 @@ cfg_rt_multi_thread! { /// # Ok(()) /// # } /// ``` +#[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, diff --git a/src/task/local.rs b/src/task/local.rs index 5896126..566b2f2 100644 --- a/src/task/local.rs +++ b/src/task/local.rs @@ -190,6 +190,7 @@ cfg_rt! { /// }).await; /// } /// ``` + #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output> where F: Future + 'static, @@ -273,6 +274,7 @@ impl LocalSet { /// } /// ``` /// [`spawn_local`]: fn@spawn_local + #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + 'static, diff --git a/src/task/spawn.rs b/src/task/spawn.rs index 77acb57..a060852 100644 --- a/src/task/spawn.rs +++ b/src/task/spawn.rs @@ -122,6 +122,7 @@ cfg_rt! { /// ```text /// error[E0391]: cycle detected when processing `main` /// ``` + #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn<T>(task: T) -> JoinHandle<T::Output> where T: Future + Send + 'static, diff --git a/src/util/trace.rs b/src/util/trace.rs index 18956a3..96a9db9 100644 --- a/src/util/trace.rs +++ b/src/util/trace.rs @@ -1,47 +1,27 @@ cfg_trace! { cfg_rt! { - use std::future::Future; - use std::pin::Pin; - use std::task::{Context, Poll}; - use pin_project_lite::pin_project; - - use tracing::Span; - - pin_project! { - /// A future that has been instrumented with a `tracing` span. - #[derive(Debug, Clone)] - pub(crate) struct Instrumented<T> { - #[pin] - inner: T, - span: Span, - } - } - - impl<T: Future> Future for Instrumented<T> { - type Output = T::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let this = self.project(); - let _enter = this.span.enter(); - this.inner.poll(cx) - } - } - - impl<T> Instrumented<T> { - pub(crate) fn new(inner: T, span: Span) -> Self { - Self { inner, span } - } - } + pub(crate) use tracing::instrument::Instrumented; #[inline] + #[cfg_attr(tokio_track_caller, track_caller)] pub(crate) fn task<F>(task: F, kind: &'static str) -> Instrumented<F> { + use tracing::instrument::Instrument; + #[cfg(tokio_track_caller)] + let location = std::panic::Location::caller(); + #[cfg(tokio_track_caller)] + let span = tracing::trace_span!( + target: "tokio::task", + "task", + %kind, + spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), + ); + #[cfg(not(tokio_track_caller))] let span = tracing::trace_span!( target: "tokio::task", "task", %kind, - future = %std::any::type_name::<F>(), ); - Instrumented::new(task, span) + task.instrument(span) } } } |