Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/invoker-impl/src/input_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// by the Apache License, Version 2.0.

use std::ops::RangeInclusive;
use std::time::Instant;

use tokio::sync::mpsc;

Expand All @@ -30,6 +31,11 @@ pub(crate) struct InvokeCommand {
// removed in v1.6
// pub(super) invocation_epoch: InvocationEpoch,
pub(super) invocation_target: InvocationTarget,
/// Timestamp when this command was enqueued. Used to compute queue duration.
/// Skipped in serde — deserialized commands get a fresh Instant, so queue
/// duration may be slightly under-reported for spilled entries. Acceptable.
#[serde(skip, default = "Instant::now")]
pub(super) enqueued_at: Instant,
}

#[derive(derive_more::Debug)]
Expand Down Expand Up @@ -118,6 +124,7 @@ impl<SR: Send> restate_invoker_api::InvokerHandle<SR> for InvokerHandle<SR> {
partition,
invocation_id,
invocation_target,
enqueued_at: Instant::now(),
})))
.map_err(|_| NotRunningError)
}
Expand Down
7 changes: 7 additions & 0 deletions crates/invoker-impl/src/invocation_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use restate_types::retries;
use restate_types::schema::invocation_target::OnMaxAttempts;
use restate_types::vqueue::VQueueId;

use crate::metric_definitions::ServiceMetrics;
use crate::quota::ConcurrencySlot;

use super::*;
Expand Down Expand Up @@ -57,6 +58,9 @@ pub(super) struct InvocationStateMachine<K: TimerKey = tokio_util::time::delay_q
/// starts and after the ISM is finally cleaned up.
#[debug(skip)]
pub(super) budget: Option<LocalMemoryPool>,

/// Cached interned metric labels. `deployment_id` is empty until `PinnedDeployment`.
pub(super) metric: ServiceMetrics,
}

/// This struct tracks which commands the invocation task generates,
Expand Down Expand Up @@ -205,6 +209,7 @@ impl<K: TimerKey> InvocationStateMachine<K> {
qid: Option<VQueueId>,
permit: Permit,
invocation_target: InvocationTarget,
metric: ServiceMetrics,
retry_iter: retries::RetryIter<'static>,
on_max_attempts: OnMaxAttempts,
concurrency_slot: ConcurrencySlot,
Expand All @@ -224,6 +229,7 @@ impl<K: TimerKey> InvocationStateMachine<K> {
requested_pause: false,
_concurrency_slot: concurrency_slot,
budget: None,
metric,
}
}

Expand Down Expand Up @@ -625,6 +631,7 @@ mod tests {
None,
Permit::new_empty(),
InvocationTarget::mock_virtual_object(),
ServiceMetrics::EMPTY,
RetryPolicy::fixed_delay(Duration::from_secs(1), Some(10)).into_iter(),
OnMaxAttempts::Kill,
ConcurrencySlot::empty(),
Expand Down
Loading
Loading