aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs2
-rw-r--r--src/net/tcp/socket.rs121
-rw-r--r--src/net/unix/ucred.rs75
-rw-r--r--src/runtime/blocking/pool.rs2
-rw-r--r--src/runtime/blocking/task.rs3
-rw-r--r--src/runtime/handle.rs16
-rw-r--r--src/runtime/mod.rs7
-rw-r--r--src/task/blocking.rs1
-rw-r--r--src/task/local.rs2
-rw-r--r--src/task/spawn.rs1
-rw-r--r--src/util/trace.rs48
11 files changed, 233 insertions, 45 deletions
diff --git a/src/lib.rs b/src/lib.rs
index b14ae72..229f050 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,4 +1,4 @@
-#![doc(html_root_url = "https://docs.rs/tokio/0.3.2")]
+#![doc(html_root_url = "https://docs.rs/tokio/0.3.3")]
#![allow(
clippy::cognitive_complexity,
clippy::large_enum_variant,
diff --git a/src/net/tcp/socket.rs b/src/net/tcp/socket.rs
index 5b0f802..9e3ca99 100644
--- a/src/net/tcp/socket.rs
+++ b/src/net/tcp/socket.rs
@@ -183,6 +183,127 @@ impl TcpSocket {
self.inner.set_reuseaddr(reuseaddr)
}
+ /// Retrieves the value set for `SO_REUSEADDR` on this socket
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpSocket;
+ ///
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let addr = "127.0.0.1:8080".parse().unwrap();
+ ///
+ /// let socket = TcpSocket::new_v4()?;
+ /// socket.set_reuseaddr(true)?;
+ /// assert!(socket.reuseaddr().unwrap());
+ /// socket.bind(addr)?;
+ ///
+ /// let listener = socket.listen(1024)?;
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn reuseaddr(&self) -> io::Result<bool> {
+ self.inner.get_reuseaddr()
+ }
+
+ /// Allow the socket to bind to an in-use port. Only available for unix systems
+ /// (excluding Solaris & Illumos).
+ ///
+ /// Behavior is platform specific. Refer to the target platform's
+ /// documentation for more details.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpSocket;
+ ///
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let addr = "127.0.0.1:8080".parse().unwrap();
+ ///
+ /// let socket = TcpSocket::new_v4()?;
+ /// socket.set_reuseport(true)?;
+ /// socket.bind(addr)?;
+ ///
+ /// let listener = socket.listen(1024)?;
+ /// Ok(())
+ /// }
+ /// ```
+ #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
+ #[cfg_attr(
+ docsrs,
+ doc(cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos"))))
+ )]
+ pub fn set_reuseport(&self, reuseport: bool) -> io::Result<()> {
+ self.inner.set_reuseport(reuseport)
+ }
+
+ /// Allow the socket to bind to an in-use port. Only available for unix systems
+ /// (excluding Solaris & Illumos).
+ ///
+ /// Behavior is platform specific. Refer to the target platform's
+ /// documentation for more details.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpSocket;
+ ///
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let addr = "127.0.0.1:8080".parse().unwrap();
+ ///
+ /// let socket = TcpSocket::new_v4()?;
+ /// socket.set_reuseport(true)?;
+ /// assert!(socket.reuseport().unwrap());
+ /// socket.bind(addr)?;
+ ///
+ /// let listener = socket.listen(1024)?;
+ /// Ok(())
+ /// }
+ /// ```
+ #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
+ #[cfg_attr(
+ docsrs,
+ doc(cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos"))))
+ )]
+ pub fn reuseport(&self) -> io::Result<bool> {
+ self.inner.get_reuseport()
+ }
+
+ /// Get the local address of this socket.
+ ///
+ /// Will fail on windows if called before `bind`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpSocket;
+ ///
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let addr = "127.0.0.1:8080".parse().unwrap();
+ ///
+ /// let socket = TcpSocket::new_v4()?;
+ /// socket.bind(addr)?;
+ /// assert_eq!(socket.local_addr().unwrap().to_string(), "127.0.0.1:8080");
+ /// let listener = socket.listen(1024)?;
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.get_localaddr()
+ }
+
/// Bind the socket to the given address.
///
/// This calls the `bind(2)` operating-system function. Behavior is
diff --git a/src/net/unix/ucred.rs b/src/net/unix/ucred.rs
index ef214a7..7d73ee0 100644
--- a/src/net/unix/ucred.rs
+++ b/src/net/unix/ucred.rs
@@ -1,8 +1,10 @@
-use libc::{gid_t, uid_t};
+use libc::{gid_t, pid_t, uid_t};
/// Credentials of a process
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub struct UCred {
+ /// PID (process ID) of the process
+ pid: Option<pid_t>,
/// UID (user ID) of the process
uid: uid_t,
/// GID (group ID) of the process
@@ -19,6 +21,13 @@ impl UCred {
pub fn gid(&self) -> gid_t {
self.gid
}
+
+ /// Gets PID (process ID) of the process.
+ ///
+ /// This is only implemented under linux, android, IOS and MacOS
+ pub fn pid(&self) -> Option<pid_t> {
+ self.pid
+ }
}
#[cfg(any(target_os = "linux", target_os = "android"))]
@@ -26,12 +35,13 @@ pub(crate) use self::impl_linux::get_peer_cred;
#[cfg(any(
target_os = "dragonfly",
- target_os = "macos",
- target_os = "ios",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd"
))]
+pub(crate) use self::impl_bsd::get_peer_cred;
+
+#[cfg(any(target_os = "macos", target_os = "ios"))]
pub(crate) use self::impl_macos::get_peer_cred;
#[cfg(any(target_os = "solaris", target_os = "illumos"))]
@@ -77,6 +87,7 @@ pub(crate) mod impl_linux {
Ok(super::UCred {
uid: ucred.uid,
gid: ucred.gid,
+ pid: Some(ucred.pid),
})
} else {
Err(io::Error::last_os_error())
@@ -87,13 +98,11 @@ pub(crate) mod impl_linux {
#[cfg(any(
target_os = "dragonfly",
- target_os = "macos",
- target_os = "ios",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd"
))]
-pub(crate) mod impl_macos {
+pub(crate) mod impl_bsd {
use crate::net::unix::UnixStream;
use libc::getpeereid;
@@ -114,6 +123,54 @@ pub(crate) mod impl_macos {
Ok(super::UCred {
uid: uid.assume_init(),
gid: gid.assume_init(),
+ pid: None,
+ })
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+}
+
+#[cfg(any(target_os = "macos", target_os = "ios"))]
+pub(crate) mod impl_macos {
+ use crate::net::unix::UnixStream;
+
+ use libc::{c_void, getpeereid, getsockopt, pid_t, LOCAL_PEEREPID, SOL_LOCAL};
+ use std::io;
+ use std::mem::size_of;
+ use std::mem::MaybeUninit;
+ use std::os::unix::io::AsRawFd;
+
+ pub(crate) fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
+ unsafe {
+ let raw_fd = sock.as_raw_fd();
+
+ let mut uid = MaybeUninit::uninit();
+ let mut gid = MaybeUninit::uninit();
+ let mut pid: MaybeUninit<pid_t> = MaybeUninit::uninit();
+ let mut pid_size: MaybeUninit<u32> = MaybeUninit::new(size_of::<pid_t>() as u32);
+
+ if getsockopt(
+ raw_fd,
+ SOL_LOCAL,
+ LOCAL_PEEREPID,
+ pid.as_mut_ptr() as *mut c_void,
+ pid_size.as_mut_ptr(),
+ ) != 0
+ {
+ return Err(io::Error::last_os_error());
+ }
+
+ assert!(pid_size.assume_init() == (size_of::<pid_t>() as u32));
+
+ let ret = getpeereid(raw_fd, uid.as_mut_ptr(), gid.as_mut_ptr());
+
+ if ret == 0 {
+ Ok(super::UCred {
+ uid: uid.assume_init(),
+ gid: gid.assume_init(),
+ pid: Some(pid.assume_init()),
})
} else {
Err(io::Error::last_os_error())
@@ -154,7 +211,11 @@ pub(crate) mod impl_solaris {
ucred_free(cred);
- Ok(super::UCred { uid, gid })
+ Ok(super::UCred {
+ uid,
+ gid,
+ pid: None,
+ })
} else {
Err(io::Error::last_os_error())
}
diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs
index 2967a10..6b9fb1b 100644
--- a/src/runtime/blocking/pool.rs
+++ b/src/runtime/blocking/pool.rs
@@ -70,6 +70,7 @@ const KEEP_ALIVE: Duration = Duration::from_secs(10);
pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
{
let rt = context::current().expect("not currently running on the Tokio runtime.");
rt.spawn_blocking(func)
@@ -79,6 +80,7 @@ where
pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()>
where
F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
{
let rt = context::current().expect("not currently running on the Tokio runtime.");
diff --git a/src/runtime/blocking/task.rs b/src/runtime/blocking/task.rs
index a521af4..ee2d8d6 100644
--- a/src/runtime/blocking/task.rs
+++ b/src/runtime/blocking/task.rs
@@ -19,7 +19,8 @@ impl<T> Unpin for BlockingTask<T> {}
impl<T, R> Future for BlockingTask<T>
where
- T: FnOnce() -> R,
+ T: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
{
type Output = R;
diff --git a/src/runtime/handle.rs b/src/runtime/handle.rs
index b1e8d8f..72b9c06 100644
--- a/src/runtime/handle.rs
+++ b/src/runtime/handle.rs
@@ -39,13 +39,27 @@ impl Handle {
// context::enter(self.clone(), f)
// }
- /// Run the provided function on an executor dedicated to blocking operations.
+ /// Run the provided function on an executor dedicated to blocking
+ /// operations.
+ #[cfg_attr(tokio_track_caller, track_caller)]
pub(crate) fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
{
#[cfg(feature = "tracing")]
let func = {
+ #[cfg(tokio_track_caller)]
+ let location = std::panic::Location::caller();
+ #[cfg(tokio_track_caller)]
+ let span = tracing::trace_span!(
+ target: "tokio::task",
+ "task",
+ kind = %"blocking",
+ function = %std::any::type_name::<F>(),
+ spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()),
+ );
+ #[cfg(not(tokio_track_caller))]
let span = tracing::trace_span!(
target: "tokio::task",
"task",
diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs
index be4aa38..f85344d 100644
--- a/src/runtime/mod.rs
+++ b/src/runtime/mod.rs
@@ -357,11 +357,14 @@ cfg_rt! {
/// });
/// # }
/// ```
+ #[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
+ #[cfg(feature = "tracing")]
+ let future = crate::util::trace::task(future, "task");
match &self.kind {
#[cfg(feature = "rt-multi-thread")]
Kind::ThreadPool(exec) => exec.spawn(future),
@@ -385,9 +388,11 @@ cfg_rt! {
/// println!("now running on a worker thread");
/// });
/// # }
+ #[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
{
self.handle.spawn_blocking(func)
}
@@ -415,7 +420,7 @@ cfg_rt! {
///
/// # Panics
///
- /// This function panics if the provided future panics, or if not called within an
+ /// This function panics if the provided future panics, or if called within an
/// asynchronous execution context.
///
/// # Examples
diff --git a/src/task/blocking.rs b/src/task/blocking.rs
index fc6632b..36bc457 100644
--- a/src/task/blocking.rs
+++ b/src/task/blocking.rs
@@ -104,6 +104,7 @@ cfg_rt_multi_thread! {
/// # Ok(())
/// # }
/// ```
+#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
diff --git a/src/task/local.rs b/src/task/local.rs
index 5896126..566b2f2 100644
--- a/src/task/local.rs
+++ b/src/task/local.rs
@@ -190,6 +190,7 @@ cfg_rt! {
/// }).await;
/// }
/// ```
+ #[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
@@ -273,6 +274,7 @@ impl LocalSet {
/// }
/// ```
/// [`spawn_local`]: fn@spawn_local
+ #[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
diff --git a/src/task/spawn.rs b/src/task/spawn.rs
index 77acb57..a060852 100644
--- a/src/task/spawn.rs
+++ b/src/task/spawn.rs
@@ -122,6 +122,7 @@ cfg_rt! {
/// ```text
/// error[E0391]: cycle detected when processing `main`
/// ```
+ #[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn<T>(task: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
diff --git a/src/util/trace.rs b/src/util/trace.rs
index 18956a3..96a9db9 100644
--- a/src/util/trace.rs
+++ b/src/util/trace.rs
@@ -1,47 +1,27 @@
cfg_trace! {
cfg_rt! {
- use std::future::Future;
- use std::pin::Pin;
- use std::task::{Context, Poll};
- use pin_project_lite::pin_project;
-
- use tracing::Span;
-
- pin_project! {
- /// A future that has been instrumented with a `tracing` span.
- #[derive(Debug, Clone)]
- pub(crate) struct Instrumented<T> {
- #[pin]
- inner: T,
- span: Span,
- }
- }
-
- impl<T: Future> Future for Instrumented<T> {
- type Output = T::Output;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let this = self.project();
- let _enter = this.span.enter();
- this.inner.poll(cx)
- }
- }
-
- impl<T> Instrumented<T> {
- pub(crate) fn new(inner: T, span: Span) -> Self {
- Self { inner, span }
- }
- }
+ pub(crate) use tracing::instrument::Instrumented;
#[inline]
+ #[cfg_attr(tokio_track_caller, track_caller)]
pub(crate) fn task<F>(task: F, kind: &'static str) -> Instrumented<F> {
+ use tracing::instrument::Instrument;
+ #[cfg(tokio_track_caller)]
+ let location = std::panic::Location::caller();
+ #[cfg(tokio_track_caller)]
+ let span = tracing::trace_span!(
+ target: "tokio::task",
+ "task",
+ %kind,
+ spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()),
+ );
+ #[cfg(not(tokio_track_caller))]
let span = tracing::trace_span!(
target: "tokio::task",
"task",
%kind,
- future = %std::any::type_name::<F>(),
);
- Instrumented::new(task, span)
+ task.instrument(span)
}
}
}