aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/metrics/batch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/runtime/metrics/batch.rs')
-rw-r--r--src/runtime/metrics/batch.rs79
1 files changed, 63 insertions, 16 deletions
diff --git a/src/runtime/metrics/batch.rs b/src/runtime/metrics/batch.rs
index 4e6b28d..1d0f3de 100644
--- a/src/runtime/metrics/batch.rs
+++ b/src/runtime/metrics/batch.rs
@@ -1,8 +1,7 @@
-use crate::runtime::WorkerMetrics;
+use crate::runtime::metrics::{HistogramBatch, WorkerMetrics};
-use std::convert::TryFrom;
use std::sync::atomic::Ordering::Relaxed;
-use std::time::Instant;
+use std::time::{Duration, Instant};
pub(crate) struct MetricsBatch {
/// Number of times the worker parked.
@@ -33,11 +32,26 @@ pub(crate) struct MetricsBatch {
/// The total busy duration in nanoseconds.
busy_duration_total: u64,
- last_resume_time: Instant,
+
+ /// Instant at which work last resumed (continued after park).
+ processing_scheduled_tasks_started_at: Instant,
+
+ /// If `Some`, tracks poll times in nanoseconds
+ poll_timer: Option<PollTimer>,
+}
+
+struct PollTimer {
+ /// Histogram of poll counts within each band.
+ poll_counts: HistogramBatch,
+
+ /// Instant when the most recent task started polling.
+ poll_started_at: Instant,
}
impl MetricsBatch {
- pub(crate) fn new() -> MetricsBatch {
+ pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch {
+ let now = Instant::now();
+
MetricsBatch {
park_count: 0,
noop_count: 0,
@@ -48,11 +62,19 @@ impl MetricsBatch {
local_schedule_count: 0,
overflow_count: 0,
busy_duration_total: 0,
- last_resume_time: Instant::now(),
+ processing_scheduled_tasks_started_at: now,
+ poll_timer: worker_metrics
+ .poll_count_histogram
+ .as_ref()
+ .map(|worker_poll_counts| PollTimer {
+ poll_counts: HistogramBatch::from_histogram(worker_poll_counts),
+ poll_started_at: now,
+ }),
}
}
- pub(crate) fn submit(&mut self, worker: &WorkerMetrics) {
+ pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
+ worker.mean_poll_time.store(mean_poll_time, Relaxed);
worker.park_count.store(self.park_count, Relaxed);
worker.noop_count.store(self.noop_count, Relaxed);
worker.steal_count.store(self.steal_count, Relaxed);
@@ -69,6 +91,11 @@ impl MetricsBatch {
.local_schedule_count
.store(self.local_schedule_count, Relaxed);
worker.overflow_count.store(self.overflow_count, Relaxed);
+
+ if let Some(poll_timer) = &self.poll_timer {
+ let dst = worker.poll_count_histogram.as_ref().unwrap();
+ poll_timer.poll_counts.submit(dst);
+ }
}
/// The worker is about to park.
@@ -80,22 +107,38 @@ impl MetricsBatch {
} else {
self.poll_count_on_last_park = self.poll_count;
}
-
- let busy_duration = self.last_resume_time.elapsed();
- let busy_duration = u64::try_from(busy_duration.as_nanos()).unwrap_or(u64::MAX);
- self.busy_duration_total += busy_duration;
}
- pub(crate) fn returned_from_park(&mut self) {
- self.last_resume_time = Instant::now();
+ /// Start processing a batch of tasks
+ pub(crate) fn start_processing_scheduled_tasks(&mut self) {
+ self.processing_scheduled_tasks_started_at = Instant::now();
}
- pub(crate) fn inc_local_schedule_count(&mut self) {
- self.local_schedule_count += 1;
+ /// Stop processing a batch of tasks
+ pub(crate) fn end_processing_scheduled_tasks(&mut self) {
+ let busy_duration = self.processing_scheduled_tasks_started_at.elapsed();
+ self.busy_duration_total += duration_as_u64(busy_duration);
}
- pub(crate) fn incr_poll_count(&mut self) {
+ /// Start polling an individual task
+ pub(crate) fn start_poll(&mut self) {
self.poll_count += 1;
+
+ if let Some(poll_timer) = &mut self.poll_timer {
+ poll_timer.poll_started_at = Instant::now();
+ }
+ }
+
+ /// Stop polling an individual task
+ pub(crate) fn end_poll(&mut self) {
+ if let Some(poll_timer) = &mut self.poll_timer {
+ let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed());
+ poll_timer.poll_counts.measure(elapsed, 1);
+ }
+ }
+
+ pub(crate) fn inc_local_schedule_count(&mut self) {
+ self.local_schedule_count += 1;
}
}
@@ -114,3 +157,7 @@ cfg_rt_multi_thread! {
}
}
}
+
+fn duration_as_u64(dur: Duration) -> u64 {
+ u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX)
+}