From 6ba0bafc2333d81e1eebe00b900b6b1260cf4259 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Fri, 5 Jun 2026 21:56:08 +0100 Subject: [PATCH] [VQueues] Track per-entry wait stats and error counts Reworks how scheduler wait-time accounting is persisted, replacing the two ad-hoc EMA fields with the full `WaitStats` breakdown: - `VQueueStatistics` now keeps an EMA over all seven `WaitStats` buckets (`avg_wait_stats`), applied only on scheduler-driven moves so zero samples from non-scheduler transitions no longer dilute the averages. - `EntryStatistics` records `latest_attempt_wait_stats` and a saturating `total_wait_stats` across all attempts, plus a new `num_errors` counter: yields caused by transient errors now count as errors instead of yields. - `MoveMetrics` carries `Option` instead of two loose `u32`s. - `WaitStats` switches to fixed encoding and gains `ema_apply` / saturating `Add`. Surfaced in SQL: `sys_vqueues.num_errors`, and `sys_vqueue_meta.avg_blocked_on_lock` / `avg_blocked_on_invoker_concurrency`. Entry `Status` display is now kebab-case (`backing-off`) to match `sys_invocation` conventions. Note: breaks the persisted codec for vqueue entries, meta, and pending merge operands (tag renumbering/type changes). Acceptable while vqueues remain experimental and fresh-cluster only. --- .../benches/vqueue_meta_merge.rs | 8 +- .../src/vqueue_table/entry_status.rs | 2 +- .../storage-api/src/vqueue_table/metadata.rs | 132 ++++++++---------- crates/storage-api/src/vqueue_table/stats.rs | 103 ++++++++++++-- .../src/vqueue_meta/row.rs | 16 ++- .../src/vqueue_meta/schema.rs | 8 ++ .../src/vqueues/row.rs | 7 + .../src/vqueues/schema.rs | 3 + .../src/vqueues/tests.rs | 2 +- crates/vqueues/src/cache.rs | 3 +- crates/vqueues/src/lib.rs | 57 ++++---- crates/vqueues/src/scheduler/drr.rs | 2 +- crates/vqueues/src/scheduler/queue.rs | 2 + .../worker/src/partition/state_machine/mod.rs | 6 +- 14 files changed, 225 insertions(+), 126 deletions(-) diff --git a/crates/partition-store/benches/vqueue_meta_merge.rs b/crates/partition-store/benches/vqueue_meta_merge.rs index 32f6a7e520..5b0da0ceae 100644 --- a/crates/partition-store/benches/vqueue_meta_merge.rs +++ b/crates/partition-store/benches/vqueue_meta_merge.rs @@ -19,6 +19,7 @@ use restate_storage_api::vqueue_table::Stage; use restate_storage_api::vqueue_table::metadata::{ Action, MoveMetrics, Update, VQueueLink, VQueueMeta, }; +use restate_storage_api::vqueue_table::stats::WaitStats; use restate_types::vqueues::VQueueId; const BASE_TS_MS: u64 = 1_744_000_000_000; @@ -39,8 +40,11 @@ fn move_metrics( last_transition_at, has_started, first_runnable_at, - blocked_on_concurrency_rules_ms, - blocked_on_invoker_throttling_ms, + scheduler_wait_stats: Some(WaitStats { + blocked_on_concurrency_rules_ms, + blocked_on_invoker_throttling_ms, + ..WaitStats::default() + }), } } diff --git a/crates/storage-api/src/vqueue_table/entry_status.rs b/crates/storage-api/src/vqueue_table/entry_status.rs index 2800b0d61c..a6a22e331b 100644 --- a/crates/storage-api/src/vqueue_table/entry_status.rs +++ b/crates/storage-api/src/vqueue_table/entry_status.rs @@ -15,7 +15,7 @@ use super::stats::EntryStatistics; use super::{EntryKey, EntryMetadata, Stage}; #[derive(Debug, strum::Display, Clone, Copy, Eq, PartialEq, bilrost::Enumeration)] -#[strum(serialize_all = "snake_case")] +#[strum(serialize_all = "kebab-case")] pub enum Status { #[bilrost(0)] Unknown, diff --git a/crates/storage-api/src/vqueue_table/metadata.rs b/crates/storage-api/src/vqueue_table/metadata.rs index f7d3eb6976..5cfd2f4a9f 100644 --- a/crates/storage-api/src/vqueue_table/metadata.rs +++ b/crates/storage-api/src/vqueue_table/metadata.rs @@ -15,6 +15,7 @@ use restate_types::{LockName, LockNameRef, Scope, ServiceName}; use restate_util_string::ReString; use super::Stage; +use super::stats::WaitStats; #[derive(Debug, Clone, bilrost::Message)] pub struct VQueueStatistics { @@ -78,16 +79,12 @@ pub struct VQueueStatistics { /// Note that this only tracks entries that were not killed/cancelled or failed/paused. #[bilrost(tag(15))] pub(crate) avg_end_to_end_duration_ms: u64, - /// Exponential moving average (EMA) of time the head item spent blocked on - /// user-defined concurrency rules before entering `Running`. Sampled on - /// every Inbox → Running transition (every run attempt, including retries). + + /// Exponential moving average (EMA) of the various statistics + /// emitted by the scheduler while attempting to run items + /// from this queue. #[bilrost(tag(16))] - pub(crate) avg_blocked_on_concurrency_rules_ms: u64, - /// Exponential moving average (EMA) of time the head item spent in - /// node-level invoker throttling before entering `Running`. Sampled on - /// every Inbox → Running transition (every run attempt, including retries). - #[bilrost(tag(17))] - pub(crate) avg_blocked_on_invoker_throttling_ms: u64, + pub(crate) avg_wait_stats: WaitStats, } impl VQueueStatistics { @@ -108,8 +105,7 @@ impl VQueueStatistics { avg_run_duration_ms: 0, avg_suspension_duration_ms: 0, avg_end_to_end_duration_ms: 0, - avg_blocked_on_concurrency_rules_ms: 0, - avg_blocked_on_invoker_throttling_ms: 0, + avg_wait_stats: WaitStats::default(), } } @@ -133,16 +129,6 @@ impl VQueueStatistics { self.avg_end_to_end_duration_ms = Self::ema(self.avg_end_to_end_duration_ms, latency_ms); } - fn update_avg_blocked_on_concurrency_rules(&mut self, latency_ms: u64) { - self.avg_blocked_on_concurrency_rules_ms = - Self::ema(self.avg_blocked_on_concurrency_rules_ms, latency_ms); - } - - fn update_avg_blocked_on_invoker_throttling(&mut self, latency_ms: u64) { - self.avg_blocked_on_invoker_throttling_ms = - Self::ema(self.avg_blocked_on_invoker_throttling_ms, latency_ms); - } - fn ema(previous: u64, sample_ms: u64) -> u64 { if previous == 0 { sample_ms @@ -186,12 +172,8 @@ impl VQueueStatistics { self.avg_end_to_end_duration_ms } - pub const fn avg_blocked_on_concurrency_rules_ms(&self) -> u64 { - self.avg_blocked_on_concurrency_rules_ms - } - - pub const fn avg_blocked_on_invoker_throttling_ms(&self) -> u64 { - self.avg_blocked_on_invoker_throttling_ms + pub const fn avg_wait_stats(&self) -> &WaitStats { + &self.avg_wait_stats } pub const fn last_enqueued_at(&self) -> Option { @@ -477,17 +459,9 @@ impl VQueueMeta { Stage::Running => { self.stats.num_running += 1; self.stats.last_attempt_at = Some(now); - - // EMAs for per-run-attempt blocking time. Sampled on - // every Inbox → Running transition (including retries), - // unlike `avg_queue_duration_ms` which only tracks the - // first attempt. - self.stats.update_avg_blocked_on_concurrency_rules( - metrics.blocked_on_concurrency_rules_ms as u64, - ); - self.stats.update_avg_blocked_on_invoker_throttling( - metrics.blocked_on_invoker_throttling_ms as u64, - ); + if let Some(wait_stats) = metrics.scheduler_wait_stats { + self.stats.avg_wait_stats.ema_apply(wait_stats); + } if !metrics.has_started { let first_wait_ms = now_ms.saturating_sub_ms(metrics.first_runnable_at); @@ -528,16 +502,11 @@ pub struct MoveMetrics { /// Earliest timestamp at which the entry can realistically start. #[bilrost(tag(3), encoding(fixed))] pub first_runnable_at: MillisSinceEpoch, - /// Milliseconds the head item spent blocked on user-defined concurrency - /// rules during this run attempt. Only populated on Inbox → Running moves; - /// zero for every other transition. Feeds `avg_blocked_on_concurrency_rules_ms`. - #[bilrost(tag(4), encoding(fixed))] - pub blocked_on_concurrency_rules_ms: u32, - /// Milliseconds the head item spent in node-level invoker throttling - /// during this run attempt. Only populated on Inbox → Running moves; zero - /// for every other transition. Feeds `avg_blocked_on_invoker_throttling_ms`. - #[bilrost(tag(5), encoding(fixed))] - pub blocked_on_invoker_throttling_ms: u32, + + /// The scheduler wait stats. Only populated when this transition is driven by + /// the scheduler. + #[bilrost(tag(4))] + pub scheduler_wait_stats: Option, } #[derive(Debug, Clone, Default, bilrost::Oneof, bilrost::Message)] @@ -601,13 +570,12 @@ mod tests { first_runnable_at_ms: u64, has_started: bool, ) -> MoveMetrics { - metrics_with_wait( - last_transition_at_ms, - first_runnable_at_ms, + MoveMetrics { + last_transition_at: ts(last_transition_at_ms), has_started, - 0, - 0, - ) + first_runnable_at: MillisSinceEpoch::new(first_runnable_at_ms), + scheduler_wait_stats: None, + } } fn metrics_with_wait( @@ -618,11 +586,12 @@ mod tests { blocked_on_invoker_throttling_ms: u32, ) -> MoveMetrics { MoveMetrics { - last_transition_at: ts(last_transition_at_ms), - has_started, - blocked_on_concurrency_rules_ms, - blocked_on_invoker_throttling_ms, - first_runnable_at: MillisSinceEpoch::new(first_runnable_at_ms), + scheduler_wait_stats: Some(WaitStats { + blocked_on_concurrency_rules_ms, + blocked_on_invoker_throttling_ms, + ..WaitStats::default() + }), + ..metrics(last_transition_at_ms, first_runnable_at_ms, has_started) } } @@ -676,10 +645,10 @@ mod tests { #[test] fn blocking_emas_sample_every_run_attempt() { - // These two EMAs (`avg_blocked_on_concurrency_rules_ms`, - // `avg_blocked_on_invoker_throttling_ms`) are sampled on EVERY Inbox→Running - // transition, not just the first attempt like `avg_queue_duration_ms`. - // This test pins that distinction down. + // The `avg_wait_stats` EMAs are sampled on EVERY Inbox→Running + // transition that carries scheduler wait stats, not just the first + // attempt like `avg_queue_duration_ms`. This test pins that + // distinction down. let created_at = ts(BASE_TS_MS + 1_000); let mut meta = VQueueMeta::new(created_at, None, LimitKey::None, VQueueLink::None); @@ -709,8 +678,14 @@ mod tests { ), }, )); - assert_eq!(meta.stats.avg_blocked_on_concurrency_rules_ms, 1_000); - assert_eq!(meta.stats.avg_blocked_on_invoker_throttling_ms, 500); + assert_eq!( + meta.stats.avg_wait_stats.blocked_on_concurrency_rules_ms, + 1_000 + ); + assert_eq!( + meta.stats.avg_wait_stats.blocked_on_invoker_throttling_ms, + 500 + ); assert_eq!(meta.stats.avg_queue_duration_ms, 1_000); // Yield back Running→Inbox. Neither the new EMAs nor the old @@ -723,8 +698,14 @@ mod tests { metrics: metrics(BASE_TS_MS + 2_000, BASE_TS_MS + 1_000, true), }, )); - assert_eq!(meta.stats.avg_blocked_on_concurrency_rules_ms, 1_000); - assert_eq!(meta.stats.avg_blocked_on_invoker_throttling_ms, 500); + assert_eq!( + meta.stats.avg_wait_stats.blocked_on_concurrency_rules_ms, + 1_000 + ); + assert_eq!( + meta.stats.avg_wait_stats.blocked_on_invoker_throttling_ms, + 500 + ); // Second Inbox→Running (a retry, `has_started = true`): 2_000 ms / // 0 ms. The new EMAs MUST continue sampling even though has_started. @@ -737,8 +718,14 @@ mod tests { metrics: metrics_with_wait(BASE_TS_MS + 3_000, BASE_TS_MS + 1_000, true, 2_000, 0), }, )); - assert_eq!(meta.stats.avg_blocked_on_concurrency_rules_ms, 1_050); - assert_eq!(meta.stats.avg_blocked_on_invoker_throttling_ms, 475); + assert_eq!( + meta.stats.avg_wait_stats.blocked_on_concurrency_rules_ms, + 1_050 + ); + assert_eq!( + meta.stats.avg_wait_stats.blocked_on_invoker_throttling_ms, + 475 + ); // `avg_queue_duration_ms` must NOT have moved on the retry — the // first-attempt gate (`has_started = false`) is still the existing @@ -884,8 +871,11 @@ mod tests { avg_run_duration_ms: 12, avg_suspension_duration_ms: 13, avg_end_to_end_duration_ms: 14, - avg_blocked_on_concurrency_rules_ms: 15, - avg_blocked_on_invoker_throttling_ms: 16, + avg_wait_stats: WaitStats { + blocked_on_concurrency_rules_ms: 15, + blocked_on_invoker_throttling_ms: 16, + ..WaitStats::default() + }, }, scope: Some(Scope::try_from_static("scope-a").unwrap()), limit_key: "tenant-1/user-1".parse::>().unwrap(), diff --git a/crates/storage-api/src/vqueue_table/stats.rs b/crates/storage-api/src/vqueue_table/stats.rs index 315495f136..8d4381a994 100644 --- a/crates/storage-api/src/vqueue_table/stats.rs +++ b/crates/storage-api/src/vqueue_table/stats.rs @@ -8,37 +8,103 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::ops::Add; + use restate_clock::time::MillisSinceEpoch; use restate_clock::{RoughTimestamp, UniqueTimestamp}; /// Some stats that are collected from the scheduler. Emitted along the item /// at decision time. -#[derive(Debug, Clone, Default, bilrost::Message)] +/// +/// Encoding is fixed which mimcs the simplicity and efficiency of a Vec<(StatKey, Value)> +/// on the wire. +#[derive(Debug, Clone, Copy, Default, bilrost::Message)] pub struct WaitStats { /// Total milliseconds the item spent waiting on global invoker capacity - #[bilrost(tag(1))] + #[bilrost(tag(1), encoding(fixed))] pub blocked_on_invoker_concurrency_ms: u32, /// Total milliseconds the item was blocked on user-defined per-vqueue /// throttling rules. - #[bilrost(tag(2))] + #[bilrost(tag(2), encoding(fixed))] pub blocked_on_throttling_rules_ms: u32, /// Total milliseconds the item was blocked on node-level invoker throttling. - #[bilrost(tag(3))] + #[bilrost(tag(3), encoding(fixed))] pub blocked_on_invoker_throttling_ms: u32, /// Total milliseconds the item spent waiting on invoker memory pool - #[bilrost(tag(4))] + #[bilrost(tag(4), encoding(fixed))] pub blocked_on_invoker_memory_ms: u32, /// Total milliseconds the item spent waiting on user-defined concurrency limits - #[bilrost(tag(5))] + #[bilrost(tag(5), encoding(fixed))] pub blocked_on_concurrency_rules_ms: u32, /// Total milliseconds the item spent waiting to acquire a virtual object lock - #[bilrost(tag(6))] + #[bilrost(tag(6), encoding(fixed))] pub blocked_on_lock_ms: u32, /// Total milliseconds the item spent blocked on deployment concurrency capacity - #[bilrost(tag(8))] + #[bilrost(tag(8), encoding(fixed))] pub blocked_on_deployment_concurrency_ms: u32, } +macro_rules! ema_merge { + ($this:ident, $other:ident, $( $attr:ident, )+) => { + $( + $this.$attr = if $this.$attr == 0 { + $other.$attr + } else { + // Exponential moving average with alpha=0.05. + (($this.$attr as f64 * 0.95) + ($other.$attr as f64 * 0.05)).ceil() as u32 + }; + )+ + }; +} + +impl WaitStats { + /// Merge the input WaitStats into self. Self holds EMA values and `other` + /// is a data point appended to it. + pub fn ema_apply(&mut self, other: WaitStats) { + ema_merge!( + self, + other, + blocked_on_invoker_concurrency_ms, + blocked_on_throttling_rules_ms, + blocked_on_invoker_throttling_ms, + blocked_on_invoker_memory_ms, + blocked_on_concurrency_rules_ms, + blocked_on_lock_ms, + blocked_on_deployment_concurrency_ms, + ); + } +} + +impl Add for WaitStats { + type Output = WaitStats; + + fn add(self, rhs: WaitStats) -> Self::Output { + Self::Output { + blocked_on_deployment_concurrency_ms: self + .blocked_on_deployment_concurrency_ms + .saturating_add(rhs.blocked_on_deployment_concurrency_ms), + blocked_on_invoker_concurrency_ms: self + .blocked_on_invoker_concurrency_ms + .saturating_add(rhs.blocked_on_invoker_concurrency_ms), + blocked_on_throttling_rules_ms: self + .blocked_on_throttling_rules_ms + .saturating_add(rhs.blocked_on_throttling_rules_ms), + blocked_on_invoker_throttling_ms: self + .blocked_on_invoker_throttling_ms + .saturating_add(rhs.blocked_on_invoker_throttling_ms), + blocked_on_invoker_memory_ms: self + .blocked_on_invoker_memory_ms + .saturating_add(rhs.blocked_on_invoker_memory_ms), + blocked_on_concurrency_rules_ms: self + .blocked_on_concurrency_rules_ms + .saturating_add(rhs.blocked_on_concurrency_rules_ms), + blocked_on_lock_ms: self + .blocked_on_lock_ms + .saturating_add(rhs.blocked_on_lock_ms), + } + } +} + #[derive(Debug, Clone, bilrost::Message)] pub struct EntryStatistics { /// Creation timestamp of the entry. @@ -59,11 +125,14 @@ pub struct EntryStatistics { pub num_suspensions: u32, #[bilrost(tag(6), encoding(fixed))] pub num_yields: u32, - /// Timestamp of the first attempt to run this entry + /// Number of times this entry has yielded due to an error #[bilrost(tag(7), encoding(fixed))] + pub num_errors: u32, + /// Timestamp of the first attempt to run this entry + #[bilrost(tag(8), encoding(fixed))] pub first_attempt_at: Option, /// Timestamp of the last attempt to run this entry - #[bilrost(tag(8), encoding(fixed))] + #[bilrost(tag(9), encoding(fixed))] pub latest_attempt_at: Option, /// Earliest timestamp at which the first run can realistically start. /// @@ -72,8 +141,17 @@ pub struct EntryStatistics { /// /// We clamp to `created_at` when `original_run_at` is in the past to avoid /// inflating the first-attempt wait time. - #[bilrost(tag(9), encoding(fixed))] + #[bilrost(tag(10), encoding(fixed))] pub first_runnable_at: MillisSinceEpoch, + + /// Stats emitted by the scheduler of the last run attempt + #[bilrost(tag(11))] + pub latest_attempt_wait_stats: WaitStats, + + // # Cumulative Stats Over All Attempts + /// Will be saturating to the max possible values (u32, 49-ish days) + #[bilrost(tag(12))] + pub total_wait_stats: WaitStats, } impl EntryStatistics { @@ -89,9 +167,12 @@ impl EntryStatistics { num_paused: 0, num_suspensions: 0, num_yields: 0, + num_errors: 0, first_attempt_at: None, latest_attempt_at: None, first_runnable_at, + latest_attempt_wait_stats: WaitStats::default(), + total_wait_stats: WaitStats::default(), } } } diff --git a/crates/storage-query-datafusion/src/vqueue_meta/row.rs b/crates/storage-query-datafusion/src/vqueue_meta/row.rs index 1c13291d24..a1ea337c8e 100644 --- a/crates/storage-query-datafusion/src/vqueue_meta/row.rs +++ b/crates/storage-query-datafusion/src/vqueue_meta/row.rs @@ -102,15 +102,27 @@ pub(crate) fn append_vqueues_meta_row( row.avg_end_to_end_duration(meta.stats.avg_end_to_end_duration_ms() as i64); } + if row.is_avg_blocked_on_lock_defined() { + row.avg_blocked_on_lock(meta.stats.avg_wait_stats().blocked_on_lock_ms as i64); + } + if row.is_avg_blocked_on_concurrency_rules_defined() { row.avg_blocked_on_concurrency_rules( - meta.stats.avg_blocked_on_concurrency_rules_ms() as i64 + meta.stats.avg_wait_stats().blocked_on_concurrency_rules_ms as i64, + ); + } + + if row.is_avg_blocked_on_invoker_concurrency_defined() { + row.avg_blocked_on_invoker_concurrency( + meta.stats + .avg_wait_stats() + .blocked_on_invoker_concurrency_ms as i64, ); } if row.is_avg_blocked_on_invoker_throttling_defined() { row.avg_blocked_on_invoker_throttling( - meta.stats.avg_blocked_on_invoker_throttling_ms() as i64 + meta.stats.avg_wait_stats().blocked_on_invoker_throttling_ms as i64, ); } diff --git a/crates/storage-query-datafusion/src/vqueue_meta/schema.rs b/crates/storage-query-datafusion/src/vqueue_meta/schema.rs index b9ceb65e89..117e94e187 100644 --- a/crates/storage-query-datafusion/src/vqueue_meta/schema.rs +++ b/crates/storage-query-datafusion/src/vqueue_meta/schema.rs @@ -81,11 +81,19 @@ define_table!(sys_vqueue_meta( /// Inbox → Running transition (every run attempt, including retries). avg_blocked_on_concurrency_rules: DataType::Duration, + /// Exponential moving average (EMA) of time the head item spent blocked on + /// node-level invoker concurrency tokens before entering Running. + avg_blocked_on_invoker_concurrency: DataType::Duration, + /// Exponential moving average (EMA) of time the head item spent blocked on /// node-level invoker throttling before entering Running. Sampled on every /// Inbox → Running transition (every run attempt, including retries). avg_blocked_on_invoker_throttling: DataType::Duration, + /// Exponential moving average (EMA) of time the head item spent blocked on + /// a virtual object lock before entering Running. + avg_blocked_on_lock: DataType::Duration, + /// The number of entries that are in the inbox. The inbox is the priority /// queue that the scheduler uses to choose which entries to run next. num_inbox: DataType::UInt64, diff --git a/crates/storage-query-datafusion/src/vqueues/row.rs b/crates/storage-query-datafusion/src/vqueues/row.rs index 8b357e9295..e37739c900 100644 --- a/crates/storage-query-datafusion/src/vqueues/row.rs +++ b/crates/storage-query-datafusion/src/vqueues/row.rs @@ -67,12 +67,19 @@ pub(crate) fn append_vqueues_row<'a>( if row.is_num_attempts_defined() { row.num_attempts(entry.stats.num_attempts); } + + if row.is_num_errors_defined() { + row.num_errors(entry.stats.num_errors); + } + if row.is_num_pauses_defined() { row.num_pauses(entry.stats.num_paused); } + if row.is_num_suspensions_defined() { row.num_suspensions(entry.stats.num_suspensions); } + if row.is_num_yields_defined() { row.num_yields(entry.stats.num_yields); } diff --git a/crates/storage-query-datafusion/src/vqueues/schema.rs b/crates/storage-query-datafusion/src/vqueues/schema.rs index 1de73173ec..e8eacee26f 100644 --- a/crates/storage-query-datafusion/src/vqueues/schema.rs +++ b/crates/storage-query-datafusion/src/vqueues/schema.rs @@ -61,6 +61,9 @@ define_table!(sys_vqueues( /// Number of times this entry has been moved to the run queue. num_attempts: DataType::UInt32, + /// Number of times this entry has yielded execution due to transient errors. + num_errors: DataType::UInt32, + /// Number of times this entry has been moved to the paused stage. num_pauses: DataType::UInt32, diff --git a/crates/storage-query-datafusion/src/vqueues/tests.rs b/crates/storage-query-datafusion/src/vqueues/tests.rs index 54fc22a451..14d97b8432 100644 --- a/crates/storage-query-datafusion/src/vqueues/tests.rs +++ b/crates/storage-query-datafusion/src/vqueues/tests.rs @@ -97,7 +97,7 @@ async fn get_vqueue_entry_value_fields() { 0, { "stage" => StringArray: eq("inbox"), - "status" => StringArray: eq("backing_off"), + "status" => StringArray: eq("backing-off"), "num_attempts" => UInt32Array: eq(3), "num_pauses" => UInt32Array: eq(2), "num_suspensions" => UInt32Array: eq(4), diff --git a/crates/vqueues/src/cache.rs b/crates/vqueues/src/cache.rs index fcb8139300..7d14c703bc 100644 --- a/crates/vqueues/src/cache.rs +++ b/crates/vqueues/src/cache.rs @@ -289,9 +289,8 @@ mod tests { let metrics = MoveMetrics { last_transition_at: at, has_started: false, - blocked_on_concurrency_rules_ms: 0, - blocked_on_invoker_throttling_ms: 0, first_runnable_at: at.to_unix_millis(), + scheduler_wait_stats: None, }; meta.apply_update(&Update::new( at, diff --git a/crates/vqueues/src/lib.rs b/crates/vqueues/src/lib.rs index ca5a27ea61..4b8465d5d2 100644 --- a/crates/vqueues/src/lib.rs +++ b/crates/vqueues/src/lib.rs @@ -13,6 +13,7 @@ mod metric_definitions; pub mod scheduler; mod util; +use std::ops::Add; use std::time::Duration; // Re-exports @@ -323,7 +324,7 @@ where &mut self, at: UniqueTimestamp, header: &impl EntryStatusHeader, - wait_stats: &WaitStats, + wait_stats: WaitStats, ) -> EntryKey { let vqueue_id = header.vqueue_id(); let partition_key = vqueue_id.partition_key(); @@ -651,15 +652,13 @@ where // boosting). If that's the case, we mutate the entry key to reflect that. let modified_key = header.entry_key().set_run_at(run_at); - let stats = Self::mark_yield(at, header.stats()); - - let (status, metadata) = match reason { + let (status, metadata, is_error) = match reason { YieldReason::Unknown | YieldReason::InvokerLoadShedding | YieldReason::PartitionLeaderChange => { // The reason could be coming from a future version of restate. // It's okay to have an unknown reason, we'll continue to yield as usual. - (Status::Yielded, header.metadata().clone()) + (Status::Yielded, header.metadata().clone(), false) } YieldReason::ExhaustedMemoryBudget { needed_memory } => { let current_metadata = header.metadata().clone(); @@ -669,6 +668,7 @@ where needed_memory: Some(needed_memory), ..current_metadata }, + false, ) } YieldReason::TransientError { @@ -683,10 +683,13 @@ where retry_count_since_last_stored_command, ..current_metadata }, + true, ) } }; + let stats = Self::mark_yield(at, header.stats().clone(), is_error); + debug!( entry = %header.display_entry_id(), qid = %header.vqueue_id(), @@ -888,7 +891,7 @@ where &mut self, at: UniqueTimestamp, header: &impl EntryStatusHeader, - wait_stats: &WaitStats, + wait_stats: WaitStats, status: Status, ) { let vqueue_id = header.vqueue_id(); @@ -1079,27 +1082,13 @@ where #[inline] fn build_move_metrics( stats: &EntryStatistics, - wait_stats: Option<&WaitStats>, + scheduler_wait_stats: Option, ) -> metadata::MoveMetrics { - // The two `*_ms` fields below are only meaningful on Inbox → Running - // moves (where the scheduler actually observed the head waiting). For - // every other transition we pass `None` and leave them zero — the - // receiving EMA code only reads them inside the `Stage::Running` arm, - // so the zero samples are never consumed. - let (concurrency_rules_ms, blocked_on_invoker_throttling_ms) = wait_stats - .map(|w| { - ( - w.blocked_on_concurrency_rules_ms, - w.blocked_on_invoker_throttling_ms, - ) - }) - .unwrap_or_default(); metadata::MoveMetrics { last_transition_at: stats.transitioned_at, has_started: stats.num_attempts > 0, first_runnable_at: stats.first_runnable_at, - blocked_on_concurrency_rules_ms: concurrency_rules_ms, - blocked_on_invoker_throttling_ms, + scheduler_wait_stats, } } @@ -1115,17 +1104,15 @@ where fn mark_run_attempt( at: UniqueTimestamp, stats: &EntryStatistics, - // `WaitStats` is consumed at the vqueue level (EMAs in - // `VQueueStatistics`) via `build_move_metrics`; it is intentionally not - // accumulated per-entry to avoid persisting stale blocking signal that - // decays in relevance as the system evolves. - _wait_stats: &WaitStats, + wait_stats: WaitStats, ) -> EntryStatistics { EntryStatistics { num_attempts: stats.num_attempts.saturating_add(1), first_attempt_at: stats.first_attempt_at.or(Some(at)), latest_attempt_at: Some(at), transitioned_at: at, + latest_attempt_wait_stats: wait_stats, + total_wait_stats: stats.total_wait_stats.add(wait_stats), ..stats.clone() } } @@ -1149,11 +1136,17 @@ where } #[inline] - fn mark_yield(at: UniqueTimestamp, stats: &EntryStatistics) -> EntryStatistics { - EntryStatistics { - num_yields: stats.num_yields.saturating_add(1), - transitioned_at: at, - ..stats.clone() + fn mark_yield( + at: UniqueTimestamp, + mut stats: EntryStatistics, + is_error: bool, + ) -> EntryStatistics { + stats.transitioned_at = at; + if is_error { + stats.num_errors = stats.num_errors.saturating_add(1); + } else { + stats.num_yields = stats.num_yields.saturating_add(1); } + stats } } diff --git a/crates/vqueues/src/scheduler/drr.rs b/crates/vqueues/src/scheduler/drr.rs index 39abf7f633..fc8e39e0ca 100644 --- a/crates/vqueues/src/scheduler/drr.rs +++ b/crates/vqueues/src/scheduler/drr.rs @@ -561,7 +561,7 @@ mod tests { .await .expect("vqueue should be created"); - vqueue.run_entry(at, &header, &WaitStats::default()) + vqueue.run_entry(at, &header, WaitStats::default()) } async fn create_resource_manager_with_throttling( diff --git a/crates/vqueues/src/scheduler/queue.rs b/crates/vqueues/src/scheduler/queue.rs index 34c2932cee..8118273b7e 100644 --- a/crates/vqueues/src/scheduler/queue.rs +++ b/crates/vqueues/src/scheduler/queue.rs @@ -253,6 +253,8 @@ pub enum QueueItem<'a> { None, } +// One `Stage` per `Queue`; boxing the head would add an allocation on the hot path. +#[allow(clippy::large_enum_variant)] #[derive(derive_more::Debug)] enum Stage { /// Brand-new queue; running items still need to be drained first. diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 82d54f0f6c..8055374207 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -590,7 +590,7 @@ impl StateMachineApplyContext<'_, S> { .into()); } scheduler::SchedulerAction::Run(run_action) => { - self.attempt_to_run(qid, &run_action.key, &run_action.wait_stats) + self.attempt_to_run(qid, &run_action.key, run_action.wait_stats) .await?; } scheduler::SchedulerAction::Yield(yield_action) => { @@ -3168,7 +3168,7 @@ impl StateMachineApplyContext<'_, S> { &mut self, qid: &VQueueId, entry_key: &EntryKey, - wait_stats: &vqueue_table::stats::WaitStats, + wait_stats: vqueue_table::stats::WaitStats, ) -> Result<(), Error> where S: WriteInboxTable @@ -3268,7 +3268,7 @@ impl StateMachineApplyContext<'_, S> { &mut self, qid: &VQueueId, entry_key: &EntryKey, - wait_stats: &vqueue_table::stats::WaitStats, + wait_stats: vqueue_table::stats::WaitStats, ) -> Result<(), Error> where S: WriteInboxTable