aboutsummaryrefslogtreecommitdiff
path: root/tests/rt_threaded_alt.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/rt_threaded_alt.rs')
-rw-r--r--tests/rt_threaded_alt.rs717
1 files changed, 717 insertions, 0 deletions
diff --git a/tests/rt_threaded_alt.rs b/tests/rt_threaded_alt.rs
new file mode 100644
index 0000000..3e3ac07
--- /dev/null
+++ b/tests/rt_threaded_alt.rs
@@ -0,0 +1,717 @@
+#![warn(rust_2018_idioms)]
+#![cfg(all(feature = "full", not(target_os = "wasi")))]
+#![cfg(tokio_unstable)]
+
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::net::{TcpListener, TcpStream};
+use tokio::runtime;
+use tokio::sync::oneshot;
+use tokio_test::{assert_err, assert_ok};
+
+use futures::future::poll_fn;
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::Relaxed;
+use std::sync::{mpsc, Arc, Mutex};
+use std::task::{Context, Poll, Waker};
+
+macro_rules! cfg_metrics {
+ ($($t:tt)*) => {
+ #[cfg(tokio_unstable)]
+ {
+ $( $t )*
+ }
+ }
+}
+
+#[test]
+fn single_thread() {
+ // No panic when starting a runtime w/ a single thread
+ let _ = runtime::Builder::new_multi_thread_alt()
+ .enable_all()
+ .worker_threads(1)
+ .build()
+ .unwrap();
+}
+
+#[test]
+fn many_oneshot_futures() {
+ // used for notifying the main thread
+ const NUM: usize = 1_000;
+
+ for _ in 0..5 {
+ let (tx, rx) = mpsc::channel();
+
+ let rt = rt();
+ let cnt = Arc::new(AtomicUsize::new(0));
+
+ for _ in 0..NUM {
+ let cnt = cnt.clone();
+ let tx = tx.clone();
+
+ rt.spawn(async move {
+ let num = cnt.fetch_add(1, Relaxed) + 1;
+
+ if num == NUM {
+ tx.send(()).unwrap();
+ }
+ });
+ }
+
+ rx.recv().unwrap();
+
+ // Wait for the pool to shutdown
+ drop(rt);
+ }
+}
+
+#[test]
+fn spawn_two() {
+ let rt = rt();
+
+ let out = rt.block_on(async {
+ let (tx, rx) = oneshot::channel();
+
+ tokio::spawn(async move {
+ tokio::spawn(async move {
+ tx.send("ZOMG").unwrap();
+ });
+ });
+
+ assert_ok!(rx.await)
+ });
+
+ assert_eq!(out, "ZOMG");
+
+ cfg_metrics! {
+ let metrics = rt.metrics();
+ drop(rt);
+ assert_eq!(1, metrics.remote_schedule_count());
+
+ let mut local = 0;
+ for i in 0..metrics.num_workers() {
+ local += metrics.worker_local_schedule_count(i);
+ }
+
+ assert_eq!(1, local);
+ }
+}
+
+#[test]
+fn many_multishot_futures() {
+ const CHAIN: usize = 200;
+ const CYCLES: usize = 5;
+ const TRACKS: usize = 50;
+
+ for _ in 0..50 {
+ let rt = rt();
+ let mut start_txs = Vec::with_capacity(TRACKS);
+ let mut final_rxs = Vec::with_capacity(TRACKS);
+
+ for _ in 0..TRACKS {
+ let (start_tx, mut chain_rx) = tokio::sync::mpsc::channel(10);
+
+ for _ in 0..CHAIN {
+ let (next_tx, next_rx) = tokio::sync::mpsc::channel(10);
+
+ // Forward all the messages
+ rt.spawn(async move {
+ while let Some(v) = chain_rx.recv().await {
+ next_tx.send(v).await.unwrap();
+ }
+ });
+
+ chain_rx = next_rx;
+ }
+
+ // This final task cycles if needed
+ let (final_tx, final_rx) = tokio::sync::mpsc::channel(10);
+ let cycle_tx = start_tx.clone();
+ let mut rem = CYCLES;
+
+ rt.spawn(async move {
+ for _ in 0..CYCLES {
+ let msg = chain_rx.recv().await.unwrap();
+
+ rem -= 1;
+
+ if rem == 0 {
+ final_tx.send(msg).await.unwrap();
+ } else {
+ cycle_tx.send(msg).await.unwrap();
+ }
+ }
+ });
+
+ start_txs.push(start_tx);
+ final_rxs.push(final_rx);
+ }
+
+ {
+ rt.block_on(async move {
+ for start_tx in start_txs {
+ start_tx.send("ping").await.unwrap();
+ }
+
+ for mut final_rx in final_rxs {
+ final_rx.recv().await.unwrap();
+ }
+ });
+ }
+ }
+}
+
+#[test]
+fn lifo_slot_budget() {
+ async fn my_fn() {
+ spawn_another();
+ }
+
+ fn spawn_another() {
+ tokio::spawn(my_fn());
+ }
+
+ let rt = runtime::Builder::new_multi_thread_alt()
+ .enable_all()
+ .worker_threads(1)
+ .build()
+ .unwrap();
+
+ let (send, recv) = oneshot::channel();
+
+ rt.spawn(async move {
+ tokio::spawn(my_fn());
+ let _ = send.send(());
+ });
+
+ let _ = rt.block_on(recv);
+}
+
+#[test]
+fn spawn_shutdown() {
+ let rt = rt();
+ let (tx, rx) = mpsc::channel();
+
+ rt.block_on(async {
+ tokio::spawn(client_server(tx.clone()));
+ });
+
+ // Use spawner
+ rt.spawn(client_server(tx));
+
+ assert_ok!(rx.recv());
+ assert_ok!(rx.recv());
+
+ drop(rt);
+ assert_err!(rx.try_recv());
+}
+
+async fn client_server(tx: mpsc::Sender<()>) {
+ let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+
+ // Get the assigned address
+ let addr = assert_ok!(server.local_addr());
+
+ // Spawn the server
+ tokio::spawn(async move {
+ // Accept a socket
+ let (mut socket, _) = server.accept().await.unwrap();
+
+ // Write some data
+ socket.write_all(b"hello").await.unwrap();
+ });
+
+ let mut client = TcpStream::connect(&addr).await.unwrap();
+
+ let mut buf = vec![];
+ client.read_to_end(&mut buf).await.unwrap();
+
+ assert_eq!(buf, b"hello");
+ tx.send(()).unwrap();
+}
+
+#[test]
+fn drop_threadpool_drops_futures() {
+ for _ in 0..1_000 {
+ let num_inc = Arc::new(AtomicUsize::new(0));
+ let num_dec = Arc::new(AtomicUsize::new(0));
+ let num_drop = Arc::new(AtomicUsize::new(0));
+
+ struct Never(Arc<AtomicUsize>);
+
+ impl Future for Never {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
+ Poll::Pending
+ }
+ }
+
+ impl Drop for Never {
+ fn drop(&mut self) {
+ self.0.fetch_add(1, Relaxed);
+ }
+ }
+
+ let a = num_inc.clone();
+ let b = num_dec.clone();
+
+ let rt = runtime::Builder::new_multi_thread_alt()
+ .enable_all()
+ .on_thread_start(move || {
+ a.fetch_add(1, Relaxed);
+ })
+ .on_thread_stop(move || {
+ b.fetch_add(1, Relaxed);
+ })
+ .build()
+ .unwrap();
+
+ rt.spawn(Never(num_drop.clone()));
+
+ // Wait for the pool to shutdown
+ drop(rt);
+
+ // Assert that only a single thread was spawned.
+ let a = num_inc.load(Relaxed);
+ assert!(a >= 1);
+
+ // Assert that all threads shutdown
+ let b = num_dec.load(Relaxed);
+ assert_eq!(a, b);
+
+ // Assert that the future was dropped
+ let c = num_drop.load(Relaxed);
+ assert_eq!(c, 1);
+ }
+}
+
+#[test]
+fn start_stop_callbacks_called() {
+ use std::sync::atomic::{AtomicUsize, Ordering};
+
+ let after_start = Arc::new(AtomicUsize::new(0));
+ let before_stop = Arc::new(AtomicUsize::new(0));
+
+ let after_inner = after_start.clone();
+ let before_inner = before_stop.clone();
+ let rt = tokio::runtime::Builder::new_multi_thread_alt()
+ .enable_all()
+ .on_thread_start(move || {
+ after_inner.clone().fetch_add(1, Ordering::Relaxed);
+ })
+ .on_thread_stop(move || {
+ before_inner.clone().fetch_add(1, Ordering::Relaxed);
+ })
+ .build()
+ .unwrap();
+
+ let (tx, rx) = oneshot::channel();
+
+ rt.spawn(async move {
+ assert_ok!(tx.send(()));
+ });
+
+ assert_ok!(rt.block_on(rx));
+
+ drop(rt);
+
+ assert!(after_start.load(Ordering::Relaxed) > 0);
+ assert!(before_stop.load(Ordering::Relaxed) > 0);
+}
+
+#[test]
+fn blocking_task() {
+ // used for notifying the main thread
+ const NUM: usize = 1_000;
+
+ for _ in 0..10 {
+ let (tx, rx) = mpsc::channel();
+
+ let rt = rt();
+ let cnt = Arc::new(AtomicUsize::new(0));
+
+ // there are four workers in the pool
+ // so, if we run 4 blocking tasks, we know that handoff must have happened
+ let block = Arc::new(std::sync::Barrier::new(5));
+ for _ in 0..4 {
+ let block = block.clone();
+ rt.spawn(async move {
+ tokio::task::block_in_place(move || {
+ block.wait();
+ block.wait();
+ })
+ });
+ }
+ block.wait();
+
+ for _ in 0..NUM {
+ let cnt = cnt.clone();
+ let tx = tx.clone();
+
+ rt.spawn(async move {
+ let num = cnt.fetch_add(1, Relaxed) + 1;
+
+ if num == NUM {
+ tx.send(()).unwrap();
+ }
+ });
+ }
+
+ rx.recv().unwrap();
+
+ // Wait for the pool to shutdown
+ block.wait();
+ }
+}
+
+#[test]
+fn multi_threadpool() {
+ use tokio::sync::oneshot;
+
+ let rt1 = rt();
+ let rt2 = rt();
+
+ let (tx, rx) = oneshot::channel();
+ let (done_tx, done_rx) = mpsc::channel();
+
+ rt2.spawn(async move {
+ rx.await.unwrap();
+ done_tx.send(()).unwrap();
+ });
+
+ rt1.spawn(async move {
+ tx.send(()).unwrap();
+ });
+
+ done_rx.recv().unwrap();
+}
+
+// When `block_in_place` returns, it attempts to reclaim the yielded runtime
+// worker. In this case, the remainder of the task is on the runtime worker and
+// must take part in the cooperative task budgeting system.
+//
+// The test ensures that, when this happens, attempting to consume from a
+// channel yields occasionally even if there are values ready to receive.
+#[test]
+fn coop_and_block_in_place() {
+ let rt = tokio::runtime::Builder::new_multi_thread_alt()
+ // Setting max threads to 1 prevents another thread from claiming the
+ // runtime worker yielded as part of `block_in_place` and guarantees the
+ // same thread will reclaim the worker at the end of the
+ // `block_in_place` call.
+ .max_blocking_threads(1)
+ .build()
+ .unwrap();
+
+ rt.block_on(async move {
+ let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
+
+ // Fill the channel
+ for _ in 0..1024 {
+ tx.send(()).await.unwrap();
+ }
+
+ drop(tx);
+
+ tokio::spawn(async move {
+ // Block in place without doing anything
+ tokio::task::block_in_place(|| {});
+
+ // Receive all the values, this should trigger a `Pending` as the
+ // coop limit will be reached.
+ poll_fn(|cx| {
+ while let Poll::Ready(v) = {
+ tokio::pin! {
+ let fut = rx.recv();
+ }
+
+ Pin::new(&mut fut).poll(cx)
+ } {
+ if v.is_none() {
+ panic!("did not yield");
+ }
+ }
+
+ Poll::Ready(())
+ })
+ .await
+ })
+ .await
+ .unwrap();
+ });
+}
+
+#[test]
+fn yield_after_block_in_place() {
+ let rt = tokio::runtime::Builder::new_multi_thread_alt()
+ .worker_threads(1)
+ .build()
+ .unwrap();
+
+ rt.block_on(async {
+ tokio::spawn(async move {
+ // Block in place then enter a new runtime
+ tokio::task::block_in_place(|| {
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .build()
+ .unwrap();
+
+ rt.block_on(async {});
+ });
+
+ // Yield, then complete
+ tokio::task::yield_now().await;
+ })
+ .await
+ .unwrap()
+ });
+}
+
+// Testing this does not panic
+#[test]
+fn max_blocking_threads() {
+ let _rt = tokio::runtime::Builder::new_multi_thread_alt()
+ .max_blocking_threads(1)
+ .build()
+ .unwrap();
+}
+
+#[test]
+#[should_panic]
+fn max_blocking_threads_set_to_zero() {
+ let _rt = tokio::runtime::Builder::new_multi_thread_alt()
+ .max_blocking_threads(0)
+ .build()
+ .unwrap();
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn hang_on_shutdown() {
+ let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>();
+ tokio::spawn(async move {
+ tokio::task::block_in_place(|| sync_rx.recv().ok());
+ });
+
+ tokio::spawn(async {
+ tokio::time::sleep(std::time::Duration::from_secs(2)).await;
+ drop(sync_tx);
+ });
+ tokio::time::sleep(std::time::Duration::from_secs(1)).await;
+}
+
+/// Demonstrates tokio-rs/tokio#3869
+#[test]
+fn wake_during_shutdown() {
+ struct Shared {
+ waker: Option<Waker>,
+ }
+
+ struct MyFuture {
+ shared: Arc<Mutex<Shared>>,
+ put_waker: bool,
+ }
+
+ impl MyFuture {
+ fn new() -> (Self, Self) {
+ let shared = Arc::new(Mutex::new(Shared { waker: None }));
+ let f1 = MyFuture {
+ shared: shared.clone(),
+ put_waker: true,
+ };
+ let f2 = MyFuture {
+ shared,
+ put_waker: false,
+ };
+ (f1, f2)
+ }
+ }
+
+ impl Future for MyFuture {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ let me = Pin::into_inner(self);
+ let mut lock = me.shared.lock().unwrap();
+ if me.put_waker {
+ lock.waker = Some(cx.waker().clone());
+ }
+ Poll::Pending
+ }
+ }
+
+ impl Drop for MyFuture {
+ fn drop(&mut self) {
+ let mut lock = self.shared.lock().unwrap();
+ if !self.put_waker {
+ lock.waker.take().unwrap().wake();
+ }
+ drop(lock);
+ }
+ }
+
+ let rt = tokio::runtime::Builder::new_multi_thread_alt()
+ .worker_threads(1)
+ .enable_all()
+ .build()
+ .unwrap();
+
+ let (f1, f2) = MyFuture::new();
+
+ rt.spawn(f1);
+ rt.spawn(f2);
+
+ rt.block_on(async { tokio::time::sleep(tokio::time::Duration::from_millis(20)).await });
+}
+
+#[should_panic]
+#[tokio::test]
+async fn test_block_in_place1() {
+ tokio::task::block_in_place(|| {});
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn test_block_in_place2() {
+ tokio::task::block_in_place(|| {});
+}
+
+#[should_panic]
+#[tokio::main(flavor = "current_thread")]
+#[test]
+async fn test_block_in_place3() {
+ tokio::task::block_in_place(|| {});
+}
+
+#[tokio::main]
+#[test]
+async fn test_block_in_place4() {
+ tokio::task::block_in_place(|| {});
+}
+
+// Testing the tuning logic is tricky as it is inherently timing based, and more
+// of a heuristic than an exact behavior. This test checks that the interval
+// changes over time based on load factors. There are no assertions, completion
+// is sufficient. If there is a regression, this test will hang. In theory, we
+// could add limits, but that would be likely to fail on CI.
+#[test]
+#[cfg(not(tokio_no_tuning_tests))]
+fn test_tuning() {
+ use std::sync::atomic::AtomicBool;
+ use std::time::Duration;
+
+ let rt = runtime::Builder::new_multi_thread_alt()
+ .worker_threads(1)
+ .build()
+ .unwrap();
+
+ fn iter(flag: Arc<AtomicBool>, counter: Arc<AtomicUsize>, stall: bool) {
+ if flag.load(Relaxed) {
+ if stall {
+ std::thread::sleep(Duration::from_micros(5));
+ }
+
+ counter.fetch_add(1, Relaxed);
+ tokio::spawn(async move { iter(flag, counter, stall) });
+ }
+ }
+
+ let flag = Arc::new(AtomicBool::new(true));
+ let counter = Arc::new(AtomicUsize::new(61));
+ let interval = Arc::new(AtomicUsize::new(61));
+
+ {
+ let flag = flag.clone();
+ let counter = counter.clone();
+ rt.spawn(async move { iter(flag, counter, true) });
+ }
+
+ // Now, hammer the injection queue until the interval drops.
+ let mut n = 0;
+ loop {
+ let curr = interval.load(Relaxed);
+
+ if curr <= 8 {
+ n += 1;
+ } else {
+ n = 0;
+ }
+
+ // Make sure we get a few good rounds. Jitter in the tuning could result
+ // in one "good" value without being representative of reaching a good
+ // state.
+ if n == 3 {
+ break;
+ }
+
+ if Arc::strong_count(&interval) < 5_000 {
+ let counter = counter.clone();
+ let interval = interval.clone();
+
+ rt.spawn(async move {
+ let prev = counter.swap(0, Relaxed);
+ interval.store(prev, Relaxed);
+ });
+
+ std::thread::yield_now();
+ }
+ }
+
+ flag.store(false, Relaxed);
+
+ let w = Arc::downgrade(&interval);
+ drop(interval);
+
+ while w.strong_count() > 0 {
+ std::thread::sleep(Duration::from_micros(500));
+ }
+
+ // Now, run it again with a faster task
+ let flag = Arc::new(AtomicBool::new(true));
+ // Set it high, we know it shouldn't ever really be this high
+ let counter = Arc::new(AtomicUsize::new(10_000));
+ let interval = Arc::new(AtomicUsize::new(10_000));
+
+ {
+ let flag = flag.clone();
+ let counter = counter.clone();
+ rt.spawn(async move { iter(flag, counter, false) });
+ }
+
+ // Now, hammer the injection queue until the interval reaches the expected range.
+ let mut n = 0;
+ loop {
+ let curr = interval.load(Relaxed);
+
+ if curr <= 1_000 && curr > 32 {
+ n += 1;
+ } else {
+ n = 0;
+ }
+
+ if n == 3 {
+ break;
+ }
+
+ if Arc::strong_count(&interval) <= 5_000 {
+ let counter = counter.clone();
+ let interval = interval.clone();
+
+ rt.spawn(async move {
+ let prev = counter.swap(0, Relaxed);
+ interval.store(prev, Relaxed);
+ });
+ }
+
+ std::thread::yield_now();
+ }
+
+ flag.store(false, Relaxed);
+}
+
+fn rt() -> runtime::Runtime {
+ runtime::Builder::new_multi_thread_alt()
+ .enable_all()
+ .build()
+ .unwrap()
+}