From 5e2233b1dc4c731f7d98ab572c87cfcc6ff215ff Mon Sep 17 00:00:00 2001 From: Shashank Jarmale Date: Mon, 30 Mar 2026 10:24:09 -0700 Subject: [PATCH] Modify Rust consumer to produce _next_ message offset to Snuba commit log, rather than _current_ --- rust_snuba/src/strategies/processor.rs | 91 +++++++++++++++++++++++++- 1 file changed, 88 insertions(+), 3 deletions(-) diff --git a/rust_snuba/src/strategies/processor.rs b/rust_snuba/src/strategies/processor.rs index cb87890d63..d3c6d0bf74 100644 --- a/rust_snuba/src/strategies/processor.rs +++ b/rust_snuba/src/strategies/processor.rs @@ -54,7 +54,7 @@ pub fn make_rust_processor( .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([( partition.index, CommitLogEntry { - offset, + offset: offset + 1, orig_message_ts: timestamp, received_p99: transformed.origin_timestamp.into_iter().collect(), }, @@ -131,7 +131,7 @@ pub fn make_rust_processor_with_replacements( .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([( partition.index, CommitLogEntry { - offset, + offset: offset + 1, orig_message_ts: timestamp, received_p99: transformed.origin_timestamp.into_iter().collect(), }, @@ -214,7 +214,7 @@ pub fn make_rust_processor_row_binary( .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([( partition.index, CommitLogEntry { - offset, + offset: offset + 1, orig_message_ts: timestamp, received_p99: transformed.origin_timestamp.into_iter().collect(), }, @@ -474,7 +474,15 @@ mod tests { use sentry_arroyo::backends::kafka::types::KafkaPayload; use sentry_arroyo::types::{Message, Partition, Topic}; + use std::sync::Arc; + use std::time::Duration; + + use sentry_arroyo::processing::strategies::{ + CommitRequest, ProcessingStrategy, StrategyError, SubmitError, + }; + use crate::types::InsertBatch; + use crate::types::RowData; use crate::Noop; #[test] @@ -518,6 +526,83 @@ mod tests { let _ = strategy.join(None); } + /// The commit log offset produced by the processor must use next-to-consume + /// semantics (current offset + 1). + #[test] + fn commit_log_entry_uses_next_offset() { + let captured: Arc>>> = + Arc::new(std::sync::Mutex::new(Vec::new())); + let captured_clone = captured.clone(); + + struct Capture(Arc>>>); + impl ProcessingStrategy> for Capture { + fn poll(&mut self) -> Result, StrategyError> { + Ok(None) + } + fn submit( + &mut self, + message: Message>, + ) -> Result<(), SubmitError>> { + self.0.lock().unwrap().push(message.into_payload()); + Ok(()) + } + fn terminate(&mut self) {} + fn join( + &mut self, + _timeout: Option, + ) -> Result, StrategyError> { + Ok(None) + } + } + + fn noop_processor( + _payload: KafkaPayload, + _metadata: KafkaMessageMetadata, + _config: &ProcessorConfig, + ) -> anyhow::Result { + Ok(InsertBatch::default()) + } + + let partition = Partition::new(Topic::new("events-small"), 7); + let raw_offset: u64 = 42; + let concurrency = ConcurrencyConfig::new(1); + + let mut strategy = make_rust_processor( + Capture(captured_clone), + noop_processor, + "outcomes", + false, + &concurrency, + ProcessorConfig::default(), + None, + ); + + let payload = KafkaPayload::new(None, None, Some(b"{}".to_vec())); + let message = Message::new_broker_message(payload, partition, raw_offset, Utc::now()); + + strategy.submit(message).unwrap(); + strategy.poll().unwrap(); + let _ = strategy.join(None); + + let batches = captured.lock().unwrap(); + assert_eq!(batches.len(), 1); + + let offsets = batches[0].commit_log_offsets(); + let entry = offsets + .0 + .get(&partition.index) + .expect("commit log entry missing for partition"); + + assert_eq!( + entry.offset, + raw_offset + 1, + "Commit log entry must contain next-to-consume offset (raw + 1). \ + Got raw offset {}, expected {}.", + raw_offset, + raw_offset + 1, + ); + } + #[test] fn test_ip_addresses() { let test_cases = [