diff --git a/Cargo.lock b/Cargo.lock index 0e56abcdcf..76db9cf2df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7658,6 +7658,7 @@ version = "1.7.0-rc.2" dependencies = [ "bytes", "dashmap", + "derive_builder", "futures", "googletest", "pin-project", diff --git a/crates/bifrost/src/background_appender.rs b/crates/bifrost/src/background_appender.rs index c719495e43..bbc713f02a 100644 --- a/crates/bifrost/src/background_appender.rs +++ b/crates/bifrost/src/background_appender.rs @@ -357,6 +357,26 @@ impl LogSender { Ok(()) } + /// Waits for capacity on the channel and returns an error if the appender is + /// draining or drained. + /// + /// Unlike [`enqueue`](Self::enqueue), this does not check the record size and + /// accepts a record of any size. + /// + /// Callers have to ensure that record is not larger than the network message size limit. + pub async fn enqueue_unchecked(&mut self, record: A) -> Result<(), EnqueueError> + where + A: Into>, + { + let Ok(permit) = self.tx.reserve().await else { + return Err(EnqueueError::Closed(record)); + }; + let record = record.into().into_record().ensure_encoded(&mut self.arena); + permit.send(AppendOperation::Enqueue(record)); + + Ok(()) + } + /// Attempt to enqueue a record to the appender. Returns immediately if the /// appender is pushing back or if the appender is draining or drained. /// diff --git a/crates/bifrost/src/error.rs b/crates/bifrost/src/error.rs index f0fa7c0e16..ff2a705553 100644 --- a/crates/bifrost/src/error.rs +++ b/crates/bifrost/src/error.rs @@ -67,6 +67,18 @@ pub enum EnqueueError { }, } +impl EnqueueError { + pub fn drop_payload(self) -> EnqueueError<()> { + match self { + Self::Closed(_) => EnqueueError::Closed(()), + Self::Full(_) => EnqueueError::Full(()), + Self::RecordTooLarge { record_size, limit } => { + EnqueueError::RecordTooLarge { record_size, limit } + } + } + } +} + #[derive(Clone, Debug, thiserror::Error)] pub enum AdminError { #[error("log {0} is permanently sealed")] diff --git a/crates/bifrost/src/lib.rs b/crates/bifrost/src/lib.rs index 677626f085..45ab9d1734 100644 --- a/crates/bifrost/src/lib.rs +++ b/crates/bifrost/src/lib.rs @@ -29,7 +29,7 @@ pub use appender::Appender; pub use background_appender::{AppenderHandle, BackgroundAppender, CommitToken, LogSender}; pub use bifrost::{Bifrost, ErrorRecoveryStrategy}; pub use bifrost_admin::{BifrostAdmin, MaybeSealedSegment}; -pub use error::{Error, Result}; +pub use error::{EnqueueError, Error, Result}; pub use read_stream::LogReadStream; pub use read_stream_registry::ActiveReadStreamRegistry; pub use record::{InputRecord, LogEntry, MaybeRecord, RecordKind}; diff --git a/crates/core/src/partitions.rs b/crates/core/src/partitions.rs index 87ebc7e587..039d1ebdd9 100644 --- a/crates/core/src/partitions.rs +++ b/crates/core/src/partitions.rs @@ -118,4 +118,8 @@ impl PartitionRouting { .expect("partition replica set states are never dropped while in use"); *state } + + pub fn partition_replica_set_state(&self) -> &PartitionReplicaSetStates { + &self.partition_replica_set_states + } } diff --git a/crates/core/src/task_center/task_kind.rs b/crates/core/src/task_center/task_kind.rs index 4888f73929..06299a5c41 100644 --- a/crates/core/src/task_center/task_kind.rs +++ b/crates/core/src/task_center/task_kind.rs @@ -158,6 +158,8 @@ pub enum TaskKind { LogStoreWriter, // - Datafusion DfScanner, + #[strum(props(runtime = "default"))] + IngestionSession, } impl TaskKind { diff --git a/crates/ingestion-client/Cargo.toml b/crates/ingestion-client/Cargo.toml index e2cf7af7b4..3eed368ccd 100644 --- a/crates/ingestion-client/Cargo.toml +++ b/crates/ingestion-client/Cargo.toml @@ -7,6 +7,9 @@ rust-version.workspace = true license.workspace = true publish = false +[features] +test-util = [] + [dependencies] restate-core = { workspace = true } @@ -15,6 +18,7 @@ restate-workspace-hack = { workspace = true } bytes = { workspace = true } dashmap = { workspace = true } +derive_builder = { workspace = true } futures = { workspace = true } pin-project = { workspace = true } thiserror = { workspace = true } diff --git a/crates/ingestion-client/src/chunks_size.rs b/crates/ingestion-client/src/chunks_size.rs index 80021f18b1..220e87be04 100644 --- a/crates/ingestion-client/src/chunks_size.rs +++ b/crates/ingestion-client/src/chunks_size.rs @@ -30,6 +30,10 @@ impl ChunksSize where F: Fn(&S::Item) -> usize, { + pub fn new(stream: S, max_size: usize, size_fn: F) -> Self { + Self::with_buffered(stream, max_size, size_fn, Vec::default()) + } + /// Creates a chunker, pre-seeding the accumulation buffer with `buffered` items so they lead the /// next emitted chunk. Pass an empty `Vec` to start fresh. Seeding is used to carry a previously /// over-pulled item across a recreated `ChunksSize` (e.g. after a reconnect) without losing it or diff --git a/crates/ingestion-client/src/client.rs b/crates/ingestion-client/src/client.rs index d91f392716..10378a8501 100644 --- a/crates/ingestion-client/src/client.rs +++ b/crates/ingestion-client/src/client.rs @@ -39,6 +39,8 @@ pub enum IngestionError { Closed(&'static str), #[error(transparent)] PartitionTableError(#[from] PartitionTableError), + #[error("Record of size {size} bytes exceeds maximum allowed size of {limit} bytes")] + RecordMaxSizeExceeded { size: usize, limit: usize }, } /// High-level ingestion entry point that allocates permits and hands out session handles per partition. @@ -78,7 +80,7 @@ impl IngestionClient { partition_table: Live, partition_routing: PartitionRouting, memory_budget: NonZeroUsize, - opts: Option, + opts: SessionOptions, ) -> Self { Self { manager: SessionManager::new(networking, partition_routing, opts), @@ -117,6 +119,13 @@ where ) -> IngestFuture { let record = record.into().into_record(&mut self.arena); + if record.estimate_size() > self.manager.options().record_size_limit.get() { + return IngestFuture::error(IngestionError::RecordMaxSizeExceeded { + size: record.estimate_size(), + limit: self.manager.options().record_size_limit.get(), + }); + } + let budget = record.estimate_size().min(self.memory_budget.get()); let partition_id = match self @@ -280,7 +289,7 @@ mod test { }; use restate_types::{ - Version, + GenerationalNodeId, Version, identifiers::{LeaderEpoch, PartitionId}, net::{ self, RpcRequest, @@ -300,6 +309,20 @@ mod test { ) -> ( ServiceStream, IngestionClient, + ) { + let (incoming, client, _states, _my_node_id) = init_env_with_states(batch_size).await; + (incoming, client) + } + + /// Like [`init_env`] but also returns the partition replica-set states (so tests can simulate + /// leadership changes via `note_observed_leader`) and the node id acting as leader. + async fn init_env_with_states( + batch_size: usize, + ) -> ( + ServiceStream, + IngestionClient, + PartitionReplicaSetStates, + GenerationalNodeId, ) { let mut builder = TestCoreEnvBuilder::with_incoming_only_connector() .add_mock_nodes_config() @@ -327,20 +350,20 @@ mod test { let incoming = svc.start(); + let my_node_id = builder.my_node_id; let env = builder.build().await; let client = IngestionClient::new( env.networking, env.metadata.updateable_partition_table(), - PartitionRouting::new(partition_replica_set_states, TaskCenter::current()), + PartitionRouting::new(partition_replica_set_states.clone(), TaskCenter::current()), NonZeroUsize::new(10 * 1024 * 1024).unwrap(), // 10MB - SessionOptions { - batch_size, - ..Default::default() - } - .into(), + SessionOptions::builder() + .batch_size(NonZeroUsize::new(batch_size).unwrap()) + .build() + .unwrap(), ); - (incoming, client) + (incoming, client, partition_replica_set_states, my_node_id) } async fn must_next( @@ -469,101 +492,145 @@ mod test { ); } - // Regression test for record loss caused by out-of-order commits (issue #4810). - // - // The partition processor checks leadership *per* ingest RPC, so on a single connection a - // leadership transition can return `NotLeader` for an earlier batch (which is therefore not - // appended) while a later batch is appended and acked. The dedup mechanism uses a monotonic - // high-water-mark per producer, so once the later (higher sequence number) records are applied - // the earlier ones get permanently dropped as "outdated". This is only possible if the session - // keeps more than one batch in flight per partition. This test pins that the session does not - // pipeline a second batch before the head batch has been acknowledged. + // Pipelining is restored (#4879): on a V4 connection the session keeps multiple batches in + // flight at once. Ordering no longer relies on an at-most-one-in-flight cap — see the invariant + // documented in `connected_pipelining`. This pins that a second batch reaches the wire before + // the head batch is acknowledged, and that every batch carries the leader epoch. #[test(restate_core::test(start_paused = true))] - async fn does_not_pipeline_unacked_batches() { + async fn pipelines_multiple_unacked_batches() { + let mut buf = BytesMut::new(); let (mut incoming, mut client) = init_env(1024).await; - // Ingest the head record; the session sends it as its own batch because it is the only - // record available at the time the batch is formed. - let _c0 = client.ingest(0, InputRecord::from_str("r0")).await.unwrap(); + // Ingest the head record; it is sent as its own batch (the only record available when the + // batch is formed). + let c0 = client.ingest(0, InputRecord::from_str("r0")).await.unwrap(); let head = must_next(&mut incoming).await; let (head_rx, head_body) = head.split(); - assert_that!(head_body.records, len(eq(1))); - - // Ingest a second record to the same partition while the head batch is still unacked. - let _c1 = client.ingest(0, InputRecord::from_str("r1")).await.unwrap(); - - // Let the session task run to completion. Under `start_paused`, once all tasks are parked - // the runtime auto-advances time to fire this sleep, so afterwards the session has done all - // the work it could: if it were going to pipeline the second batch, it would have by now. - tokio::time::sleep(Duration::from_secs(1)).await; + assert_that!( + head_body.records, + all!( + len(eq(1)), + contains(eq(InputRecord::from_str("r0").into_record(&mut buf))) + ) + ); + assert_eq!(head_body.target_leader_epoch, Some(LeaderEpoch::INITIAL)); - // The session must not have put a second batch on the wire before the head was acked. - let mut next = std::pin::pin!(must_next(&mut incoming)); - assert!( - (&mut next).now_or_never().is_none(), - "session pipelined a second batch before the head batch was acknowledged" + // Ingest a second record while the head is still unacked. With pipelining the session puts + // it on the wire without waiting for the head's acknowledgement. + let c1 = client.ingest(0, InputRecord::from_str("r1")).await.unwrap(); + let tail = must_next(&mut incoming).await; + let (tail_rx, tail_body) = tail.split(); + assert_that!( + tail_body.records, + all!( + len(eq(1)), + contains(eq(InputRecord::from_str("r1").into_record(&mut buf))) + ) ); + assert_eq!(tail_body.target_leader_epoch, Some(LeaderEpoch::INITIAL)); - // Acking the head unblocks the next batch, which now carries the second record in order. + // Acknowledge both in order; both records commit. head_rx.send(ResponseStatus::Ack.into()); - let tail = next.await; - let (tail_rx, tail_body) = tail.split(); - assert_that!(tail_body.records, len(eq(1))); tail_rx.send(ResponseStatus::Ack.into()); + c0.await.expect("r0 commits"); + c1.await.expect("r1 commits"); } - // Regression test for the cross-reconnect variant of #4810 (found reviewing PR #4880). - // - // `ChunksSize` must read one record past a full batch to detect the cap boundary, so it buffers - // that record internally. On a reconnect the buffered record must not become a *second* in-flight - // batch (which `replay()` would pipeline, recreating the out-of-order append and record loss). - // It is carried over into the next batch and sent only after the head is acknowledged, and it is - // never failed/cancelled (the Kafka ingress and shuffle have no cheap retry). + // Regression test for #4810 under restored pipelining (#4879): a leadership transition must not + // reorder records. With two batches in flight, a `NotLeaderWithEpoch` on the head causes the + // session to carry over *all* in-flight batches and replay them — in produced order — against + // the new epoch. Nothing is failed/cancelled (the Kafka ingress and shuffle have no cheap + // retry), so out-of-order appends that the dedup high-water-mark would silently drop cannot + // happen. #[test(restate_core::test(start_paused = true))] - async fn reconnect_carries_over_buffered_records() { - // Cap fits exactly one record, so the second record overflows and is buffered by the chunker. + async fn leadership_change_replays_inflight_in_order() { + // Cap fits exactly one record, so r0 and r1 form two separate batches. let mut buf = BytesMut::new(); let one_record = InputRecord::from_str("r0") .into_record(&mut buf) .estimate_size(); - let (mut incoming, mut client) = init_env(one_record).await; + let (mut incoming, mut client, states, my_node_id) = init_env_with_states(one_record).await; - // Queue both records before the session forms a batch: `ingest().await` does not yield to the - // session task, so both are in the channel when the chunker first runs (during the sleep). + // Queue both records before the session forms a batch: `ingest().await` does not yield to + // the session task, so both are in the channel when the chunker first runs. let c0 = client.ingest(0, InputRecord::from_str("r0")).await.unwrap(); let c1 = client.ingest(0, InputRecord::from_str("r1")).await.unwrap(); tokio::time::sleep(Duration::from_secs(1)).await; - // The chunker emits B1=[r0] and buffers r1. - let head = must_next(&mut incoming).await; - let (head_rx, head_body) = head.split(); - assert_that!(head_body.records, len(eq(1))); + // Both batches are pipelined: B1=[r0] then B2=[r1], each at the initial epoch. + let b1 = must_next(&mut incoming).await; + let (b1_rx, b1_body) = b1.split(); + assert_that!( + b1_body.records, + all!( + len(eq(1)), + contains(eq(InputRecord::from_str("r0").into_record(&mut buf))) + ) + ); + assert_eq!(b1_body.target_leader_epoch, Some(LeaderEpoch::INITIAL)); + + let b2 = must_next(&mut incoming).await; + let (_b2_rx, b2_body) = b2.split(); + assert_that!( + b2_body.records, + all!( + len(eq(1)), + contains(eq(InputRecord::from_str("r1").into_record(&mut buf))) + ) + ); - // A NotLeader on the head triggers reconnect + replay. The buffered r1 must be carried over, - // not turned into a second in-flight batch. - head_rx.send(ResponseStatus::NotLeader { of: 0.into() }.into()); + // A new leader wins the election. Make routing observe the new epoch so the session can + // reconnect, then reject the head with the new epoch. + let new_epoch = LeaderEpoch::INITIAL.next(); + let leadership_state = LeadershipState { + current_leader: my_node_id, + current_leader_epoch: new_epoch, + }; + states.note_observed_leader(PartitionId::from(0), leadership_state); + + b1_rx.send( + ResponseStatus::NotLeaderWithEpoch { + of: 0.into(), + last_seen_leadership_state: leadership_state, + } + .into(), + ); + // Dropping `_b2_rx` here would not matter: replies are consumed head-first, so the session + // never observes B2's outcome before reconnecting. tokio::time::sleep(Duration::from_secs(1)).await; - // Replay re-sends only the head (r0); r1 is buffered, not in flight. - let head = must_next(&mut incoming).await; - let (head_rx, head_body) = head.split(); - assert_that!(head_body.records, len(eq(1))); + // Replay re-sends B1 then B2 in produced order, now targeting the new epoch. + let b1 = must_next(&mut incoming).await; + let (b1_rx, b1_body) = b1.split(); + assert_that!( + b1_body.records, + all!( + len(eq(1)), + contains(eq(InputRecord::from_str("r0").into_record(&mut buf))) + ) + ); + assert_eq!(b1_body.target_leader_epoch, Some(new_epoch)); - let mut next = std::pin::pin!(must_next(&mut incoming)); - assert!( - (&mut next).now_or_never().is_none(), - "session put a second batch in flight after reconnect" + let b2 = must_next(&mut incoming).await; + let (b2_rx, b2_body) = b2.split(); + assert_that!( + b2_body.records, + all!( + len(eq(1)), + contains(eq(InputRecord::from_str("r1").into_record(&mut buf))) + ) ); + assert_eq!(b2_body.target_leader_epoch, Some(new_epoch)); - // Acking the head releases the carried-over r1 as the next batch. - head_rx.send(ResponseStatus::Ack.into()); - let tail = next.await; - let (tail_rx, tail_body) = tail.split(); - assert_that!(tail_body.records, len(eq(1))); - tail_rx.send(ResponseStatus::Ack.into()); + b1_rx.send(ResponseStatus::Ack.into()); + b2_rx.send(ResponseStatus::Ack.into()); - // Nothing was failed/cancelled: both records eventually commit. + // Nothing was failed/cancelled: both records eventually commit, in order. c0.await.expect("r0 commits"); c1.await.expect("r1 commits"); } + + // Note: the ≤V3 sequential fallback (`connected_sequential_mode`) cannot be exercised here + // because the in-process test connection always negotiates `CURRENT_PROTOCOL_VERSION` (V4). Its + // carry-over/remainder handling mirrors the pipelining path covered above. } diff --git a/crates/ingestion-client/src/lib.rs b/crates/ingestion-client/src/lib.rs index fb7a19193d..811ae98076 100644 --- a/crates/ingestion-client/src/lib.rs +++ b/crates/ingestion-client/src/lib.rs @@ -13,4 +13,6 @@ mod client; mod session; pub use client::{IngestFuture, IngestionClient, IngestionError}; -pub use session::{CancelledError, RecordCommit, SessionClosed, SessionOptions}; +pub use session::{ + CancelledError, RecordCommit, SessionClosed, SessionOptions, SessionOptionsBuilder, +}; diff --git a/crates/ingestion-client/src/session.rs b/crates/ingestion-client/src/session.rs index 2f06a3cd12..5fc77beb9b 100644 --- a/crates/ingestion-client/src/session.rs +++ b/crates/ingestion-client/src/session.rs @@ -8,14 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::{collections::VecDeque, sync::Arc, time::Duration}; +use std::{collections::VecDeque, num::NonZeroUsize, sync::Arc, time::Duration}; use dashmap::DashMap; use futures::{FutureExt, StreamExt, future::OptionFuture, ready}; use tokio::sync::{OwnedSemaphorePermit, mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_util::sync::CancellationToken; -use tracing::{debug, trace}; +use tracing::{debug, instrument, trace, warn}; use restate_core::{ TaskCenter, TaskKind, @@ -26,13 +26,22 @@ use restate_core::{ partitions::PartitionRouting, }; use restate_types::{ - identifiers::PartitionId, - net::ingest::{IngestRecord, IngestRequest, IngestResponse, ResponseStatus}, + identifiers::{LeaderEpoch, PartitionId}, + net::{ + ProtocolVersion, + ingest::{IngestRecord, IngestRequest, IngestResponse, ResponseStatus}, + }, + partitions::state::LeadershipState, retries::RetryPolicy, }; use crate::chunks_size::ChunksSize; +#[cfg(any(test, feature = "test-util"))] +const DEFAULT_MAX_RECORD_SIZE: usize = 32 * 1024 * 1024; // 32MiB + +type InflightQueue = VecDeque<(IngestionBatch, Option>)>; + /// Error returned when attempting to use a session that has already been closed. #[derive(Clone, Copy, Debug, thiserror::Error)] #[error("Partition session is closed")] @@ -123,8 +132,6 @@ impl RecordCommitResolver { struct IngestionBatch { records: Arc<[IngestRecord]>, resolvers: Vec, - - reply_rx: Option>, } impl IngestionBatch { @@ -132,11 +139,7 @@ impl IngestionBatch { let (resolvers, records): (Vec<_>, Vec<_>) = batch.into_iter().unzip(); let records: Arc<[IngestRecord]> = Arc::from(records); - Self { - records, - resolvers, - reply_rx: None, - } + Self { records, resolvers } } /// Marks every tracked record in the batch as committed. @@ -152,25 +155,45 @@ impl IngestionBatch { } /// Tunable parameters for batching and networking behaviour of partition sessions. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, derive_builder::Builder)] +#[builder(pattern = "owned")] pub struct SessionOptions { /// Maximum batch size in `bytes` - pub batch_size: usize, + #[builder(default=NonZeroUsize::new(50 * 1024).unwrap())] + pub(crate) batch_size: NonZeroUsize, /// Connection retry policy /// Retry policy must be infinite (retries forever) /// If not, the retry will fallback to 2 seconds intervals - pub connection_retry_policy: RetryPolicy, + #[builder(default=RetryPolicy::exponential( + Duration::from_millis(250), + 2.0, + None, + Some(Duration::from_secs(3)), + ))] + pub(crate) connection_retry_policy: RetryPolicy, + /// Maximum allowed record size + #[cfg_attr(any(test, feature = "test-util"), builder(default=NonZeroUsize::new(DEFAULT_MAX_RECORD_SIZE).unwrap()))] + pub(crate) record_size_limit: NonZeroUsize, /// Connection swimlane - pub swimlane: Swimlane, + #[cfg_attr(any(test, feature = "test-util"), builder(default=Swimlane::General))] + pub(crate) swimlane: Swimlane, +} + +impl SessionOptions { + pub fn builder() -> SessionOptionsBuilder { + SessionOptionsBuilder::default() + } } +#[cfg(any(test, feature = "test-util"))] impl Default for SessionOptions { fn default() -> Self { Self { // The default batch size of 50KB is to avoid // overwhelming the PP on the hot path. - batch_size: 50 * 1024, // 50 KB + batch_size: NonZeroUsize::new(50 * 1024).unwrap(), // 50 KB swimlane: Swimlane::IngressData, + record_size_limit: NonZeroUsize::new(DEFAULT_MAX_RECORD_SIZE).unwrap(), connection_retry_policy: RetryPolicy::exponential( Duration::from_millis(10), 2.0, @@ -205,7 +228,8 @@ impl SessionHandle { enum SessionState { Connecting, - Connected { connection: Connection }, + ConnectedSequentialMode { connection: Connection }, + ConnectedPipeliningMode { connection: Connection }, Shutdown, } @@ -218,11 +242,12 @@ pub struct PartitionSession { opts: SessionOptions, rx: UnboundedReceiverStream<(RecordCommitResolver, IngestRecord)>, tx: mpsc::UnboundedSender<(RecordCommitResolver, IngestRecord)>, - inflight: VecDeque, - // Records pulled from `rx` while detecting a batch boundary but not yet sent. Carried across - // reconnects and fed back into the chunker so they lead the next batch, instead of becoming a - // second in-flight batch (which would let `replay()` append records out of order). See #4810. - carry_over: Vec<(RecordCommitResolver, IngestRecord)>, + // Batches that were unacked (or never sent) when the previous connection ended, kept in + // produced order. On the next connection they are re-sent ahead of any newly chunked batch so + // records reach the partition log in the order they were produced. Includes the record the + // chunker over-pulled to detect a batch boundary, wrapped as its own trailing batch. + carry_over: VecDeque, + target_leader_epoch: LeaderEpoch, } impl PartitionSession { @@ -242,10 +267,10 @@ impl PartitionSession { partition_routing, networking, opts, - inflight: Default::default(), rx, tx, - carry_over: Vec::new(), + carry_over: VecDeque::default(), + target_leader_epoch: LeaderEpoch::INITIAL, } } @@ -269,26 +294,25 @@ where { /// Runs the session state machine until shut down, reacting to cancellation and connection errors. pub async fn start(self, cancellation: CancellationToken) { + let partition_id = self.partition; debug!( - partition_id = %self.partition, + partition_id = %partition_id, "Starting ingestion partition session", ); cancellation.run_until_cancelled(self.run_inner()).await; + debug!("Ingestion session for partition {partition_id} stopped"); } /// Runs the session state machine until shut down, reacting to cancellation and connection errors. async fn run_inner(mut self) { let mut state = SessionState::Connecting; - debug!( - partition_id = %self.partition, - "Starting ingestion partition session", - ); loop { state = match state { SessionState::Connecting => { - let mut retry = self.opts.connection_retry_policy.iter(); + let retry_policy = self.opts.connection_retry_policy.clone(); + let mut retry = retry_policy.iter(); loop { match self.connect().await { Some(state) => break state, @@ -296,13 +320,20 @@ where // retry // this assumes that retry policy is infinite. If it's not it falls back // to a fixed 2 seconds sleep between retries - tokio::time::sleep(retry.next().unwrap_or(Duration::from_secs(2))) + tokio::time::sleep(retry.next().unwrap_or(Duration::from_secs(3))) .await; } } } } - SessionState::Connected { connection } => self.connected(connection).await, + SessionState::ConnectedSequentialMode { connection } => { + self.connected_sequential_mode(connection).await; + SessionState::Connecting + } + SessionState::ConnectedPipeliningMode { connection } => { + self.connected_pipelining(connection).await; + SessionState::Connecting + } SessionState::Shutdown => { self.rx.close(); break; @@ -311,11 +342,27 @@ where } } - async fn connect(&self) -> Option { + async fn connect(&mut self) -> Option { + // Block until routing has observed a leader epoch at least as new as the one we're + // targeting. `target_leader_epoch` starts at `INITIAL` and is bumped whenever a + // `NotLeaderWithEpoch` response tells us a newer leader has won (see + // `note_leadership_state`). Waiting on it here ensures that after a leadership change we + // don't reconnect to the stale leader; instead we hold off until the cluster has + // independently propagated the new leadership state into routing. + let leader_epoch = self + .partition_routing + .wait_for_leader_epoch(self.partition, self.target_leader_epoch) + .await + .current_leader_epoch; + + // Resolve the node separately: routing returns the current leader if known, otherwise it + // falls back to a live replica-set member. let node_id = self .partition_routing .get_node_by_partition(self.partition)?; + self.target_leader_epoch = leader_epoch; + let result = self .networking .get_connection(node_id, self.opts.swimlane) @@ -323,114 +370,273 @@ where match result { Ok(connection) => { - debug!("Connection established to node {node_id}"); - Some(SessionState::Connected { connection }) + debug!("Connection established to node {}", node_id); + if connection.protocol_version() <= ProtocolVersion::V3 { + Some(SessionState::ConnectedSequentialMode { connection }) + } else { + Some(SessionState::ConnectedPipeliningMode { connection }) + } } Err(ConnectError::Shutdown(_)) => Some(SessionState::Shutdown), Err(err) => { - debug!("Failed to connect to node {node_id}: {err}"); + debug!("Failed to connect to node {}: {err}", node_id); None } } } - /// Re-sends all inflight batches after a connection is restored. - async fn replay(&mut self, connection: &Connection) -> Result<(), ConnectionClosed> { - // todo(azmy): to avoid all the inflight batches again and waste traffic - // maybe test the connection first by sending an empty batch and wait for response - // before proceeding? + #[instrument( + level="debug", + skip_all, + name="connected", + fields( + mode="sequential", + partition=%self.partition, + leader_epoch=%self.target_leader_epoch, + ) + )] + async fn connected_sequential_mode(&mut self, connection: Connection) { + // replay carry over one by one + debug!("Carry-over batches: {}", self.carry_over.len()); - let total = self.inflight.iter().fold(0, |v, i| v + i.len()); - trace!( - partition = %self.partition, - batches = self.inflight.len(), - records = total, - "Replaying inflight records after connection was restored" - ); + while let Some(batch) = self.carry_over.front() { + let records = Arc::clone(&batch.records); - for batch in self.inflight.iter_mut() { let Some(permit) = connection.reserve().await else { - return Err(ConnectionClosed); + return; }; - // resend batch + trace!("Sending ingest batch, len: {}", records.len()); + // Nodes running protocol version V3 doesn't support + // target_leader_epoch. We mainly sending this to be + // able to use sequential mode against nodes with >= V4 + // if needed let reply_rx = permit .send_rpc( - IngestRequest::from(Arc::clone(&batch.records)), + IngestRequest { + records, + target_leader_epoch: Some(self.target_leader_epoch), + }, Some(self.partition.into()), ) .expect("encoding version to match"); - batch.reply_rx = Some(reply_rx); + + match reply_rx.await.map(|r| r.status) { + Ok(ResponseStatus::Ack) => { + // remove committed batch from carry_over + let batch = self.carry_over.pop_front().unwrap(); + batch.committed(); + } + Ok(ResponseStatus::NotLeaderWithEpoch { + last_seen_leadership_state, + .. + }) => { + // Note that this error kind is never returned + // by partition processor < v1.7. This is mainly + // defensive coding, but also allows us to switch + // to sequential mode for protocol version >= V4 + // if needed. + self.target_leader_epoch + .note_leadership_state(&last_seen_leadership_state); + + return; + } + Ok(ResponseStatus::Internal { msg }) => { + warn!("Ingestion internal error from {}: {msg}", connection.peer()); + return; + } + Ok(response) => { + // Handle any other response code as a connection loss + // and retry all inflight batches. + debug!( + "Ingestion response from {}: {:?}", + connection.peer(), + response + ); + + return; + } + Err(err) => { + // we can assume that for any error + // we need to retry all the inflight batches. + // special case for load shedding we could + // throttle the stream a little bit then + // speed up over a period of time. + debug!("Ingestion error from {}: {}", connection.peer(), err); + return; + } + } } - Ok(()) - } + debug_assert!(self.carry_over.is_empty()); + + let mut chunked = ChunksSize::new(&mut self.rx, self.opts.batch_size.get(), |(_, item)| { + item.estimate_size() + }); - async fn connected(&mut self, connection: Connection) -> SessionState { - if self.replay(&connection).await.is_err() { - return SessionState::Connecting; + while let Some(batch) = chunked.next().await { + let batch = IngestionBatch::new(batch); + let records = Arc::clone(&batch.records); + + let Some(permit) = connection.reserve().await else { + self.carry_over.push_back(batch); + break; + }; + + trace!("Sending ingest batch, len: {}", records.len()); + let reply_rx = permit + .send_rpc( + IngestRequest { + records, + target_leader_epoch: Some(self.target_leader_epoch), + }, + Some(self.partition.into()), + ) + .expect("encoding version to match"); + + match reply_rx.await.map(|r| r.status) { + Ok(ResponseStatus::Ack) => { + batch.committed(); + } + Ok(ResponseStatus::NotLeaderWithEpoch { + last_seen_leadership_state, + .. + }) => { + // Note that this error kind is never returned + // by partition processor < v1.7. This is mainly + // defensive coding, but also allows us to switch + // to sequential mode for protocol version >= V4 + // if needed. + self.target_leader_epoch + .note_leadership_state(&last_seen_leadership_state); + + self.carry_over.push_back(batch); + break; + } + Ok(ResponseStatus::Internal { msg }) => { + warn!("Ingestion internal error from {}: {msg}", connection.peer()); + self.carry_over.push_back(batch); + break; + } + Ok(response) => { + // Handle any other response code as a connection loss + // and retry all inflight batches. + debug!( + "Ingestion response from {}: {:?}", + connection.peer(), + response + ); + self.carry_over.push_back(batch); + break; + } + Err(err) => { + // we can assume that for any error + // we need to retry all the inflight batches. + // special case for load shedding we could + // throttle the stream a little bit then + // speed up over a period of time. + debug!("Ingestion error from {}: {}", connection.peer(), err); + self.carry_over.push_back(batch); + break; + } + } } - // Seed the chunker with any records over-pulled but not sent on the previous connection so - // they lead the next batch (in produced order) rather than being lost or reordered. - let mut chunked = ChunksSize::with_buffered( - &mut self.rx, - self.opts.batch_size, - |(_, item)| item.estimate_size(), - std::mem::take(&mut self.carry_over), - ); + // Carry over the record the chunker over-pulled to detect the batch boundary so it leads + // the next connection's first batch instead of being dropped (which would surface to the + // caller as a spurious cancellation). Mirrors the pipelining path. + let remainder = chunked.into_remainder(); + if !remainder.is_empty() { + self.carry_over.push_back(IngestionBatch::new(remainder)); + } + } + + #[instrument( + level="debug", + skip_all, + name="connected", + fields( + mode="pipelining", + partition=%self.partition, + leader_epoch=%self.target_leader_epoch, + ) + )] + async fn connected_pipelining(&mut self, connection: Connection) { + debug!("Carry-over batches: {}", self.carry_over.len(),); + + let Ok(mut inflight) = self.replay(&connection).await else { + return; + }; - let state = loop { - debug_assert!( - self.inflight.len() <= 1, - "ingestion session must keep at most one batch in flight" - ); - - // Only pull a new batch when nothing is in flight. Keeping at most one unacked batch - // per partition guarantees that records reach the partition log in the order they were - // produced: a trailing batch can never be appended before an earlier one that is being - // retried (e.g. after a `NotLeader` response during a leadership transition). Out-of-order - // appends would otherwise be silently dropped by the producer dedup high-water-mark. See - // https://github.com/restatedev/restate/issues/4810. - let can_send_next = self.inflight.is_empty(); - let head: OptionFuture<_> = self - .inflight + debug_assert!(self.carry_over.is_empty()); + + // Carry-over batches were already re-sent in order by `replay()`; new batches are chunked + // fresh from the stream. + let mut chunked = ChunksSize::new(&mut self.rx, self.opts.batch_size.get(), |(_, item)| { + item.estimate_size() + }); + + loop { + // Multiple batches may be in flight at once. Ordering is preserved without an + // at-most-one-in-flight cap because: the connection is FIFO, the leader appends each + // batch sequentially in arrival order, and every in-flight batch carries the same + // `target_leader_epoch` so a leadership change rejects the whole pipeline atomically + // (rather than rejecting an earlier batch while appending a later one, which the dedup + // high-water-mark would then silently drop — see #4810). Replies are consumed strictly + // head-first, so on any rejection the head and everything behind it are carried over and + // replayed in produced order. + let head: OptionFuture<_> = inflight .front_mut() - .and_then(|batch| batch.reply_rx.as_mut()) + .and_then(|(_, reply_rx)| reply_rx.as_mut()) .into(); tokio::select! { - _ = connection.closed() => { - break SessionState::Connecting; - } - Some(batch) = chunked.next(), if can_send_next => { + Some(batch) = chunked.next() => { let batch = IngestionBatch::new(batch); let records = Arc::clone(&batch.records); - self.inflight.push_back(batch); + let batch = inflight.push_back_mut((batch, None)); let Some(permit) = connection.reserve().await else { - break SessionState::Connecting; + break; }; trace!("Sending ingest batch, len: {}", records.len()); let reply_rx = permit - .send_rpc(IngestRequest::from(records), Some(self.partition.into())) - .expect("encoding version to match"); - - self.inflight.back_mut().expect("to exist").reply_rx = Some(reply_rx); + .send_rpc( + IngestRequest{ + records, + target_leader_epoch: Some(self.target_leader_epoch) + }, + Some(self.partition.into()) + ).expect("encoding version to match"); + + batch.1 = Some(reply_rx); } Some(result) = head => { match result.map(|r|r.status) { Ok(ResponseStatus::Ack) => { - let batch = self.inflight.pop_front().expect("not empty"); + let batch = inflight.pop_front().expect("not empty").0; batch.committed(); } + Ok(ResponseStatus::NotLeaderWithEpoch{ + last_seen_leadership_state, + .. + })=> { + self.target_leader_epoch + .note_leadership_state(&last_seen_leadership_state); + + break; + } + Ok(ResponseStatus::Internal{msg}) => { + warn!("Ingestion internal error from {}: {msg}", connection.peer()); + break; + } Ok(response) => { // Handle any other response code as a connection loss // and retry all inflight batches. debug!("Ingestion response from {}: {:?}", connection.peer(), response); - break SessionState::Connecting; + break; } Err(err) => { // we can assume that for any error @@ -438,24 +644,78 @@ where // special case for load shedding we could // throttle the stream a little bit then // speed up over a period of time. - debug!("Ingestion error from {}: {}", connection.peer(), err); - break SessionState::Connecting; + break; } } } } - }; + } - // state == Connecting - assert!(matches!(state, SessionState::Connecting)); + for batch in inflight.into_iter().map(|(batch, _)| batch) { + self.carry_over.push_back(batch); + } + + let remainder = chunked.into_remainder(); + if !remainder.is_empty() { + self.carry_over.push_back(IngestionBatch::new(remainder)); + } + } + + /// Re-sends all inflight batches after a connection is restored. + async fn replay(&mut self, connection: &Connection) -> Result { + debug!( + partition = %self.partition, + batches = self.carry_over.len(), + records = self.carry_over.iter().map(|i| i.len()).sum::(), + "Replaying inflight records after connection was restored" + ); - // Carry the chunker's buffered-but-unsent records over to the next connection instead of - // turning them into a second in-flight batch: `replay()` only resends `inflight`, so keeping - // these here preserves the at-most-one-batch-in-flight invariant while not losing them. - self.carry_over = chunked.into_remainder(); + let mut inflight = InflightQueue::new(); - state + while let Some(batch) = self.carry_over.front() { + let Some(permit) = connection.reserve().await else { + // restore the carry over back to original state. + // maintaining the original order. + // todo(azmy): Avoid resending All the batches by trying to drain + // the reply_rx channel until the first error. + while let Some((batch, _)) = inflight.pop_back() { + self.carry_over.push_front(batch); + } + + return Err(ConnectionClosed); + }; + + // resend batch + let reply_rx = permit + .send_rpc( + IngestRequest { + records: Arc::clone(&batch.records), + target_leader_epoch: Some(self.target_leader_epoch), + }, + Some(self.partition.into()), + ) + .expect("encoding version to match"); + + let batch = self.carry_over.pop_front().unwrap(); + inflight.push_back((batch, Some(reply_rx))); + } + + Ok(inflight) + } +} + +trait LeaderEpochExt { + /// Monotonically bumps the targeted leader epoch to the one carried by a learned leadership + /// state, ignoring stale (older) states. Never lowers the epoch. + fn note_leadership_state(&mut self, state: &LeadershipState); +} + +impl LeaderEpochExt for LeaderEpoch { + fn note_leadership_state(&mut self, state: &LeadershipState) { + if *self < state.current_leader_epoch { + *self = state.current_leader_epoch + } } } @@ -498,13 +758,10 @@ where let handle = session.handle(); let cancellation = self.cancellation.child_token(); - let _ = TaskCenter::spawn( - TaskKind::Background, - "ingestion-partition-session", - async move { - session.start(cancellation).await; - Ok(()) - }, + let _ = TaskCenter::spawn_unmanaged( + TaskKind::IngestionSession, + format!("ingestion-partition-session-p{id}"), + session.start(cancellation), ); handle @@ -514,6 +771,7 @@ where impl Drop for SessionManagerInner { fn drop(&mut self) { self.cancellation.cancel(); + trace!("Session manager cancelled"); } } @@ -528,12 +786,12 @@ impl SessionManager { pub fn new( networking: Networking, partition_routing: PartitionRouting, - opts: Option, + opts: SessionOptions, ) -> Self { let inner = SessionManagerInner { networking, partition_routing, - opts: opts.unwrap_or_default(), + opts, handles: Default::default(), cancellation: CancellationToken::new(), }; @@ -550,6 +808,10 @@ impl SessionManager { pub fn networking(&self) -> &Networking { &self.inner.networking } + + pub fn options(&self) -> &SessionOptions { + &self.inner.opts + } } impl SessionManager diff --git a/crates/ingress-kafka/src/consumer_task.rs b/crates/ingress-kafka/src/consumer_task.rs index e273e79660..7d51cce86b 100644 --- a/crates/ingress-kafka/src/consumer_task.rs +++ b/crates/ingress-kafka/src/consumer_task.rs @@ -44,14 +44,6 @@ use crate::Error; use crate::builder::EnvelopeBuilder; use crate::metric_definitions::{KAFKA_INGRESS_CONSUMER_LAG, KAFKA_INGRESS_REQUESTS}; -impl From for Error { - fn from(value: IngestionError) -> Self { - match value { - IngestionError::Closed(reason) => Self::IngestionClosed(reason.into()), - IngestionError::PartitionTableError(err) => Self::PartitionTableError(err), - } - } -} type MessageConsumer = StreamConsumer>; #[derive(Clone)] @@ -510,7 +502,7 @@ where biased; Some(committed) = Self::head_committed(&mut inflight) => { _ = inflight.pop_front().expect("to exist"); - let offset = committed.map_err(|_| Error::IngestionClosed("commit cancelled".into()))?; + let offset = committed.map_err(|_| Error::IngestionError(IngestionError::Closed("commit cancelled")))?; ingress_request_counter.increment(1); trace!( diff --git a/crates/ingress-kafka/src/lib.rs b/crates/ingress-kafka/src/lib.rs index bae3c2030f..b2185fa16b 100644 --- a/crates/ingress-kafka/src/lib.rs +++ b/crates/ingress-kafka/src/lib.rs @@ -16,8 +16,9 @@ mod subscription_controller; use rdkafka::error::KafkaError; use tokio::sync::mpsc; +use restate_ingestion_client::IngestionError; use restate_types::schema::kafka::KafkaCluster; -use restate_types::{partitions::PartitionTableError, schema::subscriptions::Subscription}; +use restate_types::schema::subscriptions::Subscription; #[derive(Debug)] pub enum Command { @@ -42,10 +43,8 @@ pub enum Error { #[source] cause: anyhow::Error, }, - #[error("Ingress stream is closed: {0}")] - IngestionClosed(Box), - #[error(transparent)] - PartitionTableError(#[from] PartitionTableError), + #[error("Ingress error: {0}")] + IngestionError(#[from] IngestionError), #[error( "Received a message on the main partition queue for topic {0} partition {1} despite partitioned queues" )] diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 808155b82d..20672abc25 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -290,11 +290,19 @@ impl Node { .ingestion .inflight_memory_budget .as_non_zero_usize(), - Some(SessionOptions { - batch_size: config.ingress.ingestion.request_batch_size.as_usize(), - connection_retry_policy: config.ingress.ingestion.connection_retry_policy.clone(), - swimlane: Swimlane::IngressData, - }), + SessionOptions::builder() + .batch_size( + config + .ingress + .ingestion + .request_batch_size + .as_non_zero_usize(), + ) + .record_size_limit(config.ingress.request_size_limit()) + .connection_retry_policy(config.ingress.ingestion.connection_retry_policy.clone()) + .swimlane(Swimlane::IngressData) + .build() + .expect("Ingestion session options to build"), ); // Create a node-level RemoteScannerManager shared across all roles. diff --git a/crates/types/protobuf/restate/common.proto b/crates/types/protobuf/restate/common.proto index 6a0ef133aa..e16b451980 100644 --- a/crates/types/protobuf/restate/common.proto +++ b/crates/types/protobuf/restate/common.proto @@ -22,6 +22,14 @@ enum ProtocolVersion { // Release in v1.6. Support custom `Record` // encoding that uses Bytes instead of PolyBytes V3 = 3; + + // Release in v1.7 + // Ingress client can differentiate v1.6 PPs + // from v1.7 PPs to choose which ingestion mode + // to use. For v1.6 only single inflight batch + // is allowed. In v1.7 multiple inflight batches + // are allowed. + V4 = 4; } message NodeId { diff --git a/crates/types/src/net/ingest.rs b/crates/types/src/net/ingest.rs index c57f32768d..7ad62e0762 100644 --- a/crates/types/src/net/ingest.rs +++ b/crates/types/src/net/ingest.rs @@ -14,11 +14,12 @@ use bytes::Bytes; use restate_encoding::{ArcedSlice, RestateEncoding}; -use crate::identifiers::PartitionId; +use crate::identifiers::{LeaderEpoch, PartitionId}; use crate::logs::{HasRecordKeys, Keys}; use crate::message::MessageIndex; use crate::net::partition_processor::PartitionLeaderService; use crate::net::{RpcRequest, bilrost_wire_codec, define_rpc}; +use crate::partitions::state::LeadershipState; #[derive(Debug, Eq, PartialEq, Clone, bilrost::Message)] pub struct IngestRecord { @@ -48,6 +49,16 @@ impl HasRecordKeys for IngestRecord { pub struct IngestRequest { #[bilrost(tag(1), encoding(ArcedSlice))] pub records: Arc<[IngestRecord]>, + + /// The expected leader epoch of the target partition. + /// + /// When set, the partition processor only accepts the request if this + /// matches its current leader epoch. This lets it atomically reject an + /// entire stream of ingest requests across a leadership change. + /// + /// Since v1.7 + protocol V4 + #[bilrost(tag(2))] + pub target_leader_epoch: Option, } impl IngestRequest { @@ -58,12 +69,6 @@ impl IngestRequest { } } -impl From> for IngestRequest { - fn from(records: Arc<[IngestRecord]>) -> Self { - Self { records } - } -} - bilrost_wire_codec!(IngestRequest); #[derive(Debug, Clone, bilrost::Oneof, bilrost::Message)] @@ -71,6 +76,8 @@ pub enum ResponseStatus { Unknown, #[bilrost(tag = 1, message)] Ack, + // Retained for wire-compat with <=V3 (pre-v1.7) peers. New code sends/handles + // `NotLeaderWithEpoch` instead. #[bilrost(tag = 2, message)] NotLeader { of: PartitionId, @@ -79,6 +86,11 @@ pub enum ResponseStatus { Internal { msg: String, }, + #[bilrost(tag = 4, message)] + NotLeaderWithEpoch { + of: PartitionId, + last_seen_leadership_state: LeadershipState, + }, } #[derive(Debug, Clone, bilrost::Message)] @@ -101,10 +113,18 @@ define_rpc! { @service=PartitionLeaderService, } +/// [`ReceivedIngestRequest`] must be kept +/// in lockstep with [`IngestRequest`] +/// It uses the same TYPE as [`IngestRequest`] +/// to be able to decode directly to owned Vec +/// on server side. #[derive(Debug, bilrost::Message)] pub struct ReceivedIngestRequest { #[bilrost(tag(1), encoding(packed))] pub records: Vec, + // todo(azmy): make non-optional in Restate v1.8 + #[bilrost(tag(2))] + pub target_leader_epoch: Option, } bilrost_wire_codec!(ReceivedIngestRequest); diff --git a/crates/types/src/net/log_server.rs b/crates/types/src/net/log_server.rs index d732151f8f..64746f41cd 100644 --- a/crates/types/src/net/log_server.rs +++ b/crates/types/src/net/log_server.rs @@ -397,7 +397,7 @@ impl WireEncode for Store { let msg: compat::Store = self.clone().into(); Ok(encode_as_bilrost(&msg)) } - ProtocolVersion::V3 => Ok(encode_as_bilrost(self)), + ProtocolVersion::V3 | ProtocolVersion::V4 => Ok(encode_as_bilrost(self)), } } } @@ -422,7 +422,9 @@ impl WireDecode for Store { crate::net::codec::decode_as_bilrost(buf, protocol_version)?; Ok(msg.into()) } - ProtocolVersion::V3 => crate::net::codec::decode_as_bilrost(buf, protocol_version), + ProtocolVersion::V3 | ProtocolVersion::V4 => { + crate::net::codec::decode_as_bilrost(buf, protocol_version) + } } } } diff --git a/crates/types/src/net/mod.rs b/crates/types/src/net/mod.rs index c99842ee4c..b6c84e58ec 100644 --- a/crates/types/src/net/mod.rs +++ b/crates/types/src/net/mod.rs @@ -25,7 +25,7 @@ pub use crate::protobuf::common::ProtocolVersion; pub use crate::protobuf::common::ServiceTag; pub const MIN_SUPPORTED_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::V2; -pub const CURRENT_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::V3; +pub const CURRENT_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::V4; pub trait Service: Send + Unpin + 'static { const TAG: ServiceTag; diff --git a/crates/types/src/net/replicated_loglet.rs b/crates/types/src/net/replicated_loglet.rs index c77dd8c731..be15ef3fc6 100644 --- a/crates/types/src/net/replicated_loglet.rs +++ b/crates/types/src/net/replicated_loglet.rs @@ -163,7 +163,7 @@ impl WireEncode for Append { let msg: compat::Append = self.clone().into(); Ok(encode_as_bilrost(&msg)) } - ProtocolVersion::V3 => Ok(encode_as_bilrost(self)), + ProtocolVersion::V3 | ProtocolVersion::V4 => Ok(encode_as_bilrost(self)), } } } @@ -188,7 +188,9 @@ impl WireDecode for Append { crate::net::codec::decode_as_bilrost(buf, protocol_version)?; Ok(msg.into()) } - ProtocolVersion::V3 => crate::net::codec::decode_as_bilrost(buf, protocol_version), + ProtocolVersion::V3 | ProtocolVersion::V4 => { + crate::net::codec::decode_as_bilrost(buf, protocol_version) + } } } } diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 5acda23282..32ecb5b5b6 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -78,6 +78,7 @@ ulid = { workspace = true } [dev-dependencies] restate-bifrost = { workspace = true, features = ["test-util"] } restate-core = { workspace = true, features = ["test-util"] } +restate-ingestion-client = { workspace = true, features = ["test-util"] } restate-rocksdb = { workspace = true, features = ["test-util"] } restate-service-protocol = { workspace = true, features = ["test-util"] } restate-storage-api = { workspace = true, features = ["test-util"] } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index bc69274637..7c477a630b 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -28,6 +28,7 @@ use std::sync::Arc; use codederror::CodedError; use restate_core::network::Swimlane; use restate_ingestion_client::SessionOptions; +use restate_types::net::connect_opts::GrpcConnectionOptions; use restate_wal_protocol::Envelope; use tracing::info; @@ -156,11 +157,13 @@ where .shuffle .inflight_memory_budget .as_non_zero_usize(), - Some(SessionOptions { - batch_size: config.worker.shuffle.request_batch_size.as_usize(), - connection_retry_policy: config.worker.shuffle.connection_retry_policy.clone(), - swimlane: Swimlane::BifrostData, - }), + SessionOptions::builder() + .batch_size(config.worker.shuffle.request_batch_size.as_non_zero_usize()) + .connection_retry_policy(config.worker.shuffle.connection_retry_policy.clone()) + .record_size_limit(config.networking.message_size_limit()) + .swimlane(Swimlane::IngressData) + .build() + .expect("Ingestion session options to build"), ); let metadata_store_client = metadata_writer.raw_metadata_store_client().clone(); diff --git a/crates/worker/src/metric_definitions.rs b/crates/worker/src/metric_definitions.rs index adff6cdb3b..a7328fb8ef 100644 --- a/crates/worker/src/metric_definitions.rs +++ b/crates/worker/src/metric_definitions.rs @@ -58,8 +58,6 @@ pub const PARTITION_IS_EFFECTIVE_LEADER: &str = "restate.partition.is_effective_ pub const PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS: &str = "restate.partition.record_committed_to_read_latency.seconds"; -pub const PARTITION_INGESTION_REQUEST_LEN: &str = "restate.partition.ingest.request.len"; -pub const PARTITION_INGESTION_REQUEST_SIZE: &str = "restate.partition.ingest.request.size.bytes"; pub const PARTITION_SHUFFLE_MESSAGE_COUNT: &str = "restate.partition.shuffle.message.count"; pub const PARTITION_SHUFFLE_INFLIGHT_COUNT: &str = "restate.partition.shuffle.inflight.count"; @@ -146,18 +144,6 @@ pub(crate) fn describe_metrics() { "The age of the latest partition snapshot in seconds" ); - describe_histogram!( - PARTITION_INGESTION_REQUEST_LEN, - Unit::Count, - "Number of records in a single ingestion request" - ); - - describe_histogram!( - PARTITION_INGESTION_REQUEST_SIZE, - Unit::Bytes, - "Total size of records in a single ingestion request" - ); - describe_counter!( PARTITION_SHUFFLE_MESSAGE_COUNT, Unit::Count, diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 8342d13f42..3a01bd4d59 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -844,6 +844,7 @@ impl LeadershipState { /// Forward externally-created records to this partition. pub async fn forward_many_with_callback( &mut self, + target_leader_epoch: LeaderEpoch, records: impl ExactSizeIterator, callback: F, ) where @@ -854,6 +855,15 @@ impl LeadershipState { PartitionProcessorRpcError::NotLeader(self.partition.partition_id), )), State::Leader(leader_state) => { + // Defensive check. Make sure that the target epoch + // matches the leader state epoch + if target_leader_epoch != leader_state.leader_epoch { + callback(Err(PartitionProcessorRpcError::NotLeader( + self.partition.partition_id, + ))); + return; + } + leader_state .forward_many_with_callback(records, callback) .await; @@ -861,6 +871,7 @@ impl LeadershipState { } } } + #[derive(Debug, derive_more::From)] struct TimerReader(PartitionStore); @@ -915,7 +926,7 @@ mod tests { use restate_bifrost::Bifrost; use restate_core::partitions::PartitionRouting; use restate_core::{TaskCenter, TestCoreEnv}; - use restate_ingestion_client::IngestionClient; + use restate_ingestion_client::{IngestionClient, SessionOptions}; use restate_limiter::RuleBook; use restate_partition_store::PartitionStoreManager; use restate_rocksdb::RocksDbManager; @@ -971,7 +982,7 @@ mod tests { env.metadata.updateable_partition_table(), PartitionRouting::new(replica_set_states.clone(), TaskCenter::current()), NonZeroUsize::new(10 * 1024 * 1024).unwrap(), - None, + SessionOptions::default(), ); let (leader_query_tx, _leader_query_rx) = restate_worker_api::channel(); diff --git a/crates/worker/src/partition/leadership/self_proposer.rs b/crates/worker/src/partition/leadership/self_proposer.rs index e5d5957904..15d1a0de8e 100644 --- a/crates/worker/src/partition/leadership/self_proposer.rs +++ b/crates/worker/src/partition/leadership/self_proposer.rs @@ -12,7 +12,7 @@ use std::sync::Arc; use futures::never::Never; -use restate_bifrost::{Bifrost, CommitToken, ErrorRecoveryStrategy, InputRecord}; +use restate_bifrost::{Bifrost, CommitToken, EnqueueError, ErrorRecoveryStrategy, InputRecord}; use restate_storage_api::deduplication_table::{DedupInformation, EpochSequenceNumber}; use restate_types::{ identifiers::PartitionKey, logs::LogId, net::ingest::IngestRecord, time::NanosSinceEpoch, @@ -169,10 +169,12 @@ impl SelfProposer { /// Forward externally-created records to Bifrost, returning a [`CommitToken`]. /// /// The records already carry their own dedup information in their headers; no ESN is attached. + /// Internally this uses `enqueue_unchecked` which does not check the record size. Hence + /// the only limit here is the networking max message size. pub async fn forward_many_with_notification( &mut self, records: impl ExactSizeIterator, - ) -> Result where { + ) -> Result> where { let sender = self.bifrost_appender.sender(); // This should ideally be implemented @@ -200,15 +202,12 @@ impl SelfProposer { }; sender - .enqueue(input) + .enqueue_unchecked(input) .await - .map_err(|e| Error::SelfProposer(e.to_string()))?; + .map_err(|e| e.drop_payload())?; } - sender - .notify_committed() - .await - .map_err(|e| Error::SelfProposer(e.to_string())) + sender.notify_committed().await } fn create_self_propose_header(&mut self, partition_key: PartitionKey) -> Header { diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 8e8f1026c5..3a1b3af37c 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -95,8 +95,8 @@ use restate_worker_api::{LeaderQueryCommand, LeaderQueryReceiver}; use self::leadership::trim_queue::TrimQueue; use crate::metric_definitions::{ FLARE_REASON_VERSION_BARRIER, LEADER_LABEL, LEADER_LABEL_FOLLOWER, LEADER_LABEL_LEADER, - PARTITION_BLOCKED_FLARE, PARTITION_INGESTION_REQUEST_LEN, PARTITION_INGESTION_REQUEST_SIZE, - PARTITION_LABEL, PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS, REASON_LABEL, + PARTITION_BLOCKED_FLARE, PARTITION_LABEL, PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS, + REASON_LABEL, }; use crate::partition::leadership::LeadershipState; use crate::partition::state_machine::{ActionCollector, StateMachine}; @@ -961,25 +961,71 @@ where async fn on_pp_ingest_request(&mut self, msg: Incoming>) { let (reciprocal, request) = msg.split(); - histogram!( - PARTITION_INGESTION_REQUEST_LEN, PARTITION_LABEL => self.partition_id_str.clone() - ) - .record(request.records.len() as f64); - histogram!( - PARTITION_INGESTION_REQUEST_SIZE, PARTITION_LABEL => self.partition_id_str.clone() - ) - .record(request.records.iter().fold(0, |s, r| s + r.estimate_size()) as f64); + // todo(azmy): Once target_leader_epoch becomes mandatory, we need to remove + // this hack. + let Some(target_leader_epoch) = request.target_leader_epoch else { + // Old clients don't send a target leader epoch, so we + // reject these writes to avoid data loss. When such a + // client pipelines requests, a leadership change could + // cause one request to be rejected while later requests + // in the pipeline are accepted, silently dropping data. + // See #4879 for details. + let _ = TaskCenter::spawn_child(TaskKind::Disposable, "reject-ingestion", async move { + // clients will try immediately + // so we need to put some back pressure here + // by delaying the response so they don't + // retry immediately + tokio::time::sleep(Duration::from_secs(1)).await; + + // Older clients (if enabled) will print this error message as debug + // message so it might be a good idea to print this here also as a warn + // to make sure it's visible during a rolling upgrade. + warn!( + "Rejecting ingestion requests from old ingestion \ + clients to prevent data loss. Please upgrade to v1.7 \ + or disable the experimental ingestion feature(s) \ + `experimental_*_batch_ingestion`" + ); + reciprocal.send( + ResponseStatus::Internal { + msg: "Rejecting ingestion requests from old ingestion \ + clients to prevent data loss. Please upgrade to v1.7 \ + or disable the experimental ingestion feature(s) \ + `experimental_*_batch_ingestion`" + .into(), + } + .into(), + ); + Ok(()) + }); + + return; + }; + + let replica_set_states = self.replica_set_states.clone(); self.leadership_state .forward_many_with_callback( + target_leader_epoch, request.records.into_iter(), - |result: Result<(), PartitionProcessorRpcError>| match result { + move |result: Result<(), PartitionProcessorRpcError>| match result { Ok(_) => reciprocal.send(ResponseStatus::Ack.into()), Err(err) => match err { PartitionProcessorRpcError::NotLeader(id) | PartitionProcessorRpcError::LostLeadership(id) => { - reciprocal.send(ResponseStatus::NotLeader { of: id }.into()) + // get and return the most up to date view of the + // leader state. + let leadership_state = + replica_set_states.membership_state(id).current_leader(); + + reciprocal.send( + ResponseStatus::NotLeaderWithEpoch { + of: id, + last_seen_leadership_state: leadership_state, + } + .into(), + ) } PartitionProcessorRpcError::Internal(msg) => { reciprocal.send(ResponseStatus::Internal { msg }.into()) diff --git a/crates/worker/src/partition/shuffle.rs b/crates/worker/src/partition/shuffle.rs index af14e21fb2..50e2baabff 100644 --- a/crates/worker/src/partition/shuffle.rs +++ b/crates/worker/src/partition/shuffle.rs @@ -426,7 +426,7 @@ mod ingestion_client_tests { use assert2::let_assert; use futures::StreamExt; use restate_core::partitions::PartitionRouting; - use restate_ingestion_client::IngestionClient; + use restate_ingestion_client::{IngestionClient, SessionOptions}; use restate_types::net::RpcRequest; use restate_types::net::ingest::{ReceivedIngestRequest, ResponseStatus}; use restate_types::net::partition_processor::PartitionLeaderService; @@ -652,7 +652,7 @@ mod ingestion_client_tests { env.metadata.updateable_partition_table(), PartitionRouting::new(partition_replica_set_states, TaskCenter::current()), NonZeroUsize::new(10 * 1024 * 1024).unwrap(), - None, + SessionOptions::default(), ); let (truncation_tx, _truncation_rx) = mpsc::channel(1); diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index be4bf64f80..be2dbec9b1 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -1547,7 +1547,7 @@ mod tests { use restate_bifrost::providers::memory_loglet; use restate_core::partitions::PartitionRouting; use restate_core::{TaskCenter, TaskKind, TestCoreEnvBuilder}; - use restate_ingestion_client::IngestionClient; + use restate_ingestion_client::{IngestionClient, SessionOptions}; use restate_partition_store::PartitionStoreManager; use restate_rocksdb::RocksDbManager; use restate_types::config::Configuration; @@ -1602,7 +1602,7 @@ mod tests { env_builder.metadata.updateable_partition_table(), PartitionRouting::new(replica_set_states.clone(), TaskCenter::current()), NonZeroUsize::new(10 * 1024 * 1024).unwrap(), - None, + SessionOptions::default(), ); let partition_processor_manager = PartitionProcessorManager::new( diff --git a/tools/xtask/src/main.rs b/tools/xtask/src/main.rs index 16d900c2f2..9490e4ae6c 100644 --- a/tools/xtask/src/main.rs +++ b/tools/xtask/src/main.rs @@ -22,7 +22,7 @@ use restate_admin::service::AdminService; use restate_core::partitions::PartitionRouting; use restate_core::{TaskCenter, TaskCenterBuilder, TestCoreEnv}; use restate_core::{TaskCenterFutureExt, TaskKind}; -use restate_ingestion_client::IngestionClient; +use restate_ingestion_client::{IngestionClient, SessionOptions}; use restate_service_client::{AssumeRoleCacheMode, ServiceClient}; use restate_service_protocol_v4::discovery::ServiceDiscovery; use restate_service_protocol_v4::serdes::SerdesClient; @@ -221,7 +221,7 @@ async fn generate_rest_api_doc() -> anyhow::Result<()> { node_env.metadata.updateable_partition_table(), PartitionRouting::new(PartitionReplicaSetStates::default(), TaskCenter::current()), NonZeroUsize::new(1000).unwrap(), - None, + SessionOptions::default(), ); let socket_dir = tempfile::tempdir()?;