diff --git a/crates/worker/src/partition/leadership/leader_state.rs b/crates/worker/src/partition/leadership/leader_state.rs index 69d47a4724..bccd5b7969 100644 --- a/crates/worker/src/partition/leadership/leader_state.rs +++ b/crates/worker/src/partition/leadership/leader_state.rs @@ -16,7 +16,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, ready}; -use bytes::BytesMut; use futures::future::OptionFuture; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt, stream}; @@ -33,12 +32,10 @@ use restate_limiter::RuleBook; use restate_partition_store::PartitionDb; use restate_storage_api::vqueue_table::scheduler::SchedulerDecisionsCommand; use restate_types::config::Configuration; -use restate_types::identifiers::{ - LeaderEpoch, PartitionId, PartitionKey, PartitionProcessorRpcRequestId, WithPartitionKey, -}; +use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionProcessorRpcRequestId}; use restate_types::invocation::PurgeInvocationRequest; use restate_types::invocation::client::{InvocationOutput, SubmittedInvocationNotification}; -use restate_types::logs::Keys; +use restate_types::logs::{BodyWithKeys, Keys}; use restate_types::net::ingest::IngestRecord; use restate_types::net::partition_processor::{ PartitionProcessorRpcError, PartitionProcessorRpcResponse, @@ -48,9 +45,8 @@ use restate_types::{RESTATE_VERSION_1_7_0, SemanticRestateVersion, Version, Vers use restate_vqueues::VQueueEvent; use restate_vqueues::scheduler::Decisions; use restate_vqueues::{SchedulerService, VQueuesMeta}; -use restate_wal_protocol::Command; use restate_wal_protocol::control::UpsertSchemaCommand; -use restate_wal_protocol::v1::UpsertRuleBookCommandWrapper; +use restate_wal_protocol::v2::{Command, CommandWithKeys, commands}; use restate_worker_api::invoker::InvokerHandle; use restate_worker_api::resources::ReservedResources; use restate_worker_api::{SchedulerStatusEntry, UserLimitCounterEntry}; @@ -342,7 +338,6 @@ impl LeaderState { &mut self, action_effects: impl IntoIterator, ) -> Result<(), Error> { - let mut arena = BytesMut::with_capacity(128 * 1024); for effect in action_effects { match effect { ActionEffect::Scheduler(decisions) => { @@ -362,17 +357,11 @@ impl LeaderState { .into_iter() // one command per partition key .map(|(partition_key, group)| { - let decisions = SchedulerDecisionsCommand { - qids: group.collect(), - }; - - arena.reserve(decisions.encoded_len()); - // safe to unwrap because we reserved enough space - decisions.bilrost_encode(&mut arena).unwrap(); - - ( - partition_key, - Command::VQSchedulerDecisions(arena.split().freeze()), + BodyWithKeys::new( + SchedulerDecisionsCommand { + qids: group.collect(), + }, + Keys::Single(partition_key), ) // an action goes into command, and resources are popped }) @@ -387,71 +376,64 @@ impl LeaderState { // based on configuration, whether to consider partition-local durability in // the replica-set as a sufficient source of durability, or only snapshots. self.self_proposer - .self_propose( - self.partition_key_range.start(), - Command::UpdatePartitionDurability(partition_durability), - ) + .self_propose(BodyWithKeys::new( + partition_durability, + Keys::RangeInclusive(self.partition_key_range.into()), + )) .await?; } ActionEffect::Invoker(invoker_effect) => { self.self_proposer - .self_propose( - invoker_effect.invocation_id.partition_key(), - Command::InvokerEffect(invoker_effect), - ) + .self_propose(commands::InvokerEffectCommand::from(*invoker_effect)) .await?; } ActionEffect::Shuffle(outbox_truncation) => { // todo: Until we support partition splits we need to get rid of outboxes or introduce partition // specific destination messages that are identified by a partition_id self.self_proposer - .self_propose( - self.partition_key_range.start(), - Command::TruncateOutbox(outbox_truncation.index()), - ) + .self_propose(commands::TruncateOutboxCommand { + partition_key_range: Keys::RangeInclusive( + self.partition_key_range.into(), + ), + index: outbox_truncation.index(), + }) .await?; } ActionEffect::Timer(timer) => { self.self_proposer - .self_propose(timer.invocation_id().partition_key(), Command::Timer(timer)) + .self_propose(commands::TimerCommand::from(timer)) .await?; } - ActionEffect::Cleaner(effect) => { - let (invocation_id, cmd) = match effect { - CleanerEffect::PurgeJournal(invocation_id) => ( + ActionEffect::Cleaner(effect) => match effect { + CleanerEffect::PurgeJournal(invocation_id) => { + let record = commands::PurgeJournalCommand::from(PurgeInvocationRequest { invocation_id, - Command::PurgeJournal(PurgeInvocationRequest { - invocation_id, - response_sink: None, - }), - ), - CleanerEffect::PurgeInvocation(invocation_id) => ( - invocation_id, - Command::PurgeInvocation(PurgeInvocationRequest { + response_sink: None, + }); + + self.self_proposer.self_propose(record).await?; + } + CleanerEffect::PurgeInvocation(invocation_id) => { + let record = + commands::PurgeInvocationCommand::from(PurgeInvocationRequest { invocation_id, response_sink: None, - }), - ), - }; + }); - self.self_proposer - .self_propose(invocation_id.partition_key(), cmd) - .await?; - } + self.self_proposer.self_propose(record).await?; + } + }, ActionEffect::UpsertSchema(schema) => { if SemanticRestateVersion::current() .is_equal_or_newer_than(&RESTATE_VERSION_1_7_0) { self.self_proposer - .self_propose( - self.partition_key_range.start(), - Command::UpsertSchema(UpsertSchemaCommand { - partition_key_range: Keys::RangeInclusive( - self.partition_key_range.into(), - ), - schema, - }), - ) + .self_propose(UpsertSchemaCommand { + partition_key_range: Keys::RangeInclusive( + self.partition_key_range.into(), + ), + schema, + }) .await?; } } @@ -463,21 +445,11 @@ impl LeaderState { .experimental .is_vqueues_enabled() { - let cmd = - restate_wal_protocol::control::UpsertRuleBookCommand { rule_book }; - - arena.reserve(cmd.encoded_len()); - // safe to unwrap because we reserved enough space - cmd.bilrost_encode(&mut arena).unwrap(); - self.self_proposer - .self_propose( - self.partition_key_range.start(), - Command::UpsertRuleBook(UpsertRuleBookCommandWrapper { - partition_key_range: self.partition_key_range, - command: arena.split().freeze(), - }), - ) + .self_propose(BodyWithKeys::new( + restate_wal_protocol::control::UpsertRuleBookCommand { rule_book }, + Keys::RangeInclusive(self.partition_key_range.into()), + )) .await?; } } @@ -490,14 +462,13 @@ impl LeaderState { Ok(()) } - pub async fn handle_rpc_proposal_command( + pub async fn handle_rpc_proposal_command( &mut self, request_id: PartitionProcessorRpcRequestId, reciprocal: Reciprocal< Oneshot>, >, - partition_key: PartitionKey, - cmd: Command, + cmd: impl CommandWithKeys, ) { match self.awaiting_rpc_actions.entry(request_id) { Entry::Occupied(mut o) => { @@ -511,7 +482,7 @@ impl LeaderState { } Entry::Vacant(v) => { // In this case, no one proposed this command yet, let's try to propose it - if let Err(e) = self.self_proposer.self_propose(partition_key, cmd).await { + if let Err(e) = self.self_proposer.self_propose(cmd).await { reciprocal.send(Err(PartitionProcessorRpcError::Internal(e.to_string()))); } else { v.insert(reciprocal); @@ -525,18 +496,13 @@ impl LeaderState { /// Records appended this way are never filtered by the dedup mechanism during leadership /// transitions, making this safe for fire-and-forget ingress commands (signals, invocation /// responses). - pub async fn append_and_respond_asynchronously( + pub async fn append_and_respond_asynchronously( &mut self, - partition_key: PartitionKey, - cmd: Command, + cmd: impl CommandWithKeys, reciprocal: RpcReciprocal, success_response: PartitionProcessorRpcResponse, ) { - match self - .self_proposer - .append_with_notification(partition_key, cmd) - .await - { + match self.self_proposer.append_with_notification(cmd).await { Ok(commit_token) => { self.awaiting_rpc_self_propose.push(SelfAppendFuture::new( commit_token, diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index f0314035dc..e0878a7427 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -20,6 +20,8 @@ use std::sync::Arc; use std::time::Duration; use futures::{StreamExt, TryStreamExt}; +use restate_wal_protocol::Envelope; +use restate_wal_protocol::v2::Command; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, instrument, warn}; @@ -47,10 +49,10 @@ use restate_timer::TokioClock; use restate_types::cluster::cluster_state::RunMode; use restate_types::config::Configuration; use restate_types::errors::GenericError; +use restate_types::identifiers::PartitionProcessorRpcRequestId; use restate_types::identifiers::{LeaderEpoch, PartitionId}; -use restate_types::identifiers::{PartitionKey, PartitionProcessorRpcRequestId}; use restate_types::live::LiveLoadExt; -use restate_types::logs::Keys; +use restate_types::logs::{HasRecordKeys, Keys}; use restate_types::message::MessageIndex; use restate_types::net::ingest::IngestRecord; use restate_types::net::partition_processor::{ @@ -70,7 +72,6 @@ use restate_wal_protocol::control::{ AnnounceLeaderCommand, UpdatePartitionDurabilityCommand, VersionBarrierCommand, }; use restate_wal_protocol::timer::TimerKeyValue; -use restate_wal_protocol::{Command, Envelope}; use restate_worker_api::invoker::capacity::InvokerCapacity; use restate_worker_api::invoker::{Effect, InvokerHandle}; use restate_worker_api::{ @@ -267,14 +268,14 @@ where ) -> Result<(), Error> { let leader_epoch = leadership_info.leader_epoch; - let announce_leader = Command::AnnounceLeader(Box::new(AnnounceLeaderCommand { + let announce_leader = AnnounceLeaderCommand { node_id: my_node_id(), leader_epoch, epoch_version: Some(leadership_info.version), partition_key_range: self.partition.key_range, current_config: Some(leadership_info.current_config), next_config: leadership_info.next_config, - })); + }; let mut self_proposer = SelfProposer::new( self.partition.log_id(), @@ -282,9 +283,7 @@ where &self.bifrost, )?; - self_proposer - .self_propose(self.partition.key_range.start(), announce_leader) - .await?; + self_proposer.self_propose(announce_leader).await?; self.state = State::Candidate { leader_epoch, @@ -545,16 +544,11 @@ where }) { self_proposer - .self_propose( - self.partition.key_range.start(), - Command::VersionBarrier(VersionBarrierCommand { - version: forced_min_restate_version.clone(), - partition_key_range: Keys::RangeInclusive( - self.partition.key_range.into(), - ), - human_reason: Some("Force min Restate version".to_owned()), - }), - ) + .self_propose(VersionBarrierCommand { + version: forced_min_restate_version.clone(), + partition_key_range: Keys::RangeInclusive(self.partition.key_range.into()), + human_reason: Some("Force min Restate version".to_owned()), + }) .await?; min_restate_version = min_restate_version.max(forced_min_restate_version); @@ -565,16 +559,11 @@ where && RESTATE_VERSION_1_6_0.is_newer_than(&min_restate_version) { self_proposer - .self_propose( - self.partition.key_range.start(), - Command::VersionBarrier(VersionBarrierCommand { - version: RESTATE_VERSION_1_6_0.clone(), - partition_key_range: Keys::RangeInclusive( - self.partition.key_range.into(), - ), - human_reason: Some("Enable journal v2 by default".to_owned()), - }), - ) + .self_propose(VersionBarrierCommand { + version: RESTATE_VERSION_1_6_0.clone(), + partition_key_range: Keys::RangeInclusive(self.partition.key_range.into()), + human_reason: Some("Enable journal v2 by default".to_owned()), + }) .await?; } @@ -759,14 +748,13 @@ impl LeadershipState { } } - pub async fn handle_rpc_proposal_command( + pub async fn handle_rpc_proposal_command( &mut self, request_id: PartitionProcessorRpcRequestId, reciprocal: Reciprocal< Oneshot>, >, - partition_key: PartitionKey, - cmd: Command, + cmd: C, ) { match &mut self.state { State::Follower | State::Candidate { .. } => { @@ -777,17 +765,16 @@ impl LeadershipState { } State::Leader(leader_state) => { leader_state - .handle_rpc_proposal_command(request_id, reciprocal, partition_key, cmd) + .handle_rpc_proposal_command(request_id, reciprocal, cmd) .await; } } } /// Append a command to Bifrost without dedup information, responding on Bifrost commit. - pub async fn append_and_respond_asynchronously( + pub async fn append_and_respond_asynchronously( &mut self, - partition_key: PartitionKey, - cmd: Command, + cmd: C, reciprocal: Reciprocal< Oneshot>, >, @@ -799,12 +786,7 @@ impl LeadershipState { )), State::Leader(leader_state) => { leader_state - .append_and_respond_asynchronously( - partition_key, - cmd, - reciprocal, - success_response, - ) + .append_and_respond_asynchronously(cmd, reciprocal, success_response) .await; } } @@ -879,7 +861,6 @@ mod tests { use crate::partition::leadership::{LeadershipState, State}; use crate::partition_processor_manager::PartitionLeaderHandlesRegistry; use crate::rule_book_cache::RuleBookCacheHandle; - use assert2::let_assert; use restate_bifrost::Bifrost; use restate_core::partitions::PartitionRouting; use restate_core::{TaskCenter, TestCoreEnv}; @@ -895,8 +876,7 @@ mod tests { use restate_types::sharding::KeyRange; use restate_types::{GenerationalNodeId, Version}; use restate_vqueues::VQueuesMetaCache; - use restate_wal_protocol::Command; - use restate_wal_protocol::Envelope; + use restate_wal_protocol::v2::{CommandKind, Envelope, Raw, commands}; use restate_worker_api::invoker::capacity::InvokerCapacity; use std::num::NonZeroUsize; use std::sync::Arc; @@ -960,9 +940,11 @@ mod tests { .await .unwrap()?; - let envelope = record.try_decode::().unwrap()?; + let envelope = record.try_decode::>().unwrap()?; - let_assert!(Command::AnnounceLeader(announce_leader) = envelope.command); + assert_eq!(CommandKind::AnnounceLeader, envelope.kind()); + let envelope = envelope.into_typed::(); + let announce_leader = envelope.into_inner().unwrap(); assert_eq!(announce_leader.node_id, NODE_ID); assert_eq!(announce_leader.leader_epoch, leader_epoch); assert_eq!(announce_leader.partition_key_range, PARTITION_KEY_RANGE); diff --git a/crates/worker/src/partition/leadership/self_proposer.rs b/crates/worker/src/partition/leadership/self_proposer.rs index 636d5e7c42..8a810f789c 100644 --- a/crates/worker/src/partition/leadership/self_proposer.rs +++ b/crates/worker/src/partition/leadership/self_proposer.rs @@ -8,16 +8,16 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::sync::Arc; - use futures::never::Never; use restate_bifrost::{Bifrost, CommitToken, ErrorRecoveryStrategy, InputRecord}; -use restate_storage_api::deduplication_table::{DedupInformation, EpochSequenceNumber}; +use restate_storage_api::deduplication_table::EpochSequenceNumber; use restate_types::{ - identifiers::PartitionKey, logs::LogId, net::ingest::IngestRecord, time::NanosSinceEpoch, + logs::{BodyWithKeys, LogId}, + net::ingest::IngestRecord, + time::NanosSinceEpoch, }; -use restate_wal_protocol::{Command, Destination, Envelope, Header, Source}; +use restate_wal_protocol::v2::{Command, CommandWithKeys, Dedup, Envelope, Raw}; use crate::partition::leadership::Error; @@ -34,7 +34,7 @@ static BIFROST_APPENDER_TASK: &str = "bifrost-appender"; pub struct SelfProposer { epoch_sequence_number: EpochSequenceNumber, - bifrost_appender: restate_bifrost::AppenderHandle, + bifrost_appender: restate_bifrost::AppenderHandle>, } impl SelfProposer { @@ -72,33 +72,29 @@ impl SelfProposer { /// /// Note that self_propose_many will return an error if the number of commands is greater than /// the internal channel's max capacity. - pub async fn self_propose_many( - &mut self, - cmds: impl ExactSizeIterator, - ) -> Result<(), Error> { + pub async fn self_propose_many(&mut self, records: I) -> Result<(), Error> + where + I: ExactSizeIterator, + C: Command, + T: CommandWithKeys, + { // allocate a sequence number range for the batch let leader_epoch = self.epoch_sequence_number.leader_epoch; let start_seq = self.epoch_sequence_number.sequence_number; - let end_seq = start_seq + cmds.len() as u64; + let end_seq = start_seq + records.len() as u64; - let envelopes = cmds.enumerate().map(|(idx, (partition_key, cmd))| { - let esn = EpochSequenceNumber { - leader_epoch, - sequence_number: start_seq + idx as u64, - }; - let header = Header { - dest: Destination::Processor { - partition_key, - dedup: Some(DedupInformation::self_proposal(esn)), - }, - source: Source::Processor { - partition_id: None, - partition_key: Some(partition_key), + let envelopes = records.enumerate().map(|(idx, command)| { + let keys = command.keys(); + let envelope = Envelope::new( + Dedup::SelfProposal { leader_epoch, + seq: start_seq + idx as u64, }, - }; - Arc::new(Envelope::new(header, cmd)) + command.inner(), + ) + .into_raw(); + BodyWithKeys::new(envelope, keys) }); // Only blocks if background append is pushing back (queue full) @@ -117,21 +113,24 @@ impl SelfProposer { Ok(()) } - /// Self-propose a single command to Bifrost, attaching ESN-based dedup information. - pub async fn self_propose( + pub async fn self_propose( &mut self, - partition_key: PartitionKey, - cmd: Command, + command: impl CommandWithKeys, ) -> Result<(), Error> { - let envelope = Envelope::new(self.create_self_propose_header(partition_key), cmd); + let esn = self.next_esn(); + let dedup = Dedup::SelfProposal { + leader_epoch: esn.leader_epoch, + seq: esn.sequence_number, + }; + + let keys = command.keys(); + let envelope = Envelope::new(dedup, command.inner()); - // Only blocks if background append is pushing back (queue full) self.bifrost_appender .sender() - .enqueue(Arc::new(envelope)) + .enqueue(BodyWithKeys::new(envelope.into_raw(), keys)) .await .map_err(|e| Error::SelfProposer(e.to_string()))?; - Ok(()) } @@ -140,28 +139,17 @@ impl SelfProposer { /// Unlike [`Self::self_propose`], this does not attach an epoch sequence number. Records /// appended this way are never filtered by the dedup mechanism during leadership transitions, /// which makes them safe for fire-and-forget ingress commands (signals, invocation responses). - pub async fn append_with_notification( + pub async fn append_with_notification( &mut self, - partition_key: PartitionKey, - cmd: Command, + command: impl CommandWithKeys, ) -> Result { - let header = Header { - dest: Destination::Processor { - partition_key, - dedup: None, - }, - source: Source::Processor { - partition_id: None, - partition_key: Some(partition_key), - leader_epoch: self.epoch_sequence_number.leader_epoch, - }, - }; - let envelope = Envelope::new(header, cmd); + let keys = command.keys(); + let envelope = Envelope::new(Dedup::None, command.inner()).into_raw(); let commit_token = self .bifrost_appender .sender() - .enqueue_with_notification(Arc::new(envelope)) + .enqueue_with_notification(BodyWithKeys::new(envelope, keys)) .await .map_err(|e| Error::SelfProposer(e.to_string()))?; @@ -213,21 +201,10 @@ impl SelfProposer { .map_err(|e| Error::SelfProposer(e.to_string())) } - fn create_self_propose_header(&mut self, partition_key: PartitionKey) -> Header { + fn next_esn(&mut self) -> EpochSequenceNumber { let esn = self.epoch_sequence_number; self.epoch_sequence_number = self.epoch_sequence_number.next(); - - Header { - dest: Destination::Processor { - partition_key, - dedup: Some(DedupInformation::self_proposal(esn)), - }, - source: Source::Processor { - partition_id: None, - partition_key: Some(partition_key), - leader_epoch: self.epoch_sequence_number.leader_epoch, - }, - } + esn } /// Waits for self proposer to fail. This method will only complete with an error if the self diff --git a/crates/worker/src/partition/rpc/append_invocation.rs b/crates/worker/src/partition/rpc/append_invocation.rs index 50e8fa0360..c213a8c293 100644 --- a/crates/worker/src/partition/rpc/append_invocation.rs +++ b/crates/worker/src/partition/rpc/append_invocation.rs @@ -8,12 +8,13 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; -use restate_types::identifiers::WithPartitionKey; use restate_types::invocation; use restate_types::invocation::{ ServiceInvocation, ServiceInvocationResponseSink, SubmitNotificationSink, }; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) request_id: PartitionProcessorRpcRequestId, @@ -55,15 +56,13 @@ impl<'a, TActuator: Actuator, TSchemas, TStorage> RpcHandler } }; - let partition_key = service_invocation.partition_key(); - let cmd = Command::Invoke(Box::new(service_invocation)); + let record = commands::InvokeCommand::from(service_invocation); match append_invocation_reply_on { AppendInvocationReplyOn::Appended => { self.proposer .append_and_respond_asynchronously( - partition_key, - cmd, + record, replier, PartitionProcessorRpcResponse::Appended, ) @@ -71,7 +70,7 @@ impl<'a, TActuator: Actuator, TSchemas, TStorage> RpcHandler } AppendInvocationReplyOn::Submitted | AppendInvocationReplyOn::Output => { self.proposer - .handle_rpc_proposal_command(partition_key, cmd, request_id, replier) + .handle_rpc_proposal_command(record, request_id, replier) .await; } } @@ -87,7 +86,6 @@ mod tests { use crate::partition::rpc::MockActuator; use futures::FutureExt; use googletest::prelude::*; - use restate_test_util::let_assert; use std::future::ready; use test_log::test; @@ -95,20 +93,21 @@ mod tests { async fn reply_on_appended() { let mut proposer = MockActuator::new(); proposer - .expect_append_and_respond_asynchronously::() - .return_once_st(|_, cmd, _, _| { - let_assert!(Command::Invoke(service_invocation) = cmd); + .expect_append_and_respond_asynchronously::() + .return_once_st(|cmd, _, _| { + let service_invocation: ServiceInvocation = cmd.into(); + assert_that!( service_invocation, - points_to(all!( + all!( field!(ServiceInvocation.response_sink, none()), field!(ServiceInvocation.submit_notification_sink, none()), - )) + ) ); ready(()).boxed() }); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let (tx, _rx) = Reciprocal::mock(); @@ -130,21 +129,22 @@ mod tests { let request_id = PartitionProcessorRpcRequestId::new(); let mut proposer = MockActuator::new(); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() - .return_once_st(|_, cmd, req_id, _| { - let_assert!(Command::Invoke(service_invocation) = cmd); + .expect_handle_rpc_proposal_command::() + .return_once_st(|cmd, req_id, _| { + let service_invocation: ServiceInvocation = cmd.into(); + assert_that!( service_invocation, - points_to(all!( + all!( field!(ServiceInvocation.response_sink, none()), field!( ServiceInvocation.submit_notification_sink, some(eq(SubmitNotificationSink::Ingress { request_id: req_id })) ), - )) + ) ); ready(()).boxed() }); @@ -168,15 +168,16 @@ mod tests { let request_id = PartitionProcessorRpcRequestId::new(); let mut proposer = MockActuator::new(); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() - .return_once_st(|_, cmd, req_id, _| { - let_assert!(Command::Invoke(service_invocation) = cmd); + .expect_handle_rpc_proposal_command::() + .return_once_st(|cmd, req_id, _| { + let service_invocation: ServiceInvocation= cmd.into(); + assert_that!( service_invocation, - points_to(all!( + all!( field!( ServiceInvocation.response_sink, some(eq(ServiceInvocationResponseSink::Ingress { @@ -184,7 +185,7 @@ mod tests { })) ), field!(ServiceInvocation.submit_notification_sink, none()), - )) + ) ); ready(()).boxed() }); diff --git a/crates/worker/src/partition/rpc/append_invocation_response.rs b/crates/worker/src/partition/rpc/append_invocation_response.rs index 7f6842a644..657000a5fe 100644 --- a/crates/worker/src/partition/rpc/append_invocation_response.rs +++ b/crates/worker/src/partition/rpc/append_invocation_response.rs @@ -8,11 +8,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; -use restate_types::identifiers::WithPartitionKey; use restate_types::invocation::InvocationResponse; use restate_types::net::partition_processor::PartitionProcessorRpcResponse; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) invocation_response: InvocationResponse, @@ -33,8 +33,7 @@ impl<'a, TActuator: Actuator, TSchemas, TStorage> RpcHandler ) -> Result<(), Self::Error> { self.proposer .append_and_respond_asynchronously( - invocation_response.partition_key(), - Command::InvocationResponse(invocation_response), + commands::InvocationResponseCommand::from(invocation_response), replier, PartitionProcessorRpcResponse::Appended, ) diff --git a/crates/worker/src/partition/rpc/append_signal.rs b/crates/worker/src/partition/rpc/append_signal.rs index d8f08d0df6..7ad400673a 100644 --- a/crates/worker/src/partition/rpc/append_signal.rs +++ b/crates/worker/src/partition/rpc/append_signal.rs @@ -8,12 +8,13 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; -use restate_types::identifiers::{InvocationId, WithPartitionKey}; +use restate_types::identifiers::InvocationId; use restate_types::invocation::NotifySignalRequest; use restate_types::journal_v2::Signal; use restate_types::net::partition_processor::PartitionProcessorRpcResponse; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) invocation_id: InvocationId, @@ -36,8 +37,7 @@ impl<'a, TActuator: Actuator, TSchemas, TStorage> RpcHandler ) -> Result<(), Self::Error> { self.proposer .append_and_respond_asynchronously( - invocation_id.partition_key(), - Command::NotifySignal(NotifySignalRequest { + commands::NotifySignalCommand::from(NotifySignalRequest { invocation_id, signal, }), diff --git a/crates/worker/src/partition/rpc/cancel_invocation.rs b/crates/worker/src/partition/rpc/cancel_invocation.rs index d7829db624..49bd51eae1 100644 --- a/crates/worker/src/partition/rpc/cancel_invocation.rs +++ b/crates/worker/src/partition/rpc/cancel_invocation.rs @@ -8,14 +8,15 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; -use restate_types::identifiers::{InvocationId, WithPartitionKey}; +use restate_types::identifiers::InvocationId; use restate_types::invocation::{ IngressInvocationResponseSink, InvocationMutationResponseSink, InvocationTermination, TerminationFlavor, }; use restate_types::net::partition_processor::CancelInvocationRpcResponse; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) request_id: PartitionProcessorRpcRequestId, @@ -38,8 +39,7 @@ impl<'a, TActuator: Actuator, TSchemas, TStorage> RpcHandler ) -> Result<(), Self::Error> { self.proposer .handle_rpc_proposal_command( - invocation_id.partition_key(), - Command::TerminateInvocation(InvocationTermination { + commands::TerminateInvocationCommand::from(InvocationTermination { invocation_id, flavor: TerminationFlavor::Cancel, response_sink: Some(InvocationMutationResponseSink::Ingress( diff --git a/crates/worker/src/partition/rpc/get_invocation_output.rs b/crates/worker/src/partition/rpc/get_invocation_output.rs index f864239a6a..8e59b20fb6 100644 --- a/crates/worker/src/partition/rpc/get_invocation_output.rs +++ b/crates/worker/src/partition/rpc/get_invocation_output.rs @@ -8,10 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; use restate_storage_api::StorageError; use restate_storage_api::invocation_status_table::{InvocationStatus, ReadInvocationStatusTable}; -use restate_types::identifiers::WithPartitionKey; use restate_types::invocation; use restate_types::invocation::client::{InvocationOutput, InvocationOutputResponse}; use restate_types::invocation::{ @@ -20,7 +18,9 @@ use restate_types::invocation::{ use restate_types::net::partition_processor::{ GetInvocationOutputResponseMode, PartitionProcessorRpcError, PartitionProcessorRpcResponse, }; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) request_id: PartitionProcessorRpcRequestId, @@ -95,8 +95,7 @@ where self.proposer .handle_rpc_proposal_command( - invocation_query.partition_key(), - Command::AttachInvocation(AttachInvocationRequest { + commands::AttachInvocationCommand::from(AttachInvocationRequest { invocation_query, block_on_inflight: true, response_sink: ServiceInvocationResponseSink::Ingress { request_id }, diff --git a/crates/worker/src/partition/rpc/kill_invocation.rs b/crates/worker/src/partition/rpc/kill_invocation.rs index 4401199918..9c7a3f1a62 100644 --- a/crates/worker/src/partition/rpc/kill_invocation.rs +++ b/crates/worker/src/partition/rpc/kill_invocation.rs @@ -8,14 +8,15 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; -use restate_types::identifiers::{InvocationId, WithPartitionKey}; +use restate_types::identifiers::InvocationId; use restate_types::invocation::{ IngressInvocationResponseSink, InvocationMutationResponseSink, InvocationTermination, TerminationFlavor, }; use restate_types::net::partition_processor::KillInvocationRpcResponse; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) request_id: PartitionProcessorRpcRequestId, @@ -38,8 +39,7 @@ impl<'a, TActuator: Actuator, TSchemas, TStorage> RpcHandler ) -> Result<(), Self::Error> { self.proposer .handle_rpc_proposal_command( - invocation_id.partition_key(), - Command::TerminateInvocation(InvocationTermination { + commands::TerminateInvocationCommand::from(InvocationTermination { invocation_id, flavor: TerminationFlavor::Kill, response_sink: Some(InvocationMutationResponseSink::Ingress( diff --git a/crates/worker/src/partition/rpc/mod.rs b/crates/worker/src/partition/rpc/mod.rs index 31e1e9cfc7..874fd02930 100644 --- a/crates/worker/src/partition/rpc/mod.rs +++ b/crates/worker/src/partition/rpc/mod.rs @@ -27,26 +27,24 @@ use restate_core::network::{Oneshot, Reciprocal, TransportConnect}; use restate_storage_api::invocation_status_table::ReadInvocationStatusTable; use restate_storage_api::journal_table as journal_table_v1; use restate_storage_api::journal_table_v2::ReadJournalTable; -use restate_types::identifiers::{ - InvocationId, PartitionId, PartitionKey, PartitionProcessorRpcRequestId, -}; +use restate_types::identifiers::{InvocationId, PartitionId, PartitionProcessorRpcRequestId}; use restate_types::invocation::InvocationRequest; +use restate_types::logs::HasRecordKeys; use restate_types::net::partition_processor::{ AppendInvocationReplyOn, PartitionProcessorRpcError, PartitionProcessorRpcRequest, PartitionProcessorRpcRequestInner, PartitionProcessorRpcResponse, }; use restate_types::schema::deployment::DeploymentResolver; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::Command; use restate_worker_api::invoker::InvokerHandle; use crate::partition::leadership::LeadershipState; #[cfg_attr(test, mockall::automock)] pub(super) trait Actuator { - fn handle_rpc_proposal_command( + fn handle_rpc_proposal_command( &mut self, - partition_key: PartitionKey, - cmd: Command, + cmd: C, request_id: PartitionProcessorRpcRequestId, replier: Replier, ) -> impl Future; @@ -57,11 +55,11 @@ pub(super) trait Actuator { /// transitions, making this safe for fire-and-forget ingress commands (signals, invocation /// responses). fn append_and_respond_asynchronously< + C: Command + HasRecordKeys, O: 'static + Into + Send + Sync, >( &mut self, - partition_key: PartitionKey, - cmd: Command, + cmd: C, replier: Replier, success_response: O, ) -> impl Future; @@ -79,16 +77,17 @@ impl Actuator for LeadershipState where T: TransportConnect, { - async fn append_and_respond_asynchronously>( + async fn append_and_respond_asynchronously< + C: Command + HasRecordKeys, + O: Into, + >( &mut self, - partition_key: PartitionKey, - cmd: Command, + cmd: C, replier: Replier, on_proposed_response: O, ) { LeadershipState::append_and_respond_asynchronously( self, - partition_key, cmd, replier.0, on_proposed_response.into(), @@ -96,21 +95,13 @@ where .await } - async fn handle_rpc_proposal_command( + async fn handle_rpc_proposal_command( &mut self, - partition_key: PartitionKey, - cmd: Command, + cmd: C, request_id: PartitionProcessorRpcRequestId, replier: Replier, ) { - LeadershipState::handle_rpc_proposal_command( - self, - request_id, - replier.0, - partition_key, - cmd, - ) - .await + LeadershipState::handle_rpc_proposal_command(self, request_id, replier.0, cmd).await } fn notify_invoker_to_retry_now(&mut self, invocation_id: InvocationId) { diff --git a/crates/worker/src/partition/rpc/purge_invocation.rs b/crates/worker/src/partition/rpc/purge_invocation.rs index 291c268c3d..e1ff77358d 100644 --- a/crates/worker/src/partition/rpc/purge_invocation.rs +++ b/crates/worker/src/partition/rpc/purge_invocation.rs @@ -8,13 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; -use restate_types::identifiers::{InvocationId, WithPartitionKey}; +use restate_types::identifiers::InvocationId; use restate_types::invocation::{ IngressInvocationResponseSink, InvocationMutationResponseSink, PurgeInvocationRequest, }; use restate_types::net::partition_processor::PurgeInvocationRpcResponse; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) request_id: PartitionProcessorRpcRequestId, @@ -37,8 +38,7 @@ impl<'a, TActuator: Actuator, TSchemas, TStorage> RpcHandler ) -> Result<(), Self::Error> { self.proposer .handle_rpc_proposal_command( - invocation_id.partition_key(), - Command::PurgeInvocation(PurgeInvocationRequest { + commands::PurgeInvocationCommand::from(PurgeInvocationRequest { invocation_id, response_sink: Some(InvocationMutationResponseSink::Ingress( IngressInvocationResponseSink { request_id }, diff --git a/crates/worker/src/partition/rpc/purge_journal.rs b/crates/worker/src/partition/rpc/purge_journal.rs index 7630c28038..25b6337657 100644 --- a/crates/worker/src/partition/rpc/purge_journal.rs +++ b/crates/worker/src/partition/rpc/purge_journal.rs @@ -8,13 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; -use restate_types::identifiers::{InvocationId, WithPartitionKey}; +use restate_types::identifiers::InvocationId; use restate_types::invocation::{ IngressInvocationResponseSink, InvocationMutationResponseSink, PurgeInvocationRequest, }; use restate_types::net::partition_processor::PurgeInvocationRpcResponse; -use restate_wal_protocol::Command; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) request_id: PartitionProcessorRpcRequestId, @@ -37,8 +38,7 @@ impl<'a, TActuator: Actuator, TSchemas, TStorage> RpcHandler ) -> Result<(), Self::Error> { self.proposer .handle_rpc_proposal_command( - invocation_id.partition_key(), - Command::PurgeJournal(PurgeInvocationRequest { + commands::PurgeJournalCommand::from(PurgeInvocationRequest { invocation_id, response_sink: Some(InvocationMutationResponseSink::Ingress( IngressInvocationResponseSink { request_id }, diff --git a/crates/worker/src/partition/rpc/restart_as_new_invocation.rs b/crates/worker/src/partition/rpc/restart_as_new_invocation.rs index b4054152a4..b593175c84 100644 --- a/crates/worker/src/partition/rpc/restart_as_new_invocation.rs +++ b/crates/worker/src/partition/rpc/restart_as_new_invocation.rs @@ -8,9 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; use assert2::let_assert; use opentelemetry::trace::Span; + use restate_service_protocol::codec::ProtobufRawEntryCodec as OldProtocolEntryCodec; use restate_service_protocol_v4::entry_codec::ServiceProtocolV4Codec; use restate_storage_api::invocation_status_table::{InvocationStatus, ReadInvocationStatusTable}; @@ -28,6 +28,9 @@ use restate_types::journal_v2::{CommandMetadata, EntryMetadata, EntryType}; use restate_types::net::partition_processor::RestartAsNewInvocationRpcResponse; use restate_types::service_protocol::ServiceProtocolVersion; use restate_types::{invocation, journal_v2}; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) request_id: PartitionProcessorRpcRequestId, @@ -240,14 +243,13 @@ where ); // Propose the usual Invoke command - let cmd = Command::Invoke(Box::new(service_invocation)); + let record = commands::InvokeCommand::from(service_invocation); // Propose and done // This path should be no longer needed once we switch to the journal v2 by default. self.proposer .append_and_respond_asynchronously( - invocation_id.partition_key(), - cmd, + record, replier, RestartAsNewInvocationRpcResponse::Ok { new_invocation_id }, ) @@ -372,7 +374,7 @@ where } // Pass the ball to the state machine, the PP will reply to the RPC request. - let cmd = Command::RestartAsNewInvocation(RestartAsNewInvocationRequest { + let record = commands::RestartAsNewInvocationCommand::from(RestartAsNewInvocationRequest { invocation_id, new_invocation_id, copy_prefix_up_to_index_included, @@ -382,7 +384,7 @@ where )), }); self.proposer - .handle_rpc_proposal_command(invocation_id.partition_key(), cmd, request_id, replier) + .handle_rpc_proposal_command(record, request_id, replier) .await; Ok(()) @@ -394,7 +396,6 @@ mod tests { use std::collections::HashMap; use std::future::ready; - use assert2::let_assert; use bytes::Bytes; use futures::{FutureExt, Stream, StreamExt, stream}; use googletest::prelude::*; @@ -728,12 +729,12 @@ mod tests { let headers_clone = vec![Header::new("key", "value")]; let payload_clone = payload.clone(); proposer - .expect_append_and_respond_asynchronously::() - .return_once_st(move |_, cmd, _, response| { - let_assert!(Command::Invoke(service_invocation) = cmd); + .expect_append_and_respond_asynchronously::() + .return_once_st(move |cmd, _, response| { + let service_invocation: ServiceInvocation = cmd.into(); assert_that!( service_invocation, - points_to(all!( + all!( field!(ServiceInvocation.invocation_id, not(eq(old_invocation_id))), field!(ServiceInvocation.argument, eq(payload_clone)), field!(ServiceInvocation.headers, eq(headers_clone)), @@ -743,7 +744,7 @@ mod tests { ), field!(ServiceInvocation.response_sink, none()), field!(ServiceInvocation.submit_notification_sink, none()), - )) + ) ); assert_that!( response, @@ -754,7 +755,7 @@ mod tests { ready(()).boxed() }); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage::new_with_input_v1( @@ -796,10 +797,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); // Completed with no pinned deployment triggers v1 workaround @@ -851,10 +852,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let status = InvocationStatus::Completed(CompletedInvocation { @@ -898,10 +899,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let status = InvocationStatus::Completed(CompletedInvocation { @@ -952,10 +953,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); // Completed with journal length>0 but no v1 input present in storage @@ -1008,21 +1009,20 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() - .return_once_st(move |_, cmd, _, _| { + .expect_handle_rpc_proposal_command::() + .return_once_st(move |cmd, _, _| { + let request: RestartAsNewInvocationRequest = cmd.into(); assert_that!( - cmd, - pat!(Command::RestartAsNewInvocation(pat!( - RestartAsNewInvocationRequest { - copy_prefix_up_to_index_included: eq(0), - patch_deployment_id: none() - } - ))) + request, + pat!(RestartAsNewInvocationRequest { + copy_prefix_up_to_index_included: eq(0), + patch_deployment_id: none() + }) ); ready(()).boxed() }); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); let (tx, rx) = Reciprocal::mock(); @@ -1070,21 +1070,20 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() - .return_once_st(move |_, cmd, _, _| { + .expect_handle_rpc_proposal_command::() + .return_once_st(move |cmd, _, _| { + let request: RestartAsNewInvocationRequest =cmd.into(); assert_that!( - cmd, - pat!(Command::RestartAsNewInvocation(pat!( - RestartAsNewInvocationRequest { - copy_prefix_up_to_index_included: eq(0), - patch_deployment_id: none() - } - ))) + request, + pat!(RestartAsNewInvocationRequest { + copy_prefix_up_to_index_included: eq(0), + patch_deployment_id: none() + }) ); ready(()).boxed() }); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); let (tx, rx) = Reciprocal::mock(); @@ -1115,10 +1114,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage::new_without_journal(invocation_id, Default::default()); @@ -1152,10 +1151,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage::new_with_input_v1( @@ -1211,10 +1210,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage::new_without_journal(invocation_id, status); @@ -1260,10 +1259,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage::new_without_journal(invocation_id, status); @@ -1344,21 +1343,20 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() - .return_once_st(move |_, cmd, _, _| { + .expect_handle_rpc_proposal_command::() + .return_once_st(move |cmd, _, _| { + let request: RestartAsNewInvocationRequest = cmd.into(); assert_that!( - cmd, - pat!(Command::RestartAsNewInvocation(pat!( - RestartAsNewInvocationRequest { - copy_prefix_up_to_index_included: eq(1), - patch_deployment_id: none() - } - ))) + request, + pat!(RestartAsNewInvocationRequest { + copy_prefix_up_to_index_included: eq(1), + patch_deployment_id: none() + }) ); ready(()).boxed() }); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); let (tx, rx) = Reciprocal::mock(); @@ -1408,21 +1406,20 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() - .return_once_st(move |_, cmd, _, _| { + .expect_handle_rpc_proposal_command::() + .return_once_st(move |cmd, _, _| { + let request: RestartAsNewInvocationRequest = cmd.into(); assert_that!( - cmd, - pat!(Command::RestartAsNewInvocation(pat!( - RestartAsNewInvocationRequest { - copy_prefix_up_to_index_included: eq(1), - patch_deployment_id: some(eq(latest_id)) - } - ))) + request, + pat!(RestartAsNewInvocationRequest { + copy_prefix_up_to_index_included: eq(1), + patch_deployment_id: some(eq(latest_id)) + }) ); ready(()).boxed() }); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); let (tx, rx) = Reciprocal::mock(); @@ -1466,10 +1463,10 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); let (tx, rx) = Reciprocal::mock(); @@ -1517,7 +1514,7 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let (tx, rx) = Reciprocal::mock(); @@ -1558,7 +1555,7 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let (tx, rx) = Reciprocal::mock(); @@ -1599,7 +1596,7 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let (tx, rx) = Reciprocal::mock(); @@ -1638,10 +1635,10 @@ mod tests { .expect_partition_id() .return_const(PartitionId::from(0)); proposer - .expect_append_and_respond_asynchronously::() + .expect_append_and_respond_asynchronously::() .never(); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage::new_without_journal(invocation_id, Default::default()); diff --git a/crates/worker/src/partition/rpc/resume_invocation.rs b/crates/worker/src/partition/rpc/resume_invocation.rs index f0110c6552..73ea32695f 100644 --- a/crates/worker/src/partition/rpc/resume_invocation.rs +++ b/crates/worker/src/partition/rpc/resume_invocation.rs @@ -8,17 +8,19 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::*; use restate_storage_api::invocation_status_table::{ InFlightInvocationMetadata, InvocationStatus, ReadInvocationStatusTable, }; -use restate_types::identifiers::{InvocationId, WithPartitionKey}; +use restate_types::identifiers::InvocationId; use restate_types::invocation::client::PatchDeploymentId; use restate_types::invocation::{ IngressInvocationResponseSink, InvocationMutationResponseSink, ResumeInvocationRequest, }; use restate_types::net::partition_processor::ResumeInvocationRpcResponse; use restate_types::schema::deployment::DeploymentResolver; +use restate_wal_protocol::v2::commands; + +use super::*; pub(super) struct Request { pub(super) request_id: PartitionProcessorRpcRequestId, @@ -127,8 +129,7 @@ where // We need to propose the message, PP will deal with invoking this back self.proposer .handle_rpc_proposal_command( - invocation_id.partition_key(), - Command::ResumeInvocation(ResumeInvocationRequest { + commands::ResumeInvocationCommand::from(ResumeInvocationRequest { invocation_id, update_pinned_deployment_id, response_sink: Some(InvocationMutationResponseSink::Ingress( @@ -165,7 +166,6 @@ mod tests { use super::*; use crate::partition::rpc::MockActuator; - use assert2::let_assert; use futures::FutureExt; use googletest::prelude::*; use restate_storage_api::invocation_status_table::{ @@ -251,9 +251,9 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() - .return_once_st(move |_, cmd, request_id, replier| { - let_assert!(Command::ResumeInvocation(resume_invocation_request) = cmd); + .expect_handle_rpc_proposal_command::() + .return_once_st(move |cmd, request_id, replier| { + let resume_invocation_request: ResumeInvocationRequest = cmd.into(); assert_that!( resume_invocation_request, all!( @@ -301,7 +301,7 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage { @@ -335,7 +335,7 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage { @@ -381,7 +381,7 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage { @@ -434,7 +434,7 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let metadata = InFlightInvocationMetadata { @@ -510,17 +510,18 @@ mod tests { let mut proposer = MockActuator::new(); proposer.expect_is_leader().return_const(true); proposer - .expect_handle_rpc_proposal_command::() - .return_once_st(move |_, cmd, request_id, replier| { + .expect_handle_rpc_proposal_command::() + .return_once_st(move |cmd, request_id, replier| { + let resume_invocation_request: ResumeInvocationRequest = cmd.into(); assert_that!( - cmd, - pat!(Command::ResumeInvocation(pat!(ResumeInvocationRequest { + resume_invocation_request, + pat!(ResumeInvocationRequest { invocation_id: eq(invocation_id), update_pinned_deployment_id: some(eq(expected_deployment_id)), response_sink: some(eq(InvocationMutationResponseSink::Ingress( IngressInvocationResponseSink { request_id } ))) - }))) + }) ); replier.send(ResumeInvocationRpcResponse::Ok); ready(()).boxed() @@ -579,7 +580,7 @@ mod tests { .expect_partition_id() .return_const(PartitionId::from(0)); proposer - .expect_handle_rpc_proposal_command::() + .expect_handle_rpc_proposal_command::() .never(); let mut storage = MockStorage {