diff --git a/crates/bifrost/src/record.rs b/crates/bifrost/src/record.rs index a3181658dd..da06d62209 100644 --- a/crates/bifrost/src/record.rs +++ b/crates/bifrost/src/record.rs @@ -171,13 +171,15 @@ impl LogEntry { pub fn try_decode_arc( self, ) -> Option, StorageDecodeError>> { - self.into_record().map(|record| record.decode_arc()) + self.into_record() + .map(|record| record.decode_arc().map_err(Into::into)) } pub fn try_decode( self, ) -> Option> { - self.into_record().map(|record| record.decode()) + self.into_record() + .map(|record| record.decode().map_err(Into::into)) } #[cfg(any(test, feature = "test-util"))] diff --git a/crates/platform/src/storage.rs b/crates/platform/src/storage.rs index d269948759..bd827ac8eb 100644 --- a/crates/platform/src/storage.rs +++ b/crates/platform/src/storage.rs @@ -23,7 +23,7 @@ pub enum StorageEncodeError { SizeOverflow(usize), } -#[derive(Debug, thiserror::Error)] +#[derive(derive_more::Debug, thiserror::Error)] pub enum StorageDecodeError { #[error("failed reading codec: {0}")] ReadingCodec(ReString), diff --git a/crates/types/src/logs/mod.rs b/crates/types/src/logs/mod.rs index e50684b63c..c9def78080 100644 --- a/crates/types/src/logs/mod.rs +++ b/crates/types/src/logs/mod.rs @@ -28,7 +28,7 @@ mod tail; pub use loglet::*; pub use offset_watch::*; -pub use record::{CustomRecordEncoding, Record}; +pub use record::{CustomRecordEncoding, Record, RecordDecodeError}; pub use record_cache::RecordCache; pub use tail::*; diff --git a/crates/types/src/logs/record.rs b/crates/types/src/logs/record.rs index 59229621e4..a48ae8153e 100644 --- a/crates/types/src/logs/record.rs +++ b/crates/types/src/logs/record.rs @@ -84,20 +84,16 @@ impl Record { /// the value from the underlying Arc delivered from the loglet. Use this approach if you need /// to mutate the value in-place and the cost of cloning sections is high. It's generally /// recommended to use `decode_arc` whenever possible for large payloads. - pub fn decode(self) -> Result { + pub fn decode(self) -> Result { let decoded = match self.body { PolyBytes::Bytes(slice) => { let mut buf = std::io::Cursor::new(slice); StorageCodec::decode(&mut buf)? } PolyBytes::Both(value, _) | PolyBytes::Typed(value) => { - let target_arc: Arc = value.downcast_arc().map_err(|_| { - StorageDecodeError::DecodeValue( - anyhow::anyhow!( - "Type mismatch. Original value in PolyBytes::Typed does not match requested type" - ) - .into(), - )})?; + let target_arc: Arc = value + .downcast_arc() + .map_err(RecordDecodeError::TypedValueMismatch)?; // Attempts to move the inner value (T) if this Arc has exactly one strong // reference. Otherwise, it clones the inner value. match Arc::try_unwrap(target_arc) { @@ -112,28 +108,41 @@ impl Record { /// Decode the record body into an Arc. This is the most efficient way to access the entry /// if you need read-only access or if it's acceptable to selectively clone inner sections. If /// the record is in record cache, this will avoid cloning or deserialization of the value. - pub fn decode_arc( - self, - ) -> Result, StorageDecodeError> { + pub fn decode_arc(self) -> Result, RecordDecodeError> { let decoded = match self.body { PolyBytes::Bytes(slice) => { let mut buf = std::io::Cursor::new(slice); Arc::new(StorageCodec::decode(&mut buf)?) } - PolyBytes::Typed(value) | PolyBytes::Both(value, _) => { - value.downcast_arc().map_err(|_| { - StorageDecodeError::DecodeValue( - anyhow::anyhow!( - "Type mismatch. Original value in PolyBytes::Typed does not match requested type" - ) - .into(), - )})? - }, + PolyBytes::Typed(value) | PolyBytes::Both(value, _) => value + .downcast_arc() + .map_err(RecordDecodeError::TypedValueMismatch)?, }; Ok(decoded) } } +#[derive(derive_more::Debug, thiserror::Error)] +pub enum RecordDecodeError { + #[error(transparent)] + StorageDecodeError(#[from] StorageDecodeError), + #[error("Type mismatch. Original value in PolyBytes::Typed does not match requested type")] + #[debug("TypedValueMismatch")] + TypedValueMismatch(Arc), +} + +impl From for StorageDecodeError { + fn from(value: RecordDecodeError) -> Self { + match value { + RecordDecodeError::StorageDecodeError(err) => err, + RecordDecodeError::TypedValueMismatch(_) => StorageDecodeError::DecodeValue( + "Type mismatch. Original value in PolyBytes::Typed does not match requested type" + .into(), + ), + } + } +} + impl EstimatedMemorySize for Record { #[inline] fn estimated_memory_size(&self) -> usize { diff --git a/crates/wal-protocol/src/v2.rs b/crates/wal-protocol/src/v2.rs index 1900683192..0e35476920 100644 --- a/crates/wal-protocol/src/v2.rs +++ b/crates/wal-protocol/src/v2.rs @@ -375,7 +375,6 @@ where C: Command + HasRecordKeys, { fn from(command: C) -> PartialEnvelope { - // let payload = payload.into(); PartialEnvelope { kind: C::KIND, keys: command.record_keys(), diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index e425675b5f..b2315a880d 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -84,6 +84,7 @@ restate-storage-api = { workspace = true, features = ["test-util"] } restate-test-util = { workspace = true, features = ["prost"] } restate-types = { workspace = true, features = ["test-util"] } restate-vqueues = { workspace = true, features = ["test-util"] } +restate-wal-protocol = { workspace = true, features = ["test-util"] } googletest = { workspace = true } mockall = { workspace = true } diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 2ddfcdbf33..9f0486055f 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -28,7 +28,6 @@ mod state_machine; pub mod types; use std::fmt::Debug; -use std::ops::RangeBounds; use std::sync::Arc; use std::time::Duration; @@ -64,7 +63,9 @@ use restate_types::cluster::cluster_state::{PartitionProcessorStatus, ReplayStat use restate_types::config::Configuration; use restate_types::epoch::EpochMetadata; use restate_types::identifiers::LeaderEpoch; -use restate_types::logs::{KeyFilter, Lsn, Record, SequenceNumber}; +use restate_types::logs::{ + KeyFilter, Keys, Lsn, MatchKeyQuery, Record, RecordDecodeError, SequenceNumber, +}; use restate_types::net::ingest::{ DedupSequenceNrQueryRequest, DedupSequenceNrQueryResponse, ReceivedIngestRequest, ResponseStatus, @@ -85,7 +86,8 @@ use restate_vqueues::{VQueuesMeta, VQueuesMetaCache}; use restate_wal_protocol::control::{ AnnounceLeaderCommand, CurrentReplicaSetConfiguration, NextReplicaSetConfiguration, }; -use restate_wal_protocol::{Command, Destination, Envelope, Header}; +use restate_wal_protocol::v2::commands; +use restate_wal_protocol::{Envelope, v2}; use restate_worker_api::invoker::capacity::InvokerCapacity; use restate_worker_api::{LeaderQueryCommand, LeaderQueryReceiver}; @@ -241,6 +243,7 @@ impl PartitionProcessorBuilder { let last_applied_log_lsn_watch = watch::Sender::new(Lsn::INVALID); Ok(PartitionProcessor { + key_filter: KeyFilter::Within(partition_store.partition_key_range().into()), partition_id_str, leadership_state, state_machine, @@ -319,6 +322,7 @@ pub struct PartitionProcessor { last_applied_log_lsn_watch: watch::Sender, cached_epoch_metadata: Option, + key_filter: KeyFilter, } #[derive(Debug, thiserror::Error)] @@ -368,8 +372,9 @@ pub enum ProcessorError { struct LsnEnvelope { pub lsn: Lsn, + pub keys: Keys, pub created_at: NanosSinceEpoch, - pub envelope: Arc, + pub envelope: Arc>, } /// OrderedOperations are scheduled operations that @@ -430,6 +435,33 @@ where res } + /// Decode record tries to decode the record first as v2 Envelope, if it failed, + /// it decodes as v1 Envelope then converts into v2. + fn decode_record(record: Record) -> Result>, StorageDecodeError> { + let envelope = match record.decode_arc::>() { + Ok(envelope) => envelope, + Err(RecordDecodeError::TypedValueMismatch(v1_envelope)) => { + let v1_envelope: Arc = v1_envelope + .downcast_arc() + .map_err(|_| StorageDecodeError::DecodeValue("Type mismatch. Record value in PolyBytes::Typed does not match requested type".into()))?; + + let v1_envelope = match Arc::try_unwrap(v1_envelope) { + Ok(v1_envelope) => v1_envelope, + Err(arc) => arc.as_ref().clone(), + }; + + let envelope: v2::Envelope = v1_envelope + .try_into() + .map_err(|err: anyhow::Error| StorageDecodeError::DecodeValue(err.into()))?; + + Arc::new(envelope) + } + Err(RecordDecodeError::StorageDecodeError(err)) => return Err(err), + }; + + Ok(envelope) + } + async fn run_inner(&mut self) -> Result<(), ProcessorError> { let mut partition_store = self.partition_store.clone(); @@ -625,8 +657,9 @@ where let record = LsnEnvelope { lsn, + keys: record.keys().clone(), created_at: record.created_at(), - envelope: record.decode_arc()?, + envelope: Self::decode_record(record)?, }; let maybe_announce_leader = self.apply_record( @@ -996,37 +1029,55 @@ where action_collector: &mut ActionCollector, vqueues_cache: &mut VQueuesMetaCache, ) -> Result>, state_machine::Error> { - trace!(lsn = %record.lsn, "Processing bifrost record for '{}': {:?}", record.envelope.command.name(), record.envelope.header); - - if let Some(dedup_information) = self.is_targeted_to_me(&record.envelope.header) { - // deduplicate if deduplication information has been provided - if let Some(dedup_information) = dedup_information { - if Self::is_outdated_or_duplicate(dedup_information, transaction).await? { - debug!( - "Ignoring outdated or duplicate message: {:?}", - record.envelope.header - ); - return Ok(None); - } - transaction - .put_dedup_seq_number( - dedup_information.producer_id.clone(), - &dedup_information.sequence_number, - ) - .map_err(state_machine::Error::Storage)?; - } + trace!(lsn = %record.lsn, "Processing bifrost record for '{}': {:?}", record.envelope.kind(), record.envelope.header()); - // todo: redesign to pass the arc (or reference) further down - let record_created_at = record.created_at; - let record_lsn = record.lsn; - let envelope = Arc::unwrap_or_clone(record.envelope); + if !self.is_targeted_to_me(&record.keys) { + self.status.num_skipped_records += 1; + trace!( + "Ignore message which is not targeted to me Partition Range: {:?} Key: {:?}, Header: {:?}", + self.key_filter, + record.keys, + record.envelope.header() + ); + return Ok(None); + } - if let Command::AnnounceLeader(announce_leader) = envelope.command { - // leadership change detected, let's finish our transaction here - return Ok(Some(announce_leader)); - } else if let Command::UpdatePartitionDurability(partition_durability) = - envelope.command - { + // todo(azmy): use dedup() directly without first converting to DedupInformation + let dedup_information: Option = record.envelope.dedup().clone().into(); + + // deduplicate if deduplication information has been provided + if let Some(dedup_information) = dedup_information { + if Self::is_outdated_or_duplicate(&dedup_information, transaction).await? { + debug!( + "Ignoring outdated or duplicate message: {:?}", + record.envelope.header() + ); + return Ok(None); + } + transaction + .put_dedup_seq_number( + dedup_information.producer_id.clone(), + &dedup_information.sequence_number, + ) + .map_err(state_machine::Error::Storage)?; + } + + // todo: redesign to pass the arc (or reference) further down + let record_created_at = record.created_at; + let record_lsn = record.lsn; + // note: v2 envelope is cheaply clonable since it holds either `Bytes` or + // Arc of the payload record. + let envelope = Arc::unwrap_or_clone(record.envelope); + + match envelope.kind() { + v2::CommandKind::AnnounceLeader => { + let envelope = envelope.into_typed::(); + let announce_leader = envelope.into_inner()?; + return Ok(Some(Box::new(announce_leader))); + } + v2::CommandKind::UpdatePartitionDurability => { + let envelope = envelope.into_typed::(); + let partition_durability = envelope.into_inner()?; if partition_durability.partition_id != self.partition_store.partition_id() { self.status.num_skipped_records += 1; trace!( @@ -1044,10 +1095,11 @@ where if self.trim_queue.push(&partition_durability) { transaction.put_partition_durability(&partition_durability)?; } - } else { + } + _ => { self.state_machine .apply( - envelope.command, + envelope, record_created_at.into(), record_lsn, transaction, @@ -1057,31 +1109,13 @@ where ) .await?; } - } else { - self.status.num_skipped_records += 1; - trace!( - "Ignore message which is not targeted to me: {:?}", - record.envelope.header - ); } Ok(None) } - fn is_targeted_to_me<'a>(&self, header: &'a Header) -> Option<&'a Option> { - match &header.dest { - Destination::Processor { - partition_key, - dedup, - } if self - .partition_store - .partition_key_range() - .contains(partition_key) => - { - Some(dedup) - } - _ => None, - } + fn is_targeted_to_me(&self, keys: &Keys) -> bool { + keys.matches_key_query(&self.key_filter) } async fn is_outdated_or_duplicate( diff --git a/crates/worker/src/partition/state_machine/entries/attach_invocation_command.rs b/crates/worker/src/partition/state_machine/entries/attach_invocation_command.rs index 5126e83555..61ff50da31 100644 --- a/crates/worker/src/partition/state_machine/entries/attach_invocation_command.rs +++ b/crates/worker/src/partition/state_machine/entries/attach_invocation_command.rs @@ -55,7 +55,7 @@ mod tests { AttachInvocationCommand, AttachInvocationCompletion, AttachInvocationResult, AttachInvocationTarget, CommandType, Entry, EntryMetadata, EntryType, NotificationId, }; - use restate_wal_protocol::Command; + use restate_wal_protocol::v2::{Command, commands}; use rstest::rstest; #[rstest] @@ -83,7 +83,7 @@ mod tests { let actions = test_env .apply_multiple([ invoker_entry_effect(invocation_id, attach_invocation_command.clone()), - Command::InvocationResponse(InvocationResponse { + commands::InvocationResponseCommand::test_envelope(InvocationResponse { target: JournalCompletionTarget::from_parts(invocation_id, completion_id), result: ResponseResult::Success(success_result.clone()), }), diff --git a/crates/worker/src/partition/state_machine/entries/call_commands.rs b/crates/worker/src/partition/state_machine/entries/call_commands.rs index 6d4683370e..e98677ca53 100644 --- a/crates/worker/src/partition/state_machine/entries/call_commands.rs +++ b/crates/worker/src/partition/state_machine/entries/call_commands.rs @@ -165,7 +165,7 @@ mod tests { CommandType, Entry, EntryMetadata, EntryType, NotificationId, OneWayCallCommand, }; use restate_types::time::MillisSinceEpoch; - use restate_wal_protocol::Command; + use restate_wal_protocol::v2::{Command, commands}; use rstest::rstest; use std::time::{Duration, SystemTime}; @@ -195,7 +195,7 @@ mod tests { let actions = test_env .apply_multiple([ invoker_entry_effect(invocation_id, call_command.clone()), - Command::InvocationResponse(InvocationResponse { + commands::InvocationResponseCommand::test_envelope(InvocationResponse { target: JournalCompletionTarget::from_parts( invocation_id, result_completion_id, diff --git a/crates/worker/src/partition/state_machine/entries/get_invocation_output_command.rs b/crates/worker/src/partition/state_machine/entries/get_invocation_output_command.rs index e79808960c..9cd1433b6a 100644 --- a/crates/worker/src/partition/state_machine/entries/get_invocation_output_command.rs +++ b/crates/worker/src/partition/state_machine/entries/get_invocation_output_command.rs @@ -57,7 +57,7 @@ mod tests { GetInvocationOutputCommand, GetInvocationOutputCompletion, GetInvocationOutputResult, NotificationId, }; - use restate_wal_protocol::Command; + use restate_wal_protocol::v2::{Command, commands}; use rstest::rstest; #[rstest] @@ -90,12 +90,14 @@ mod tests { completion_id, }; let response_command = if complete_using_notify_get_invocation_output { - Command::NotifyGetInvocationOutputResponse(GetInvocationOutputResponse { - target: JournalCompletionTarget::from_parts(invocation_id, completion_id), - result: expected_get_invocation_result.clone(), - }) + commands::NotifyGetInvocationOutputResponseCommand::test_envelope( + GetInvocationOutputResponse { + target: JournalCompletionTarget::from_parts(invocation_id, completion_id), + result: expected_get_invocation_result.clone(), + }, + ) } else { - Command::InvocationResponse(InvocationResponse { + commands::InvocationResponseCommand::test_envelope(InvocationResponse { target: JournalCompletionTarget::from_parts(invocation_id, completion_id), result: if complete_with_not_ready { ResponseResult::Failure(NOT_READY_INVOCATION_ERROR) diff --git a/crates/worker/src/partition/state_machine/entries/mod.rs b/crates/worker/src/partition/state_machine/entries/mod.rs index 7af7c3c1ed..83414a922a 100644 --- a/crates/worker/src/partition/state_machine/entries/mod.rs +++ b/crates/worker/src/partition/state_machine/entries/mod.rs @@ -432,7 +432,7 @@ mod tests { }; use restate_types::journal_v2::{CallCommand, CallRequest}; use restate_types::{RESTATE_VERSION_1_6_0, SemanticRestateVersion}; - use restate_wal_protocol::Command; + use restate_wal_protocol::v2::{Command, commands}; #[restate_core::test] async fn update_journal_and_commands_length() { @@ -484,13 +484,15 @@ mod tests { ); let _ = test_env - .apply(Command::InvocationResponse(InvocationResponse { - target: JournalCompletionTarget { - caller_id: invocation_id, - caller_completion_id: result_completion_id, + .apply(commands::InvocationResponseCommand::test_envelope( + InvocationResponse { + target: JournalCompletionTarget { + caller_id: invocation_id, + caller_completion_id: result_completion_id, + }, + result: ResponseResult::Success(success_result.clone()), }, - result: ResponseResult::Success(success_result.clone()), - })) + )) .await; assert_that!( test_env diff --git a/crates/worker/src/partition/state_machine/entries/notification.rs b/crates/worker/src/partition/state_machine/entries/notification.rs index 94416e21fd..eae64e6520 100644 --- a/crates/worker/src/partition/state_machine/entries/notification.rs +++ b/crates/worker/src/partition/state_machine/entries/notification.rs @@ -100,8 +100,8 @@ mod tests { }; use restate_types::time::MillisSinceEpoch; use restate_types::{RESTATE_VERSION_1_6_0, SemanticRestateVersion}; - use restate_wal_protocol::Command; use restate_wal_protocol::timer::TimerKeyValue; + use restate_wal_protocol::v2::{Command, commands}; use rstest::rstest; use std::time::Duration; @@ -118,10 +118,12 @@ mod tests { // Send signal notification let signal = Signal::new(SignalId::for_index(17), signal_result); let actions = test_env - .apply(Command::NotifySignal(NotifySignalRequest { - invocation_id, - signal: signal.clone(), - })) + .apply(commands::NotifySignalCommand::test_envelope( + NotifySignalRequest { + invocation_id, + signal: signal.clone(), + }, + )) .await; assert_that!( actions, @@ -164,10 +166,12 @@ mod tests { // Send signal notification before pinned deployment let signal = Signal::new(SignalId::for_index(17), SignalResult::Void); let actions = test_env - .apply(Command::NotifySignal(NotifySignalRequest { - invocation_id, - signal: signal.clone(), - })) + .apply(commands::NotifySignalCommand::test_envelope( + NotifySignalRequest { + invocation_id, + signal: signal.clone(), + }, + )) .await; assert_that!( actions, @@ -233,11 +237,9 @@ mod tests { // Send a completion notification for a command (e.g., Sleep) with completion_id = 1 let completion_id = 1; let _ = test_env - .apply(Command::Timer(TimerKeyValue::complete_journal_entry( - wake_up_time, - invocation_id, - completion_id, - ))) + .apply(commands::TimerCommand::test_envelope( + TimerKeyValue::complete_journal_entry(wake_up_time, invocation_id, completion_id), + )) .await; // The invocation should remain paused @@ -280,11 +282,13 @@ mod tests { // Apply the cancel signal notification let actions = test_env - .apply(Command::TerminateInvocation(InvocationTermination { - invocation_id, - flavor: TerminationFlavor::Cancel, - response_sink: None, - })) + .apply(commands::TerminateInvocationCommand::test_envelope( + InvocationTermination { + invocation_id, + flavor: TerminationFlavor::Cancel, + response_sink: None, + }, + )) .await; // The invocation should be resumed (invoke action dispatched) diff --git a/crates/worker/src/partition/state_machine/lifecycle/cancel.rs b/crates/worker/src/partition/state_machine/lifecycle/cancel.rs index 61e1c88ce8..31b22dc910 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/cancel.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/cancel.rs @@ -144,7 +144,7 @@ mod tests { use restate_types::service_protocol::ServiceProtocolVersion; use restate_types::time::MillisSinceEpoch; use restate_types::{RESTATE_VERSION_1_6_0, SemanticRestateVersion}; - use restate_wal_protocol::Command; + use restate_wal_protocol::v2::{Command, commands}; use restate_worker_api::invoker::Effect; #[restate_core::test] @@ -155,11 +155,13 @@ mod tests { // Send signal notification let actions = test_env - .apply(Command::TerminateInvocation(InvocationTermination { - invocation_id, - flavor: TerminationFlavor::Cancel, - response_sink: None, - })) + .apply(commands::TerminateInvocationCommand::test_envelope( + InvocationTermination { + invocation_id, + flavor: TerminationFlavor::Cancel, + response_sink: None, + }, + )) .await; assert_that!( actions, @@ -181,10 +183,12 @@ mod tests { // Send signal notification let actions = test_env - .apply(Command::NotifySignal(NotifySignalRequest { - invocation_id, - signal: CANCEL_SIGNAL.try_into().unwrap(), - })) + .apply(commands::NotifySignalCommand::test_envelope( + NotifySignalRequest { + invocation_id, + signal: CANCEL_SIGNAL.try_into().unwrap(), + }, + )) .await; assert_that!( actions, @@ -206,10 +210,12 @@ mod tests { // Send signal notification before pinning the deployment let actions = test_env - .apply(Command::NotifySignal(NotifySignalRequest { - invocation_id, - signal: CANCEL_SIGNAL.try_into().unwrap(), - })) + .apply(commands::NotifySignalCommand::test_envelope( + NotifySignalRequest { + invocation_id, + signal: CANCEL_SIGNAL.try_into().unwrap(), + }, + )) .await; assert_that!( actions, @@ -222,13 +228,13 @@ mod tests { // Now pin to protocol v4, this should apply the cancel notification let actions = test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::PinnedDeployment(PinnedDeployment { deployment_id: DeploymentId::default(), service_protocol_version: ServiceProtocolVersion::V4, }), - }))) + })) .await; assert_that!( actions, @@ -250,10 +256,12 @@ mod tests { // Send signal notification before pinning the deployment let actions = test_env - .apply(Command::NotifySignal(NotifySignalRequest { - invocation_id, - signal: CANCEL_SIGNAL.try_into().unwrap(), - })) + .apply(commands::NotifySignalCommand::test_envelope( + NotifySignalRequest { + invocation_id, + signal: CANCEL_SIGNAL.try_into().unwrap(), + }, + )) .await; assert_that!( actions, @@ -301,12 +309,12 @@ mod tests { let rpc_id = PartitionProcessorRpcRequestId::new(); let _ = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, execution_time: Some(MillisSinceEpoch::MAX), response_sink: Some(ServiceInvocationResponseSink::ingress(rpc_id)), ..ServiceInvocation::mock() - }))) + })) .await; // assert that scheduled invocation is in invocation_status @@ -317,10 +325,12 @@ mod tests { assert!(let InvocationStatus::Scheduled(_) = current_invocation_status); let actions = test_env - .apply(Command::NotifySignal(NotifySignalRequest { - invocation_id, - signal: CANCEL_SIGNAL.try_into().unwrap(), - })) + .apply(commands::NotifySignalCommand::test_envelope( + NotifySignalRequest { + invocation_id, + signal: CANCEL_SIGNAL.try_into().unwrap(), + }, + )) .await; assert_that!( actions, @@ -395,22 +405,22 @@ mod tests { let caller_id = InvocationId::mock_random(); let _ = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), ..ServiceInvocation::mock() - }))) + })) .await; let _ = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id: inboxed_id, invocation_target: inboxed_target, response_sink: Some(ServiceInvocationResponseSink::PartitionProcessor( JournalCompletionTarget::from_parts(caller_id, 0), )), ..ServiceInvocation::mock() - }))) + })) .await; let current_invocation_status = test_env @@ -422,10 +432,12 @@ mod tests { assert!(let InvocationStatus::Inboxed(_) = current_invocation_status); let actions = test_env - .apply(Command::NotifySignal(NotifySignalRequest { - invocation_id: inboxed_id, - signal: CANCEL_SIGNAL.try_into().unwrap(), - })) + .apply(commands::NotifySignalCommand::test_envelope( + NotifySignalRequest { + invocation_id: inboxed_id, + signal: CANCEL_SIGNAL.try_into().unwrap(), + }, + )) .await; let current_invocation_status = test_env diff --git a/crates/worker/src/partition/state_machine/lifecycle/event.rs b/crates/worker/src/partition/state_machine/lifecycle/event.rs index e3633b48a7..35b56ff40a 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/event.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/event.rs @@ -86,7 +86,7 @@ mod tests { use googletest::prelude::*; use restate_types::journal_events::raw::RawEvent; use restate_types::journal_events::{Event, TransientErrorEvent}; - use restate_wal_protocol::Command; + use restate_wal_protocol::v2::{Command, commands}; use restate_worker_api::invoker::Effect; #[restate_core::test] @@ -106,13 +106,13 @@ mod tests { }; let _ = test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::JournalEvent { event: RawEvent::from(Event::TransientError(transient_error_event.clone())) .clone(), }, - }))) + })) .await; assert_that!( diff --git a/crates/worker/src/partition/state_machine/lifecycle/manual_resume.rs b/crates/worker/src/partition/state_machine/lifecycle/manual_resume.rs index 6c73fb9d71..dc03096888 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/manual_resume.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/manual_resume.rs @@ -108,7 +108,7 @@ mod tests { use restate_types::invocation::{IngressInvocationResponseSink, ResumeInvocationRequest}; use restate_types::journal_v2::{NotificationId, SleepCommand}; use restate_types::service_protocol::ServiceProtocolVersion; - use restate_wal_protocol::Command; + use restate_wal_protocol::v2::{Command, commands}; use restate_worker_api::invoker::Effect; use std::time::{Duration, SystemTime}; @@ -131,13 +131,15 @@ mod tests { // Now on manual resume, we should resume the suspended invocation let request_id = PartitionProcessorRpcRequestId::new(); let actions = test_env - .apply(Command::ResumeInvocation(ResumeInvocationRequest { - invocation_id, - update_pinned_deployment_id: None, - response_sink: Some(InvocationMutationResponseSink::Ingress( - IngressInvocationResponseSink { request_id }, - )), - })) + .apply(commands::ResumeInvocationCommand::test_envelope( + ResumeInvocationRequest { + invocation_id, + update_pinned_deployment_id: None, + response_sink: Some(InvocationMutationResponseSink::Ingress( + IngressInvocationResponseSink { request_id }, + )), + }, + )) .await; assert_that!( actions, @@ -168,13 +170,13 @@ mod tests { let initial_deployment_id = DeploymentId::new(); // Pin deployment let _ = test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::PinnedDeployment(PinnedDeployment { deployment_id: initial_deployment_id, service_protocol_version: ServiceProtocolVersion::V5, }), - }))) + })) .await; // Mock paused test_env @@ -193,13 +195,15 @@ mod tests { let request_id = PartitionProcessorRpcRequestId::new(); let new_deployment_id = DeploymentId::new(); let actions = test_env - .apply(Command::ResumeInvocation(ResumeInvocationRequest { - invocation_id, - update_pinned_deployment_id: Some(new_deployment_id), - response_sink: Some(InvocationMutationResponseSink::Ingress( - IngressInvocationResponseSink { request_id }, - )), - })) + .apply(commands::ResumeInvocationCommand::test_envelope( + ResumeInvocationRequest { + invocation_id, + update_pinned_deployment_id: Some(new_deployment_id), + response_sink: Some(InvocationMutationResponseSink::Ingress( + IngressInvocationResponseSink { request_id }, + )), + }, + )) .await; assert_that!( actions, @@ -260,13 +264,15 @@ mod tests { // Now on manual resume, we should resume the suspended invocation let request_id = PartitionProcessorRpcRequestId::new(); let actions = test_env - .apply(Command::ResumeInvocation(ResumeInvocationRequest { - invocation_id, - update_pinned_deployment_id: None, - response_sink: Some(InvocationMutationResponseSink::Ingress( - IngressInvocationResponseSink { request_id }, - )), - })) + .apply(commands::ResumeInvocationCommand::test_envelope( + ResumeInvocationRequest { + invocation_id, + update_pinned_deployment_id: None, + response_sink: Some(InvocationMutationResponseSink::Ingress( + IngressInvocationResponseSink { request_id }, + )), + }, + )) .await; assert_that!( actions, diff --git a/crates/worker/src/partition/state_machine/lifecycle/notify_invocation_response.rs b/crates/worker/src/partition/state_machine/lifecycle/notify_invocation_response.rs index 5e9cea47dd..cd7be14372 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/notify_invocation_response.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/notify_invocation_response.rs @@ -171,7 +171,7 @@ mod tests { use restate_types::journal_v2::{ CallCommand, CallInvocationIdCompletion, CallRequest, Entry, EntryType, NotificationId, }; - use restate_wal_protocol::Command; + use restate_wal_protocol::v2::{Command, commands}; #[restate_core::test] async fn reply_to_call_with_failure_and_metadata() { @@ -197,7 +197,7 @@ mod tests { let actions = test_env .apply_multiple([ fixtures::invoker_entry_effect(invocation_id, call_command.clone()), - Command::InvocationResponse(InvocationResponse { + commands::InvocationResponseCommand::test_envelope(InvocationResponse { target: JournalCompletionTarget::from_parts( invocation_id, result_completion_id, diff --git a/crates/worker/src/partition/state_machine/lifecycle/paused.rs b/crates/worker/src/partition/state_machine/lifecycle/paused.rs index dbf9d8bc61..6353c96921 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/paused.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/paused.rs @@ -126,7 +126,7 @@ mod tests { InFlightInvocationMetadata, InvocationStatusDiscriminants, ReadInvocationStatusTable, }; use restate_types::journal_events::{Event, PausedEvent, TransientErrorEvent}; - use restate_wal_protocol::Command; + use restate_wal_protocol::v2::{Command, commands}; use crate::partition::state_machine::tests::{TestEnv, fixtures, matchers}; use crate::partition::types::InvokerEffectKind; @@ -151,14 +151,14 @@ mod tests { // Check we just pause let _ = test_env - .apply(Command::InvokerEffect(Box::new( + .apply(commands::InvokerEffectCommand::test_envelope( restate_worker_api::invoker::Effect { invocation_id, kind: InvokerEffectKind::Paused { paused_event: paused_event.clone().into(), }, }, - ))) + )) .await; assert_that!( test_env @@ -202,12 +202,12 @@ mod tests { // Check we just pause let _ = test_env - .apply(Command::InvokerEffect(Box::new( + .apply(commands::InvokerEffectCommand::test_envelope( restate_worker_api::invoker::Effect { invocation_id, kind: InvokerEffectKind::Paused { paused_event }, }, - ))) + )) .await; assert_that!( test_env diff --git a/crates/worker/src/partition/state_machine/lifecycle/purge_journal.rs b/crates/worker/src/partition/state_machine/lifecycle/purge_journal.rs index 74dd043f0f..2c0e3f15b0 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/purge_journal.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/purge_journal.rs @@ -111,7 +111,7 @@ mod tests { }; use restate_types::journal_v2::{CommandType, OutputCommand, OutputResult}; use restate_types::service_protocol::ServiceProtocolVersion; - use restate_wal_protocol::Command; + use restate_wal_protocol::v2::{Command, commands}; use std::time::Duration; #[restate_core::test] @@ -127,27 +127,28 @@ mod tests { let response_bytes = Bytes::from_static(b"123"); // Create and complete a fresh invocation - let actions = Box::pin(test_env.apply_multiple([ - Command::Invoke(Box::new(ServiceInvocation { - invocation_id, - invocation_target: invocation_target.clone(), - response_sink: Some(ServiceInvocationResponseSink::Ingress { request_id }), - idempotency_key: Some(idempotency_key.clone()), - completion_retention_duration: completion_retention, - journal_retention_duration: journal_retention, - ..ServiceInvocation::mock() - })), - pinned_deployment(invocation_id, ServiceProtocolVersion::V5), - invoker_entry_effect( - invocation_id, - OutputCommand { - result: OutputResult::Success(response_bytes.clone()), - name: Default::default(), - }, - ), - invoker_end_effect(invocation_id), - ])) - .await; + let actions = test_env + .apply_multiple([ + commands::InvokeCommand::test_envelope(ServiceInvocation { + invocation_id, + invocation_target: invocation_target.clone(), + response_sink: Some(ServiceInvocationResponseSink::Ingress { request_id }), + idempotency_key: Some(idempotency_key.clone()), + completion_retention_duration: completion_retention, + journal_retention_duration: journal_retention, + ..ServiceInvocation::mock() + }), + pinned_deployment(invocation_id, ServiceProtocolVersion::V5), + invoker_entry_effect( + invocation_id, + OutputCommand { + result: OutputResult::Success(response_bytes.clone()), + name: Default::default(), + }, + ), + invoker_end_effect(invocation_id), + ]) + .await; // Assert response assert_that!( @@ -185,22 +186,24 @@ mod tests { // Now let's purge the journal test_env - .apply(Command::PurgeJournal(PurgeInvocationRequest { - invocation_id, - response_sink: None, - })) + .apply(commands::PurgeJournalCommand::test_envelope( + PurgeInvocationRequest { + invocation_id, + response_sink: None, + }, + )) .await; // At this point we should still be able to de-duplicate the invocation let request_id = PartitionProcessorRpcRequestId::default(); let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), response_sink: Some(ServiceInvocationResponseSink::Ingress { request_id }), idempotency_key: Some(idempotency_key), ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -229,10 +232,12 @@ mod tests { // Now purge completely test_env - .apply(Command::PurgeInvocation(PurgeInvocationRequest { - invocation_id, - response_sink: None, - })) + .apply(commands::PurgeInvocationCommand::test_envelope( + PurgeInvocationRequest { + invocation_id, + response_sink: None, + }, + )) .await; // Nothing should be left diff --git a/crates/worker/src/partition/state_machine/lifecycle/restart_as_new.rs b/crates/worker/src/partition/state_machine/lifecycle/restart_as_new.rs index 8bf019bd0c..d619a5efb3 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/restart_as_new.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/restart_as_new.rs @@ -352,8 +352,8 @@ mod tests { }; use restate_types::service_protocol::ServiceProtocolVersion; use restate_types::time::MillisSinceEpoch; - use restate_wal_protocol::Command; use restate_wal_protocol::timer::TimerKeyValue; + use restate_wal_protocol::v2::{Command, commands}; use std::time::Duration; #[restate_core::test] @@ -363,25 +363,26 @@ mod tests { // Start and complete an invocation with only the input entry let invocation_target = InvocationTarget::mock_virtual_object(); let original_invocation_id = InvocationId::generate(&invocation_target, None); - let _ = Box::pin(test_env.apply_multiple([ - Command::Invoke(Box::new(ServiceInvocation { - invocation_id: original_invocation_id, - invocation_target: invocation_target.clone(), - completion_retention_duration: Duration::from_secs(120), - journal_retention_duration: Duration::ZERO, - ..ServiceInvocation::mock() - })), - fixtures::pinned_deployment(original_invocation_id, ServiceProtocolVersion::V6), - fixtures::invoker_entry_effect( - original_invocation_id, - OutputCommand { - result: OutputResult::Success(Default::default()), - name: Default::default(), - }, - ), - fixtures::invoker_end_effect(original_invocation_id), - ])) - .await; + let _ = test_env + .apply_multiple([ + commands::InvokeCommand::test_envelope(ServiceInvocation { + invocation_id: original_invocation_id, + invocation_target: invocation_target.clone(), + completion_retention_duration: Duration::from_secs(120), + journal_retention_duration: Duration::ZERO, + ..ServiceInvocation::mock() + }), + fixtures::pinned_deployment(original_invocation_id, ServiceProtocolVersion::V6), + fixtures::invoker_entry_effect( + original_invocation_id, + OutputCommand { + result: OutputResult::Success(Default::default()), + name: Default::default(), + }, + ), + fixtures::invoker_end_effect(original_invocation_id), + ]) + .await; // Sanity check, journal should not be available assert_that!( @@ -401,7 +402,7 @@ mod tests { let new_id = InvocationId::mock_generate(&InvocationTarget::mock_virtual_object()); let request_id = PartitionProcessorRpcRequestId::new(); let actions = test_env - .apply(Command::RestartAsNewInvocation( + .apply(commands::RestartAsNewInvocationCommand::test_envelope( RestartAsNewInvocationRequest { invocation_id: original_invocation_id, new_invocation_id: new_id, @@ -438,27 +439,28 @@ mod tests { // Start invocation, then kill it let invocation_target = InvocationTarget::mock_virtual_object(); let original_invocation_id = InvocationId::generate(&invocation_target, None); - let _ = Box::pin(test_env.apply_multiple([ - Command::Invoke(Box::new(ServiceInvocation { - invocation_id: original_invocation_id, - invocation_target: invocation_target.clone(), - completion_retention_duration: Duration::from_secs(120), - journal_retention_duration: Duration::from_secs(120), - ..ServiceInvocation::mock() - })), - Command::TerminateInvocation(InvocationTermination { - invocation_id: original_invocation_id, - flavor: TerminationFlavor::Kill, - response_sink: None, - }), - ])) - .await; + let _ = test_env + .apply_multiple([ + commands::InvokeCommand::test_envelope(ServiceInvocation { + invocation_id: original_invocation_id, + invocation_target: invocation_target.clone(), + completion_retention_duration: Duration::from_secs(120), + journal_retention_duration: Duration::from_secs(120), + ..ServiceInvocation::mock() + }), + commands::TerminateInvocationCommand::test_envelope(InvocationTermination { + invocation_id: original_invocation_id, + flavor: TerminationFlavor::Kill, + response_sink: None, + }), + ]) + .await; // Restart as new with copy_prefix_up_to_index_included = 0 let new_id = InvocationId::mock_generate(&invocation_target); let request_id = PartitionProcessorRpcRequestId::new(); let actions = test_env - .apply(Command::RestartAsNewInvocation( + .apply(commands::RestartAsNewInvocationCommand::test_envelope( RestartAsNewInvocationRequest { invocation_id: original_invocation_id, new_invocation_id: new_id, @@ -508,31 +510,32 @@ mod tests { // Start and complete an invocation with only the input entry let invocation_target = InvocationTarget::mock_virtual_object(); let original_invocation_id = InvocationId::generate(&invocation_target, None); - let _ = Box::pin(test_env.apply_multiple([ - Command::Invoke(Box::new(ServiceInvocation { - invocation_id: original_invocation_id, - invocation_target: invocation_target.clone(), - completion_retention_duration: Duration::from_secs(120), - journal_retention_duration: Duration::from_secs(120), - ..ServiceInvocation::mock() - })), - fixtures::pinned_deployment(original_invocation_id, ServiceProtocolVersion::V6), - fixtures::invoker_entry_effect( - original_invocation_id, - OutputCommand { - result: OutputResult::Success(Default::default()), - name: Default::default(), - }, - ), - fixtures::invoker_end_effect(original_invocation_id), - ])) - .await; + let _ = test_env + .apply_multiple([ + commands::InvokeCommand::test_envelope(ServiceInvocation { + invocation_id: original_invocation_id, + invocation_target: invocation_target.clone(), + completion_retention_duration: Duration::from_secs(120), + journal_retention_duration: Duration::from_secs(120), + ..ServiceInvocation::mock() + }), + fixtures::pinned_deployment(original_invocation_id, ServiceProtocolVersion::V6), + fixtures::invoker_entry_effect( + original_invocation_id, + OutputCommand { + result: OutputResult::Success(Default::default()), + name: Default::default(), + }, + ), + fixtures::invoker_end_effect(original_invocation_id), + ]) + .await; // Restart as new with copy_prefix_up_to_index_included = 0 let new_id = InvocationId::mock_generate(&invocation_target); let request_id = PartitionProcessorRpcRequestId::new(); let actions = test_env - .apply(Command::RestartAsNewInvocation( + .apply(commands::RestartAsNewInvocationCommand::test_envelope( RestartAsNewInvocationRequest { invocation_id: original_invocation_id, new_invocation_id: new_id, @@ -583,25 +586,26 @@ mod tests { // Mock the first invocation for the same target that we will restart let original_invocation_id = InvocationId::generate(&invocation_target, None); - let _ = Box::pin(test_env.apply_multiple([ - Command::Invoke(Box::new(ServiceInvocation { - invocation_id: original_invocation_id, - invocation_target: invocation_target.clone(), - completion_retention_duration: Duration::from_secs(120), - journal_retention_duration: Duration::from_secs(120), - ..ServiceInvocation::mock() - })), - fixtures::pinned_deployment(original_invocation_id, ServiceProtocolVersion::V6), - fixtures::invoker_entry_effect( - original_invocation_id, - OutputCommand { - result: OutputResult::Success(Default::default()), - name: Default::default(), - }, - ), - fixtures::invoker_end_effect(original_invocation_id), - ])) - .await; + let _ = test_env + .apply_multiple([ + commands::InvokeCommand::test_envelope(ServiceInvocation { + invocation_id: original_invocation_id, + invocation_target: invocation_target.clone(), + completion_retention_duration: Duration::from_secs(120), + journal_retention_duration: Duration::from_secs(120), + ..ServiceInvocation::mock() + }), + fixtures::pinned_deployment(original_invocation_id, ServiceProtocolVersion::V6), + fixtures::invoker_entry_effect( + original_invocation_id, + OutputCommand { + result: OutputResult::Success(Default::default()), + name: Default::default(), + }, + ), + fixtures::invoker_end_effect(original_invocation_id), + ]) + .await; // Now before restarting the invocation, lock the VO let locker_id = fixtures::mock_start_invocation_with_invocation_target( @@ -613,7 +617,7 @@ mod tests { // Now restart original into a new invocation while VO is locked by locker_id let new_id = InvocationId::mock_generate(&invocation_target); let _ = test_env - .apply(Command::RestartAsNewInvocation( + .apply(commands::RestartAsNewInvocationCommand::test_envelope( RestartAsNewInvocationRequest { invocation_id: original_invocation_id, new_invocation_id: new_id, @@ -657,53 +661,54 @@ mod tests { // Build journal: input(0 already) + command(1) + signal() + completion(2) let wake_up_time = MillisSinceEpoch::now(); let completion_id = 1u32; - let _ = Box::pin(test_env.apply_multiple([ - Command::Invoke(Box::new(ServiceInvocation { - invocation_id: original_invocation_id, - invocation_target: invocation_target.clone(), - completion_retention_duration: Duration::from_secs(120), - journal_retention_duration: Duration::from_secs(120), - ..ServiceInvocation::mock() - })), - fixtures::pinned_deployment(original_invocation_id, ServiceProtocolVersion::V6), - fixtures::invoker_entry_effect( - original_invocation_id, - SleepCommand { + let _ = test_env + .apply_multiple([ + commands::InvokeCommand::test_envelope(ServiceInvocation { + invocation_id: original_invocation_id, + invocation_target: invocation_target.clone(), + completion_retention_duration: Duration::from_secs(120), + journal_retention_duration: Duration::from_secs(120), + ..ServiceInvocation::mock() + }), + fixtures::pinned_deployment(original_invocation_id, ServiceProtocolVersion::V6), + fixtures::invoker_entry_effect( + original_invocation_id, + SleepCommand { + wake_up_time, + name: Default::default(), + completion_id, + }, + ), + commands::NotifySignalCommand::test_envelope(NotifySignalRequest { + invocation_id: original_invocation_id, + signal: Signal::new( + SignalId::for_index(1), + SignalResult::Success(Default::default()), + ), + }), + fixtures::invoker_entry_effect( + original_invocation_id, + SleepCommand { + wake_up_time, + name: Default::default(), + completion_id: completion_id + 1, + }, + ), + commands::TimerCommand::test_envelope(TimerKeyValue::complete_journal_entry( wake_up_time, - name: Default::default(), + original_invocation_id, completion_id, - }, - ), - Command::NotifySignal(NotifySignalRequest { - invocation_id: original_invocation_id, - signal: Signal::new( - SignalId::for_index(1), - SignalResult::Success(Default::default()), + )), + fixtures::invoker_entry_effect( + original_invocation_id, + OutputCommand { + result: OutputResult::Success(Default::default()), + name: Default::default(), + }, ), - }), - fixtures::invoker_entry_effect( - original_invocation_id, - SleepCommand { - wake_up_time, - name: Default::default(), - completion_id: completion_id + 1, - }, - ), - Command::Timer(TimerKeyValue::complete_journal_entry( - wake_up_time, - original_invocation_id, - completion_id, - )), - fixtures::invoker_entry_effect( - original_invocation_id, - OutputCommand { - result: OutputResult::Success(Default::default()), - name: Default::default(), - }, - ), - fixtures::invoker_end_effect(original_invocation_id), - ])) - .await; + fixtures::invoker_end_effect(original_invocation_id), + ]) + .await; // Capture original random seed to compare later let original_status = test_env @@ -721,7 +726,7 @@ mod tests { ); let new_deployment_id = DeploymentId::new(); let _ = test_env - .apply(Command::RestartAsNewInvocation( + .apply(commands::RestartAsNewInvocationCommand::test_envelope( RestartAsNewInvocationRequest { invocation_id: original_invocation_id, new_invocation_id, @@ -773,26 +778,27 @@ mod tests { // Start and complete an invocation with the journal retained let invocation_target = InvocationTarget::mock_virtual_object(); let original_invocation_id = InvocationId::generate(&invocation_target, None); - let _ = Box::pin(test_env.apply_multiple([ - Command::Invoke(Box::new(ServiceInvocation { - invocation_id: original_invocation_id, - invocation_target: invocation_target.clone(), - completion_retention_duration: Duration::from_secs(120), - journal_retention_duration: Duration::from_secs(120), - ..ServiceInvocation::mock() - })), - fixtures::pinned_deployment(original_invocation_id, ServiceProtocolVersion::V6), - // Complete with an output so the journal has at least input+output - fixtures::invoker_entry_effect( - original_invocation_id, - OutputCommand { - result: OutputResult::Success(Default::default()), - name: Default::default(), - }, - ), - fixtures::invoker_end_effect(original_invocation_id), - ])) - .await; + let _ = test_env + .apply_multiple([ + commands::InvokeCommand::test_envelope(ServiceInvocation { + invocation_id: original_invocation_id, + invocation_target: invocation_target.clone(), + completion_retention_duration: Duration::from_secs(120), + journal_retention_duration: Duration::from_secs(120), + ..ServiceInvocation::mock() + }), + fixtures::pinned_deployment(original_invocation_id, ServiceProtocolVersion::V6), + // Complete with an output so the journal has at least input+output + fixtures::invoker_entry_effect( + original_invocation_id, + OutputCommand { + result: OutputResult::Success(Default::default()), + name: Default::default(), + }, + ), + fixtures::invoker_end_effect(original_invocation_id), + ]) + .await; // Fetch completed status to get the exact journal length let original_status = test_env @@ -809,7 +815,7 @@ mod tests { let new_invocation_id = InvocationId::mock_generate(&invocation_target); let request_id = PartitionProcessorRpcRequestId::new(); let actions = test_env - .apply(Command::RestartAsNewInvocation( + .apply(commands::RestartAsNewInvocationCommand::test_envelope( RestartAsNewInvocationRequest { invocation_id: original_invocation_id, new_invocation_id, diff --git a/crates/worker/src/partition/state_machine/lifecycle/suspend.rs b/crates/worker/src/partition/state_machine/lifecycle/suspend.rs index 14ecf6f46f..50933ada5c 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/suspend.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/suspend.rs @@ -178,8 +178,8 @@ mod tests { }; use restate_types::time::MillisSinceEpoch; use restate_types::{RESTATE_VERSION_1_6_0, SemanticRestateVersion}; - use restate_wal_protocol::Command; use restate_wal_protocol::timer::TimerKeyValue; + use restate_wal_protocol::v2::{Command, commands}; use std::time::{Duration, SystemTime}; use tracing::info; @@ -212,7 +212,9 @@ mod tests { })) ); - let actions = test_env.apply(Command::Timer(timer_key_value)).await; + let actions = test_env + .apply(commands::TimerCommand::test_envelope(timer_key_value)) + .await; assert_that!( actions, contains(matchers::actions::invoke_for_id(invocation_id)) @@ -252,7 +254,7 @@ mod tests { let actions = test_env .apply_multiple([ invoker_entry_effect(invocation_id, sleep_command.clone()), - Command::Timer(timer_key_value.clone()), + commands::TimerCommand::test_envelope(timer_key_value.clone()), ]) .await; assert_that!( @@ -335,10 +337,12 @@ mod tests { result: SignalResult::Void, }; let actions = test_env - .apply(Command::NotifySignal(NotifySignalRequest { - invocation_id, - signal: signal.clone(), - })) + .apply(commands::NotifySignalCommand::test_envelope( + NotifySignalRequest { + invocation_id, + signal: signal.clone(), + }, + )) .await; assert_that!( actions, @@ -403,13 +407,15 @@ mod tests { ) -> Self { let actions = self .test_env - .apply(Command::NotifySignal(NotifySignalRequest { - invocation_id: self.invocation_id, - signal: Signal { - id: signal_id, - result: signal_result, + .apply(commands::NotifySignalCommand::test_envelope( + NotifySignalRequest { + invocation_id: self.invocation_id, + signal: Signal { + id: signal_id, + result: signal_result, + }, }, - })) + )) .await; self.log_invocation_status().await; @@ -436,13 +442,15 @@ mod tests { ) -> Self { let actions = self .test_env - .apply(Command::NotifySignal(NotifySignalRequest { - invocation_id: self.invocation_id, - signal: Signal { - id: signal_id, - result: signal_result, + .apply(commands::NotifySignalCommand::test_envelope( + NotifySignalRequest { + invocation_id: self.invocation_id, + signal: Signal { + id: signal_id, + result: signal_result, + }, }, - })) + )) .await; self.log_invocation_status().await; @@ -704,13 +712,15 @@ mod tests { // this by making the invoker re-suspend on a notification that's // already available, so resolve() short-circuits. let _ = test_env - .apply(Command::NotifySignal(NotifySignalRequest { - invocation_id, - signal: Signal { - id: A, - result: SUCCESS, + .apply(commands::NotifySignalCommand::test_envelope( + NotifySignalRequest { + invocation_id, + signal: Signal { + id: A, + result: SUCCESS, + }, }, - })) + )) .await; assert_that!( test_env diff --git a/crates/worker/src/partition/state_machine/lifecycle/version_barrier.rs b/crates/worker/src/partition/state_machine/lifecycle/version_barrier.rs index 439310e326..4148e95992 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/version_barrier.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/version_barrier.rs @@ -52,8 +52,8 @@ mod tests { use restate_types::identifiers::PartitionKey; use restate_types::logs::Keys; use restate_types::sharding::KeyRange; - use restate_wal_protocol::Command; use restate_wal_protocol::control::VersionBarrierCommand; + use restate_wal_protocol::v2::{Command, commands}; use crate::partition::state_machine::StateMachine; use crate::partition::state_machine::tests::TestEnv; @@ -83,11 +83,15 @@ mod tests { ); let result = test_env - .apply_fallible(Command::VersionBarrier(VersionBarrierCommand { - version: SemanticRestateVersion::parse("99.0.0").unwrap(), - human_reason: Some("testing".to_string()), - partition_key_range: Keys::RangeInclusive(PartitionKey::MIN..=PartitionKey::MAX), - })) + .apply_fallible(commands::VersionBarrierCommand::test_envelope( + VersionBarrierCommand { + version: SemanticRestateVersion::parse("99.0.0").unwrap(), + human_reason: Some("testing".to_string()), + partition_key_range: Keys::RangeInclusive( + PartitionKey::MIN..=PartitionKey::MAX, + ), + }, + )) .await; assert_that!( @@ -120,11 +124,15 @@ mod tests { let mut test_env = TestEnv::create_with_state_machine(state_machine).await; let result = test_env - .apply_fallible(Command::VersionBarrier(VersionBarrierCommand { - version: SemanticRestateVersion::current().clone(), - human_reason: Some("testing".to_string()), - partition_key_range: Keys::RangeInclusive(PartitionKey::MIN..=PartitionKey::MAX), - })) + .apply_fallible(commands::VersionBarrierCommand::test_envelope( + VersionBarrierCommand { + version: SemanticRestateVersion::current().clone(), + human_reason: Some("testing".to_string()), + partition_key_range: Keys::RangeInclusive( + PartitionKey::MIN..=PartitionKey::MAX, + ), + }, + )) .await; assert_that!(result, ok(empty())); @@ -135,11 +143,15 @@ mod tests { } // re-apply the same version, no-op let result = test_env - .apply_fallible(Command::VersionBarrier(VersionBarrierCommand { - version: SemanticRestateVersion::current().clone(), - human_reason: Some("testing".to_string()), - partition_key_range: Keys::RangeInclusive(PartitionKey::MIN..=PartitionKey::MAX), - })) + .apply_fallible(commands::VersionBarrierCommand::test_envelope( + VersionBarrierCommand { + version: SemanticRestateVersion::current().clone(), + human_reason: Some("testing".to_string()), + partition_key_range: Keys::RangeInclusive( + PartitionKey::MIN..=PartitionKey::MAX, + ), + }, + )) .await; assert_that!(result, ok(empty())); @@ -150,11 +162,15 @@ mod tests { // apply an older version, success but without effect. let result = test_env - .apply_fallible(Command::VersionBarrier(VersionBarrierCommand { - version: SemanticRestateVersion::parse("0.1.0").unwrap(), - human_reason: Some("testing".to_string()), - partition_key_range: Keys::RangeInclusive(PartitionKey::MIN..=PartitionKey::MAX), - })) + .apply_fallible(commands::VersionBarrierCommand::test_envelope( + VersionBarrierCommand { + version: SemanticRestateVersion::parse("0.1.0").unwrap(), + human_reason: Some("testing".to_string()), + partition_key_range: Keys::RangeInclusive( + PartitionKey::MIN..=PartitionKey::MAX, + ), + }, + )) .await; assert_that!(result, ok(empty())); diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index bf38aba4df..33d92296d4 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -14,6 +14,8 @@ mod lifecycle; mod utils; pub use actions::{Action, ActionCollector}; +use restate_wal_protocol::v2::{CommandKind, commands}; +use restate_worker_api::invoker::Effect; use std::collections::HashSet; use std::fmt; @@ -59,7 +61,7 @@ use restate_storage_api::service_status_table::{ use restate_storage_api::state_table::{ReadStateTable, WriteStateTable}; use restate_storage_api::timer_table::TimerKey; use restate_storage_api::timer_table::{Timer, WriteTimerTable}; -use restate_storage_api::vqueue_table::scheduler::{self, SchedulerDecisions}; +use restate_storage_api::vqueue_table::scheduler; use restate_storage_api::vqueue_table::{self, EntryKey, Stage}; use restate_storage_api::vqueue_table::{EntryStatusHeader, ReadVQueueTable, WriteVQueueTable}; use restate_storage_api::{Result as StorageResult, journal_table}; @@ -85,9 +87,9 @@ use restate_types::invocation::{ AttachInvocationRequest, IngressInvocationResponseSink, InvocationInput, InvocationMutationResponseSink, InvocationQuery, InvocationResponse, InvocationTarget, InvocationTargetType, InvocationTermination, JournalCompletionTarget, NotifySignalRequest, - ResponseResult, ServiceInvocation, ServiceInvocationResponseSink, ServiceInvocationSpanContext, - Source, SubmitNotificationSink, TerminationFlavor, VirtualObjectHandlerType, - WorkflowHandlerType, + PurgeInvocationRequest, ResponseResult, RestartAsNewInvocationRequest, ResumeInvocationRequest, + ServiceInvocation, ServiceInvocationResponseSink, ServiceInvocationSpanContext, Source, + SubmitNotificationSink, TerminationFlavor, VirtualObjectHandlerType, WorkflowHandlerType, }; use restate_types::journal::Completion; use restate_types::journal::CompletionResult; @@ -110,7 +112,7 @@ use restate_types::service_protocol::ServiceProtocolVersion; use restate_types::sharding::KeyRange; use restate_types::state_mut::ExternalStateMutation; use restate_types::state_mut::StateMutationVersion; -use restate_types::storage::{StoredRawEntry, StoredRawEntryHeader}; +use restate_types::storage::{StorageDecodeError, StoredRawEntry, StoredRawEntryHeader}; use restate_types::time::MillisSinceEpoch; use restate_types::vqueues::{self, EntryId, VQueueId}; use restate_types::{RESTATE_VERSION_1_6_0, journal_v2}; @@ -118,9 +120,9 @@ use restate_types::{RestateVersion, SemanticRestateVersion}; use restate_types::{Versioned, journal::*}; use restate_util_string::ReString; use restate_vqueues::{VQueue, VQueueHandle, VQueuesMetaCache}; -use restate_wal_protocol::Command; use restate_wal_protocol::timer::TimerKeyDisplay; use restate_wal_protocol::timer::TimerKeyValue; +use restate_wal_protocol::v2; use self::utils::SpanExt; use crate::metric_definitions::{ @@ -128,7 +130,7 @@ use crate::metric_definitions::{ USAGE_LEADER_JOURNAL_ENTRY_COUNT, }; use crate::partition::state_machine::lifecycle::OnCancelCommand; -use crate::partition::types::{InvokerEffect, InvokerEffectKind, OutboxMessageExt}; +use crate::partition::types::{InvokerEffectKind, OutboxMessageExt}; trait StateMachineFeatures { /// Write to journal v2 instead of journal v1 by default. This is a preparational step for @@ -210,6 +212,10 @@ pub enum Error { "error when trying to apply invocation response with completion id {0}, because no command was found for given completion id" )] MissingCommandForInvocationResponse(CompletionId), + #[error("failed to decode envelope(v2): {0}")] + EnvelopeDecoding(#[from] StorageDecodeError), + #[error("Bifrost envelope has unknown command kind")] + UnknownCommandKind, } #[macro_export] @@ -298,7 +304,7 @@ impl StateMachine { #[allow(clippy::too_many_arguments)] pub async fn apply( &mut self, - command: Command, + envelope: v2::Envelope, record_created_at: MillisSinceEpoch, record_lsn: Lsn, transaction: &mut TransactionType, @@ -306,11 +312,11 @@ impl StateMachine { vqueues_cache: &mut VQueuesMetaCache, is_leader: bool, ) -> Result<(), Error> { - let span = utils::state_machine_apply_command_span(is_leader, &command); + let span = utils::state_machine_apply_command_span(is_leader, envelope.kind()); async { let start = Instant::now(); // Apply the command - let command_type = command.name(); + let record_kind: &'static str = envelope.kind().into(); let res = StateMachineApplyContext { storage: transaction, record_created_at, @@ -327,9 +333,9 @@ impl StateMachine { partition_key_range: self.partition_key_range, is_leader, } - .on_apply(command) + .on_apply(envelope) .await; - histogram!(PARTITION_APPLY_COMMAND, "command" => command_type, LEADER_LABEL => if is_leader { LEADER_LABEL_LEADER } else { LEADER_LABEL_FOLLOWER }).record(start.elapsed()); + histogram!(PARTITION_APPLY_COMMAND, "command" => record_kind, LEADER_LABEL => if is_leader { LEADER_LABEL_LEADER } else { LEADER_LABEL_FOLLOWER }).record(start.elapsed()); res } .instrument(span) @@ -459,7 +465,7 @@ impl StateMachineApplyContext<'_, S> { .push(Action::AbortInvocation { invocation_id }); } - async fn on_apply(&mut self, command: Command) -> Result<(), Error> + async fn on_apply(&mut self, envelope: v2::Envelope) -> Result<(), Error> where S: ReadPromiseTable + WritePromiseTable @@ -482,11 +488,17 @@ impl StateMachineApplyContext<'_, S> { + journal_table_v2::ReadJournalTable + WriteJournalEventsTable, { - match command { - Command::VQSchedulerDecisions(encoded_cmd) => { - let command = SchedulerDecisions::bilrost_decode(encoded_cmd) - .map_err(StorageError::BilrostDecode)?; - for (qid, actions) in &command.qids { + match envelope.kind() { + CommandKind::Unknown => Err(Error::UnknownCommandKind), + CommandKind::AnnounceLeader | CommandKind::UpdatePartitionDurability => { + // no-op + Ok(()) + } + CommandKind::VQSchedulerDecisions => { + let scheduler_decisions = envelope + .into_typed::() + .into_inner()?; + for (qid, actions) in &scheduler_decisions.qids { for action in actions { match action { scheduler::SchedulerAction::Unknown => { @@ -546,15 +558,10 @@ impl StateMachineApplyContext<'_, S> { Ok(()) } - Command::UpdatePartitionDurability(_) => { - // no-op :-) - // - // This is a partition-level command that doesn't impact the state machine. - // Handling of this command should have happened without entering the state machine - // on_apply() method. - Ok(()) - } - Command::VersionBarrier(barrier) => { + CommandKind::VersionBarrier => { + let barrier = envelope + .into_typed::() + .into_inner()?; // We have versions in play: // - Our binary's version (this process) // - `min_restate_version` coming from the FSM @@ -597,10 +604,17 @@ impl StateMachineApplyContext<'_, S> { }) } } - Command::Invoke(service_invocation) => { - self.on_service_invocation(service_invocation).await - } - Command::InvocationResponse(InvocationResponse { target, result }) => { + CommandKind::Invoke => { + let service_invocation = envelope + .into_typed::() + .into_inner()?; + self.on_service_invocation(service_invocation.into()).await + } + CommandKind::InvocationResponse => { + let InvocationResponse { target, result } = envelope + .into_typed::() + .into_inner()? + .into(); let status = self.get_invocation_status(&target.caller_id).await?; if should_use_journal_table_v2(&status) { @@ -622,16 +636,33 @@ impl StateMachineApplyContext<'_, S> { self.handle_completion(target.caller_id, status, completion) .await } - Command::ProxyThrough(service_invocation) => { - self.handle_outgoing_message(OutboxMessage::ServiceInvocation(service_invocation))?; + CommandKind::ProxyThrough => { + let inner = envelope + .into_typed::() + .into_inner()?; + self.handle_outgoing_message(OutboxMessage::ServiceInvocation(Box::new( + inner.invocation.into(), + )))?; Ok(()) } - Command::AttachInvocation(attach_invocation_request) => { - self.handle_attach_invocation_request(attach_invocation_request) - .await + CommandKind::AttachInvocation => { + let inner = envelope + .into_typed::() + .into_inner()?; + self.handle_attach_invocation_request(inner.into()).await + } + CommandKind::InvokerEffect => { + let inner = envelope + .into_typed::() + .into_inner()?; + self.try_invoker_effect(inner.into()).await } - Command::InvokerEffect(effect) => self.try_invoker_effect(effect).await, - Command::TruncateOutbox(index) => { + CommandKind::TruncateOutbox => { + let index = envelope + .into_typed::() + .into_inner()? + .index; + self.do_truncate_outbox(RangeInclusive::new( (*self.outbox_head_seq_number).unwrap_or(index), index, @@ -640,11 +671,25 @@ impl StateMachineApplyContext<'_, S> { *self.outbox_head_seq_number = Some(index + 1); Ok(()) } - Command::Timer(timer) => self.on_timer(timer).await, - Command::TerminateInvocation(invocation_termination) => { - self.on_terminate_invocation(invocation_termination).await + CommandKind::Timer => { + let inner = envelope + .into_typed::() + .into_inner()?; + self.on_timer(inner.into()).await + } + CommandKind::TerminateInvocation => { + let inner = envelope + .into_typed::() + .into_inner()?; + + self.on_terminate_invocation(inner.into()).await } - Command::PurgeInvocation(purge_invocation_request) => { + CommandKind::PurgeInvocation => { + let purge_invocation_request: PurgeInvocationRequest = envelope + .into_typed::() + .into_inner()? + .into(); + lifecycle::OnPurgeCommand { invocation_id: purge_invocation_request.invocation_id, response_sink: purge_invocation_request.response_sink, @@ -653,7 +698,12 @@ impl StateMachineApplyContext<'_, S> { .await?; Ok(()) } - Command::PurgeJournal(purge_invocation_request) => { + CommandKind::PurgeJournal => { + let purge_invocation_request: PurgeInvocationRequest = envelope + .into_typed::() + .into_inner()? + .into(); + lifecycle::OnPurgeJournalCommand { invocation_id: purge_invocation_request.invocation_id, response_sink: purge_invocation_request.response_sink, @@ -662,7 +712,12 @@ impl StateMachineApplyContext<'_, S> { .await?; Ok(()) } - Command::ResumeInvocation(resume_invocation_request) => { + CommandKind::ResumeInvocation => { + let resume_invocation_request: ResumeInvocationRequest = envelope + .into_typed::() + .into_inner()? + .into(); + lifecycle::OnManualResumeCommand { invocation_id: resume_invocation_request.invocation_id, update_pinned_deployment_id: resume_invocation_request @@ -673,7 +728,12 @@ impl StateMachineApplyContext<'_, S> { .await?; Ok(()) } - Command::RestartAsNewInvocation(restart_as_new_invocation_request) => { + CommandKind::RestartAsNewInvocation => { + let restart_as_new_invocation_request: RestartAsNewInvocationRequest = envelope + .into_typed::() + .into_inner()? + .into(); + lifecycle::OnRestartAsNewInvocationCommand { invocation_id: restart_as_new_invocation_request.invocation_id, new_invocation_id: restart_as_new_invocation_request.new_invocation_id, @@ -686,16 +746,25 @@ impl StateMachineApplyContext<'_, S> { .await?; Ok(()) } - Command::PatchState(mutation) => self.handle_external_state_mutation(mutation).await, - Command::AnnounceLeader(_) => { - // no-op :-) - Ok(()) - } - Command::ScheduleTimer(timer) => { - self.register_timer(timer, Default::default())?; + CommandKind::PatchState => { + let inner = envelope + .into_typed::() + .into_inner()?; + self.handle_external_state_mutation(inner.into()).await + } + CommandKind::ScheduleTimer => { + let inner = envelope + .into_typed::() + .into_inner()?; + self.register_timer(inner.into(), Default::default())?; Ok(()) } - Command::NotifySignal(notify_signal_request) => { + CommandKind::NotifySignal => { + let notify_signal_request: NotifySignalRequest = envelope + .into_typed::() + .into_inner()? + .into(); + lifecycle::OnNotifySignalCommand { invocation_id: notify_signal_request.invocation_id, invocation_status: self @@ -707,13 +776,21 @@ impl StateMachineApplyContext<'_, S> { .await?; Ok(()) } - Command::NotifyGetInvocationOutputResponse(get_invocation_output_response) => { - lifecycle::OnNotifyGetInvocationOutputResponse(get_invocation_output_response) + CommandKind::NotifyGetInvocationOutputResponse => { + let inner = envelope + .into_typed::() + .into_inner()?; + + lifecycle::OnNotifyGetInvocationOutputResponse(inner.into()) .apply(self) .await?; Ok(()) } - Command::UpsertSchema(upsert) => { + CommandKind::UpsertSchema => { + let upsert = envelope + .into_typed::() + .into_inner()?; + trace!( "Upsert schema record to version '{}'", upsert.schema.version() @@ -731,9 +808,12 @@ impl StateMachineApplyContext<'_, S> { Ok(()) } - Command::UpsertRuleBook(upsert) => { - let new_book = RuleBook::bilrost_decode(upsert.rule_book.as_ref()) - .map_err(StorageError::BilrostDecode)?; + CommandKind::UpsertRuleBook => { + let upsert = envelope + .into_typed::() + .into_inner()?; + + let new_book = upsert.rule_book; let current_version = self.rule_book.version(); @@ -765,10 +845,6 @@ impl StateMachineApplyContext<'_, S> { } } - // Single Arc allocation per UpsertRuleBook apply; cheap - // clones for the in-memory field and the cache notify. - let new_book = Arc::new(new_book); - // Push the freshly-applied book into the node-level // cache so other PP-leaders on this node see the new // version on their next watch tick — without waiting @@ -785,7 +861,7 @@ impl StateMachineApplyContext<'_, S> { async fn on_service_invocation( &mut self, - service_invocation: Box, + service_invocation: ServiceInvocation, ) -> Result<(), Error> where S: WriteOutboxTable @@ -840,7 +916,7 @@ impl StateMachineApplyContext<'_, S> { let submit_notification_sink = service_invocation.submit_notification_sink.take(); let pre_flight_invocation_metadata = PreFlightInvocationMetadata::from_service_invocation( self.record_created_at, - *service_invocation, + service_invocation, ); self.on_pre_flight_invocation( @@ -1097,8 +1173,8 @@ impl StateMachineApplyContext<'_, S> { /// Returns the invocation in case the invocation is not a duplicate async fn handle_duplicated_requests( &mut self, - mut service_invocation: Box, - ) -> Result>, Error> + mut service_invocation: ServiceInvocation, + ) -> Result, Error> where S: ReadInvocationStatusTable + WriteInvocationStatusTable @@ -2417,7 +2493,7 @@ impl StateMachineApplyContext<'_, S> { // ServiceInvocations scheduled with a timer are always owned by the same partition processor // where the invocation should be executed - self.on_service_invocation(service_invocation).await + self.on_service_invocation(*service_invocation).await } Timer::CleanInvocationStatus(invocation_id) => { lifecycle::OnPurgeCommand { @@ -2490,7 +2566,7 @@ impl StateMachineApplyContext<'_, S> { ) } - async fn try_invoker_effect(&mut self, invoker_effect: InvokerEffect) -> Result<(), Error> + async fn try_invoker_effect(&mut self, invoker_effect: Effect) -> Result<(), Error> where S: ReadInvocationStatusTable + WriteInvocationStatusTable @@ -2522,7 +2598,7 @@ impl StateMachineApplyContext<'_, S> { async fn on_invoker_effect( &mut self, - effect: InvokerEffect, + effect: Effect, invocation_status: InvocationStatus, ) -> Result<(), Error> where diff --git a/crates/worker/src/partition/state_machine/tests/delayed_send.rs b/crates/worker/src/partition/state_machine/tests/delayed_send.rs index d01393ee1a..eff727cc0b 100644 --- a/crates/worker/src/partition/state_machine/tests/delayed_send.rs +++ b/crates/worker/src/partition/state_machine/tests/delayed_send.rs @@ -36,7 +36,7 @@ async fn run_send_with_delay(min_restate_version: SemanticRestateVersion) { let wake_up_time = MillisSinceEpoch::from(SystemTime::now() + Duration::from_secs(60)); let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), response_sink: None, @@ -44,7 +44,7 @@ async fn run_send_with_delay(min_restate_version: SemanticRestateVersion) { // Doesn't matter the execution time here, just needs to be filled execution_time: Some(wake_up_time), ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -61,10 +61,9 @@ async fn run_send_with_delay(min_restate_version: SemanticRestateVersion) { // Now fire the timer let actions = test_env - .apply(Command::Timer(TimerKeyValue::neo_invoke( - wake_up_time, - invocation_id, - ))) + .apply(commands::TimerCommand::test_envelope( + TimerKeyValue::neo_invoke(wake_up_time, invocation_id), + )) .await; assert_that!( @@ -98,7 +97,7 @@ async fn send_with_delay_where_experimental_feature_journal_table_v2_is_enabled_ let wake_up_time = MillisSinceEpoch::from(SystemTime::now() + Duration::from_secs(60)); let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), response_sink: None, @@ -107,7 +106,7 @@ async fn send_with_delay_where_experimental_feature_journal_table_v2_is_enabled_ // Doesn't matter the execution time here, just needs to be filled execution_time: Some(wake_up_time), ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -127,10 +126,9 @@ async fn send_with_delay_where_experimental_feature_journal_table_v2_is_enabled_ // Now fire the timer let actions = test_env - .apply(Command::Timer(TimerKeyValue::neo_invoke( - wake_up_time, - invocation_id, - ))) + .apply(commands::TimerCommand::test_envelope( + TimerKeyValue::neo_invoke(wake_up_time, invocation_id), + )) .await; assert_that!( @@ -171,7 +169,7 @@ async fn send_with_delay_to_locked_virtual_object() { let wake_up_time = MillisSinceEpoch::from(SystemTime::now() + Duration::from_secs(60)); let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), response_sink: None, @@ -179,7 +177,7 @@ async fn send_with_delay_to_locked_virtual_object() { // Doesn't matter the execution time here, just needs to be filled execution_time: Some(wake_up_time), ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -205,10 +203,9 @@ async fn send_with_delay_to_locked_virtual_object() { // Now fire the timer let actions = test_env - .apply(Command::Timer(TimerKeyValue::neo_invoke( - wake_up_time, - invocation_id, - ))) + .apply(commands::TimerCommand::test_envelope( + TimerKeyValue::neo_invoke(wake_up_time, invocation_id), + )) .await; assert_that!( @@ -256,7 +253,7 @@ async fn send_with_delay_and_idempotency_key() { )); let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), idempotency_key: Some(idempotency_key.clone()), @@ -268,7 +265,7 @@ async fn send_with_delay_and_idempotency_key() { execution_time, source: Source::Ingress(request_id_1), ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -286,7 +283,7 @@ async fn send_with_delay_and_idempotency_key() { // Send another invocation which reattaches to the original one let request_id_2 = PartitionProcessorRpcRequestId::default(); let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), idempotency_key: Some(idempotency_key), @@ -298,7 +295,7 @@ async fn send_with_delay_and_idempotency_key() { execution_time: execution_time.map(|m| m + Duration::from_secs(10)), source: Source::Ingress(request_id_2), ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, diff --git a/crates/worker/src/partition/state_machine/tests/fixtures.rs b/crates/worker/src/partition/state_machine/tests/fixtures.rs index 777e84d578..3c3fb2c846 100644 --- a/crates/worker/src/partition/state_machine/tests/fixtures.rs +++ b/crates/worker/src/partition/state_machine/tests/fixtures.rs @@ -25,7 +25,8 @@ use restate_types::journal::enriched::{ }; use restate_types::journal_v2::{Entry, UnresolvedFuture}; use restate_types::service_protocol::ServiceProtocolVersion; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2; +use restate_wal_protocol::v2::{Command, commands}; use restate_worker_api::invoker::Effect; use crate::partition::state_machine::Action; @@ -76,58 +77,61 @@ pub fn incomplete_invoke_entry(invocation_id: InvocationId) -> JournalEntry { )) } -pub fn invoker_entry_effect(invocation_id: InvocationId, entry: impl Into) -> Command { +pub fn invoker_entry_effect( + invocation_id: InvocationId, + entry: impl Into, +) -> v2::Envelope { invoker_entry_effect_for_epoch(invocation_id, entry) } pub fn invoker_entry_effect_for_epoch( invocation_id: InvocationId, entry: impl Into, -) -> Command { - Command::InvokerEffect(Box::new(Effect { +) -> v2::Envelope { + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::journal_entry( entry.into().encode::(), None, ), - })) + }) } -pub fn invoker_end_effect(invocation_id: InvocationId) -> Command { +pub fn invoker_end_effect(invocation_id: InvocationId) -> v2::Envelope { invoker_end_effect_for_epoch(invocation_id) } -pub fn invoker_end_effect_for_epoch(invocation_id: InvocationId) -> Command { - Command::InvokerEffect(Box::new(Effect { +pub fn invoker_end_effect_for_epoch(invocation_id: InvocationId) -> v2::Envelope { + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::End, - })) + }) } pub fn pinned_deployment( invocation_id: InvocationId, service_protocol_version: ServiceProtocolVersion, -) -> Command { - Command::InvokerEffect(Box::new(Effect { +) -> v2::Envelope { + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::PinnedDeployment(PinnedDeployment { deployment_id: DeploymentId::default(), service_protocol_version, }), - })) + }) } pub fn invoker_suspended( invocation_id: InvocationId, future: impl Into, -) -> Command { +) -> v2::Envelope { let future = future.into(); - Command::InvokerEffect(Box::new(Effect { + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::SuspendedV3 { awaiting_on: future, }, - })) + }) } pub async fn mock_start_invocation_with_service_id( @@ -148,11 +152,13 @@ pub async fn mock_start_invocation_with_invocation_target( let invocation_id = InvocationId::mock_generate(&invocation_target); let actions = state_machine - .apply(Command::Invoke(Box::new(ServiceInvocation::initialize( - invocation_id, - invocation_target.clone(), - Source::Ingress(PartitionProcessorRpcRequestId::new()), - )))) + .apply(commands::InvokeCommand::test_envelope( + ServiceInvocation::initialize( + invocation_id, + invocation_target.clone(), + Source::Ingress(PartitionProcessorRpcRequestId::new()), + ), + )) .await; assert_that!( diff --git a/crates/worker/src/partition/state_machine/tests/idempotency.rs b/crates/worker/src/partition/state_machine/tests/idempotency.rs index b260fdaf04..909b9e979b 100644 --- a/crates/worker/src/partition/state_machine/tests/idempotency.rs +++ b/crates/worker/src/partition/state_machine/tests/idempotency.rs @@ -36,14 +36,14 @@ async fn start_and_complete_idempotent_invocation() { // Send fresh invocation with idempotency key let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), response_sink: Some(ServiceInvocationResponseSink::Ingress { request_id }), idempotency_key: Some(idempotency_key), completion_retention_duration: retention, ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -56,7 +56,7 @@ async fn start_and_complete_idempotent_invocation() { let response_bytes = Bytes::from_static(b"123"); let actions = test_env .apply_multiple([ - Command::InvokerEffect(Box::new(Effect { + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::JournalEntry { entry_index: 1, @@ -64,11 +64,11 @@ async fn start_and_complete_idempotent_invocation() { EntryResult::Success(response_bytes.clone()), )), }, - })), - Command::InvokerEffect(Box::new(Effect { + }), + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::End, - })), + }), ]) .await; @@ -135,13 +135,13 @@ async fn complete_already_completed_invocation() { // Send a request, should be completed immediately with result let request_id = PartitionProcessorRpcRequestId::default(); let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), response_sink: Some(ServiceInvocationResponseSink::Ingress { request_id }), idempotency_key: Some(idempotency_key), ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -171,7 +171,7 @@ async fn attach_with_service_invocation_command_while_executing() { // Send fresh invocation with idempotency key let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), response_sink: Some(ServiceInvocationResponseSink::Ingress { @@ -180,7 +180,7 @@ async fn attach_with_service_invocation_command_while_executing() { idempotency_key: Some(idempotency_key.clone()), completion_retention_duration: retention, ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -191,7 +191,7 @@ async fn attach_with_service_invocation_command_while_executing() { // Latch to existing invocation let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), response_sink: Some(ServiceInvocationResponseSink::Ingress { @@ -199,7 +199,7 @@ async fn attach_with_service_invocation_command_while_executing() { }), idempotency_key: Some(idempotency_key), ..ServiceInvocation::mock() - }))) + })) .await; assert_that!(actions, not(contains(pat!(Action::IngressResponse { .. })))); @@ -207,7 +207,7 @@ async fn attach_with_service_invocation_command_while_executing() { let response_bytes = Bytes::from_static(b"123"); let actions = test_env .apply_multiple([ - Command::InvokerEffect(Box::new(Effect { + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::JournalEntry { entry_index: 1, @@ -215,11 +215,11 @@ async fn attach_with_service_invocation_command_while_executing() { EntryResult::Success(response_bytes.clone()), )), }, - })), - Command::InvokerEffect(Box::new(Effect { + }), + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::End, - })), + }), ]) .await; @@ -271,7 +271,7 @@ async fn attach_with_send_service_invocation(#[case] use_same_request_id: bool) // Send fresh invocation with idempotency key let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), response_sink: Some(ServiceInvocationResponseSink::Ingress { @@ -281,7 +281,7 @@ async fn attach_with_send_service_invocation(#[case] use_same_request_id: bool) completion_retention_duration: retention, source: Source::Ingress(request_id_1), ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -292,7 +292,7 @@ async fn attach_with_send_service_invocation(#[case] use_same_request_id: bool) // Latch to existing invocation, but with a send call let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), idempotency_key: Some(idempotency_key.clone()), @@ -302,7 +302,7 @@ async fn attach_with_send_service_invocation(#[case] use_same_request_id: bool) }), source: Source::Ingress(request_id_2), ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -320,7 +320,7 @@ async fn attach_with_send_service_invocation(#[case] use_same_request_id: bool) let response_bytes = Bytes::from_static(b"123"); let actions = test_env .apply_multiple([ - Command::InvokerEffect(Box::new(Effect { + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::JournalEntry { entry_index: 1, @@ -328,11 +328,11 @@ async fn attach_with_send_service_invocation(#[case] use_same_request_id: bool) EntryResult::Success(response_bytes.clone()), )), }, - })), - Command::InvokerEffect(Box::new(Effect { + }), + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::End, - })), + }), ]) .await; @@ -394,7 +394,7 @@ async fn attach_inboxed_with_send_service_invocation() { let idempotency_key = ByteString::from_static("my-idempotency-key"); let invocation_id = InvocationId::generate(&invocation_target, Some(&idempotency_key)); let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), idempotency_key: Some(idempotency_key.clone()), @@ -403,7 +403,7 @@ async fn attach_inboxed_with_send_service_invocation() { request_id: request_id_1, }), ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -437,7 +437,7 @@ async fn attach_inboxed_with_send_service_invocation() { // Now send the request that should get the submit notification let idempotency_key = ByteString::from_static("my-idempotency-key"); let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), idempotency_key: Some(idempotency_key.clone()), @@ -446,7 +446,7 @@ async fn attach_inboxed_with_send_service_invocation() { request_id: request_id_2, }), ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -479,7 +479,7 @@ async fn attach_command() { // Send fresh invocation with idempotency key let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), response_sink: Some(ServiceInvocationResponseSink::Ingress { @@ -488,7 +488,7 @@ async fn attach_command() { idempotency_key: Some(idempotency_key.clone()), completion_retention_duration: completion_retention, ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -499,13 +499,15 @@ async fn attach_command() { // Latch to existing invocation, but with a send call let actions = test_env - .apply(Command::AttachInvocation(AttachInvocationRequest { - invocation_query: InvocationQuery::Invocation(invocation_id), - block_on_inflight: true, - response_sink: ServiceInvocationResponseSink::Ingress { - request_id: request_id_2, + .apply(commands::AttachInvocationCommand::test_envelope( + AttachInvocationRequest { + invocation_query: InvocationQuery::Invocation(invocation_id), + block_on_inflight: true, + response_sink: ServiceInvocationResponseSink::Ingress { + request_id: request_id_2, + }, }, - })) + )) .await; assert_that!( actions, @@ -516,7 +518,7 @@ async fn attach_command() { let response_bytes = Bytes::from_static(b"123"); let actions = test_env .apply_multiple([ - Command::InvokerEffect(Box::new(Effect { + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::JournalEntry { entry_index: 1, @@ -524,11 +526,11 @@ async fn attach_command() { EntryResult::Success(response_bytes.clone()), )), }, - })), - Command::InvokerEffect(Box::new(Effect { + }), + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::End, - })), + }), ]) .await; @@ -568,7 +570,7 @@ async fn attach_command_without_blocking_inflight() { // Send fresh invocation with idempotency key let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), response_sink: Some(ServiceInvocationResponseSink::Ingress { @@ -577,7 +579,7 @@ async fn attach_command_without_blocking_inflight() { idempotency_key: Some(idempotency_key.clone()), completion_retention_duration: completion_retention, ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -589,13 +591,15 @@ async fn attach_command_without_blocking_inflight() { // Latch to existing invocation without blocking on inflight invocation let caller_invocation_id = InvocationId::mock_random(); let actions = test_env - .apply(Command::AttachInvocation(AttachInvocationRequest { - invocation_query: InvocationQuery::Invocation(invocation_id), - block_on_inflight: false, - response_sink: ServiceInvocationResponseSink::PartitionProcessor( - JournalCompletionTarget::from_parts(caller_invocation_id, 1), - ), - })) + .apply(commands::AttachInvocationCommand::test_envelope( + AttachInvocationRequest { + invocation_query: InvocationQuery::Invocation(invocation_id), + block_on_inflight: false, + response_sink: ServiceInvocationResponseSink::PartitionProcessor( + JournalCompletionTarget::from_parts(caller_invocation_id, 1), + ), + }, + )) .await; assert_that!( actions, @@ -635,10 +639,12 @@ async fn purge_completed_idempotent_invocation() { // Send purge command let _ = test_env - .apply(Command::PurgeInvocation(PurgeInvocationRequest { - invocation_id, - response_sink: None, - })) + .apply(commands::PurgeInvocationCommand::test_envelope( + PurgeInvocationRequest { + invocation_id, + response_sink: None, + }, + )) .await; assert_that!( test_env diff --git a/crates/worker/src/partition/state_machine/tests/kill_cancel.rs b/crates/worker/src/partition/state_machine/tests/kill_cancel.rs index 364a3a5954..a1aa6ff9b9 100644 --- a/crates/worker/src/partition/state_machine/tests/kill_cancel.rs +++ b/crates/worker/src/partition/state_machine/tests/kill_cancel.rs @@ -56,22 +56,22 @@ async fn run_kill_inboxed_invocation( let caller_id = InvocationId::mock_random(); let _ = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), ..ServiceInvocation::mock() - }))) + })) .await; let _ = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id: inboxed_id, invocation_target: inboxed_target, response_sink: Some(ServiceInvocationResponseSink::PartitionProcessor( JournalCompletionTarget::from_parts(caller_id, 0), )), ..ServiceInvocation::mock() - }))) + })) .await; let current_invocation_status = test_env @@ -84,13 +84,15 @@ async fn run_kill_inboxed_invocation( let request_id = PartitionProcessorRpcRequestId::new(); let actions = test_env - .apply(Command::TerminateInvocation(InvocationTermination { - invocation_id: inboxed_id, - flavor: TerminationFlavor::Kill, - response_sink: Some(InvocationMutationResponseSink::Ingress( - IngressInvocationResponseSink { request_id }, - )), - })) + .apply(commands::TerminateInvocationCommand::test_envelope( + InvocationTermination { + invocation_id: inboxed_id, + flavor: TerminationFlavor::Kill, + response_sink: Some(InvocationMutationResponseSink::Ingress( + IngressInvocationResponseSink { request_id }, + )), + }, + )) .await; let current_invocation_status = test_env @@ -163,12 +165,12 @@ async fn terminate_scheduled_invocation( let rpc_id = PartitionProcessorRpcRequestId::new(); let _ = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, execution_time: Some(MillisSinceEpoch::MAX), response_sink: Some(ServiceInvocationResponseSink::ingress(rpc_id)), ..ServiceInvocation::mock() - }))) + })) .await; // assert that inboxed invocation is in invocation_status @@ -179,11 +181,13 @@ async fn terminate_scheduled_invocation( assert!(let InvocationStatus::Scheduled(_) = current_invocation_status); let actions = test_env - .apply(Command::TerminateInvocation(InvocationTermination { - invocation_id, - flavor: termination_flavor, - response_sink: None, - })) + .apply(commands::TerminateInvocationCommand::test_envelope( + InvocationTermination { + invocation_id, + flavor: termination_flavor, + response_sink: None, + }, + )) .await; assert_that!( actions, @@ -223,20 +227,20 @@ async fn kill_call_tree() -> anyhow::Result<()> { let enqueued_invocation_id_on_same_target = InvocationId::mock_generate(&invocation_target); let _ = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), ..ServiceInvocation::mock() - }))) + })) .await; // Let's enqueue an invocation afterward let _ = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id: enqueued_invocation_id_on_same_target, invocation_target: invocation_target.clone(), ..ServiceInvocation::mock() - }))) + })) .await; // Let's add some journal entries @@ -263,11 +267,13 @@ async fn kill_call_tree() -> anyhow::Result<()> { // Now let's send the termination command let actions = test_env - .apply(Command::TerminateInvocation(InvocationTermination { - invocation_id, - flavor: TerminationFlavor::Kill, - response_sink: None, - })) + .apply(commands::TerminateInvocationCommand::test_envelope( + InvocationTermination { + invocation_id, + flavor: TerminationFlavor::Kill, + response_sink: None, + }, + )) .await; assert_that!( @@ -360,18 +366,18 @@ async fn cancel_invoked_invocation() -> Result<(), Error> { let _ = test_env .apply_multiple([ - Command::Invoke(Box::new(ServiceInvocation { + commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), ..ServiceInvocation::mock() - })), - Command::InvokerEffect(Box::new(Effect { + }), + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::PinnedDeployment(PinnedDeployment { deployment_id: Default::default(), service_protocol_version: ServiceProtocolVersion::V3, }), - })), + }), ]) .await; @@ -416,11 +422,13 @@ async fn cancel_invoked_invocation() -> Result<(), Error> { tx.commit().await?; let actions = test_env - .apply(Command::TerminateInvocation(InvocationTermination { - invocation_id, - flavor: TerminationFlavor::Cancel, - response_sink: None, - })) + .apply(commands::TerminateInvocationCommand::test_envelope( + InvocationTermination { + invocation_id, + flavor: TerminationFlavor::Cancel, + response_sink: None, + }, + )) .await; // Invocation shouldn't be gone @@ -482,18 +490,18 @@ async fn cancel_suspended_invocation() -> Result<(), Error> { let _ = test_env .apply_multiple([ - Command::Invoke(Box::new(ServiceInvocation { + commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), ..ServiceInvocation::mock() - })), - Command::InvokerEffect(Box::new(Effect { + }), + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::PinnedDeployment(PinnedDeployment { deployment_id: Default::default(), service_protocol_version: ServiceProtocolVersion::V3, }), - })), + }), ]) .await; @@ -554,13 +562,15 @@ async fn cancel_suspended_invocation() -> Result<(), Error> { let request_id = PartitionProcessorRpcRequestId::new(); let actions = test_env - .apply(Command::TerminateInvocation(InvocationTermination { - invocation_id, - flavor: TerminationFlavor::Cancel, - response_sink: Some(InvocationMutationResponseSink::Ingress( - IngressInvocationResponseSink { request_id }, - )), - })) + .apply(commands::TerminateInvocationCommand::test_envelope( + InvocationTermination { + invocation_id, + flavor: TerminationFlavor::Cancel, + response_sink: Some(InvocationMutationResponseSink::Ingress( + IngressInvocationResponseSink { request_id }, + )), + }, + )) .await; // Invocation shouldn't be gone @@ -622,11 +632,11 @@ async fn cancel_invocation_entry_referring_to_previous_entry() { let callee_2 = InvocationId::mock_random(); let _ = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), ..ServiceInvocation::mock() - }))) + })) .await; // Add call and one way call journal entry @@ -652,7 +662,7 @@ async fn cancel_invocation_entry_referring_to_previous_entry() { // Now create cancel invocation entry let actions = test_env .apply_multiple(vec![ - Command::InvokerEffect(Box::new(Effect { + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::JournalEntry { entry_index: 3, @@ -660,8 +670,8 @@ async fn cancel_invocation_entry_referring_to_previous_entry() { CancelInvocationTarget::InvocationId(callee_1.to_string().into()), )), }, - })), - Command::InvokerEffect(Box::new(Effect { + }), + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::JournalEntry { entry_index: 4, @@ -669,7 +679,7 @@ async fn cancel_invocation_entry_referring_to_previous_entry() { CancelInvocationTarget::CallEntryIndex(2), )), }, - })), + }), ]) .await; diff --git a/crates/worker/src/partition/state_machine/tests/mod.rs b/crates/worker/src/partition/state_machine/tests/mod.rs index 8744c59f73..8446b6bec0 100644 --- a/crates/worker/src/partition/state_machine/tests/mod.rs +++ b/crates/worker/src/partition/state_machine/tests/mod.rs @@ -62,9 +62,10 @@ use restate_types::journal::{CompleteAwakeableEntry, EntryResult, InvokeRequest} use restate_types::journal::{Entry, EntryType}; use restate_types::journal_events::Event; use restate_types::journal_v2::raw::TryFromEntry; -use restate_types::logs::SequenceNumber; +use restate_types::logs::{Keys, SequenceNumber}; use restate_types::partitions::Partition; use restate_types::state_mut::ExternalStateMutation; +use restate_wal_protocol::v2::Command; use restate_worker_api::invoker::{Effect, EffectKind, YieldReason}; use std::collections::{HashMap, HashSet}; use test_log::test; @@ -144,13 +145,13 @@ impl TestEnv { } } - pub async fn apply(&mut self, command: Command) -> Vec { + pub async fn apply(&mut self, envelope: v2::Envelope) -> Vec { let mut transaction = self.storage.transaction(); let mut action_collector = ActionCollector::default(); let mut vqueues = VQueuesMetaCache::new_empty(1024); self.state_machine .apply( - command, + envelope, MillisSinceEpoch::now(), Lsn::OLDEST, &mut transaction, @@ -166,13 +167,16 @@ impl TestEnv { action_collector } - pub async fn apply_fallible(&mut self, command: Command) -> Result, Error> { + pub async fn apply_fallible( + &mut self, + envelope: v2::Envelope, + ) -> Result, Error> { let mut transaction = self.storage.transaction(); let mut action_collector = ActionCollector::default(); let mut vqueues = VQueuesMetaCache::new_empty(1024); self.state_machine .apply( - command, + envelope, MillisSinceEpoch::now(), Lsn::OLDEST, &mut transaction, @@ -189,11 +193,11 @@ impl TestEnv { pub async fn apply_multiple( &mut self, - commands: impl IntoIterator, + envelopes: impl IntoIterator>, ) -> Vec { let mut actions = vec![]; - for command in commands { - actions.append(&mut self.apply(command).await) + for envelop in envelopes { + actions.append(&mut self.apply(envelop).await) } actions } @@ -375,11 +379,14 @@ async fn awakeable_completion_received_before_entry() -> TestResult { let invocation_id = fixtures::mock_start_invocation(&mut test_env).await; // Send completion first + let _ = test_env - .apply(Command::InvocationResponse(InvocationResponse { - target: JournalCompletionTarget::from_parts(invocation_id, 1), - result: ResponseResult::Success(Bytes::default()), - })) + .apply(commands::InvocationResponseCommand::test_envelope( + InvocationResponse { + target: JournalCompletionTarget::from_parts(invocation_id, 1), + result: ResponseResult::Success(Bytes::default()), + }, + )) .await; // A couple of notes here: @@ -397,13 +404,13 @@ async fn awakeable_completion_received_before_entry() -> TestResult { // * If the awakeable entry has not been received yet, when receiving it the completion will be sent through. let actions = test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::JournalEntry { entry_index: 1, entry: ProtobufRawEntryCodec::serialize_enriched(Entry::awakeable(None)), }, - }))) + })) .await; // At this point we expect the completion to be forwarded to the invoker @@ -433,10 +440,12 @@ async fn awakeable_completion_received_before_entry() -> TestResult { // If we try to send the completion again, it should not be forwarded! let actions = test_env - .apply(Command::InvocationResponse(InvocationResponse { - target: JournalCompletionTarget::from_parts(invocation_id, 1), - result: ResponseResult::Success(Bytes::default()), - })) + .apply(commands::InvocationResponseCommand::test_envelope( + InvocationResponse { + target: JournalCompletionTarget::from_parts(invocation_id, 1), + result: ResponseResult::Success(Bytes::default()), + }, + )) .await; assert_that!( actions, @@ -447,12 +456,12 @@ async fn awakeable_completion_received_before_entry() -> TestResult { ); let actions = test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::Suspended { waiting_for_completed_entries: HashSet::from([1]), }, - }))) + })) .await; assert_that!( @@ -482,13 +491,13 @@ async fn complete_awakeable_with_success() { )); let actions = test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: EffectKind::JournalEntry { entry_index: 1, entry, }, - }))) + })) .await; assert_that!( @@ -527,13 +536,13 @@ async fn complete_awakeable_with_failure() { )); let actions = test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: EffectKind::JournalEntry { entry_index: 1, entry, }, - }))) + })) .await; assert_that!( @@ -566,7 +575,7 @@ async fn invoke_with_headers() -> TestResult { fixtures::mock_start_invocation_with_service_id(&mut test_env, service_id.clone()).await; let actions = test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::JournalEntry { entry_index: 1, @@ -582,7 +591,7 @@ async fn invoke_with_headers() -> TestResult { None, )), }, - }))) + })) .await; assert_that!( @@ -636,27 +645,31 @@ async fn mutate_state() -> anyhow::Result<()> { ); test_env - .apply(Command::PatchState(ExternalStateMutation { - service_id: keyed_service_id.clone(), - version: None, - state: first_state_mutation, - })) + .apply(commands::PatchStateCommand::test_envelope( + ExternalStateMutation { + service_id: keyed_service_id.clone(), + version: None, + state: first_state_mutation, + }, + )) .await; test_env - .apply(Command::PatchState(ExternalStateMutation { - service_id: keyed_service_id.clone(), - version: None, - state: second_state_mutation.clone(), - })) + .apply(commands::PatchStateCommand::test_envelope( + ExternalStateMutation { + service_id: keyed_service_id.clone(), + version: None, + state: second_state_mutation.clone(), + }, + )) .await; // terminating the ongoing invocation should trigger popping from the inbox until the // next invocation is found test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::End, - }))) + })) .await; let all_states: HashMap<_, _> = test_env @@ -687,13 +700,13 @@ async fn clear_all_user_states() -> anyhow::Result<()> { fixtures::mock_start_invocation_with_service_id(&mut test_env, service_id.clone()).await; test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::JournalEntry { entry_index: 1, entry: ProtobufRawEntryCodec::serialize_enriched(Entry::clear_all_state()), }, - }))) + })) .await; let states: Vec> = test_env @@ -722,13 +735,13 @@ async fn get_state_keys() -> TestResult { txn.commit().await.unwrap(); let actions = test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::JournalEntry { entry_index: 1, entry: ProtobufRawEntryCodec::serialize_enriched(Entry::get_state_keys(None)), }, - }))) + })) .await; // At this point we expect the completion to be forwarded to the invoker @@ -763,7 +776,7 @@ async fn get_invocation_id_entry() { let actions = test_env .apply_multiple(vec![ - Command::InvokerEffect(Box::new(Effect { + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::JournalEntry { entry_index: 3, @@ -771,8 +784,8 @@ async fn get_invocation_id_entry() { Entry::get_call_invocation_id(1, None), ), }, - })), - Command::InvokerEffect(Box::new(Effect { + }), + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::JournalEntry { entry_index: 4, @@ -780,7 +793,7 @@ async fn get_invocation_id_entry() { Entry::get_call_invocation_id(2, None), ), }, - })), + }), ]) .await; @@ -831,7 +844,7 @@ async fn attach_invocation_entry() { let callee_invocation_id = InvocationId::mock_random(); let actions = test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: EffectKind::JournalEntry { entry_index: 1, @@ -844,7 +857,7 @@ async fn attach_invocation_entry() { }, )), }, - }))) + })) .await; assert_that!( actions, @@ -875,7 +888,7 @@ async fn get_invocation_output_entry() { let callee_invocation_id = InvocationId::mock_random(); let actions = test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: EffectKind::JournalEntry { entry_index: 1, @@ -888,7 +901,7 @@ async fn get_invocation_output_entry() { }, )), }, - }))) + })) .await; assert_that!( @@ -911,10 +924,12 @@ async fn get_invocation_output_entry() { // Let's try to complete it with not ready, this should forward empty let actions = test_env - .apply(Command::InvocationResponse(InvocationResponse { - target: JournalCompletionTarget::from_parts(invocation_id, 1), - result: NOT_READY_INVOCATION_ERROR.into(), - })) + .apply(commands::InvocationResponseCommand::test_envelope( + InvocationResponse { + target: JournalCompletionTarget::from_parts(invocation_id, 1), + result: NOT_READY_INVOCATION_ERROR.into(), + }, + )) .await; assert_that!( actions, @@ -935,7 +950,7 @@ async fn send_ingress_response_to_multiple_targets() -> TestResult { let request_id_3 = PartitionProcessorRpcRequestId::default(); let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { response_sink: Some(ServiceInvocationResponseSink::Ingress { request_id: request_id_1, }), @@ -944,7 +959,7 @@ async fn send_ingress_response_to_multiple_targets() -> TestResult { invocation_target.clone(), Source::Ingress(request_id_1), ) - }))) + })) .await; assert_that!( actions, @@ -972,7 +987,7 @@ async fn send_ingress_response_to_multiple_targets() -> TestResult { // Now let's send the output entry let response_bytes = Bytes::from_static(b"123"); let actions = test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::JournalEntry { entry_index: 1, @@ -980,17 +995,17 @@ async fn send_ingress_response_to_multiple_targets() -> TestResult { EntryResult::Success(response_bytes.clone()), )), }, - }))) + })) .await; // No ingress response is expected at this point because the invocation did not end yet assert_that!(actions, not(contains(pat!(Action::IngressResponse { .. })))); // Send the End Effect let actions = test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::End, - }))) + })) .await; // At this point we expect the completion to be forwarded to the invoker assert_that!( @@ -1031,7 +1046,14 @@ async fn truncate_outbox_from_empty() -> Result<(), Error> { let mut test_env = TestEnv::create().await; - let _ = test_env.apply(Command::TruncateOutbox(outbox_index)).await; + let _ = test_env + .apply(commands::TruncateOutboxCommand::test_envelope( + commands::TruncateOutboxCommand { + index: outbox_index, + partition_key_range: Keys::None, + }, + )) + .await; assert_that!(test_env.storage.get_outbox_message(0).await?, none()); @@ -1065,7 +1087,12 @@ async fn truncate_outbox_with_gap() -> Result<(), Error> { .await; test_env - .apply(Command::TruncateOutbox(outbox_tail_index)) + .apply(commands::TruncateOutboxCommand::test_envelope( + commands::TruncateOutboxCommand { + index: outbox_tail_index, + partition_key_range: Keys::None, + }, + )) .await; assert_that!(test_env.storage.get_outbox_message(3).await?, none()); @@ -1092,11 +1119,11 @@ async fn consecutive_exclusive_handler_invocations_will_use_inbox() -> TestResul // Let's start the first invocation let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id: first_invocation_id, invocation_target: invocation_target.clone(), ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -1112,11 +1139,11 @@ async fn consecutive_exclusive_handler_invocations_will_use_inbox() -> TestResul // Let's start the second invocation let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id: second_invocation_id, invocation_target: invocation_target.clone(), ..ServiceInvocation::mock() - }))) + })) .await; // This should have not been invoked, but it should rather be in the inbox @@ -1148,10 +1175,10 @@ async fn consecutive_exclusive_handler_invocations_will_use_inbox() -> TestResul // Send the End Effect to terminate the first invocation let actions = test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id: first_invocation_id, kind: InvokerEffectKind::End, - }))) + })) .await; // At this point we expect the invoke for the second, and also the lock updated assert_that!( @@ -1167,10 +1194,10 @@ async fn consecutive_exclusive_handler_invocations_will_use_inbox() -> TestResul ); let _ = test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id: second_invocation_id, kind: InvokerEffectKind::End, - }))) + })) .await; // After the second was completed too, the inbox is empty and the service is unlocked @@ -1207,10 +1234,12 @@ async fn deduplicate_requests_with_same_pp_rpc_request_id() -> TestResult { si.response_sink = Some(ServiceInvocationResponseSink::Ingress { request_id }); si.submit_notification_sink = Some(SubmitNotificationSink::Ingress { request_id }); si.source = Source::Ingress(request_id); - Box::new(si) + si }; let actions = test_env - .apply(Command::Invoke(service_invocation.clone())) + .apply(commands::InvokeCommand::test_envelope( + service_invocation.clone(), + )) .await; assert_that!( actions, @@ -1227,7 +1256,9 @@ async fn deduplicate_requests_with_same_pp_rpc_request_id() -> TestResult { // Applying this again won't generate Invoke action, // but will return same submit notification. - let actions = test_env.apply(Command::Invoke(service_invocation)).await; + let actions = test_env + .apply(commands::InvokeCommand::test_envelope(service_invocation)) + .await; assert_that!( actions, all!( @@ -1253,14 +1284,14 @@ async fn yield_effect_resumes_invocation() { // Apply a Yield effect — the invocation should be immediately re-invoked let actions = test_env - .apply(Command::InvokerEffect(Box::new(Effect { + .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: EffectKind::Yield(YieldReason::ExhaustedMemoryBudget { needed_memory: restate_memory::NonZeroByteCount::new( NonZeroUsize::new(32768).unwrap(), ), }), - }))) + })) .await; // Yield should produce an Action::Invoke to re-schedule the invocation diff --git a/crates/worker/src/partition/state_machine/tests/workflow.rs b/crates/worker/src/partition/state_machine/tests/workflow.rs index 209bd8dfb0..7d5531d747 100644 --- a/crates/worker/src/partition/state_machine/tests/workflow.rs +++ b/crates/worker/src/partition/state_machine/tests/workflow.rs @@ -31,7 +31,7 @@ async fn start_workflow_method() { // Send fresh invocation let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), completion_retention_duration: Duration::from_secs(60), @@ -39,7 +39,7 @@ async fn start_workflow_method() { request_id: request_id_1, }), ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -60,14 +60,14 @@ async fn start_workflow_method() { // Sending another invocation won't re-execute let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), response_sink: Some(ServiceInvocationResponseSink::Ingress { request_id: request_id_2, }), ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -90,7 +90,7 @@ async fn start_workflow_method() { let response_bytes = Bytes::from_static(b"123"); let actions = test_env .apply_multiple([ - Command::InvokerEffect(Box::new(Effect { + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::JournalEntry { entry_index: 1, @@ -98,11 +98,11 @@ async fn start_workflow_method() { EntryResult::Success(response_bytes.clone()), )), }, - })), - Command::InvokerEffect(Box::new(Effect { + }), + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::End, - })), + }), ]) .await; @@ -146,14 +146,14 @@ async fn start_workflow_method() { // Sending a new request will not be completed because we don't support attach semantics let request_id_3 = PartitionProcessorRpcRequestId::default(); let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), response_sink: Some(ServiceInvocationResponseSink::Ingress { request_id: request_id_3, }), ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -180,7 +180,7 @@ async fn attach_by_workflow_key() { // Send fresh invocation let actions = test_env - .apply(Command::Invoke(Box::new(ServiceInvocation { + .apply(commands::InvokeCommand::test_envelope(ServiceInvocation { invocation_id, invocation_target: invocation_target.clone(), completion_retention_duration: Duration::from_secs(60), @@ -188,7 +188,7 @@ async fn attach_by_workflow_key() { request_id: request_id_1, }), ..ServiceInvocation::mock() - }))) + })) .await; assert_that!( actions, @@ -199,15 +199,17 @@ async fn attach_by_workflow_key() { // Sending another invocation won't re-execute let actions = test_env - .apply(Command::AttachInvocation(AttachInvocationRequest { - invocation_query: InvocationQuery::Workflow( - invocation_target.as_keyed_service_id().unwrap(), - ), - block_on_inflight: true, - response_sink: ServiceInvocationResponseSink::Ingress { - request_id: request_id_2, + .apply(commands::AttachInvocationCommand::test_envelope( + AttachInvocationRequest { + invocation_query: InvocationQuery::Workflow( + invocation_target.as_keyed_service_id().unwrap(), + ), + block_on_inflight: true, + response_sink: ServiceInvocationResponseSink::Ingress { + request_id: request_id_2, + }, }, - })) + )) .await; assert_that!( actions, @@ -223,7 +225,7 @@ async fn attach_by_workflow_key() { let response_bytes = Bytes::from_static(b"123"); let actions = test_env .apply_multiple([ - Command::InvokerEffect(Box::new(Effect { + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::JournalEntry { entry_index: 1, @@ -231,11 +233,11 @@ async fn attach_by_workflow_key() { EntryResult::Success(response_bytes.clone()), )), }, - })), - Command::InvokerEffect(Box::new(Effect { + }), + commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, kind: InvokerEffectKind::End, - })), + }), ]) .await; @@ -277,15 +279,17 @@ async fn attach_by_workflow_key() { // Sending another attach will be completed immediately let actions = test_env - .apply(Command::AttachInvocation(AttachInvocationRequest { - invocation_query: InvocationQuery::Workflow( - invocation_target.as_keyed_service_id().unwrap(), - ), - block_on_inflight: true, - response_sink: ServiceInvocationResponseSink::Ingress { - request_id: request_id_3, + .apply(commands::AttachInvocationCommand::test_envelope( + AttachInvocationRequest { + invocation_query: InvocationQuery::Workflow( + invocation_target.as_keyed_service_id().unwrap(), + ), + block_on_inflight: true, + response_sink: ServiceInvocationResponseSink::Ingress { + request_id: request_id_3, + }, }, - })) + )) .await; assert_that!( actions, @@ -323,12 +327,14 @@ async fn purge_completed_workflow() { let request_id = PartitionProcessorRpcRequestId::new(); let actions = test_env - .apply(Command::PurgeInvocation(PurgeInvocationRequest { - invocation_id, - response_sink: Some(InvocationMutationResponseSink::Ingress( - IngressInvocationResponseSink { request_id }, - )), - })) + .apply(commands::PurgeInvocationCommand::test_envelope( + PurgeInvocationRequest { + invocation_id, + response_sink: Some(InvocationMutationResponseSink::Ingress( + IngressInvocationResponseSink { request_id }, + )), + }, + )) .await; assert_that!( actions, diff --git a/crates/worker/src/partition/state_machine/utils.rs b/crates/worker/src/partition/state_machine/utils.rs index 5bd51ca41c..baaf12d8f3 100644 --- a/crates/worker/src/partition/state_machine/utils.rs +++ b/crates/worker/src/partition/state_machine/utils.rs @@ -8,10 +8,10 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use tracing::{Level, Span, debug_span, event_enabled, trace_span}; +use tracing::{Span, debug_span, trace_span}; use restate_types::{identifiers::InvocationId, invocation::InvocationTarget}; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::CommandKind; pub(super) trait SpanExt { fn record_invocation_id(&self, id: &InvocationId); @@ -33,31 +33,26 @@ impl SpanExt for tracing::Span { } } -pub(super) fn state_machine_apply_command_span(is_leader: bool, cmd: &Command) -> Span { - let span = if is_leader { +pub(super) fn state_machine_apply_command_span(is_leader: bool, record_kind: CommandKind) -> Span { + if is_leader { debug_span!( "apply_command", - otel.name = format!("apply-command: {}", cmd.name()), + otel.name = format!("apply-command: {}", record_kind), restate.invocation.id = tracing::field::Empty, restate.invocation.target = tracing::field::Empty, rpc.service = tracing::field::Empty, rpc.method = tracing::field::Empty, - restate.state_machine.command = tracing::field::debug(cmd), + restate.state_machine.command = tracing::field::display(record_kind), ) } else { trace_span!( "apply_command", - otel.name = format!("apply-command: {}", cmd.name()), + otel.name = format!("apply-command: {}", record_kind), restate.invocation.id = tracing::field::Empty, restate.invocation.target = tracing::field::Empty, rpc.service = tracing::field::Empty, rpc.method = tracing::field::Empty, - restate.state_machine.command = tracing::field::debug(cmd), + restate.state_machine.command = tracing::field::display(record_kind), ) - }; - if event_enabled!(Level::TRACE) { - span.record("restate.state_machine.command", tracing::field::debug(cmd)); } - - span } diff --git a/tools/pp-bench/src/command_gen.rs b/tools/pp-bench/src/command_gen.rs index fdaea47fdf..a0f31944ef 100644 --- a/tools/pp-bench/src/command_gen.rs +++ b/tools/pp-bench/src/command_gen.rs @@ -43,7 +43,7 @@ use rand::{Rng, RngCore, SeedableRng}; use restate_cli_util::{c_println, c_success}; use restate_serde_util::ByteCount; use restate_types::identifiers::{ - InvocationId, PartitionId, PartitionProcessorRpcRequestId, ServiceId, WithPartitionKey, + InvocationId, PartitionId, PartitionProcessorRpcRequestId, ServiceId, }; use restate_types::invocation::{ InvocationTarget, ServiceInvocation, Source, VirtualObjectHandlerType, @@ -51,7 +51,7 @@ use restate_types::invocation::{ use restate_types::logs::Lsn; use restate_types::state_mut::ExternalStateMutation; use restate_types::storage::StorageCodec; -use restate_wal_protocol::{Command, Destination, Envelope, Header}; +use restate_wal_protocol::v2::{self, CommandKind, Dedup, Envelope, Raw, commands}; use crate::{GenerateOpts, InspectOpts, WorkloadSpec, WorkloadType}; @@ -164,7 +164,7 @@ impl FileHeader { /// Result of loading commands from a file — carries optional real LSNs /// and metadata from the file header. pub struct LoadedWorkload { - pub commands: Vec, + pub envelopes: Vec>, pub lsns: Option>, pub start_lsn: Option, pub partition_id: Option, @@ -175,14 +175,14 @@ pub struct LoadedWorkload { // Command generation // --------------------------------------------------------------------------- -fn generate_one(rng: &mut StdRng, spec: &WorkloadSpec) -> Command { +fn generate_one(rng: &mut StdRng, spec: &WorkloadSpec) -> Envelope { match spec.workload { WorkloadType::PatchState => generate_patch_state(rng, spec), WorkloadType::Invoke => generate_invoke(rng, spec), } } -fn generate_patch_state(rng: &mut StdRng, spec: &WorkloadSpec) -> Command { +fn generate_patch_state(rng: &mut StdRng, spec: &WorkloadSpec) -> Envelope { let key_idx = rng.random_range(0..spec.num_keys); let service_id = ServiceId::new(None, "bench.PatchTarget", format!("key-{key_idx}")); @@ -194,14 +194,18 @@ fn generate_patch_state(rng: &mut StdRng, spec: &WorkloadSpec) -> Command { state.insert(Bytes::from(format!("state-{i}")), Bytes::from(value)); } - Command::PatchState(ExternalStateMutation { - service_id, - version: None, - state, - }) + Envelope::new( + Dedup::None, + commands::PatchStateCommand::from(ExternalStateMutation { + service_id, + version: None, + state, + }), + ) + .into_raw() } -fn generate_invoke(rng: &mut StdRng, spec: &WorkloadSpec) -> Command { +fn generate_invoke(rng: &mut StdRng, spec: &WorkloadSpec) -> Envelope { let key_idx = rng.random_range(0..spec.num_keys); let target = InvocationTarget::virtual_object( "bench.InvokeTarget", @@ -220,29 +224,7 @@ fn generate_invoke(rng: &mut StdRng, spec: &WorkloadSpec) -> Command { rng.fill_bytes(&mut arg_buf); invocation.argument = Bytes::from(arg_buf); - Command::Invoke(Box::new(invocation)) -} - -// --------------------------------------------------------------------------- -// Wrapping commands in Envelopes for serialization -// --------------------------------------------------------------------------- - -fn wrap_in_envelope(command: Command) -> Envelope { - let partition_key = match &command { - Command::PatchState(mutation) => mutation.service_id.partition_key(), - Command::Invoke(invoke) => invoke.partition_key(), - _ => 0, - }; - Envelope::new( - Header { - source: restate_wal_protocol::Source::Ingress {}, - dest: Destination::Processor { - partition_key, - dedup: None, - }, - }, - command, - ) + Envelope::new(Dedup::None, commands::InvokeCommand::from(invocation)).into_raw() } // --------------------------------------------------------------------------- @@ -252,7 +234,7 @@ fn wrap_in_envelope(command: Command) -> Envelope { /// Encode and write a single envelope record, optionally with an LSN prefix. pub fn write_record( writer: &mut W, - envelope: &Envelope, + envelope: &Envelope, lsn: Option, scratch: &mut BytesMut, ) -> anyhow::Result<()> { @@ -270,7 +252,7 @@ pub fn write_record( fn read_record( reader: &mut R, has_real_lsns: bool, -) -> anyhow::Result<(Option, Envelope)> { +) -> anyhow::Result<(Option, Envelope)> { let lsn = if has_real_lsns { let mut buf8 = [0u8; 8]; reader.read_exact(&mut buf8)?; @@ -287,7 +269,7 @@ fn read_record( reader.read_exact(&mut data)?; let mut cursor = &data[..]; - let envelope: Envelope = StorageCodec::decode(&mut cursor) + let envelope: Envelope = StorageCodec::decode(&mut cursor) .map_err(|e| anyhow::anyhow!("Failed to decode envelope: {e}"))?; Ok((lsn, envelope)) @@ -319,8 +301,7 @@ pub fn generate_command_file(opts: &GenerateOpts) -> anyhow::Result<()> { let mut rng = StdRng::seed_from_u64(opts.spec.seed); let mut scratch = BytesMut::with_capacity(4096); for _ in 0..opts.num_commands { - let cmd = generate_one(&mut rng, &opts.spec); - let envelope = wrap_in_envelope(cmd); + let envelope = generate_one(&mut rng, &opts.spec); write_record(&mut writer, &envelope, None, &mut scratch)?; } writer.flush()?; @@ -353,7 +334,7 @@ pub fn load_commands_from_file(path: &Path) -> anyhow::Result { let has_real_lsns = header.has_real_lsns(); let num_commands = header.num_commands as usize; - let mut commands = Vec::with_capacity(num_commands); + let mut envelopes = Vec::with_capacity(num_commands); let mut lsns = if has_real_lsns { Some(Vec::with_capacity(num_commands)) } else { @@ -362,7 +343,9 @@ pub fn load_commands_from_file(path: &Path) -> anyhow::Result { for _ in 0..num_commands { let (lsn, envelope) = read_record(&mut reader, has_real_lsns)?; - commands.push(envelope.command); + + envelopes.push(envelope); + if let (Some(lsn_vec), Some(lsn)) = (&mut lsns, lsn) { lsn_vec.push(lsn); } @@ -380,7 +363,7 @@ pub fn load_commands_from_file(path: &Path) -> anyhow::Result { }; Ok(LoadedWorkload { - commands, + envelopes, lsns, start_lsn, partition_id, @@ -389,7 +372,7 @@ pub fn load_commands_from_file(path: &Path) -> anyhow::Result { } /// Generate commands in-memory (no file I/O). -pub fn generate_commands_inline(spec: &WorkloadSpec, count: u64) -> Vec { +pub fn generate_commands_inline(spec: &WorkloadSpec, count: u64) -> Vec> { let mut rng = StdRng::seed_from_u64(spec.seed); (0..count).map(|_| generate_one(&mut rng, spec)).collect() } @@ -436,7 +419,7 @@ impl ExtractedFileWriter { }) } - pub fn write_envelope(&mut self, lsn: Lsn, envelope: &Envelope) -> anyhow::Result<()> { + pub fn write_envelope(&mut self, lsn: Lsn, envelope: &Envelope) -> anyhow::Result<()> { write_record(&mut self.writer, envelope, Some(lsn), &mut self.scratch)?; let lsn_val = u64::from(lsn); if self.first_lsn.is_none() { @@ -484,9 +467,13 @@ fn workload_source_name(tag: u8) -> &'static str { } } -fn describe_command(cmd: &Command) -> String { - match cmd { - Command::PatchState(m) => { +fn describe_command(envelope: Envelope) -> String { + match envelope.kind() { + CommandKind::PatchState => { + let m = envelope + .into_typed::() + .into_inner() + .unwrap(); format!( "PatchState {{ service={}/{}, keys={} }}", m.service_id.service_name, @@ -494,14 +481,18 @@ fn describe_command(cmd: &Command) -> String { m.state.len(), ) } - Command::Invoke(inv) => { + CommandKind::Invoke => { + let inv = envelope + .into_typed::() + .into_inner() + .unwrap(); format!( "Invoke {{ target={}, arg_len={} }}", inv.invocation_target, inv.argument.len(), ) } - other => other.name().to_string(), + other => other.to_string(), } } @@ -564,12 +555,12 @@ pub fn inspect_command_file(opts: &InspectOpts) -> anyhow::Result<()> { if i < to_show { let mut cursor = &data[..]; - let envelope: Envelope = StorageCodec::decode(&mut cursor) + let envelope: Envelope = StorageCodec::decode(&mut cursor) .map_err(|e| anyhow::anyhow!("Failed to decode command {i}: {e}"))?; let lsn_prefix = lsn.map(|l| format!("lsn={l} ")).unwrap_or_default(); c_println!( " [{i}] {lsn_prefix}{} ({}B)", - describe_command(&envelope.command), + describe_command(envelope), len ); } diff --git a/tools/pp-bench/src/command_pool.rs b/tools/pp-bench/src/command_pool.rs index fb7660aff4..6d920ba62c 100644 --- a/tools/pp-bench/src/command_pool.rs +++ b/tools/pp-bench/src/command_pool.rs @@ -11,50 +11,50 @@ //! Command pool: loads pre-generated commands and serves them to the benchmark loop. use restate_types::logs::Lsn; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::{Envelope, Raw}; /// Pre-loaded pool of commands that the benchmark loop draws from. /// Cycles through the pool when the benchmark needs more commands than are stored. pub struct CommandPool { - commands: Vec, + envelopes: Vec>, lsns: Option>, index: usize, } impl CommandPool { /// Create a pool from commands without LSNs (generated workloads). - pub fn new(commands: Vec) -> Self { + pub fn new(envelopes: Vec>) -> Self { assert!( - !commands.is_empty(), + !envelopes.is_empty(), "CommandPool must have at least one command" ); Self { - commands, + envelopes, lsns: None, index: 0, } } /// Create a pool from commands with associated LSNs (extracted workloads). - pub fn with_lsns(commands: Vec, lsns: Vec) -> Self { + pub fn with_lsns(envelopes: Vec>, lsns: Vec) -> Self { assert!( - !commands.is_empty(), + !envelopes.is_empty(), "CommandPool must have at least one command" ); - assert_eq!(commands.len(), lsns.len(), "commands and LSNs must match"); + assert_eq!(envelopes.len(), lsns.len(), "commands and LSNs must match"); Self { - commands, + envelopes, lsns: Some(lsns), index: 0, } } /// Take the next command (and optional LSN) from the pool, cycling when exhausted. - pub fn next_command(&mut self) -> (Command, Option) { - let cmd = self.commands[self.index].clone(); + pub fn next_envelope(&mut self) -> (Envelope, Option) { + let envelope = self.envelopes[self.index].clone(); let lsn = self.lsns.as_ref().map(|l| l[self.index]); - self.index = (self.index + 1) % self.commands.len(); - (cmd, lsn) + self.index = (self.index + 1) % self.envelopes.len(); + (envelope, lsn) } /// Whether this pool carries real LSNs from an extracted workload. @@ -64,10 +64,10 @@ impl CommandPool { /// Number of unique commands in the pool. pub fn len(&self) -> usize { - self.commands.len() + self.envelopes.len() } pub fn is_empty(&self) -> bool { - self.commands.is_empty() + self.envelopes.is_empty() } } diff --git a/tools/pp-bench/src/extract.rs b/tools/pp-bench/src/extract.rs index fff23e9128..4ebaf20cfd 100644 --- a/tools/pp-bench/src/extract.rs +++ b/tools/pp-bench/src/extract.rs @@ -33,7 +33,7 @@ use restate_types::identifiers::PartitionId; use restate_types::logs::metadata::SegmentIndex; use restate_types::logs::{LogId, LogletId, LogletOffset, Lsn, SequenceNumber}; use restate_types::storage::StorageCodec; -use restate_wal_protocol::Envelope; +use restate_wal_protocol::v2::{Envelope, Raw}; use crate::ExtractOpts; use crate::command_gen::ExtractedFileWriter; @@ -343,7 +343,7 @@ fn extract_log_records( .map_err(|e| anyhow::anyhow!("Failed to encode record body at {offset}: {e}"))?; let mut cursor = std::io::Cursor::new(body_bytes.as_ref()); - let envelope: Envelope = match StorageCodec::decode(&mut cursor) { + let envelope: Envelope = match StorageCodec::decode(&mut cursor) { Ok(e) => e, Err(e) => { c_warn!("Skipping record at offset {offset}: envelope decode error: {e}"); diff --git a/tools/pp-bench/src/workload.rs b/tools/pp-bench/src/workload.rs index ee0e8f597c..5586b1b921 100644 --- a/tools/pp-bench/src/workload.rs +++ b/tools/pp-bench/src/workload.rs @@ -66,12 +66,17 @@ pub async fn run( // ----------------------------------------------------------------------- let mut pool = if let Some(ref path) = opts.command_file { c_println!("Loading commands from {}", path.display()); + let start_time = MillisSinceEpoch::now(); let loaded = command_gen::load_commands_from_file(path)?; - c_println!("Command pool: {} commands loaded", loaded.commands.len()); + c_println!( + "Command pool: {} commands loaded (took: {}ms)", + loaded.envelopes.len(), + start_time.elapsed().as_millis() + ); if let Some(ref lsns) = loaded.lsns { - CommandPool::with_lsns(loaded.commands, lsns.clone()) + CommandPool::with_lsns(loaded.envelopes, lsns.clone()) } else { - CommandPool::new(loaded.commands) + CommandPool::new(loaded.envelopes) } } else { let total_needed = opts.warmup + opts.num_commands; @@ -143,7 +148,7 @@ pub async fn run( let mut txn = partition_store.transaction(); let batch_cmds = (batch_size as u64).min(warmup - cmds_applied); for _ in 0..batch_cmds { - let (cmd, _real_lsn) = pool.next_command(); + let (cmd, _real_lsn) = pool.next_envelope(); state_machine .apply( cmd, @@ -181,7 +186,7 @@ pub async fn run( let mut txn = partition_store.transaction(); for _ in 0..batch_cmds { - let (cmd, _real_lsn) = pool.next_command(); + let (cmd, _real_lsn) = pool.next_envelope(); state_machine .apply( cmd,