Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions crates/bifrost/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,15 @@ impl<S: Copy> LogEntry<S> {
pub fn try_decode_arc<T: StorageDecode + StorageEncode>(
self,
) -> Option<Result<Arc<T>, StorageDecodeError>> {
self.into_record().map(|record| record.decode_arc())
self.into_record()
.map(|record| record.decode_arc().map_err(Into::into))
}

pub fn try_decode<T: StorageDecode + StorageEncode + Clone>(
self,
) -> Option<Result<T, StorageDecodeError>> {
self.into_record().map(|record| record.decode())
self.into_record()
.map(|record| record.decode().map_err(Into::into))
}

#[cfg(any(test, feature = "test-util"))]
Expand Down
2 changes: 1 addition & 1 deletion crates/platform/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub enum StorageEncodeError {
SizeOverflow(usize),
}

#[derive(Debug, thiserror::Error)]
#[derive(derive_more::Debug, thiserror::Error)]
pub enum StorageDecodeError {
#[error("failed reading codec: {0}")]
ReadingCodec(ReString),
Expand Down
2 changes: 1 addition & 1 deletion crates/types/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ mod tail;

pub use loglet::*;
pub use offset_watch::*;
pub use record::{CustomRecordEncoding, Record};
pub use record::{CustomRecordEncoding, Record, RecordDecodeError};
pub use record_cache::RecordCache;
pub use tail::*;

Expand Down
49 changes: 29 additions & 20 deletions crates/types/src/logs/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,16 @@ impl Record {
/// the value from the underlying Arc delivered from the loglet. Use this approach if you need
/// to mutate the value in-place and the cost of cloning sections is high. It's generally
/// recommended to use `decode_arc` whenever possible for large payloads.
pub fn decode<T: StorageDecode + StorageEncode + Clone>(self) -> Result<T, StorageDecodeError> {
pub fn decode<T: StorageDecode + StorageEncode + Clone>(self) -> Result<T, RecordDecodeError> {
let decoded = match self.body {
PolyBytes::Bytes(slice) => {
let mut buf = std::io::Cursor::new(slice);
StorageCodec::decode(&mut buf)?
}
PolyBytes::Both(value, _) | PolyBytes::Typed(value) => {
let target_arc: Arc<T> = value.downcast_arc().map_err(|_| {
StorageDecodeError::DecodeValue(
anyhow::anyhow!(
"Type mismatch. Original value in PolyBytes::Typed does not match requested type"
)
.into(),
)})?;
let target_arc: Arc<T> = value
.downcast_arc()
.map_err(RecordDecodeError::TypedValueMismatch)?;
// Attempts to move the inner value (T) if this Arc has exactly one strong
// reference. Otherwise, it clones the inner value.
match Arc::try_unwrap(target_arc) {
Expand All @@ -112,28 +108,41 @@ impl Record {
/// Decode the record body into an Arc<T>. This is the most efficient way to access the entry
/// if you need read-only access or if it's acceptable to selectively clone inner sections. If
/// the record is in record cache, this will avoid cloning or deserialization of the value.
pub fn decode_arc<T: StorageDecode + StorageEncode>(
self,
) -> Result<Arc<T>, StorageDecodeError> {
pub fn decode_arc<T: StorageDecode + StorageEncode>(self) -> Result<Arc<T>, RecordDecodeError> {
let decoded = match self.body {
PolyBytes::Bytes(slice) => {
let mut buf = std::io::Cursor::new(slice);
Arc::new(StorageCodec::decode(&mut buf)?)
}
PolyBytes::Typed(value) | PolyBytes::Both(value, _) => {
value.downcast_arc().map_err(|_| {
StorageDecodeError::DecodeValue(
anyhow::anyhow!(
"Type mismatch. Original value in PolyBytes::Typed does not match requested type"
)
.into(),
)})?
},
PolyBytes::Typed(value) | PolyBytes::Both(value, _) => value
.downcast_arc()
.map_err(RecordDecodeError::TypedValueMismatch)?,
};
Ok(decoded)
}
}

#[derive(derive_more::Debug, thiserror::Error)]
pub enum RecordDecodeError {
#[error(transparent)]
StorageDecodeError(#[from] StorageDecodeError),
#[error("Type mismatch. Original value in PolyBytes::Typed does not match requested type")]
#[debug("TypedValueMismatch")]
TypedValueMismatch(Arc<dyn StorageEncode>),
}

impl From<RecordDecodeError> for StorageDecodeError {
fn from(value: RecordDecodeError) -> Self {
match value {
RecordDecodeError::StorageDecodeError(err) => err,
RecordDecodeError::TypedValueMismatch(_) => StorageDecodeError::DecodeValue(
"Type mismatch. Original value in PolyBytes::Typed does not match requested type"
.into(),
),
}
}
}

impl EstimatedMemorySize for Record {
#[inline]
fn estimated_memory_size(&self) -> usize {
Expand Down
1 change: 0 additions & 1 deletion crates/wal-protocol/src/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,6 @@ where
C: Command + HasRecordKeys,
{
fn from(command: C) -> PartialEnvelope {
// let payload = payload.into();
PartialEnvelope {
kind: C::KIND,
keys: command.record_keys(),
Expand Down
1 change: 1 addition & 0 deletions crates/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ restate-storage-api = { workspace = true, features = ["test-util"] }
restate-test-util = { workspace = true, features = ["prost"] }
restate-types = { workspace = true, features = ["test-util"] }
restate-vqueues = { workspace = true, features = ["test-util"] }
restate-wal-protocol = { workspace = true, features = ["test-util"] }

googletest = { workspace = true }
mockall = { workspace = true }
Expand Down
146 changes: 90 additions & 56 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ mod state_machine;
pub mod types;

use std::fmt::Debug;
use std::ops::RangeBounds;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -64,7 +63,9 @@ use restate_types::cluster::cluster_state::{PartitionProcessorStatus, ReplayStat
use restate_types::config::Configuration;
use restate_types::epoch::EpochMetadata;
use restate_types::identifiers::LeaderEpoch;
use restate_types::logs::{KeyFilter, Lsn, Record, SequenceNumber};
use restate_types::logs::{
KeyFilter, Keys, Lsn, MatchKeyQuery, Record, RecordDecodeError, SequenceNumber,
};
use restate_types::net::ingest::{
DedupSequenceNrQueryRequest, DedupSequenceNrQueryResponse, ReceivedIngestRequest,
ResponseStatus,
Expand All @@ -85,7 +86,8 @@ use restate_vqueues::{VQueuesMeta, VQueuesMetaCache};
use restate_wal_protocol::control::{
AnnounceLeaderCommand, CurrentReplicaSetConfiguration, NextReplicaSetConfiguration,
};
use restate_wal_protocol::{Command, Destination, Envelope, Header};
use restate_wal_protocol::v2::commands;
use restate_wal_protocol::{Envelope, v2};
use restate_worker_api::invoker::capacity::InvokerCapacity;
use restate_worker_api::{LeaderQueryCommand, LeaderQueryReceiver};

Expand Down Expand Up @@ -241,6 +243,7 @@ impl PartitionProcessorBuilder {
let last_applied_log_lsn_watch = watch::Sender::new(Lsn::INVALID);

Ok(PartitionProcessor {
key_filter: KeyFilter::Within(partition_store.partition_key_range().into()),
partition_id_str,
leadership_state,
state_machine,
Expand Down Expand Up @@ -319,6 +322,7 @@ pub struct PartitionProcessor<T> {

last_applied_log_lsn_watch: watch::Sender<Lsn>,
cached_epoch_metadata: Option<CachedEpochMetadata>,
key_filter: KeyFilter,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -368,8 +372,9 @@ pub enum ProcessorError {

struct LsnEnvelope {
pub lsn: Lsn,
pub keys: Keys,
pub created_at: NanosSinceEpoch,
pub envelope: Arc<Envelope>,
pub envelope: Arc<v2::Envelope<v2::Raw>>,
}

/// OrderedOperations are scheduled operations that
Expand Down Expand Up @@ -430,6 +435,33 @@ where
res
}

/// Decode record tries to decode the record first as v2 Envelope, if it failed,
/// it decodes as v1 Envelope then converts into v2.
fn decode_record(record: Record) -> Result<Arc<v2::Envelope<v2::Raw>>, StorageDecodeError> {
let envelope = match record.decode_arc::<v2::Envelope<v2::Raw>>() {
Ok(envelope) => envelope,
Err(RecordDecodeError::TypedValueMismatch(v1_envelope)) => {
let v1_envelope: Arc<Envelope> = v1_envelope
.downcast_arc()
.map_err(|_| StorageDecodeError::DecodeValue("Type mismatch. Record value in PolyBytes::Typed does not match requested type".into()))?;

let v1_envelope = match Arc::try_unwrap(v1_envelope) {
Ok(v1_envelope) => v1_envelope,
Err(arc) => arc.as_ref().clone(),
};

let envelope: v2::Envelope<v2::Raw> = v1_envelope
.try_into()
.map_err(|err: anyhow::Error| StorageDecodeError::DecodeValue(err.into()))?;

Arc::new(envelope)
}
Err(RecordDecodeError::StorageDecodeError(err)) => return Err(err),
};

Ok(envelope)
}

async fn run_inner(&mut self) -> Result<(), ProcessorError> {
let mut partition_store = self.partition_store.clone();

Expand Down Expand Up @@ -625,8 +657,9 @@ where

let record = LsnEnvelope {
lsn,
keys: record.keys().clone(),
created_at: record.created_at(),
envelope: record.decode_arc()?,
envelope: Self::decode_record(record)?,
};

let maybe_announce_leader = self.apply_record(
Expand Down Expand Up @@ -996,37 +1029,55 @@ where
action_collector: &mut ActionCollector,
vqueues_cache: &mut VQueuesMetaCache,
) -> Result<Option<Box<AnnounceLeaderCommand>>, state_machine::Error> {
trace!(lsn = %record.lsn, "Processing bifrost record for '{}': {:?}", record.envelope.command.name(), record.envelope.header);

if let Some(dedup_information) = self.is_targeted_to_me(&record.envelope.header) {
// deduplicate if deduplication information has been provided
if let Some(dedup_information) = dedup_information {
if Self::is_outdated_or_duplicate(dedup_information, transaction).await? {
debug!(
"Ignoring outdated or duplicate message: {:?}",
record.envelope.header
);
return Ok(None);
}
transaction
.put_dedup_seq_number(
dedup_information.producer_id.clone(),
&dedup_information.sequence_number,
)
.map_err(state_machine::Error::Storage)?;
}
trace!(lsn = %record.lsn, "Processing bifrost record for '{}': {:?}", record.envelope.kind(), record.envelope.header());

// todo: redesign to pass the arc (or reference) further down
let record_created_at = record.created_at;
let record_lsn = record.lsn;
let envelope = Arc::unwrap_or_clone(record.envelope);
if !self.is_targeted_to_me(&record.keys) {
self.status.num_skipped_records += 1;
trace!(
"Ignore message which is not targeted to me Partition Range: {:?} Key: {:?}, Header: {:?}",
self.key_filter,
record.keys,
record.envelope.header()
);
return Ok(None);
}

if let Command::AnnounceLeader(announce_leader) = envelope.command {
// leadership change detected, let's finish our transaction here
return Ok(Some(announce_leader));
} else if let Command::UpdatePartitionDurability(partition_durability) =
envelope.command
{
// todo(azmy): use dedup() directly without first converting to DedupInformation
let dedup_information: Option<DedupInformation> = record.envelope.dedup().clone().into();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love if we can address this one pretty soon


// deduplicate if deduplication information has been provided
if let Some(dedup_information) = dedup_information {
if Self::is_outdated_or_duplicate(&dedup_information, transaction).await? {
debug!(
"Ignoring outdated or duplicate message: {:?}",
record.envelope.header()
);
return Ok(None);
}
transaction
.put_dedup_seq_number(
dedup_information.producer_id.clone(),
&dedup_information.sequence_number,
)
.map_err(state_machine::Error::Storage)?;
}

// todo: redesign to pass the arc (or reference) further down
let record_created_at = record.created_at;
let record_lsn = record.lsn;
// note: v2 envelope is cheaply clonable since it holds either `Bytes` or
// Arc of the payload record.
let envelope = Arc::unwrap_or_clone(record.envelope);

match envelope.kind() {
v2::CommandKind::AnnounceLeader => {
let envelope = envelope.into_typed::<commands::AnnounceLeaderCommand>();
let announce_leader = envelope.into_inner()?;
return Ok(Some(Box::new(announce_leader)));
}
v2::CommandKind::UpdatePartitionDurability => {
let envelope = envelope.into_typed::<commands::UpdatePartitionDurabilityCommand>();
let partition_durability = envelope.into_inner()?;
if partition_durability.partition_id != self.partition_store.partition_id() {
self.status.num_skipped_records += 1;
trace!(
Expand All @@ -1044,10 +1095,11 @@ where
if self.trim_queue.push(&partition_durability) {
transaction.put_partition_durability(&partition_durability)?;
}
} else {
}
_ => {
self.state_machine
.apply(
envelope.command,
envelope,
record_created_at.into(),
record_lsn,
transaction,
Expand All @@ -1057,31 +1109,13 @@ where
)
.await?;
}
} else {
self.status.num_skipped_records += 1;
trace!(
"Ignore message which is not targeted to me: {:?}",
record.envelope.header
);
}

Ok(None)
}

fn is_targeted_to_me<'a>(&self, header: &'a Header) -> Option<&'a Option<DedupInformation>> {
match &header.dest {
Destination::Processor {
partition_key,
dedup,
} if self
.partition_store
.partition_key_range()
.contains(partition_key) =>
{
Some(dedup)
}
_ => None,
}
fn is_targeted_to_me(&self, keys: &Keys) -> bool {
keys.matches_key_query(&self.key_filter)
}

async fn is_outdated_or_duplicate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ mod tests {
AttachInvocationCommand, AttachInvocationCompletion, AttachInvocationResult,
AttachInvocationTarget, CommandType, Entry, EntryMetadata, EntryType, NotificationId,
};
use restate_wal_protocol::Command;
use restate_wal_protocol::v2::{Command, commands};
use rstest::rstest;

#[rstest]
Expand Down Expand Up @@ -83,7 +83,7 @@ mod tests {
let actions = test_env
.apply_multiple([
invoker_entry_effect(invocation_id, attach_invocation_command.clone()),
Command::InvocationResponse(InvocationResponse {
commands::InvocationResponseCommand::test_envelope(InvocationResponse {
target: JournalCompletionTarget::from_parts(invocation_id, completion_id),
result: ResponseResult::Success(success_result.clone()),
}),
Expand Down
Loading
Loading