Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 51 additions & 85 deletions crates/worker/src/partition/leadership/leader_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, ready};

use bytes::BytesMut;
use futures::future::OptionFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt, stream};
Expand All @@ -33,12 +32,10 @@ use restate_limiter::RuleBook;
use restate_partition_store::PartitionDb;
use restate_storage_api::vqueue_table::scheduler::SchedulerDecisionsCommand;
use restate_types::config::Configuration;
use restate_types::identifiers::{
LeaderEpoch, PartitionId, PartitionKey, PartitionProcessorRpcRequestId, WithPartitionKey,
};
use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionProcessorRpcRequestId};
use restate_types::invocation::PurgeInvocationRequest;
use restate_types::invocation::client::{InvocationOutput, SubmittedInvocationNotification};
use restate_types::logs::Keys;
use restate_types::logs::{BodyWithKeys, Keys};
use restate_types::net::ingest::IngestRecord;
use restate_types::net::partition_processor::{
PartitionProcessorRpcError, PartitionProcessorRpcResponse,
Expand All @@ -48,9 +45,8 @@ use restate_types::{RESTATE_VERSION_1_7_0, SemanticRestateVersion, Version, Vers
use restate_vqueues::VQueueEvent;
use restate_vqueues::scheduler::Decisions;
use restate_vqueues::{SchedulerService, VQueuesMeta};
use restate_wal_protocol::Command;
use restate_wal_protocol::control::UpsertSchemaCommand;
use restate_wal_protocol::v1::UpsertRuleBookCommandWrapper;
use restate_wal_protocol::v2::{Command, CommandWithKeys, commands};
use restate_worker_api::invoker::InvokerHandle;
use restate_worker_api::resources::ReservedResources;
use restate_worker_api::{SchedulerStatusEntry, UserLimitCounterEntry};
Expand Down Expand Up @@ -342,7 +338,6 @@ impl LeaderState {
&mut self,
action_effects: impl IntoIterator<Item = ActionEffect>,
) -> Result<(), Error> {
let mut arena = BytesMut::with_capacity(128 * 1024);
for effect in action_effects {
match effect {
ActionEffect::Scheduler(decisions) => {
Expand All @@ -362,17 +357,11 @@ impl LeaderState {
.into_iter()
// one command per partition key
.map(|(partition_key, group)| {
let decisions = SchedulerDecisionsCommand {
qids: group.collect(),
};

arena.reserve(decisions.encoded_len());
// safe to unwrap because we reserved enough space
decisions.bilrost_encode(&mut arena).unwrap();

(
partition_key,
Command::VQSchedulerDecisions(arena.split().freeze()),
BodyWithKeys::new(
SchedulerDecisionsCommand {
qids: group.collect(),
},
Keys::Single(partition_key),
)
// an action goes into command, and resources are popped
})
Expand All @@ -387,71 +376,64 @@ impl LeaderState {
// based on configuration, whether to consider partition-local durability in
// the replica-set as a sufficient source of durability, or only snapshots.
self.self_proposer
.self_propose(
self.partition_key_range.start(),
Command::UpdatePartitionDurability(partition_durability),
)
.self_propose(BodyWithKeys::new(
partition_durability,
Keys::RangeInclusive(self.partition_key_range.into()),
))
.await?;
}
ActionEffect::Invoker(invoker_effect) => {
self.self_proposer
.self_propose(
invoker_effect.invocation_id.partition_key(),
Command::InvokerEffect(invoker_effect),
)
.self_propose(commands::InvokerEffectCommand::from(*invoker_effect))
.await?;
}
ActionEffect::Shuffle(outbox_truncation) => {
// todo: Until we support partition splits we need to get rid of outboxes or introduce partition
// specific destination messages that are identified by a partition_id
self.self_proposer
.self_propose(
self.partition_key_range.start(),
Command::TruncateOutbox(outbox_truncation.index()),
)
.self_propose(commands::TruncateOutboxCommand {
partition_key_range: Keys::RangeInclusive(
self.partition_key_range.into(),
),
index: outbox_truncation.index(),
})
.await?;
}
ActionEffect::Timer(timer) => {
self.self_proposer
.self_propose(timer.invocation_id().partition_key(), Command::Timer(timer))
.self_propose(commands::TimerCommand::from(timer))
.await?;
}
ActionEffect::Cleaner(effect) => {
let (invocation_id, cmd) = match effect {
CleanerEffect::PurgeJournal(invocation_id) => (
ActionEffect::Cleaner(effect) => match effect {
CleanerEffect::PurgeJournal(invocation_id) => {
let record = commands::PurgeJournalCommand::from(PurgeInvocationRequest {
invocation_id,
Command::PurgeJournal(PurgeInvocationRequest {
invocation_id,
response_sink: None,
}),
),
CleanerEffect::PurgeInvocation(invocation_id) => (
invocation_id,
Command::PurgeInvocation(PurgeInvocationRequest {
response_sink: None,
});

self.self_proposer.self_propose(record).await?;
}
CleanerEffect::PurgeInvocation(invocation_id) => {
let record =
commands::PurgeInvocationCommand::from(PurgeInvocationRequest {
invocation_id,
response_sink: None,
}),
),
};
});

self.self_proposer
.self_propose(invocation_id.partition_key(), cmd)
.await?;
}
self.self_proposer.self_propose(record).await?;
}
},
ActionEffect::UpsertSchema(schema) => {
if SemanticRestateVersion::current()
.is_equal_or_newer_than(&RESTATE_VERSION_1_7_0)
{
self.self_proposer
.self_propose(
self.partition_key_range.start(),
Command::UpsertSchema(UpsertSchemaCommand {
partition_key_range: Keys::RangeInclusive(
self.partition_key_range.into(),
),
schema,
}),
)
.self_propose(UpsertSchemaCommand {
partition_key_range: Keys::RangeInclusive(
self.partition_key_range.into(),
),
schema,
})
.await?;
}
}
Expand All @@ -463,21 +445,11 @@ impl LeaderState {
.experimental
.is_vqueues_enabled()
{
let cmd =
restate_wal_protocol::control::UpsertRuleBookCommand { rule_book };

arena.reserve(cmd.encoded_len());
// safe to unwrap because we reserved enough space
cmd.bilrost_encode(&mut arena).unwrap();

self.self_proposer
.self_propose(
self.partition_key_range.start(),
Command::UpsertRuleBook(UpsertRuleBookCommandWrapper {
partition_key_range: self.partition_key_range,
command: arena.split().freeze(),
}),
)
.self_propose(BodyWithKeys::new(
restate_wal_protocol::control::UpsertRuleBookCommand { rule_book },
Keys::RangeInclusive(self.partition_key_range.into()),
))
.await?;
}
}
Expand All @@ -490,14 +462,13 @@ impl LeaderState {
Ok(())
}

pub async fn handle_rpc_proposal_command(
pub async fn handle_rpc_proposal_command<C: Command>(
&mut self,
request_id: PartitionProcessorRpcRequestId,
reciprocal: Reciprocal<
Oneshot<Result<PartitionProcessorRpcResponse, PartitionProcessorRpcError>>,
>,
partition_key: PartitionKey,
cmd: Command,
cmd: impl CommandWithKeys<C>,
) {
match self.awaiting_rpc_actions.entry(request_id) {
Entry::Occupied(mut o) => {
Expand All @@ -511,7 +482,7 @@ impl LeaderState {
}
Entry::Vacant(v) => {
// In this case, no one proposed this command yet, let's try to propose it
if let Err(e) = self.self_proposer.self_propose(partition_key, cmd).await {
if let Err(e) = self.self_proposer.self_propose(cmd).await {
reciprocal.send(Err(PartitionProcessorRpcError::Internal(e.to_string())));
} else {
v.insert(reciprocal);
Expand All @@ -525,18 +496,13 @@ impl LeaderState {
/// Records appended this way are never filtered by the dedup mechanism during leadership
/// transitions, making this safe for fire-and-forget ingress commands (signals, invocation
/// responses).
pub async fn append_and_respond_asynchronously(
pub async fn append_and_respond_asynchronously<C: Command>(
&mut self,
partition_key: PartitionKey,
cmd: Command,
cmd: impl CommandWithKeys<C>,
reciprocal: RpcReciprocal,
success_response: PartitionProcessorRpcResponse,
) {
match self
.self_proposer
.append_with_notification(partition_key, cmd)
.await
{
match self.self_proposer.append_with_notification(cmd).await {
Ok(commit_token) => {
self.awaiting_rpc_self_propose.push(SelfAppendFuture::new(
commit_token,
Expand Down
Loading
Loading