Restore ingestion pipelining with epoch-pinned ordering#4891
Conversation
9e79d76 to
7585863
Compare
| let leader_epoch = match request.target_leader_epoch { | ||
| Some(leader_epoch) => leader_epoch, | ||
| None => { | ||
| // Old clients are not sending leader epochs. |
There was a problem hiding this comment.
We will only hit this scenario if the user had the experimental feature enabled in v1.6.
There was a problem hiding this comment.
Maybe add this to the comment so that this is captured for posterity.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 75858633bf
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
tillrohrmann
left a comment
There was a problem hiding this comment.
Thanks for creating this PR @muhamadazmy. I've given it a pass and left a few comments. The main comment would be about the assumption that message appends can only fail on the pp leader because of leadership changes and if one ingest message gets rejected, then all following ones will be rejected as well. I think this assumption does not hold true at the moment (unfortunately).
|
Maybe one way to solve the problem with rejecting point-wise rpc requests is to let the error bubble up and stop the partition processor if this happens. |
|
Thank you so much @tillrohrmann for your thoroughly review. Great catch on the other kind of failures. Bubbling the error up and stopping the partition processor is a good solution, we probably can give a detailed error message with some details on what to do to get out of this situation (increase the record size maybe). Another solution is to use a different client generated unique session id. The PP only need to take note and keep track of bad sessions with a very long timeout (10min should be enough but we can go higher). Once a message with a certain session id is rejected, the session id is black-listed. The client should renew the session id and try again. The only problem with this is that the client will keep retrying forever, we can then introduce a proper error message and introducing some backing off. This has an advantage that we don't stop the partition processor, and the system can continue working normally. What do you think ? |
|
Establishing a proper session would certainly also work. I am just wondering whether this wouldn't be work that's in vain once the proper stream abstraction for our internal networking lands. That's why a cheaper solution could be to check whether we can have an Alternatively, I would go with terminating the leadership once we fail any ingestion request. This is more disruptive but ensures that we uphold the underlying assumption that ingestion batches are only committed strictly in order. |
52bdf1d to
5e9208e
Compare
|
@codex review again please |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5e9208e43e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
e2bc4d9 to
4321c22
Compare
|
@codex review |
|
Codex Review: Didn't find any major issues. 🎉 ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
There was a problem hiding this comment.
Thanks a lot for re-establishing pipelining for our ingestion clients @muhamadazmy. The changes look good to me :-) I left a few minor comments which we could address before merging this PR.
| /// Unlike [`enqueue`](Self::enqueue), this does not check the record size and | ||
| /// accepts a record of any size. |
There was a problem hiding this comment.
Maybe add: "Callers have to ensure that record is not larger than the network message size limit."
| /// Maximum allowed record size | ||
| #[builder(default=NonZeroUsize::new(DEFAULT_MAX_RECORD_SIZE).unwrap())] | ||
| pub(crate) record_size_limit: NonZeroUsize, |
There was a problem hiding this comment.
Is it required to set a default value here? The risk of DEFAULT_MAX_RECORD_SIZE is that it diverges from the network message size limit.
| impl Default for SessionOptions { | ||
| fn default() -> Self { | ||
| Self { | ||
| // The default batch size of 50KB is to avoid | ||
| // overwhelming the PP on the hot path. | ||
| batch_size: 50 * 1024, // 50 KB | ||
| batch_size: NonZeroUsize::new(50 * 1024).unwrap(), // 50 KB | ||
| swimlane: Swimlane::IngressData, | ||
| record_size_limit: NonZeroUsize::new(DEFAULT_MAX_RECORD_SIZE).unwrap(), | ||
| connection_retry_policy: RetryPolicy::exponential( | ||
| Duration::from_millis(10), | ||
| 2.0, | ||
| None, | ||
| Some(Duration::from_secs(1)), | ||
| ), | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Should we have this default and the builder's default values? Maybe it's simpler to only keep the builder's defaults so that those defaults don't diverge.
| pub(crate) record_size_limit: NonZeroUsize, | ||
| /// Connection swimlane | ||
| pub swimlane: Swimlane, | ||
| #[builder(default=Swimlane::IngressData)] |
There was a problem hiding this comment.
Same question here. Not having a default will force users to make a deliberate decision.
| .get_node_by_partition(self.partition)?; | ||
| async fn connect(&mut self) -> Option<SessionState> { | ||
| // `PartitionRouting` is the source of truth for the current leader epoch; the leadership state | ||
| // we last learned (from routing, or reported back by a `NotLeaderWithEpoch`) is used used |
There was a problem hiding this comment.
| // we last learned (from routing, or reported back by a `NotLeaderWithEpoch`) is used used | |
| // we last learned (from routing, or reported back by a `NotLeaderWithEpoch`) is used |
| let Some(permit) = connection.reserve().await else { | ||
| // restore the carry over back to original state. | ||
| // maintaining the original order. | ||
|
|
There was a problem hiding this comment.
White line seems unnecessary.
| #[strum(props(runtime = "default"))] | ||
| IngestionSession, |
There was a problem hiding this comment.
Was running the ingestion session on the pp runtime causing the shutdown problems you mentioned offline?
There was a problem hiding this comment.
Not really. The shutdown issue was that we spawned the task as managed task and not using the task centre cancellation token (since we use our own managed cancellation token).
The decision to use the default runtime is to make sure session tasks (which are lazily initialised) won't get torn down with the PP runtime when it's shutting down. This way the client can be safely shared across multiple partitions or other components.
| pub async fn forward_many_with_notification( | ||
| &mut self, | ||
| records: impl ExactSizeIterator<Item = IngestRecord>, | ||
| ) -> Result<CommitToken, Error> where { | ||
| ) -> Result<CommitToken, EnqueueError<()>> where { |
There was a problem hiding this comment.
Maybe add an explanation why at the call site of forward_many_with_notification it is ok to fail the ingestion request with the EnqueueError and we don't have to stop the leadership (bump the leader epoch).
| // so we need to put some back pressure here | ||
| // by delaying the response so they don't | ||
| // retry immediately | ||
|
|
There was a problem hiding this comment.
Unnecessary line break.
| if target_leader_epoch != leadership_state.current_leader_epoch { | ||
| reciprocal.send( | ||
| ResponseStatus::NotLeaderWithEpoch { | ||
| of: self.partition_store.partition_id(), | ||
| last_seen_leadership_state: leadership_state, | ||
| } | ||
| .into(), | ||
| ); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Should we also validate that our current leader epoch is equal to target_leader_epoch?
There was a problem hiding this comment.
Maybe by first checking the leadership_state's leader epoch could safe us doing
self
.replica_set_states
.membership_state(self.partition_store.partition_id())
.current_leader();
on every call, which access a dash map and then clones MembershipState, in the happy case.
There was a problem hiding this comment.
the replica_set_states is updated via the AnnounceLeader message or externally via gossip. In both cases, it acts as a source of truth for the leadership state in this situation. The leader won't step down unconditionally unless it observe a change in the leadership via gossip. So IMHO it's enough to check the replica_set_state always.
As for accessing the dashmap access. I was thinking of using the PartitionProcessorStatus object which we explicitly update when we observe change or when reading an announce leader message. What do you think?
There was a problem hiding this comment.
The PartitionProcessorStatus might not be the best fit as it's intended purpose is to report the partition processors internal status to the external world eventually consistently.
There was a problem hiding this comment.
The leader won't step down unconditionally unless it observe a change in the leadership via gossip. So IMHO it's enough to check the replica_set_state always.
I second @tillrohrmann here. I think we should only check the current leadership epoch associated with this leader (its own reign). The actual barrier is that the leader believes that this message belongs to its own reign.
the replica_set_states is updated via the AnnounceLeader message or externally via gossip. In both cases, it acts as a source of truth for the leadership state in this situation.
I don't think we can refer to replica_set_states as source-of-truth in any scenario. I would consider the leadership state that we return in ResponseStatus as a mere suggestion, in fact, I would even suggest we make it optional. In my mind, a good value to use is the current leader in cached_epoch_metadata and only if (a) we have a value in it and (b) leadership epoch in cached_epoch_metadata > our current epoch. If none of this is available, use what's in gossip (and again, only if its leadership epoch is > our current). When all this fails, we return None.
The ingestion client should then always the max value it observed from both (gossip) and (last suggestion it saw, if any).
|
Before merging let's wait for a final pass by @AhmedSoliman. |
1e76532 to
936b509
Compare
cb99169 to
1567dd9
Compare
AhmedSoliman
left a comment
There was a problem hiding this comment.
Nice work. Most comments are nits, the major one is to change the swimlane of shuffle to avoid blocking bifrost. The other major note is to avoid publishing the observed leadership change on gossip.
|
|
||
| use crate::chunks_size::ChunksSize; | ||
|
|
||
| const DEFAULT_MAX_RECORD_SIZE: usize = 32 * 1024 * 1024; // 32MB |
There was a problem hiding this comment.
| const DEFAULT_MAX_RECORD_SIZE: usize = 32 * 1024 * 1024; // 32MB | |
| const DEFAULT_MAX_RECORD_SIZE: usize = 32 * 1024 * 1024; // 32MiB |
| /// If not, the retry will fallback to 2 seconds intervals | ||
| pub connection_retry_policy: RetryPolicy, | ||
| #[builder(default=RetryPolicy::exponential( | ||
| Duration::from_millis(10), |
| Duration::from_millis(10), | ||
| 2.0, | ||
| None, | ||
| Some(Duration::from_secs(1)), |
There was a problem hiding this comment.
Our connection throttling probably won't allow such an aggressive retry, maybe set the max to 3s?
| // Note that this error kind is never returned | ||
| // by partition processor < v1.7. This is mainly | ||
| // defensive coding, but also allows us to switch | ||
| // to sequential mode for protocol version >= V4 | ||
| // if needed. |
| self.partition_routing | ||
| .partition_replica_set_state() | ||
| .note_observed_leader(self.partition, last_seen_leadership_state); |
There was a problem hiding this comment.
I'm not entirely sold on this. I would not attempt to manipulate gossip state via oob path. Since the peer has already learned about this information through gossip, we should wait for it to propagate (optional) and not create another source of potential poisoning of the state. What I suggest is:
- Use the new leadership state to connect in the next connect iteration (keep it in session state, and drop it if the attempt failed)
- (or) at connect time, wait until the replica set states reaches the expected leader epoch or higher.
Observed by Bassem: The incoming leadership state might be (lower) epoch than what we previously attempted. This can happen if the peer has stale gossip view. In that case, we should stick to the latest knowledge we have and try again.
There was a problem hiding this comment.
What is the downside of noting potentially observed leaders from other channels? If things are out of date, then it should be ignored, right? If it's newer than what we gathered from gossip's state, then we kind of preempt the necessity to receive this information via gossip? Does it trigger unnecessary work/updates?
There was a problem hiding this comment.
dejavu?
I see two possible downsides:
- The ambiguity of data flows and traceability. There is a comment on
note_observed_leaderasking callers to uphold a certain invariant. It becomes harder to validate this invariant when we have many paths that update it. The original design calls for the partition processor (or PPM) to be the owner of this value, if the value is misreported (i.e. bug) it's easy to pin point the component that made the wrong call. A single owner means that it has the control on when it should be advertised on gossip. For instance, if in the future we implemented graceful drains of leaders then leader might start redirecting traffic to a future leader before they lose leadership. - Poisoning. Partially related to the previous point. The source of information that we inject into gossip in this case comes from a node and we don't make any effort to prove their claim. We imply that the response we receive witholds the same invariants the partition leader upholds when updating this value. This is implicit and can be easily broken leading to gossip poisoning.
Since this operation is not essential to the operation of ingestion client, removing this mixup of concerns makes the data flow easier to follow and the system behavior more predictable.
There was a problem hiding this comment.
Thanks for the clarification. I wasn't fully aware of these aspects and might have misdirected Azmy into this direction. Then let's do as you suggested wrt to keeping the leader state information only for the next connection attempt instead of updating the partition replica set state.
| while let Some((batch, _)) = inflight.pop_back() { | ||
| self.carry_over.push_front(batch); | ||
| } |
There was a problem hiding this comment.
The downside here is that we might be resending batches that have been accepted already but we don't know because we are not checking the reply_rx. A nice improvement (maybe leave a todo) is to drain the reply_rx until the first failure.
| } | ||
| } | ||
|
|
||
| type InflightQueue = VecDeque<(IngestionBatch, Option<ReplyRx<IngestResponse>>)>; |
There was a problem hiding this comment.
it'd be nice if this is on top of the file.
| // cause one request to be rejected while later requests | ||
| // in the pipeline are accepted, silently dropping data. | ||
| // See #4879 for details. | ||
| let _ = TaskCenter::spawn_child(TaskKind::Disposable, "reject-ingestion", async move { |
There was a problem hiding this comment.
Can you add a note to remove this hack in v1.8?
| // Older clients (if enabled) will print this error message as debug | ||
| // message so it might be a good idea to print this here also as a warn | ||
| // to make sure it's visible during a rolling upgrade. | ||
| warn!( |
There was a problem hiding this comment.
[minor] This will get spammy quite quickly since we print on every batch
There was a problem hiding this comment.
Yeah I know. I am not sure how to improve this without too much code specially that this should only be for a transit state. Also would guarantee to grab the user attention imho.
| .batch_size(config.worker.shuffle.request_batch_size.as_non_zero_usize()) | ||
| .connection_retry_policy(config.worker.shuffle.connection_retry_policy.clone()) | ||
| .record_size_limit(config.networking.message_size_limit()) | ||
| .swimlane(Swimlane::BifrostData) |
There was a problem hiding this comment.
I highly suggest we switch this to use Swimland::IngressData because if we don't do that, the BifrostData shared connection will fill up its window when the leader services pushes back (partition leader service uses backpressure push back). The log server service shares the swimlane and its backpressure mode is Lossy). So normal backpressure of ingestion will cause latency spikes on bifrost or worse, get it completely stuck due to head of line blocking (when pp leader service pushes back on ingestion).
There was a problem hiding this comment.
Eventually, it might make sense to split ingress from shuffle communication if we want to give priority to shuffle (some time in the future).
There was a problem hiding this comment.
Agreed. It'll be possible to somewhat avoid the head-of-line problem when we support native streams (by leveraging h2 streams on the swimlane)
43c2622 to
fab004a
Compare
Fixes #4879 Summary: Re-enable cross-batch pipelining on the ingestion path (Kafka ingress and shuffle forwarding to partition leaders), which was disabled by the conservative #4810 fix that capped each partition session at one unacknowledged batch in flight (throughput ~ batch_size / RTT). A session may now keep multiple batches in flight while still guaranteeing that a producer's records reach the partition log in produced order. Ordering is preserved by pinning every in-flight request to the leader epoch the client last observed and processing replies strictly head-first: - The client tags each IngestRequest with `target_leader_epoch`. - A leader rejects any request whose epoch != its own with the new `NotLeaderWithEpoch` response, so a leadership change rejects the entire in-flight pipeline atomically rather than appending a later batch ahead of a rejected earlier one (the out-of-order append that the dedup high-water-mark would silently drop, #4810). - On any rejection/error the client carries over the head and everything behind it, then replays them in order against the new epoch. Introduces protocol version V4 to distinguish pipelining-capable (V4) leaders from sequential-only (<=V3) ones; the client falls back to one-batch-at-a-time against <=V3 peers. A V4 leader rejects ingestion from pre-1.7 clients that send no epoch (with back-pressure) to avoid data loss during rolling upgrades. Protocol / types: - Add ProtocolVersion::V4 and bump CURRENT_PROTOCOL_VERSION; route Store and Append encode/decode through V3|V4. - Add `target_leader_epoch` to IngestRequest/ReceivedIngestRequest and the NotLeaderWithEpoch ResponseStatus variant (NotLeader retained for <=V3 wire-compat). Worker: - `LeadershipState::handle_ingest_request` checks the request epoch, replies NotLeaderWithEpoch on follower/epoch-mismatch, rejects epoch-less old clients, and moves the ingestion request size/len metrics here. Client session: - Split into `connected_pipelining` (V4) and `connected_sequential_mode` (<=V3); carry_over is now a queue of formed batches replayed in order; `connect()` waits for and pins the observed leader epoch. Review fixes folded in: - Sequential mode now flushes the chunker's over-pulled record into carry_over on every exit path instead of dropping it (was a spurious cancellation / record loss). - Keep last_seen_leader_epoch monotonic via max() on reconnect. - Replace the obsolete single-in-flight #4810 regression tests with tests for the new invariant: batches pipeline before ack, and in-flight batches replay in order at the new epoch after a NotLeaderWithEpoch. - Correct the now-stale ordering comments; fix message/typo cosmetics. Adds a release note documenting the throughput improvement and the rolling- upgrade requirement (upgrade ingestion clients alongside/after leaders).
Fixes #4879
Summary:
Re-enable cross-batch pipelining on the ingestion path (Kafka ingress and
shuffle forwarding to partition leaders), which was disabled by the
conservative #4810 fix that capped each partition session at one
unacknowledged batch in flight (throughput ~ batch_size / RTT).
A session may now keep multiple batches in flight while still guaranteeing
that a producer's records reach the partition log in produced order. Ordering
is preserved by pinning every in-flight request to the leader epoch the client
last observed and processing replies strictly head-first:
target_leader_epoch.NotLeaderWithEpochresponse, so a leadership change rejects the entirein-flight pipeline atomically rather than appending a later batch ahead of
a rejected earlier one (the out-of-order append that the dedup
high-water-mark would silently drop, KafkaTest => callObjectHandler flaky #4810).
behind it, then replays them in order against the new epoch.
Introduces protocol version V4 to distinguish pipelining-capable (V4) leaders
from sequential-only (<=V3) ones; the client falls back to one-batch-at-a-time
against <=V3 peers. A V4 leader rejects ingestion from pre-1.7 clients that
send no epoch (with back-pressure) to avoid data loss during rolling upgrades.
Protocol / types:
Append encode/decode through V3|V4.
target_leader_epochto IngestRequest/ReceivedIngestRequest and theNotLeaderWithEpoch ResponseStatus variant (NotLeader retained for <=V3
wire-compat).
Worker:
LeadershipState::handle_ingest_requestchecks the request epoch, repliesNotLeaderWithEpoch on follower/epoch-mismatch, rejects epoch-less old
clients, and moves the ingestion request size/len metrics here.
Client session:
connected_pipelining(V4) andconnected_sequential_mode(<=V3); carry_over is now a queue of formed batches replayed in order;
connect()waits for and pins the observed leader epoch.Review fixes folded in:
on every exit path instead of dropping it (was a spurious cancellation /
record loss).
the new invariant: batches pipeline before ack, and in-flight batches replay
in order at the new epoch after a NotLeaderWithEpoch.
Adds a release note documenting the throughput improvement and the rolling-
upgrade requirement (upgrade ingestion clients alongside/after leaders).