From 8032ac85d43e826658e78acd2562374ff0627ce2 Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Tue, 12 May 2026 18:18:38 +0200 Subject: [PATCH] worker: write v2 envelopes with typed commands MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Switch the worker's write path from the v1 Envelope/Command enum to v2 typed commands and Envelope. SelfProposer::{self_propose, self_propose_many, append_with_notification} are now generic over `C: Command + HasRecordKeys` and derive the partition key from the command via `record_keys()` — the explicit `partition_key` parameter threaded through LeadershipState, the Actuator trait, and every RPC handler is gone. All worker RPC paths (append_invocation, append_invocation_response, append_signal, cancel/kill/purge_invocation, purge_journal, resume_invocation, restart_as_new_invocation, get_invocation_output) and all leader-side ActionEffect proposals (Invoker, Shuffle, Timer, Cleaner, UpsertSchema, UpsertRuleBook, scheduler decisions, version barriers, AnnounceLeader) now construct typed commands::*Command values directly, removing the Command::* enum wrapping and the BytesMut arena previously used to pre-encode scheduler decisions. With v2 envelopes now being written, drop the `try_v1_first` branch in PartitionProcessor::decode_record — v2 is the fast path and v1 stays as the TypedValueMismatch fallback for replay of older records. Tests under leadership and rpc are updated to mock the new generic Actuator signatures and decode via Envelope::into_typed::(). --- .../src/partition/leadership/leader_state.rs | 136 ++++++---------- crates/worker/src/partition/leadership/mod.rs | 74 ++++----- .../src/partition/leadership/self_proposer.rs | 103 +++++------- .../src/partition/rpc/append_invocation.rs | 53 ++++--- .../rpc/append_invocation_response.rs | 9 +- .../worker/src/partition/rpc/append_signal.rs | 10 +- .../src/partition/rpc/cancel_invocation.rs | 10 +- .../partition/rpc/get_invocation_output.rs | 9 +- .../src/partition/rpc/kill_invocation.rs | 10 +- crates/worker/src/partition/rpc/mod.rs | 39 ++--- .../src/partition/rpc/purge_invocation.rs | 10 +- .../worker/src/partition/rpc/purge_journal.rs | 10 +- .../rpc/restart_as_new_invocation.rs | 149 +++++++++--------- .../src/partition/rpc/resume_invocation.rs | 37 ++--- 14 files changed, 286 insertions(+), 373 deletions(-) diff --git a/crates/worker/src/partition/leadership/leader_state.rs b/crates/worker/src/partition/leadership/leader_state.rs index 69d47a4724..bccd5b7969 100644 --- a/crates/worker/src/partition/leadership/leader_state.rs +++ b/crates/worker/src/partition/leadership/leader_state.rs @@ -16,7 +16,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, ready}; -use bytes::BytesMut; use futures::future::OptionFuture; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt, stream}; @@ -33,12 +32,10 @@ use restate_limiter::RuleBook; use restate_partition_store::PartitionDb; use restate_storage_api::vqueue_table::scheduler::SchedulerDecisionsCommand; use restate_types::config::Configuration; -use restate_types::identifiers::{ - LeaderEpoch, PartitionId, PartitionKey, PartitionProcessorRpcRequestId, WithPartitionKey, -}; +use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionProcessorRpcRequestId}; use restate_types::invocation::PurgeInvocationRequest; use restate_types::invocation::client::{InvocationOutput, SubmittedInvocationNotification}; -use restate_types::logs::Keys; +use restate_types::logs::{BodyWithKeys, Keys}; use restate_types::net::ingest::IngestRecord; use restate_types::net::partition_processor::{ PartitionProcessorRpcError, PartitionProcessorRpcResponse, @@ -48,9 +45,8 @@ use restate_types::{RESTATE_VERSION_1_7_0, SemanticRestateVersion, Version, Vers use restate_vqueues::VQueueEvent; use restate_vqueues::scheduler::Decisions; use restate_vqueues::{SchedulerService, VQueuesMeta}; -use restate_wal_protocol::Command; use restate_wal_protocol::control::UpsertSchemaCommand; -use restate_wal_protocol::v1::UpsertRuleBookCommandWrapper; +use restate_wal_protocol::v2::{Command, CommandWithKeys, commands}; use restate_worker_api::invoker::InvokerHandle; use restate_worker_api::resources::ReservedResources; use restate_worker_api::{SchedulerStatusEntry, UserLimitCounterEntry}; @@ -342,7 +338,6 @@ impl LeaderState { &mut self, action_effects: impl IntoIterator, ) -> Result<(), Error> { - let mut arena = BytesMut::with_capacity(128 * 1024); for effect in action_effects { match effect { ActionEffect::Scheduler(decisions) => { @@ -362,17 +357,11 @@ impl LeaderState { .into_iter() // one command per partition key .map(|(partition_key, group)| { - let decisions = SchedulerDecisionsCommand { - qids: group.collect(), - }; - - arena.reserve(decisions.encoded_len()); - // safe to unwrap because we reserved enough space - decisions.bilrost_encode(&mut arena).unwrap(); - - ( - partition_key, - Command::VQSchedulerDecisions(arena.split().freeze()), + BodyWithKeys::new( + SchedulerDecisionsCommand { + qids: group.collect(), + }, + Keys::Single(partition_key), ) // an action goes into command, and resources are popped }) @@ -387,71 +376,64 @@ impl LeaderState { // based on configuration, whether to consider partition-local durability in // the replica-set as a sufficient source of durability, or only snapshots. self.self_proposer - .self_propose( - self.partition_key_range.start(), - Command::UpdatePartitionDurability(partition_durability), - ) + .self_propose(BodyWithKeys::new( + partition_durability, + Keys::RangeInclusive(self.partition_key_range.into()), + )) .await?; } ActionEffect::Invoker(invoker_effect) => { self.self_proposer - .self_propose( - invoker_effect.invocation_id.partition_key(), - Command::InvokerEffect(invoker_effect), - ) + .self_propose(commands::InvokerEffectCommand::from(*invoker_effect)) .await?; } ActionEffect::Shuffle(outbox_truncation) => { // todo: Until we support partition splits we need to get rid of outboxes or introduce partition // specific destination messages that are identified by a partition_id self.self_proposer - .self_propose( - self.partition_key_range.start(), - Command::TruncateOutbox(outbox_truncation.index()), - ) + .self_propose(commands::TruncateOutboxCommand { + partition_key_range: Keys::RangeInclusive( + self.partition_key_range.into(), + ), + index: outbox_truncation.index(), + }) .await?; } ActionEffect::Timer(timer) => { self.self_proposer - .self_propose(timer.invocation_id().partition_key(), Command::Timer(timer)) + .self_propose(commands::TimerCommand::from(timer)) .await?; } - ActionEffect::Cleaner(effect) => { - let (invocation_id, cmd) = match effect { - CleanerEffect::PurgeJournal(invocation_id) => ( + ActionEffect::Cleaner(effect) => match effect { + CleanerEffect::PurgeJournal(invocation_id) => { + let record = commands::PurgeJournalCommand::from(PurgeInvocationRequest { invocation_id, - Command::PurgeJournal(PurgeInvocationRequest { - invocation_id, - response_sink: None, - }), - ), - CleanerEffect::PurgeInvocation(invocation_id) => ( - invocation_id, - Command::PurgeInvocation(PurgeInvocationRequest { + response_sink: None, + }); + + self.self_proposer.self_propose(record).await?; + } + CleanerEffect::PurgeInvocation(invocation_id) => { + let record = + commands::PurgeInvocationCommand::from(PurgeInvocationRequest { invocation_id, response_sink: None, - }), - ), - }; + }); - self.self_proposer - .self_propose(invocation_id.partition_key(), cmd) - .await?; - } + self.self_proposer.self_propose(record).await?; + } + }, ActionEffect::UpsertSchema(schema) => { if SemanticRestateVersion::current() .is_equal_or_newer_than(&RESTATE_VERSION_1_7_0) { self.self_proposer - .self_propose( - self.partition_key_range.start(), - Command::UpsertSchema(UpsertSchemaCommand { - partition_key_range: Keys::RangeInclusive( - self.partition_key_range.into(), - ), - schema, - }), - ) + .self_propose(UpsertSchemaCommand { + partition_key_range: Keys::RangeInclusive( + self.partition_key_range.into(), + ), + schema, + }) .await?; } } @@ -463,21 +445,11 @@ impl LeaderState { .experimental .is_vqueues_enabled() { - let cmd = - restate_wal_protocol::control::UpsertRuleBookCommand { rule_book }; - - arena.reserve(cmd.encoded_len()); - // safe to unwrap because we reserved enough space - cmd.bilrost_encode(&mut arena).unwrap(); - self.self_proposer - .self_propose( - self.partition_key_range.start(), - Command::UpsertRuleBook(UpsertRuleBookCommandWrapper { - partition_key_range: self.partition_key_range, - command: arena.split().freeze(), - }), - ) + .self_propose(BodyWithKeys::new( + restate_wal_protocol::control::UpsertRuleBookCommand { rule_book }, + Keys::RangeInclusive(self.partition_key_range.into()), + )) .await?; } } @@ -490,14 +462,13 @@ impl LeaderState { Ok(()) } - pub async fn handle_rpc_proposal_command( + pub async fn handle_rpc_proposal_command( &mut self, request_id: PartitionProcessorRpcRequestId, reciprocal: Reciprocal< Oneshot>, >, - partition_key: PartitionKey, - cmd: Command, + cmd: impl CommandWithKeys, ) { match self.awaiting_rpc_actions.entry(request_id) { Entry::Occupied(mut o) => { @@ -511,7 +482,7 @@ impl LeaderState { } Entry::Vacant(v) => { // In this case, no one proposed this command yet, let's try to propose it - if let Err(e) = self.self_proposer.self_propose(partition_key, cmd).await { + if let Err(e) = self.self_proposer.self_propose(cmd).await { reciprocal.send(Err(PartitionProcessorRpcError::Internal(e.to_string()))); } else { v.insert(reciprocal); @@ -525,18 +496,13 @@ impl LeaderState { /// Records appended this way are never filtered by the dedup mechanism during leadership /// transitions, making this safe for fire-and-forget ingress commands (signals, invocation /// responses). - pub async fn append_and_respond_asynchronously( + pub async fn append_and_respond_asynchronously( &mut self, - partition_key: PartitionKey, - cmd: Command, + cmd: impl CommandWithKeys, reciprocal: RpcReciprocal, success_response: PartitionProcessorRpcResponse, ) { - match self - .self_proposer - .append_with_notification(partition_key, cmd) - .await - { + match self.self_proposer.append_with_notification(cmd).await { Ok(commit_token) => { self.awaiting_rpc_self_propose.push(SelfAppendFuture::new( commit_token, diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index f0314035dc..e0878a7427 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -20,6 +20,8 @@ use std::sync::Arc; use std::time::Duration; use futures::{StreamExt, TryStreamExt}; +use restate_wal_protocol::Envelope; +use restate_wal_protocol::v2::Command; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, instrument, warn}; @@ -47,10 +49,10 @@ use restate_timer::TokioClock; use restate_types::cluster::cluster_state::RunMode; use restate_types::config::Configuration; use restate_types::errors::GenericError; +use restate_types::identifiers::PartitionProcessorRpcRequestId; use restate_types::identifiers::{LeaderEpoch, PartitionId}; -use restate_types::identifiers::{PartitionKey, PartitionProcessorRpcRequestId}; use restate_types::live::LiveLoadExt; -use restate_types::logs::Keys; +use restate_types::logs::{HasRecordKeys, Keys}; use restate_types::message::MessageIndex; use restate_types::net::ingest::IngestRecord; use restate_types::net::partition_processor::{ @@ -70,7 +72,6 @@ use restate_wal_protocol::control::{ AnnounceLeaderCommand, UpdatePartitionDurabilityCommand, VersionBarrierCommand, }; use restate_wal_protocol::timer::TimerKeyValue; -use restate_wal_protocol::{Command, Envelope}; use restate_worker_api::invoker::capacity::InvokerCapacity; use restate_worker_api::invoker::{Effect, InvokerHandle}; use restate_worker_api::{ @@ -267,14 +268,14 @@ where ) -> Result<(), Error> { let leader_epoch = leadership_info.leader_epoch; - let announce_leader = Command::AnnounceLeader(Box::new(AnnounceLeaderCommand { + let announce_leader = AnnounceLeaderCommand { node_id: my_node_id(), leader_epoch, epoch_version: Some(leadership_info.version), partition_key_range: self.partition.key_range, current_config: Some(leadership_info.current_config), next_config: leadership_info.next_config, - })); + }; let mut self_proposer = SelfProposer::new( self.partition.log_id(), @@ -282,9 +283,7 @@ where &self.bifrost, )?; - self_proposer - .self_propose(self.partition.key_range.start(), announce_leader) - .await?; + self_proposer.self_propose(announce_leader).await?; self.state = State::Candidate { leader_epoch, @@ -545,16 +544,11 @@ where }) { self_proposer - .self_propose( - self.partition.key_range.start(), - Command::VersionBarrier(VersionBarrierCommand { - version: forced_min_restate_version.clone(), - partition_key_range: Keys::RangeInclusive( - self.partition.key_range.into(), - ), - human_reason: Some("Force min Restate version".to_owned()), - }), - ) + .self_propose(VersionBarrierCommand { + version: forced_min_restate_version.clone(), + partition_key_range: Keys::RangeInclusive(self.partition.key_range.into()), + human_reason: Some("Force min Restate version".to_owned()), + }) .await?; min_restate_version = min_restate_version.max(forced_min_restate_version); @@ -565,16 +559,11 @@ where && RESTATE_VERSION_1_6_0.is_newer_than(&min_restate_version) { self_proposer - .self_propose( - self.partition.key_range.start(), - Command::VersionBarrier(VersionBarrierCommand { - version: RESTATE_VERSION_1_6_0.clone(), - partition_key_range: Keys::RangeInclusive( - self.partition.key_range.into(), - ), - human_reason: Some("Enable journal v2 by default".to_owned()), - }), - ) + .self_propose(VersionBarrierCommand { + version: RESTATE_VERSION_1_6_0.clone(), + partition_key_range: Keys::RangeInclusive(self.partition.key_range.into()), + human_reason: Some("Enable journal v2 by default".to_owned()), + }) .await?; } @@ -759,14 +748,13 @@ impl LeadershipState { } } - pub async fn handle_rpc_proposal_command( + pub async fn handle_rpc_proposal_command( &mut self, request_id: PartitionProcessorRpcRequestId, reciprocal: Reciprocal< Oneshot>, >, - partition_key: PartitionKey, - cmd: Command, + cmd: C, ) { match &mut self.state { State::Follower | State::Candidate { .. } => { @@ -777,17 +765,16 @@ impl LeadershipState { } State::Leader(leader_state) => { leader_state - .handle_rpc_proposal_command(request_id, reciprocal, partition_key, cmd) + .handle_rpc_proposal_command(request_id, reciprocal, cmd) .await; } } } /// Append a command to Bifrost without dedup information, responding on Bifrost commit. - pub async fn append_and_respond_asynchronously( + pub async fn append_and_respond_asynchronously( &mut self, - partition_key: PartitionKey, - cmd: Command, + cmd: C, reciprocal: Reciprocal< Oneshot>, >, @@ -799,12 +786,7 @@ impl LeadershipState { )), State::Leader(leader_state) => { leader_state - .append_and_respond_asynchronously( - partition_key, - cmd, - reciprocal, - success_response, - ) + .append_and_respond_asynchronously(cmd, reciprocal, success_response) .await; } } @@ -879,7 +861,6 @@ mod tests { use crate::partition::leadership::{LeadershipState, State}; use crate::partition_processor_manager::PartitionLeaderHandlesRegistry; use crate::rule_book_cache::RuleBookCacheHandle; - use assert2::let_assert; use restate_bifrost::Bifrost; use restate_core::partitions::PartitionRouting; use restate_core::{TaskCenter, TestCoreEnv}; @@ -895,8 +876,7 @@ mod tests { use restate_types::sharding::KeyRange; use restate_types::{GenerationalNodeId, Version}; use restate_vqueues::VQueuesMetaCache; - use restate_wal_protocol::Command; - use restate_wal_protocol::Envelope; + use restate_wal_protocol::v2::{CommandKind, Envelope, Raw, commands}; use restate_worker_api::invoker::capacity::InvokerCapacity; use std::num::NonZeroUsize; use std::sync::Arc; @@ -960,9 +940,11 @@ mod tests { .await .unwrap()?; - let envelope = record.try_decode::().unwrap()?; + let envelope = record.try_decode::>().unwrap()?; - let_assert!(Command::AnnounceLeader(announce_leader) = envelope.command); + assert_eq!(CommandKind::AnnounceLeader, envelope.kind()); + let envelope = envelope.into_typed::(); + let announce_leader = envelope.into_inner().unwrap(); assert_eq!(announce_leader.node_id, NODE_ID); assert_eq!(announce_leader.leader_epoch, leader_epoch); assert_eq!(announce_leader.partition_key_range, PARTITION_KEY_RANGE); diff --git a/crates/worker/src/partition/leadership/self_proposer.rs b/crates/worker/src/partition/leadership/self_proposer.rs index 636d5e7c42..8a810f789c 100644 --- a/crates/worker/src/partition/leadership/self_proposer.rs +++ b/crates/worker/src/partition/leadership/self_proposer.rs @@ -8,16 +8,16 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::sync::Arc; - use futures::never::Never; use restate_bifrost::{Bifrost, CommitToken, ErrorRecoveryStrategy, InputRecord}; -use restate_storage_api::deduplication_table::{DedupInformation, EpochSequenceNumber}; +use restate_storage_api::deduplication_table::EpochSequenceNumber; use restate_types::{ - identifiers::PartitionKey, logs::LogId, net::ingest::IngestRecord, time::NanosSinceEpoch, + logs::{BodyWithKeys, LogId}, + net::ingest::IngestRecord, + time::NanosSinceEpoch, }; -use restate_wal_protocol::{Command, Destination, Envelope, Header, Source}; +use restate_wal_protocol::v2::{Command, CommandWithKeys, Dedup, Envelope, Raw}; use crate::partition::leadership::Error; @@ -34,7 +34,7 @@ static BIFROST_APPENDER_TASK: &str = "bifrost-appender"; pub struct SelfProposer { epoch_sequence_number: EpochSequenceNumber, - bifrost_appender: restate_bifrost::AppenderHandle, + bifrost_appender: restate_bifrost::AppenderHandle>, } impl SelfProposer { @@ -72,33 +72,29 @@ impl SelfProposer { /// /// Note that self_propose_many will return an error if the number of commands is greater than /// the internal channel's max capacity. - pub async fn self_propose_many( - &mut self, - cmds: impl ExactSizeIterator, - ) -> Result<(), Error> { + pub async fn self_propose_many(&mut self, records: I) -> Result<(), Error> + where + I: ExactSizeIterator, + C: Command, + T: CommandWithKeys, + { // allocate a sequence number range for the batch let leader_epoch = self.epoch_sequence_number.leader_epoch; let start_seq = self.epoch_sequence_number.sequence_number; - let end_seq = start_seq + cmds.len() as u64; + let end_seq = start_seq + records.len() as u64; - let envelopes = cmds.enumerate().map(|(idx, (partition_key, cmd))| { - let esn = EpochSequenceNumber { - leader_epoch, - sequence_number: start_seq + idx as u64, - }; - let header = Header { - dest: Destination::Processor { - partition_key, - dedup: Some(DedupInformation::self_proposal(esn)), - }, - source: Source::Processor { - partition_id: None, - partition_key: Some(partition_key), + let envelopes = records.enumerate().map(|(idx, command)| { + let keys = command.keys(); + let envelope = Envelope::new( + Dedup::SelfProposal { leader_epoch, + seq: start_seq + idx as u64, }, - }; - Arc::new(Envelope::new(header, cmd)) + command.inner(), + ) + .into_raw(); + BodyWithKeys::new(envelope, keys) }); // Only blocks if background append is pushing back (queue full) @@ -117,21 +113,24 @@ impl SelfProposer { Ok(()) } - /// Self-propose a single command to Bifrost, attaching ESN-based dedup information. - pub async fn self_propose( + pub async fn self_propose( &mut self, - partition_key: PartitionKey, - cmd: Command, + command: impl CommandWithKeys, ) -> Result<(), Error> { - let envelope = Envelope::new(self.create_self_propose_header(partition_key), cmd); + let esn = self.next_esn(); + let dedup = Dedup::SelfProposal { + leader_epoch: esn.leader_epoch, + seq: esn.sequence_number, + }; + + let keys = command.keys(); + let envelope = Envelope::new(dedup, command.inner()); - // Only blocks if background append is pushing back (queue full) self.bifrost_appender .sender() - .enqueue(Arc::new(envelope)) + .enqueue(BodyWithKeys::new(envelope.into_raw(), keys)) .await .map_err(|e| Error::SelfProposer(e.to_string()))?; - Ok(()) } @@ -140,28 +139,17 @@ impl SelfProposer { /// Unlike [`Self::self_propose`], this does not attach an epoch sequence number. Records /// appended this way are never filtered by the dedup mechanism during leadership transitions, /// which makes them safe for fire-and-forget ingress commands (signals, invocation responses). - pub async fn append_with_notification( + pub async fn append_with_notification( &mut self, - partition_key: PartitionKey, - cmd: Command, + command: impl CommandWithKeys, ) -> Result { - let header = Header { - dest: Destination::Processor { - partition_key, - dedup: None, - }, - source: Source::Processor { - partition_id: None, - partition_key: Some(partition_key), - leader_epoch: self.epoch_sequence_number.leader_epoch, - }, - }; - let envelope = Envelope::new(header, cmd); + let keys = command.keys(); + let envelope = Envelope::new(Dedup::None, command.inner()).into_raw(); let commit_token = self .bifrost_appender .sender() - .enqueue_with_notification(Arc::new(envelope)) + .enqueue_with_notification(BodyWithKeys::new(envelope, keys)) .await .map_err(|e| Error::SelfProposer(e.to_string()))?; @@ -213,21 +201,10 @@ impl SelfProposer { .map_err(|e| Error::SelfProposer(e.to_string())) } - fn create_self_propose_header(&mut self, partition_key: PartitionKey) -> Header { + fn next_esn(&mut self) -> EpochSequenceNumber { let esn = self.epoch_sequence_number; self.epoch_sequence_number = self.epoch_sequence_number.next(); - - Header { - dest: Destination::Processor { - partition_key, - dedup: Some(DedupInformation::self_proposal(esn)), - }, - source: Source::Processor { - partition_id: None, - partition_key: Some(partition_key), - leader_epoch: self.epoch_sequence_number.leader_epoch, - }, - } + esn } /// Waits for self proposer to fail. This method will only complete with an error if the self diff --git a/crates/worker/src/partition/rpc/append_invocation.rs b/crates/worker/src/partition/rpc/append_invocation.rs index 50e8fa0360..c213a8c293 100644 --- a/crates/worker/src/partition/rpc/append_invocation.rs +++ b/crates/worker/src/partition/rpc/append_invocation.rs @@ -8,12 +8,13 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; -use restate_types::identifiers::WithPartitionKey; use restate_types::invocation; use restate_types::invocation::{ ServiceInvocation, ServiceInvocationResponseSink, SubmitNotificationSink, }; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) request_id: PartitionProcessorRpcRequestId, @@ -55,15 +56,13 @@ impl<'a, TActuator: Actuator, TSchemas, TStorage> RpcHandler } }; - let partition_key = service_invocation.partition_key(); - let cmd = Command::Invoke(Box::new(service_invocation)); + let record = commands::InvokeCommand::from(service_invocation); match append_invocation_reply_on { AppendInvocationReplyOn::Appended => { self.proposer .append_and_respond_asynchronously( - partition_key, - cmd, + record, replier, PartitionProcessorRpcResponse::Appended, ) @@ -71,7 +70,7 @@ impl<'a, TActuator: Actuator, TSchemas, TStorage> RpcHandler } AppendInvocationReplyOn::Submitted | AppendInvocationReplyOn::Output => { self.proposer - .handle_rpc_proposal_command(partition_key, cmd, request_id, replier) + .handle_rpc_proposal_command(record, request_id, replier) .await; } } @@ -87,7 +86,6 @@ mod tests { use crate::partition::rpc::MockActuator; use futures::FutureExt; use googletest::prelude::*; - use restate_test_util::let_assert; use std::future::ready; use test_log::test; @@ -95,20 +93,21 @@ mod tests { async fn reply_on_appended() { let mut proposer = MockActuator::new(); proposer - .expect_append_and_respond_asynchronously::() - .return_once_st(|_, cmd, _, _| { - let_assert!(Command::Invoke(service_invocation) = cmd); + .expect_append_and_respond_asynchronously::() + .return_once_st(|cmd, _, _| { + let service_invocation: ServiceInvocation = cmd.into(); + assert_that!( service_invocation, - points_to(all!( + all!( field!(ServiceInvocation.response_sink, none()), field!(ServiceInvocation.submit_notification_sink, none()), - )) + ) ); ready(()).boxed() }); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let (tx, _rx) = Reciprocal::mock(); @@ -130,21 +129,22 @@ mod tests { let request_id = PartitionProcessorRpcRequestId::new(); let mut proposer = MockActuator::new(); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() - .return_once_st(|_, cmd, req_id, _| { - let_assert!(Command::Invoke(service_invocation) = cmd); + .expect_handle_rpc_proposal_command::() + .return_once_st(|cmd, req_id, _| { + let service_invocation: ServiceInvocation = cmd.into(); + assert_that!( service_invocation, - points_to(all!( + all!( field!(ServiceInvocation.response_sink, none()), field!( ServiceInvocation.submit_notification_sink, some(eq(SubmitNotificationSink::Ingress { request_id: req_id })) ), - )) + ) ); ready(()).boxed() }); @@ -168,15 +168,16 @@ mod tests { let request_id = PartitionProcessorRpcRequestId::new(); let mut proposer = MockActuator::new(); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() - .return_once_st(|_, cmd, req_id, _| { - let_assert!(Command::Invoke(service_invocation) = cmd); + .expect_handle_rpc_proposal_command::() + .return_once_st(|cmd, req_id, _| { + let service_invocation: ServiceInvocation= cmd.into(); + assert_that!( service_invocation, - points_to(all!( + all!( field!( ServiceInvocation.response_sink, some(eq(ServiceInvocationResponseSink::Ingress { @@ -184,7 +185,7 @@ mod tests { })) ), field!(ServiceInvocation.submit_notification_sink, none()), - )) + ) ); ready(()).boxed() }); diff --git a/crates/worker/src/partition/rpc/append_invocation_response.rs b/crates/worker/src/partition/rpc/append_invocation_response.rs index 7f6842a644..657000a5fe 100644 --- a/crates/worker/src/partition/rpc/append_invocation_response.rs +++ b/crates/worker/src/partition/rpc/append_invocation_response.rs @@ -8,11 +8,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; -use restate_types::identifiers::WithPartitionKey; use restate_types::invocation::InvocationResponse; use restate_types::net::partition_processor::PartitionProcessorRpcResponse; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) invocation_response: InvocationResponse, @@ -33,8 +33,7 @@ impl<'a, TActuator: Actuator, TSchemas, TStorage> RpcHandler ) -> Result<(), Self::Error> { self.proposer .append_and_respond_asynchronously( - invocation_response.partition_key(), - Command::InvocationResponse(invocation_response), + commands::InvocationResponseCommand::from(invocation_response), replier, PartitionProcessorRpcResponse::Appended, ) diff --git a/crates/worker/src/partition/rpc/append_signal.rs b/crates/worker/src/partition/rpc/append_signal.rs index d8f08d0df6..7ad400673a 100644 --- a/crates/worker/src/partition/rpc/append_signal.rs +++ b/crates/worker/src/partition/rpc/append_signal.rs @@ -8,12 +8,13 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; -use restate_types::identifiers::{InvocationId, WithPartitionKey}; +use restate_types::identifiers::InvocationId; use restate_types::invocation::NotifySignalRequest; use restate_types::journal_v2::Signal; use restate_types::net::partition_processor::PartitionProcessorRpcResponse; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) invocation_id: InvocationId, @@ -36,8 +37,7 @@ impl<'a, TActuator: Actuator, TSchemas, TStorage> RpcHandler ) -> Result<(), Self::Error> { self.proposer .append_and_respond_asynchronously( - invocation_id.partition_key(), - Command::NotifySignal(NotifySignalRequest { + commands::NotifySignalCommand::from(NotifySignalRequest { invocation_id, signal, }), diff --git a/crates/worker/src/partition/rpc/cancel_invocation.rs b/crates/worker/src/partition/rpc/cancel_invocation.rs index d7829db624..49bd51eae1 100644 --- a/crates/worker/src/partition/rpc/cancel_invocation.rs +++ b/crates/worker/src/partition/rpc/cancel_invocation.rs @@ -8,14 +8,15 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; -use restate_types::identifiers::{InvocationId, WithPartitionKey}; +use restate_types::identifiers::InvocationId; use restate_types::invocation::{ IngressInvocationResponseSink, InvocationMutationResponseSink, InvocationTermination, TerminationFlavor, }; use restate_types::net::partition_processor::CancelInvocationRpcResponse; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) request_id: PartitionProcessorRpcRequestId, @@ -38,8 +39,7 @@ impl<'a, TActuator: Actuator, TSchemas, TStorage> RpcHandler ) -> Result<(), Self::Error> { self.proposer .handle_rpc_proposal_command( - invocation_id.partition_key(), - Command::TerminateInvocation(InvocationTermination { + commands::TerminateInvocationCommand::from(InvocationTermination { invocation_id, flavor: TerminationFlavor::Cancel, response_sink: Some(InvocationMutationResponseSink::Ingress( diff --git a/crates/worker/src/partition/rpc/get_invocation_output.rs b/crates/worker/src/partition/rpc/get_invocation_output.rs index f864239a6a..8e59b20fb6 100644 --- a/crates/worker/src/partition/rpc/get_invocation_output.rs +++ b/crates/worker/src/partition/rpc/get_invocation_output.rs @@ -8,10 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; use restate_storage_api::StorageError; use restate_storage_api::invocation_status_table::{InvocationStatus, ReadInvocationStatusTable}; -use restate_types::identifiers::WithPartitionKey; use restate_types::invocation; use restate_types::invocation::client::{InvocationOutput, InvocationOutputResponse}; use restate_types::invocation::{ @@ -20,7 +18,9 @@ use restate_types::invocation::{ use restate_types::net::partition_processor::{ GetInvocationOutputResponseMode, PartitionProcessorRpcError, PartitionProcessorRpcResponse, }; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) request_id: PartitionProcessorRpcRequestId, @@ -95,8 +95,7 @@ where self.proposer .handle_rpc_proposal_command( - invocation_query.partition_key(), - Command::AttachInvocation(AttachInvocationRequest { + commands::AttachInvocationCommand::from(AttachInvocationRequest { invocation_query, block_on_inflight: true, response_sink: ServiceInvocationResponseSink::Ingress { request_id }, diff --git a/crates/worker/src/partition/rpc/kill_invocation.rs b/crates/worker/src/partition/rpc/kill_invocation.rs index 4401199918..9c7a3f1a62 100644 --- a/crates/worker/src/partition/rpc/kill_invocation.rs +++ b/crates/worker/src/partition/rpc/kill_invocation.rs @@ -8,14 +8,15 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; -use restate_types::identifiers::{InvocationId, WithPartitionKey}; +use restate_types::identifiers::InvocationId; use restate_types::invocation::{ IngressInvocationResponseSink, InvocationMutationResponseSink, InvocationTermination, TerminationFlavor, }; use restate_types::net::partition_processor::KillInvocationRpcResponse; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) request_id: PartitionProcessorRpcRequestId, @@ -38,8 +39,7 @@ impl<'a, TActuator: Actuator, TSchemas, TStorage> RpcHandler ) -> Result<(), Self::Error> { self.proposer .handle_rpc_proposal_command( - invocation_id.partition_key(), - Command::TerminateInvocation(InvocationTermination { + commands::TerminateInvocationCommand::from(InvocationTermination { invocation_id, flavor: TerminationFlavor::Kill, response_sink: Some(InvocationMutationResponseSink::Ingress( diff --git a/crates/worker/src/partition/rpc/mod.rs b/crates/worker/src/partition/rpc/mod.rs index 31e1e9cfc7..874fd02930 100644 --- a/crates/worker/src/partition/rpc/mod.rs +++ b/crates/worker/src/partition/rpc/mod.rs @@ -27,26 +27,24 @@ use restate_core::network::{Oneshot, Reciprocal, TransportConnect}; use restate_storage_api::invocation_status_table::ReadInvocationStatusTable; use restate_storage_api::journal_table as journal_table_v1; use restate_storage_api::journal_table_v2::ReadJournalTable; -use restate_types::identifiers::{ - InvocationId, PartitionId, PartitionKey, PartitionProcessorRpcRequestId, -}; +use restate_types::identifiers::{InvocationId, PartitionId, PartitionProcessorRpcRequestId}; use restate_types::invocation::InvocationRequest; +use restate_types::logs::HasRecordKeys; use restate_types::net::partition_processor::{ AppendInvocationReplyOn, PartitionProcessorRpcError, PartitionProcessorRpcRequest, PartitionProcessorRpcRequestInner, PartitionProcessorRpcResponse, }; use restate_types::schema::deployment::DeploymentResolver; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::Command; use restate_worker_api::invoker::InvokerHandle; use crate::partition::leadership::LeadershipState; #[cfg_attr(test, mockall::automock)] pub(super) trait Actuator { - fn handle_rpc_proposal_command( + fn handle_rpc_proposal_command( &mut self, - partition_key: PartitionKey, - cmd: Command, + cmd: C, request_id: PartitionProcessorRpcRequestId, replier: Replier, ) -> impl Future; @@ -57,11 +55,11 @@ pub(super) trait Actuator { /// transitions, making this safe for fire-and-forget ingress commands (signals, invocation /// responses). fn append_and_respond_asynchronously< + C: Command + HasRecordKeys, O: 'static + Into + Send + Sync, >( &mut self, - partition_key: PartitionKey, - cmd: Command, + cmd: C, replier: Replier, success_response: O, ) -> impl Future; @@ -79,16 +77,17 @@ impl Actuator for LeadershipState where T: TransportConnect, { - async fn append_and_respond_asynchronously>( + async fn append_and_respond_asynchronously< + C: Command + HasRecordKeys, + O: Into, + >( &mut self, - partition_key: PartitionKey, - cmd: Command, + cmd: C, replier: Replier, on_proposed_response: O, ) { LeadershipState::append_and_respond_asynchronously( self, - partition_key, cmd, replier.0, on_proposed_response.into(), @@ -96,21 +95,13 @@ where .await } - async fn handle_rpc_proposal_command( + async fn handle_rpc_proposal_command( &mut self, - partition_key: PartitionKey, - cmd: Command, + cmd: C, request_id: PartitionProcessorRpcRequestId, replier: Replier, ) { - LeadershipState::handle_rpc_proposal_command( - self, - request_id, - replier.0, - partition_key, - cmd, - ) - .await + LeadershipState::handle_rpc_proposal_command(self, request_id, replier.0, cmd).await } fn notify_invoker_to_retry_now(&mut self, invocation_id: InvocationId) { diff --git a/crates/worker/src/partition/rpc/purge_invocation.rs b/crates/worker/src/partition/rpc/purge_invocation.rs index 291c268c3d..e1ff77358d 100644 --- a/crates/worker/src/partition/rpc/purge_invocation.rs +++ b/crates/worker/src/partition/rpc/purge_invocation.rs @@ -8,13 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; -use restate_types::identifiers::{InvocationId, WithPartitionKey}; +use restate_types::identifiers::InvocationId; use restate_types::invocation::{ IngressInvocationResponseSink, InvocationMutationResponseSink, PurgeInvocationRequest, }; use restate_types::net::partition_processor::PurgeInvocationRpcResponse; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) request_id: PartitionProcessorRpcRequestId, @@ -37,8 +38,7 @@ impl<'a, TActuator: Actuator, TSchemas, TStorage> RpcHandler ) -> Result<(), Self::Error> { self.proposer .handle_rpc_proposal_command( - invocation_id.partition_key(), - Command::PurgeInvocation(PurgeInvocationRequest { + commands::PurgeInvocationCommand::from(PurgeInvocationRequest { invocation_id, response_sink: Some(InvocationMutationResponseSink::Ingress( IngressInvocationResponseSink { request_id }, diff --git a/crates/worker/src/partition/rpc/purge_journal.rs b/crates/worker/src/partition/rpc/purge_journal.rs index 7630c28038..25b6337657 100644 --- a/crates/worker/src/partition/rpc/purge_journal.rs +++ b/crates/worker/src/partition/rpc/purge_journal.rs @@ -8,13 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; -use restate_types::identifiers::{InvocationId, WithPartitionKey}; +use restate_types::identifiers::InvocationId; use restate_types::invocation::{ IngressInvocationResponseSink, InvocationMutationResponseSink, PurgeInvocationRequest, }; use restate_types::net::partition_processor::PurgeInvocationRpcResponse; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) request_id: PartitionProcessorRpcRequestId, @@ -37,8 +38,7 @@ impl<'a, TActuator: Actuator, TSchemas, TStorage> RpcHandler ) -> Result<(), Self::Error> { self.proposer .handle_rpc_proposal_command( - invocation_id.partition_key(), - Command::PurgeJournal(PurgeInvocationRequest { + commands::PurgeJournalCommand::from(PurgeInvocationRequest { invocation_id, response_sink: Some(InvocationMutationResponseSink::Ingress( IngressInvocationResponseSink { request_id }, diff --git a/crates/worker/src/partition/rpc/restart_as_new_invocation.rs b/crates/worker/src/partition/rpc/restart_as_new_invocation.rs index b4054152a4..b593175c84 100644 --- a/crates/worker/src/partition/rpc/restart_as_new_invocation.rs +++ b/crates/worker/src/partition/rpc/restart_as_new_invocation.rs @@ -8,9 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; use assert2::let_assert; use opentelemetry::trace::Span; + use restate_service_protocol::codec::ProtobufRawEntryCodec as OldProtocolEntryCodec; use restate_service_protocol_v4::entry_codec::ServiceProtocolV4Codec; use restate_storage_api::invocation_status_table::{InvocationStatus, ReadInvocationStatusTable}; @@ -28,6 +28,9 @@ use restate_types::journal_v2::{CommandMetadata, EntryMetadata, EntryType}; use restate_types::net::partition_processor::RestartAsNewInvocationRpcResponse; use restate_types::service_protocol::ServiceProtocolVersion; use restate_types::{invocation, journal_v2}; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) request_id: PartitionProcessorRpcRequestId, @@ -240,14 +243,13 @@ where ); // Propose the usual Invoke command - let cmd = Command::Invoke(Box::new(service_invocation)); + let record = commands::InvokeCommand::from(service_invocation); // Propose and done // This path should be no longer needed once we switch to the journal v2 by default. self.proposer .append_and_respond_asynchronously( - invocation_id.partition_key(), - cmd, + record, replier, RestartAsNewInvocationRpcResponse::Ok { new_invocation_id }, ) @@ -372,7 +374,7 @@ where } // Pass the ball to the state machine, the PP will reply to the RPC request. - let cmd = Command::RestartAsNewInvocation(RestartAsNewInvocationRequest { + let record = commands::RestartAsNewInvocationCommand::from(RestartAsNewInvocationRequest { invocation_id, new_invocation_id, copy_prefix_up_to_index_included, @@ -382,7 +384,7 @@ where )), }); self.proposer - .handle_rpc_proposal_command(invocation_id.partition_key(), cmd, request_id, replier) + .handle_rpc_proposal_command(record, request_id, replier) .await; Ok(()) @@ -394,7 +396,6 @@ mod tests { use std::collections::HashMap; use std::future::ready; - use assert2::let_assert; use bytes::Bytes; use futures::{FutureExt, Stream, StreamExt, stream}; use googletest::prelude::*; @@ -728,12 +729,12 @@ mod tests { let headers_clone = vec![Header::new("key", "value")]; let payload_clone = payload.clone(); proposer - .expect_append_and_respond_asynchronously::() - .return_once_st(move |_, cmd, _, response| { - let_assert!(Command::Invoke(service_invocation) = cmd); + .expect_append_and_respond_asynchronously::() + .return_once_st(move |cmd, _, response| { + let service_invocation: ServiceInvocation = cmd.into(); assert_that!( service_invocation, - points_to(all!( + all!( field!(ServiceInvocation.invocation_id, not(eq(old_invocation_id))), field!(ServiceInvocation.argument, eq(payload_clone)), field!(ServiceInvocation.headers, eq(headers_clone)), @@ -743,7 +744,7 @@ mod tests { ), field!(ServiceInvocation.response_sink, none()), field!(ServiceInvocation.submit_notification_sink, none()), - )) + ) ); assert_that!( response, @@ -754,7 +755,7 @@ mod tests { ready(()).boxed() }); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage::new_with_input_v1( @@ -796,10 +797,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); // Completed with no pinned deployment triggers v1 workaround @@ -851,10 +852,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let status = InvocationStatus::Completed(CompletedInvocation { @@ -898,10 +899,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let status = InvocationStatus::Completed(CompletedInvocation { @@ -952,10 +953,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); // Completed with journal length>0 but no v1 input present in storage @@ -1008,21 +1009,20 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() - .return_once_st(move |_, cmd, _, _| { + .expect_handle_rpc_proposal_command::() + .return_once_st(move |cmd, _, _| { + let request: RestartAsNewInvocationRequest = cmd.into(); assert_that!( - cmd, - pat!(Command::RestartAsNewInvocation(pat!( - RestartAsNewInvocationRequest { - copy_prefix_up_to_index_included: eq(0), - patch_deployment_id: none() - } - ))) + request, + pat!(RestartAsNewInvocationRequest { + copy_prefix_up_to_index_included: eq(0), + patch_deployment_id: none() + }) ); ready(()).boxed() }); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); let (tx, rx) = Reciprocal::mock(); @@ -1070,21 +1070,20 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() - .return_once_st(move |_, cmd, _, _| { + .expect_handle_rpc_proposal_command::() + .return_once_st(move |cmd, _, _| { + let request: RestartAsNewInvocationRequest =cmd.into(); assert_that!( - cmd, - pat!(Command::RestartAsNewInvocation(pat!( - RestartAsNewInvocationRequest { - copy_prefix_up_to_index_included: eq(0), - patch_deployment_id: none() - } - ))) + request, + pat!(RestartAsNewInvocationRequest { + copy_prefix_up_to_index_included: eq(0), + patch_deployment_id: none() + }) ); ready(()).boxed() }); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); let (tx, rx) = Reciprocal::mock(); @@ -1115,10 +1114,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage::new_without_journal(invocation_id, Default::default()); @@ -1152,10 +1151,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage::new_with_input_v1( @@ -1211,10 +1210,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage::new_without_journal(invocation_id, status); @@ -1260,10 +1259,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage::new_without_journal(invocation_id, status); @@ -1344,21 +1343,20 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() - .return_once_st(move |_, cmd, _, _| { + .expect_handle_rpc_proposal_command::() + .return_once_st(move |cmd, _, _| { + let request: RestartAsNewInvocationRequest = cmd.into(); assert_that!( - cmd, - pat!(Command::RestartAsNewInvocation(pat!( - RestartAsNewInvocationRequest { - copy_prefix_up_to_index_included: eq(1), - patch_deployment_id: none() - } - ))) + request, + pat!(RestartAsNewInvocationRequest { + copy_prefix_up_to_index_included: eq(1), + patch_deployment_id: none() + }) ); ready(()).boxed() }); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); let (tx, rx) = Reciprocal::mock(); @@ -1408,21 +1406,20 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() - .return_once_st(move |_, cmd, _, _| { + .expect_handle_rpc_proposal_command::() + .return_once_st(move |cmd, _, _| { + let request: RestartAsNewInvocationRequest = cmd.into(); assert_that!( - cmd, - pat!(Command::RestartAsNewInvocation(pat!( - RestartAsNewInvocationRequest { - copy_prefix_up_to_index_included: eq(1), - patch_deployment_id: some(eq(latest_id)) - } - ))) + request, + pat!(RestartAsNewInvocationRequest { + copy_prefix_up_to_index_included: eq(1), + patch_deployment_id: some(eq(latest_id)) + }) ); ready(()).boxed() }); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); let (tx, rx) = Reciprocal::mock(); @@ -1466,10 +1463,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); let (tx, rx) = Reciprocal::mock(); @@ -1517,7 +1514,7 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let (tx, rx) = Reciprocal::mock(); @@ -1558,7 +1555,7 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let (tx, rx) = Reciprocal::mock(); @@ -1599,7 +1596,7 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let (tx, rx) = Reciprocal::mock(); @@ -1638,10 +1635,10 @@ mod tests { .expect_partition_id() .return_const(PartitionId::from(0)); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage::new_without_journal(invocation_id, Default::default()); diff --git a/crates/worker/src/partition/rpc/resume_invocation.rs b/crates/worker/src/partition/rpc/resume_invocation.rs index f0110c6552..73ea32695f 100644 --- a/crates/worker/src/partition/rpc/resume_invocation.rs +++ b/crates/worker/src/partition/rpc/resume_invocation.rs @@ -8,17 +8,19 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; use restate_storage_api::invocation_status_table::{ InFlightInvocationMetadata, InvocationStatus, ReadInvocationStatusTable, }; -use restate_types::identifiers::{InvocationId, WithPartitionKey}; +use restate_types::identifiers::InvocationId; use restate_types::invocation::client::PatchDeploymentId; use restate_types::invocation::{ IngressInvocationResponseSink, InvocationMutationResponseSink, ResumeInvocationRequest, }; use restate_types::net::partition_processor::ResumeInvocationRpcResponse; use restate_types::schema::deployment::DeploymentResolver; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) request_id: PartitionProcessorRpcRequestId, @@ -127,8 +129,7 @@ where // We need to propose the message, PP will deal with invoking this back self.proposer .handle_rpc_proposal_command( - invocation_id.partition_key(), - Command::ResumeInvocation(ResumeInvocationRequest { + commands::ResumeInvocationCommand::from(ResumeInvocationRequest { invocation_id, update_pinned_deployment_id, response_sink: Some(InvocationMutationResponseSink::Ingress( @@ -165,7 +166,6 @@ mod tests { use super::*; use crate::partition::rpc::MockActuator; - use assert2::let_assert; use futures::FutureExt; use googletest::prelude::*; use restate_storage_api::invocation_status_table::{ @@ -251,9 +251,9 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() - .return_once_st(move |_, cmd, request_id, replier| { - let_assert!(Command::ResumeInvocation(resume_invocation_request) = cmd); + .expect_handle_rpc_proposal_command::() + .return_once_st(move |cmd, request_id, replier| { + let resume_invocation_request: ResumeInvocationRequest = cmd.into(); assert_that!( resume_invocation_request, all!( @@ -301,7 +301,7 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage { @@ -335,7 +335,7 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage { @@ -381,7 +381,7 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage { @@ -434,7 +434,7 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let metadata = InFlightInvocationMetadata { @@ -510,17 +510,18 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() - .return_once_st(move |_, cmd, request_id, replier| { + .expect_handle_rpc_proposal_command::() + .return_once_st(move |cmd, request_id, replier| { + let resume_invocation_request: ResumeInvocationRequest = cmd.into(); assert_that!( - cmd, - pat!(Command::ResumeInvocation(pat!(ResumeInvocationRequest { + resume_invocation_request, + pat!(ResumeInvocationRequest { invocation_id: eq(invocation_id), update_pinned_deployment_id: some(eq(expected_deployment_id)), response_sink: some(eq(InvocationMutationResponseSink::Ingress( IngressInvocationResponseSink { request_id } ))) - }))) + }) ); replier.send(ResumeInvocationRpcResponse::Ok); ready(()).boxed() @@ -579,7 +580,7 @@ mod tests { .expect_partition_id() .return_const(PartitionId::from(0)); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage {