worker: decode partition records as v2 envelopes with v1 fallback#4696
Conversation
983f7e3 to
4a99ecf
Compare
5c3811d to
e49a760
Compare
9bac5c5 to
c83fbaf
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c83fbaf695
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| /// partition key range | ||
| #[bilrost(tag(4))] | ||
| pub partition_key_range: Keys, |
There was a problem hiding this comment.
Preserve decoding of old durability records
When a new processor replays or follows a WAL segment containing Command::UpdatePartitionDurability written by a pre-change node, the flexbuffers/serde V1 payload will not contain this newly added partition_key_range field. Because the field is non-optional and has no #[serde(default)], v1::Envelope::decode fails before the V1-to-V2 fallback can convert the record, so rolling upgrades or replay of untrimmed old durability records can stop the partition processor. Please add a backward-compatible default/compat path for this field.
Useful? React with 👍 / 👎.
b3e0c8a to
c528d5a
Compare
| UnsupportedCodecKind(StorageCodecKind), | ||
| #[error("Type mismatch. Original value in PolyBytes::Typed({}) does not match requested type", _0.type_name())] | ||
| #[debug("TypedValueMismatch({})", _0.type_name())] | ||
| TypedValueMismatch(Arc<dyn StorageEncode>), |
There was a problem hiding this comment.
Not a huge fan of passing the entire value into the error. Is this really necessary? I suppose one can formulate the error message and return it in a ReString instead?
There was a problem hiding this comment.
It's not only for the name. this is actually used to fallback to v1 decoding in some cases.
The problem here is because of record cache, we might try to decode v1 envelope as v2 where the PolyBytes are typed. To make this work, I instead pass the inner value to be handled here https://github.com/restatedev/restate/pull/4696/changes#diff-09e9d092973d97a3155dc5ca9f928dee76e6b85bbea659659c3083da645f7d09R458
f781f06 to
3b4acac
Compare
d882faa to
c1d0d27
Compare
| keys: payload.record_keys(), | ||
| payload, | ||
| fn from(command: C) -> PartialEnvelope { | ||
| // let payload = payload.into(); |
| let record_lsn = record.lsn; | ||
| let envelope = Arc::unwrap_or_clone(record.envelope); | ||
| // todo(azmy): use dedup() directly without first converting to DedupInformation | ||
| let dedup_information: Option<DedupInformation> = record.envelope.dedup().clone().into(); |
There was a problem hiding this comment.
I'd love if we can address this one pretty soon
AhmedSoliman
left a comment
There was a problem hiding this comment.
Thank you for working on this 🚢
## Summary Switch the partition processor's read path to v2 envelopes. Records are decoded as `v2::Envelope<Raw>` first; if the record on disk is still flexbuffers-encoded v1, we downcast through the new `StorageDecodeError::TypedValueMismatch` variant (which now carries the original Arc) and convert via the v1→v2 compatibility shim. The decode path no longer relies on `Header::dest` to decide whether a record targets this partition. Instead, the processor stores a `KeyFilter` derived from its partition key range and checks each record's `Keys` via `MatchKeyQuery`. Deduplication still runs, but now reads `Dedup` off the v2 header (converted to the existing `DedupInformation` for the dedup table — TODO left to consume Dedup directly). `apply_record` is rewritten around `RecordKind`: AnnounceLeader and UpdatePartitionDurability are handled inline; everything else is forwarded to the state machine as a typed v2 envelope. Tests under state_machine/ and pp-bench are updated to construct records via `records::<Kind>::new_test(...)` instead of the v1 `Command::<Variant>(...)` enum. Writers still emit v1 envelopes; only reads go through v2.
worker: decode partition records as v2 envelopes with v1 fallback
Summary
Switch the partition processor's read path to v2 envelopes. Records
are decoded as
v2::Envelope<Raw>first; if the record on disk isstill flexbuffers-encoded v1, we downcast through the new
StorageDecodeError::TypedValueMismatchvariant (which now carriesthe original Arc) and convert via the v1→v2 compatibility shim.
The decode path no longer relies on
Header::destto decide whethera record targets this partition. Instead, the processor stores a
KeyFilterderived from its partition key range and checks eachrecord's
KeysviaMatchKeyQuery. Deduplication still runs, butnow reads
Dedupoff the v2 header (converted to the existingDedupInformationfor the dedup table — TODO left to consume Dedupdirectly).
apply_recordis rewritten aroundRecordKind: AnnounceLeader andUpdatePartitionDurability are handled inline; everything else is
forwarded to the state machine as a typed v2 envelope.
Tests under state_machine/ and pp-bench are updated to construct
records via
records::<Kind>::new_test(...)instead of the v1Command::<Variant>(...)enum.Writers still emit v1 envelopes; only reads go through v2.
Stack created with Sapling. Best reviewed with ReviewStack.