diff options
Diffstat (limited to 'src/runtime/metrics/batch.rs')
-rw-r--r-- | src/runtime/metrics/batch.rs | 79 |
1 files changed, 63 insertions, 16 deletions
diff --git a/src/runtime/metrics/batch.rs b/src/runtime/metrics/batch.rs index 4e6b28d..1d0f3de 100644 --- a/src/runtime/metrics/batch.rs +++ b/src/runtime/metrics/batch.rs @@ -1,8 +1,7 @@ -use crate::runtime::WorkerMetrics; +use crate::runtime::metrics::{HistogramBatch, WorkerMetrics}; -use std::convert::TryFrom; use std::sync::atomic::Ordering::Relaxed; -use std::time::Instant; +use std::time::{Duration, Instant}; pub(crate) struct MetricsBatch { /// Number of times the worker parked. @@ -33,11 +32,26 @@ pub(crate) struct MetricsBatch { /// The total busy duration in nanoseconds. busy_duration_total: u64, - last_resume_time: Instant, + + /// Instant at which work last resumed (continued after park). + processing_scheduled_tasks_started_at: Instant, + + /// If `Some`, tracks poll times in nanoseconds + poll_timer: Option<PollTimer>, +} + +struct PollTimer { + /// Histogram of poll counts within each band. + poll_counts: HistogramBatch, + + /// Instant when the most recent task started polling. + poll_started_at: Instant, } impl MetricsBatch { - pub(crate) fn new() -> MetricsBatch { + pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch { + let now = Instant::now(); + MetricsBatch { park_count: 0, noop_count: 0, @@ -48,11 +62,19 @@ impl MetricsBatch { local_schedule_count: 0, overflow_count: 0, busy_duration_total: 0, - last_resume_time: Instant::now(), + processing_scheduled_tasks_started_at: now, + poll_timer: worker_metrics + .poll_count_histogram + .as_ref() + .map(|worker_poll_counts| PollTimer { + poll_counts: HistogramBatch::from_histogram(worker_poll_counts), + poll_started_at: now, + }), } } - pub(crate) fn submit(&mut self, worker: &WorkerMetrics) { + pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) { + worker.mean_poll_time.store(mean_poll_time, Relaxed); worker.park_count.store(self.park_count, Relaxed); worker.noop_count.store(self.noop_count, Relaxed); worker.steal_count.store(self.steal_count, Relaxed); @@ -69,6 +91,11 @@ impl MetricsBatch { .local_schedule_count .store(self.local_schedule_count, Relaxed); worker.overflow_count.store(self.overflow_count, Relaxed); + + if let Some(poll_timer) = &self.poll_timer { + let dst = worker.poll_count_histogram.as_ref().unwrap(); + poll_timer.poll_counts.submit(dst); + } } /// The worker is about to park. @@ -80,22 +107,38 @@ impl MetricsBatch { } else { self.poll_count_on_last_park = self.poll_count; } - - let busy_duration = self.last_resume_time.elapsed(); - let busy_duration = u64::try_from(busy_duration.as_nanos()).unwrap_or(u64::MAX); - self.busy_duration_total += busy_duration; } - pub(crate) fn returned_from_park(&mut self) { - self.last_resume_time = Instant::now(); + /// Start processing a batch of tasks + pub(crate) fn start_processing_scheduled_tasks(&mut self) { + self.processing_scheduled_tasks_started_at = Instant::now(); } - pub(crate) fn inc_local_schedule_count(&mut self) { - self.local_schedule_count += 1; + /// Stop processing a batch of tasks + pub(crate) fn end_processing_scheduled_tasks(&mut self) { + let busy_duration = self.processing_scheduled_tasks_started_at.elapsed(); + self.busy_duration_total += duration_as_u64(busy_duration); } - pub(crate) fn incr_poll_count(&mut self) { + /// Start polling an individual task + pub(crate) fn start_poll(&mut self) { self.poll_count += 1; + + if let Some(poll_timer) = &mut self.poll_timer { + poll_timer.poll_started_at = Instant::now(); + } + } + + /// Stop polling an individual task + pub(crate) fn end_poll(&mut self) { + if let Some(poll_timer) = &mut self.poll_timer { + let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed()); + poll_timer.poll_counts.measure(elapsed, 1); + } + } + + pub(crate) fn inc_local_schedule_count(&mut self) { + self.local_schedule_count += 1; } } @@ -114,3 +157,7 @@ cfg_rt_multi_thread! { } } } + +fn duration_as_u64(dur: Duration) -> u64 { + u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX) +} |