Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions crates/bifrost/src/background_appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,26 @@ impl<T: StorageEncode> LogSender<T> {
Ok(())
}

/// Waits for capacity on the channel and returns an error if the appender is
/// draining or drained.
///
/// Unlike [`enqueue`](Self::enqueue), this does not check the record size and
/// accepts a record of any size.
Comment on lines +363 to +364

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add: "Callers have to ensure that record is not larger than the network message size limit."

///
/// Callers have to ensure that record is not larger than the network message size limit.
pub async fn enqueue_unchecked<A>(&mut self, record: A) -> Result<(), EnqueueError<A>>
where
A: Into<InputRecord<T>>,
{
let Ok(permit) = self.tx.reserve().await else {
return Err(EnqueueError::Closed(record));
};
let record = record.into().into_record().ensure_encoded(&mut self.arena);
permit.send(AppendOperation::Enqueue(record));

Ok(())
}

/// Attempt to enqueue a record to the appender. Returns immediately if the
/// appender is pushing back or if the appender is draining or drained.
///
Expand Down
12 changes: 12 additions & 0 deletions crates/bifrost/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ pub enum EnqueueError<T> {
},
}

impl<T> EnqueueError<T> {
pub fn drop_payload(self) -> EnqueueError<()> {
match self {
Self::Closed(_) => EnqueueError::Closed(()),
Self::Full(_) => EnqueueError::Full(()),
Self::RecordTooLarge { record_size, limit } => {
EnqueueError::RecordTooLarge { record_size, limit }
}
}
}
}

#[derive(Clone, Debug, thiserror::Error)]
pub enum AdminError {
#[error("log {0} is permanently sealed")]
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use appender::Appender;
pub use background_appender::{AppenderHandle, BackgroundAppender, CommitToken, LogSender};
pub use bifrost::{Bifrost, ErrorRecoveryStrategy};
pub use bifrost_admin::{BifrostAdmin, MaybeSealedSegment};
pub use error::{Error, Result};
pub use error::{EnqueueError, Error, Result};
pub use read_stream::LogReadStream;
pub use read_stream_registry::ActiveReadStreamRegistry;
pub use record::{InputRecord, LogEntry, MaybeRecord, RecordKind};
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,8 @@ impl PartitionRouting {
.expect("partition replica set states are never dropped while in use");
*state
}

pub fn partition_replica_set_state(&self) -> &PartitionReplicaSetStates {
&self.partition_replica_set_states
}
}
2 changes: 2 additions & 0 deletions crates/core/src/task_center/task_kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ pub enum TaskKind {
LogStoreWriter,
// - Datafusion
DfScanner,
#[strum(props(runtime = "default"))]
IngestionSession,
Comment on lines +161 to +162

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was running the ingestion session on the pp runtime causing the shutdown problems you mentioned offline?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. The shutdown issue was that we spawned the task as managed task and not using the task centre cancellation token (since we use our own managed cancellation token).

The decision to use the default runtime is to make sure session tasks (which are lazily initialised) won't get torn down with the PP runtime when it's shutting down. This way the client can be safely shared across multiple partitions or other components.

}

impl TaskKind {
Expand Down
4 changes: 4 additions & 0 deletions crates/ingestion-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ rust-version.workspace = true
license.workspace = true
publish = false

[features]
test-util = []

[dependencies]

restate-core = { workspace = true }
Expand All @@ -15,6 +18,7 @@ restate-workspace-hack = { workspace = true }

bytes = { workspace = true }
dashmap = { workspace = true }
derive_builder = { workspace = true }
futures = { workspace = true }
pin-project = { workspace = true }
thiserror = { workspace = true }
Expand Down
4 changes: 4 additions & 0 deletions crates/ingestion-client/src/chunks_size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ impl<F, S: Stream> ChunksSize<F, S>
where
F: Fn(&S::Item) -> usize,
{
pub fn new(stream: S, max_size: usize, size_fn: F) -> Self {
Self::with_buffered(stream, max_size, size_fn, Vec::default())
}

/// Creates a chunker, pre-seeding the accumulation buffer with `buffered` items so they lead the
/// next emitted chunk. Pass an empty `Vec` to start fresh. Seeding is used to carry a previously
/// over-pulled item across a recreated `ChunksSize` (e.g. after a reconnect) without losing it or
Expand Down
Loading
Loading