Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions crates/partition-store/benches/vqueue_meta_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use restate_storage_api::vqueue_table::Stage;
use restate_storage_api::vqueue_table::metadata::{
Action, MoveMetrics, Update, VQueueLink, VQueueMeta,
};
use restate_storage_api::vqueue_table::stats::WaitStats;
use restate_types::vqueues::VQueueId;

const BASE_TS_MS: u64 = 1_744_000_000_000;
Expand All @@ -39,8 +40,11 @@ fn move_metrics(
last_transition_at,
has_started,
first_runnable_at,
blocked_on_concurrency_rules_ms,
blocked_on_invoker_throttling_ms,
scheduler_wait_stats: Some(WaitStats {
blocked_on_concurrency_rules_ms,
blocked_on_invoker_throttling_ms,
..WaitStats::default()
}),
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/storage-api/src/vqueue_table/entry_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::stats::EntryStatistics;
use super::{EntryKey, EntryMetadata, Stage};

#[derive(Debug, strum::Display, Clone, Copy, Eq, PartialEq, bilrost::Enumeration)]
#[strum(serialize_all = "snake_case")]
#[strum(serialize_all = "kebab-case")]
pub enum Status {
#[bilrost(0)]
Unknown,
Expand Down
132 changes: 61 additions & 71 deletions crates/storage-api/src/vqueue_table/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use restate_types::{LockName, LockNameRef, Scope, ServiceName};
use restate_util_string::ReString;

use super::Stage;
use super::stats::WaitStats;

#[derive(Debug, Clone, bilrost::Message)]
pub struct VQueueStatistics {
Expand Down Expand Up @@ -78,16 +79,12 @@ pub struct VQueueStatistics {
/// Note that this only tracks entries that were not killed/cancelled or failed/paused.
#[bilrost(tag(15))]
pub(crate) avg_end_to_end_duration_ms: u64,
/// Exponential moving average (EMA) of time the head item spent blocked on
/// user-defined concurrency rules before entering `Running`. Sampled on
/// every Inbox → Running transition (every run attempt, including retries).

/// Exponential moving average (EMA) of the various statistics
/// emitted by the scheduler while attempting to run items
/// from this queue.
#[bilrost(tag(16))]
pub(crate) avg_blocked_on_concurrency_rules_ms: u64,
/// Exponential moving average (EMA) of time the head item spent in
/// node-level invoker throttling before entering `Running`. Sampled on
/// every Inbox → Running transition (every run attempt, including retries).
#[bilrost(tag(17))]
pub(crate) avg_blocked_on_invoker_throttling_ms: u64,
pub(crate) avg_wait_stats: WaitStats,
}

impl VQueueStatistics {
Expand All @@ -108,8 +105,7 @@ impl VQueueStatistics {
avg_run_duration_ms: 0,
avg_suspension_duration_ms: 0,
avg_end_to_end_duration_ms: 0,
avg_blocked_on_concurrency_rules_ms: 0,
avg_blocked_on_invoker_throttling_ms: 0,
avg_wait_stats: WaitStats::default(),
}
}

Expand All @@ -133,16 +129,6 @@ impl VQueueStatistics {
self.avg_end_to_end_duration_ms = Self::ema(self.avg_end_to_end_duration_ms, latency_ms);
}

fn update_avg_blocked_on_concurrency_rules(&mut self, latency_ms: u64) {
self.avg_blocked_on_concurrency_rules_ms =
Self::ema(self.avg_blocked_on_concurrency_rules_ms, latency_ms);
}

fn update_avg_blocked_on_invoker_throttling(&mut self, latency_ms: u64) {
self.avg_blocked_on_invoker_throttling_ms =
Self::ema(self.avg_blocked_on_invoker_throttling_ms, latency_ms);
}

fn ema(previous: u64, sample_ms: u64) -> u64 {
if previous == 0 {
sample_ms
Expand Down Expand Up @@ -186,12 +172,8 @@ impl VQueueStatistics {
self.avg_end_to_end_duration_ms
}

pub const fn avg_blocked_on_concurrency_rules_ms(&self) -> u64 {
self.avg_blocked_on_concurrency_rules_ms
}

pub const fn avg_blocked_on_invoker_throttling_ms(&self) -> u64 {
self.avg_blocked_on_invoker_throttling_ms
pub const fn avg_wait_stats(&self) -> &WaitStats {
&self.avg_wait_stats
}

pub const fn last_enqueued_at(&self) -> Option<UniqueTimestamp> {
Expand Down Expand Up @@ -477,17 +459,9 @@ impl VQueueMeta {
Stage::Running => {
self.stats.num_running += 1;
self.stats.last_attempt_at = Some(now);

// EMAs for per-run-attempt blocking time. Sampled on
// every Inbox → Running transition (including retries),
// unlike `avg_queue_duration_ms` which only tracks the
// first attempt.
self.stats.update_avg_blocked_on_concurrency_rules(
metrics.blocked_on_concurrency_rules_ms as u64,
);
self.stats.update_avg_blocked_on_invoker_throttling(
metrics.blocked_on_invoker_throttling_ms as u64,
);
if let Some(wait_stats) = metrics.scheduler_wait_stats {
self.stats.avg_wait_stats.ema_apply(wait_stats);
}

if !metrics.has_started {
let first_wait_ms = now_ms.saturating_sub_ms(metrics.first_runnable_at);
Expand Down Expand Up @@ -528,16 +502,11 @@ pub struct MoveMetrics {
/// Earliest timestamp at which the entry can realistically start.
#[bilrost(tag(3), encoding(fixed))]
pub first_runnable_at: MillisSinceEpoch,
/// Milliseconds the head item spent blocked on user-defined concurrency
/// rules during this run attempt. Only populated on Inbox → Running moves;
/// zero for every other transition. Feeds `avg_blocked_on_concurrency_rules_ms`.
#[bilrost(tag(4), encoding(fixed))]
pub blocked_on_concurrency_rules_ms: u32,
/// Milliseconds the head item spent in node-level invoker throttling
/// during this run attempt. Only populated on Inbox → Running moves; zero
/// for every other transition. Feeds `avg_blocked_on_invoker_throttling_ms`.
#[bilrost(tag(5), encoding(fixed))]
pub blocked_on_invoker_throttling_ms: u32,

/// The scheduler wait stats. Only populated when this transition is driven by
/// the scheduler.
#[bilrost(tag(4))]
pub scheduler_wait_stats: Option<WaitStats>,
}

#[derive(Debug, Clone, Default, bilrost::Oneof, bilrost::Message)]
Expand Down Expand Up @@ -601,13 +570,12 @@ mod tests {
first_runnable_at_ms: u64,
has_started: bool,
) -> MoveMetrics {
metrics_with_wait(
last_transition_at_ms,
first_runnable_at_ms,
MoveMetrics {
last_transition_at: ts(last_transition_at_ms),
has_started,
0,
0,
)
first_runnable_at: MillisSinceEpoch::new(first_runnable_at_ms),
scheduler_wait_stats: None,
}
}

fn metrics_with_wait(
Expand All @@ -618,11 +586,12 @@ mod tests {
blocked_on_invoker_throttling_ms: u32,
) -> MoveMetrics {
MoveMetrics {
last_transition_at: ts(last_transition_at_ms),
has_started,
blocked_on_concurrency_rules_ms,
blocked_on_invoker_throttling_ms,
first_runnable_at: MillisSinceEpoch::new(first_runnable_at_ms),
scheduler_wait_stats: Some(WaitStats {
blocked_on_concurrency_rules_ms,
blocked_on_invoker_throttling_ms,
..WaitStats::default()
}),
..metrics(last_transition_at_ms, first_runnable_at_ms, has_started)
}
}

Expand Down Expand Up @@ -676,10 +645,10 @@ mod tests {

#[test]
fn blocking_emas_sample_every_run_attempt() {
// These two EMAs (`avg_blocked_on_concurrency_rules_ms`,
// `avg_blocked_on_invoker_throttling_ms`) are sampled on EVERY Inbox→Running
// transition, not just the first attempt like `avg_queue_duration_ms`.
// This test pins that distinction down.
// The `avg_wait_stats` EMAs are sampled on EVERY Inbox→Running
// transition that carries scheduler wait stats, not just the first
// attempt like `avg_queue_duration_ms`. This test pins that
// distinction down.
let created_at = ts(BASE_TS_MS + 1_000);
let mut meta = VQueueMeta::new(created_at, None, LimitKey::None, VQueueLink::None);

Expand Down Expand Up @@ -709,8 +678,14 @@ mod tests {
),
},
));
assert_eq!(meta.stats.avg_blocked_on_concurrency_rules_ms, 1_000);
assert_eq!(meta.stats.avg_blocked_on_invoker_throttling_ms, 500);
assert_eq!(
meta.stats.avg_wait_stats.blocked_on_concurrency_rules_ms,
1_000
);
assert_eq!(
meta.stats.avg_wait_stats.blocked_on_invoker_throttling_ms,
500
);
assert_eq!(meta.stats.avg_queue_duration_ms, 1_000);

// Yield back Running→Inbox. Neither the new EMAs nor the old
Expand All @@ -723,8 +698,14 @@ mod tests {
metrics: metrics(BASE_TS_MS + 2_000, BASE_TS_MS + 1_000, true),
},
));
assert_eq!(meta.stats.avg_blocked_on_concurrency_rules_ms, 1_000);
assert_eq!(meta.stats.avg_blocked_on_invoker_throttling_ms, 500);
assert_eq!(
meta.stats.avg_wait_stats.blocked_on_concurrency_rules_ms,
1_000
);
assert_eq!(
meta.stats.avg_wait_stats.blocked_on_invoker_throttling_ms,
500
);

// Second Inbox→Running (a retry, `has_started = true`): 2_000 ms /
// 0 ms. The new EMAs MUST continue sampling even though has_started.
Expand All @@ -737,8 +718,14 @@ mod tests {
metrics: metrics_with_wait(BASE_TS_MS + 3_000, BASE_TS_MS + 1_000, true, 2_000, 0),
},
));
assert_eq!(meta.stats.avg_blocked_on_concurrency_rules_ms, 1_050);
assert_eq!(meta.stats.avg_blocked_on_invoker_throttling_ms, 475);
assert_eq!(
meta.stats.avg_wait_stats.blocked_on_concurrency_rules_ms,
1_050
);
assert_eq!(
meta.stats.avg_wait_stats.blocked_on_invoker_throttling_ms,
475
);

// `avg_queue_duration_ms` must NOT have moved on the retry — the
// first-attempt gate (`has_started = false`) is still the existing
Expand Down Expand Up @@ -884,8 +871,11 @@ mod tests {
avg_run_duration_ms: 12,
avg_suspension_duration_ms: 13,
avg_end_to_end_duration_ms: 14,
avg_blocked_on_concurrency_rules_ms: 15,
avg_blocked_on_invoker_throttling_ms: 16,
avg_wait_stats: WaitStats {
blocked_on_concurrency_rules_ms: 15,
blocked_on_invoker_throttling_ms: 16,
..WaitStats::default()
},
},
scope: Some(Scope::try_from_static("scope-a").unwrap()),
limit_key: "tenant-1/user-1".parse::<LimitKey<ReString>>().unwrap(),
Expand Down
Loading
Loading