From eaacc0ff6d58380c3318210d3b0b65b74e04910f Mon Sep 17 00:00:00 2001 From: David Herberth Date: Fri, 29 May 2026 12:28:39 +0200 Subject: [PATCH 1/3] ref(transactions): Send extracted spans directly to store --- CHANGELOG.md | 4 + relay-server/src/envelope/mod.rs | 26 --- relay-server/src/managed/counted.rs | 8 + .../src/processing/legacy_spans/store.rs | 1 + .../src/processing/profiles/forward.rs | 56 ------ relay-server/src/processing/profiles/mod.rs | 47 ++++- relay-server/src/processing/spans/store.rs | 1 + .../src/processing/transactions/extraction.rs | 14 +- .../src/processing/transactions/mod.rs | 21 +- .../src/processing/transactions/process.rs | 190 +++++++++++++----- .../src/processing/transactions/spans.rs | 181 ++--------------- .../src/processing/transactions/types.rs | 21 -- .../processing/transactions/types/expanded.rs | 119 ++++++----- .../src/processing/transactions/types/mod.rs | 9 + .../processing/transactions/types/output.rs | 123 ++++++++++-- .../processing/transactions/types/profile.rs | 1 - .../transactions/types/serialized.rs | 8 +- relay-server/src/processing/utils/event.rs | 4 - relay-server/src/services/processor.rs | 8 - relay-server/src/services/store.rs | 131 +----------- 20 files changed, 434 insertions(+), 539 deletions(-) delete mode 100644 relay-server/src/processing/profiles/forward.rs delete mode 100644 relay-server/src/processing/transactions/types.rs create mode 100644 relay-server/src/processing/transactions/types/mod.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 9793b94da34..b0ba5fd11c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ - Implement mobile measurements calculation for V2 spans. ([#6022](https://github.com/getsentry/relay/pull/6022)) +**Internal**: + +- Forwards extracted transaction spans directly to Kafka instead of serializing to an intermediate envelope first. ([#6029](https://github.com/getsentry/relay/pull/6029)) + ## 26.5.1 **Features**: diff --git a/relay-server/src/envelope/mod.rs b/relay-server/src/envelope/mod.rs index b7822af22d5..7b8ab9fb158 100644 --- a/relay-server/src/envelope/mod.rs +++ b/relay-server/src/envelope/mod.rs @@ -103,13 +103,6 @@ pub struct EnvelopeHeaders { #[serde(default, skip_serializing_if = "Option::is_none")] retention: Option, - /// Data retention in days for the items of this envelope. - /// - /// This value is always overwritten in processing mode by the value specified in the project - /// configuration. - #[serde(default, skip_serializing_if = "Option::is_none")] - downsampled_retention: Option, - /// Timestamp when the event has been sent, according to the SDK. /// /// This can be used to perform drift correction. @@ -165,7 +158,6 @@ impl EnvelopeHeaders { event_id: self.event_id, meta: meta.copy_to(request_meta), retention: self.retention, - downsampled_retention: self.downsampled_retention, sent_at: self.sent_at, trace: self.trace, required_features: self.required_features, @@ -231,7 +223,6 @@ where event_id, meta, retention, - downsampled_retention, sent_at, trace, required_features, @@ -247,9 +238,6 @@ where if let Some(retention) = retention { map.entry(&"retention", retention); } - if let Some(downsampled_retention) = downsampled_retention { - map.entry(&"downsampled_retention", downsampled_retention); - } if let Some(sent_at) = sent_at { map.entry(&"sent_at", sent_at); } @@ -302,7 +290,6 @@ impl Envelope { event_id, meta, retention: None, - downsampled_retention: None, sent_at: None, other: BTreeMap::new(), trace: None, @@ -406,14 +393,6 @@ impl Envelope { self.headers.retention.unwrap_or(DEFAULT_EVENT_RETENTION) } - /// Returns the data retention in days for items in this envelope. - #[cfg_attr(not(feature = "processing"), allow(dead_code))] - pub fn downsampled_retention(&self) -> u16 { - self.headers - .downsampled_retention - .unwrap_or(self.retention()) - } - /// When the event has been sent, according to the SDK. pub fn sent_at(&self) -> Option> { self.headers.sent_at @@ -471,11 +450,6 @@ impl Envelope { self.headers.retention = Some(retention); } - /// Sets the data retention in days for items in this envelope. - pub fn set_downsampled_retention(&mut self, retention: u16) { - self.headers.downsampled_retention = Some(retention); - } - /// Runs transaction parametrization on the DSC trace transaction. /// /// The purpose is for trace rules to match on the parametrized version of the transaction. diff --git a/relay-server/src/managed/counted.rs b/relay-server/src/managed/counted.rs index 66f078f6bff..377ee132a68 100644 --- a/relay-server/src/managed/counted.rs +++ b/relay-server/src/managed/counted.rs @@ -41,6 +41,14 @@ impl Counted for Option { } } +impl Counted for (T, S) { + fn quantities(&self) -> Quantities { + let mut quantities = self.0.quantities(); + quantities.extend(self.1.quantities()); + quantities + } +} + impl Counted for Either where L: Counted, diff --git a/relay-server/src/processing/legacy_spans/store.rs b/relay-server/src/processing/legacy_spans/store.rs index e8449a1c164..e3de0b7505b 100644 --- a/relay-server/src/processing/legacy_spans/store.rs +++ b/relay-server/src/processing/legacy_spans/store.rs @@ -39,6 +39,7 @@ pub fn convert( routing_key: span.trace_id.value().copied().map(Into::into), retention_days: retentions.standard, downsampled_retention_days: retentions.downsampled, + event_id: None, item: span, })) } diff --git a/relay-server/src/processing/profiles/forward.rs b/relay-server/src/processing/profiles/forward.rs deleted file mode 100644 index 276af930023..00000000000 --- a/relay-server/src/processing/profiles/forward.rs +++ /dev/null @@ -1,56 +0,0 @@ -use smallvec::smallvec; - -use crate::Envelope; -#[cfg(feature = "processing")] -use crate::managed::Counted; -use crate::managed::{Managed, Rejected}; -use crate::processing::profiles::{ExpandedProfile, ProfilesOutput}; -use crate::processing::{Forward, ForwardContext}; -#[cfg(feature = "processing")] -use crate::services::store::StoreProfile; - -impl Forward for ProfilesOutput { - fn serialize_envelope( - self, - _ctx: ForwardContext<'_>, - ) -> Result>, Rejected<()>> { - let Self(profile) = self; - let envelope = profile.map( - |ExpandedProfile { - headers, - profile, - profile_type: _, - }, - _| { Envelope::from_parts(headers, smallvec![profile]) }, - ); - - Ok(envelope) - } - - #[cfg(feature = "processing")] - fn forward_store( - self, - s: crate::processing::StoreHandle<'_>, - ctx: ForwardContext<'_>, - ) -> Result<(), Rejected<()>> { - let Self(profile) = self; - - let store_profile = profile.map(|profile, _| convert(profile, &ctx)); - s.send_to_store(store_profile); - - Ok(()) - } -} - -/// Converts a [`ExpandedProfile`] into a [`StoreProfile`]. -#[cfg(feature = "processing")] -fn convert(profile: ExpandedProfile, ctx: &ForwardContext) -> StoreProfile { - let retention_days = ctx.event_retention().standard; - let quantities = profile.quantities(); - - StoreProfile { - retention_days, - profile: profile.profile, - quantities, - } -} diff --git a/relay-server/src/processing/profiles/mod.rs b/relay-server/src/processing/profiles/mod.rs index d28a4c9117c..cb8b3f03694 100644 --- a/relay-server/src/processing/profiles/mod.rs +++ b/relay-server/src/processing/profiles/mod.rs @@ -4,15 +4,17 @@ use relay_profiling::{ProfileError, ProfileType}; use relay_quotas::{DataCategory, RateLimits}; use smallvec::smallvec; +use crate::Envelope; use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items}; use crate::managed::{ Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected, }; -use crate::processing::{Context, CountRateLimited, Output, Processor, QuotaRateLimiter}; +use crate::processing::{ + Context, CountRateLimited, Forward, ForwardContext, Output, Processor, QuotaRateLimiter, +}; use crate::services::outcome::{DiscardReason, Outcome}; mod filter; -mod forward; mod process; #[derive(Debug, thiserror::Error)] @@ -159,3 +161,44 @@ impl CountRateLimited for Managed { /// Output produced by the [`ProfilesProcessor`]. #[derive(Debug)] pub struct ProfilesOutput(Managed); + +impl Forward for ProfilesOutput { + fn serialize_envelope( + self, + _ctx: ForwardContext<'_>, + ) -> Result>, Rejected<()>> { + let Self(profile) = self; + let envelope = profile.map( + |ExpandedProfile { + headers, + profile, + profile_type: _, + }, + _| { Envelope::from_parts(headers, smallvec![profile]) }, + ); + + Ok(envelope) + } + + #[cfg(feature = "processing")] + fn forward_store( + self, + s: crate::processing::StoreHandle<'_>, + ctx: ForwardContext<'_>, + ) -> Result<(), Rejected<()>> { + use crate::services::store::StoreProfile; + + let Self(profile) = self; + + let retention_days = ctx.event_retention().standard; + + let store_profile = profile.map(|profile, _| StoreProfile { + retention_days, + quantities: profile.quantities(), + profile: profile.profile, + }); + s.send_to_store(store_profile); + + Ok(()) + } +} diff --git a/relay-server/src/processing/spans/store.rs b/relay-server/src/processing/spans/store.rs index 3a26425462c..1996b66edd5 100644 --- a/relay-server/src/processing/spans/store.rs +++ b/relay-server/src/processing/spans/store.rs @@ -44,6 +44,7 @@ pub fn convert(span: IndexedSpanOnly, ctx: &Context) -> Result> routing_key, retention_days: ctx.retention.standard, downsampled_retention_days: ctx.retention.downsampled, + event_id: None, item: span, })) } diff --git a/relay-server/src/processing/transactions/extraction.rs b/relay-server/src/processing/transactions/extraction.rs index 51cde420d0c..2795362536d 100644 --- a/relay-server/src/processing/transactions/extraction.rs +++ b/relay-server/src/processing/transactions/extraction.rs @@ -9,7 +9,7 @@ use relay_sampling::evaluation::SamplingDecision; use crate::processing::Context; use crate::processing::utils::event::EventMetricsExtracted; -use crate::services::processor::{ProcessingError, ProcessingExtractedMetrics}; +use crate::services::processor::ProcessingExtractedMetrics; /// Creates a span from the transaction and applies tag extraction on it. /// @@ -42,7 +42,7 @@ pub fn extract_metrics( event: &mut Annotated, extracted_metrics: &mut ProcessingExtractedMetrics, ctx: ExtractMetricsContext, -) -> Result { +) -> EventMetricsExtracted { let ExtractMetricsContext { dsc, project_id, @@ -55,11 +55,11 @@ pub fn extract_metrics( // the full metrics extraction config and skip sampling if it is incomplete. if metrics_extracted { - return Ok(EventMetricsExtracted(true)); + return EventMetricsExtracted(true); } let Some(event) = event.value_mut() else { // Nothing to extract, but metrics extraction was called. - return Ok(EventMetricsExtracted(true)); + return EventMetricsExtracted(true); }; // NOTE: This function requires a `metric_extraction` in the project config. Legacy configs @@ -68,7 +68,7 @@ pub fn extract_metrics( let combined_config = { let config = match &ctx.project_info.config.metric_extraction { ErrorBoundary::Ok(config) if config.is_supported() => config, - _ => return Ok(EventMetricsExtracted(false)), + _ => return EventMetricsExtracted(false), }; let global_config = match &ctx.global_config.metric_extraction { ErrorBoundary::Ok(global_config) => global_config, @@ -83,7 +83,7 @@ pub fn extract_metrics( // If there's an error with global metrics extraction, it is safe to assume that this // Relay instance is not up-to-date, and we should skip extraction. relay_log::debug!("Failed to parse global extraction config: {e}"); - return Ok(EventMetricsExtracted(false)); + return EventMetricsExtracted(false); } } }; @@ -106,5 +106,5 @@ pub fn extract_metrics( ); extracted_metrics.extend(metrics, Some(sampling_decision)); - Ok(EventMetricsExtracted(true)) + EventMetricsExtracted(true) } diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index 07bfc92018c..c4895e7d920 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -3,11 +3,10 @@ use std::sync::Arc; use relay_event_normalization::GeoIpLookup; use relay_event_schema::protocol::Metrics; use relay_quotas::RateLimits; -use relay_sampling::evaluation::SamplingDecision; use crate::envelope::ItemType; use crate::managed::{Managed, ManagedEnvelope, OutcomeError, Rejected}; -use crate::processing::transactions::process::{SamplingOutput, split_indexed_and_total}; +use crate::processing::transactions::process::SamplingOutput; use crate::processing::transactions::types::{SerializedTransaction, TransactionOutput}; use crate::processing::utils::attachments; use crate::processing::utils::event::event_type; @@ -101,11 +100,13 @@ impl Processor for TransactionProcessor { let attachments = envelope .envelope_mut() - .take_items_by(|item| matches!(*item.ty(), ItemType::Attachment)); + .take_items_by(|item| matches!(*item.ty(), ItemType::Attachment)) + .into_vec(); let profiles = envelope .envelope_mut() - .take_items_by(|item| matches!(*item.ty(), ItemType::Profile)); + .take_items_by(|item| matches!(*item.ty(), ItemType::Profile)) + .into_vec(); let work = SerializedTransaction { headers, @@ -144,7 +145,7 @@ impl Processor for TransactionProcessor { process::process_profile(&mut tx, ctx); relay_log::trace!("Sample transaction"); - let (tx, server_sample_rate) = match process::run_dynamic_sampling(tx, ctx, filters_status)? + let (tx, server_sample_rate) = match process::run_dynamic_sampling(tx, ctx, filters_status) { SamplingOutput::Keep { payload, @@ -170,8 +171,6 @@ impl Processor for TransactionProcessor { #[allow(unused_mut)] let mut tx = process::scrub(tx, ctx)?; - tx = process::extract_spans(tx, ctx, server_sample_rate); - relay_log::trace!("Enforce quotas"); let tx = self.limiter.enforce_quotas(tx, ctx).await?; let tx = match tx.transpose() { @@ -188,10 +187,14 @@ impl Processor for TransactionProcessor { ); }; - let (indexed, metrics) = split_indexed_and_total(tx, ctx, SamplingDecision::Keep)?; + let (spans, tx) = process::extract_spans(tx, ctx, server_sample_rate); + let spans = self.limiter.enforce_quotas(spans, ctx).await.ok(); + + let (transaction, spans, metrics) = + process::split_indexed_and_total_with_extracted_spans(tx, spans, ctx); return Ok(Output { - main: Some(TransactionOutput::Indexed(indexed)), + main: Some(TransactionOutput::Indexed { spans, transaction }), metrics: Some(metrics), }); } diff --git a/relay-server/src/processing/transactions/process.rs b/relay-server/src/processing/transactions/process.rs index 61dd34f114c..70fa4b19376 100644 --- a/relay-server/src/processing/transactions/process.rs +++ b/relay-server/src/processing/transactions/process.rs @@ -8,19 +8,18 @@ use relay_sampling::evaluation::SamplingDecision; use relay_statsd::metric; use smallvec::smallvec; -use crate::envelope::Items; +use crate::envelope::Item; use crate::managed::{Counted, Managed, ManagedResult, Quantities, RecordKeeper, Rejected}; use crate::metrics_extraction::ExtractedMetrics; use crate::processing::spans::{Indexed, TotalAndIndexed}; use crate::processing::transactions::extraction::{self, ExtractMetricsContext}; use crate::processing::transactions::spans; use crate::processing::transactions::types::{ - ExpandedProfile, ExpandedTransaction, Flags, StandaloneProfile, + ExpandedProfile, ExpandedTransaction, ExtractedIndexedSpans, ExtractedSpans, Flags, + StandaloneProfile, }; use crate::processing::transactions::{Error, SerializedTransaction, profile}; -use crate::processing::utils::event::{ - EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, -}; +use crate::processing::utils::event::{EventFullyNormalized, FiltersStatus}; use crate::processing::{Context, utils}; use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome}; use crate::services::processor::{ProcessingError, ProcessingExtractedMetrics}; @@ -74,7 +73,6 @@ pub fn expand( flags, attachments, profile, - extracted_spans: vec![], category: TotalAndIndexed, })) }) @@ -90,7 +88,7 @@ fn validate_flags(flags: &Flags) { } fn expand_profile( - profiles: Items, + profiles: Vec, record_keeper: &mut RecordKeeper<'_>, ) -> Option { let mut profiles = profiles.into_iter(); @@ -231,21 +229,21 @@ pub fn run_dynamic_sampling( payload: Managed>, ctx: Context<'_>, filters_status: FiltersStatus, -) -> Result> { +) -> SamplingOutput { let sampling_result = make_dynamic_sampling_decision(&payload, ctx, filters_status); let sampling_match = match sampling_result { SamplingResult::Match(m) if m.decision().is_drop() => m, keep => { - return Ok(SamplingOutput::Keep { + return SamplingOutput::Keep { payload, sample_rate: keep.sample_rate(), - }); + }; } }; // At this point the decision is to drop the payload. - let (payload, metrics) = split_indexed_and_total(payload, ctx, SamplingDecision::Drop)?; + let (payload, metrics) = split_indexed_and_total(payload, ctx, SamplingDecision::Drop); let (payload, profile) = payload.split_once(|mut payload, _| { let profile = payload.profile.take().map(|profile| StandaloneProfile { @@ -261,10 +259,10 @@ pub fn run_dynamic_sampling( let outcome = Outcome::FilteredSampling(sampling_match.into_matched_rules().into()); let _ = payload.reject_err(outcome); - Ok(SamplingOutput::Drop { + SamplingOutput::Drop { metrics, profile: profile.transpose().map(Managed::boxed), - }) + } } /// Computes the dynamic sampling decision for the unit of work, but does not perform action on data. @@ -298,6 +296,103 @@ fn do_make_dynamic_sampling_decision( utils::dynamic_sampling::run(work.headers.dsc(), work.event.value(), &ctx) } +type IndexedTransactionAndSpanAndMetrics = ( + Managed>>, + Option>, + Managed, +); + +/// Splits transaction into indexed payload and metrics representing the total counts. +/// +/// Like [`split_indexed_and_total`] but works with [`ExtractedSpans`]. +pub fn split_indexed_and_total_with_extracted_spans( + transaction: Managed>, + spans: Option>, + ctx: Context<'_>, +) -> IndexedTransactionAndSpanAndMetrics { + let scoping = transaction.scoping(); + + let (transaction, metrics) = transaction.split_once(|mut tx, r| { + r.lenient(DataCategory::MetricBucket); + + let mut metrics = ProcessingExtractedMetrics::new(); + + let had_metrics_extracted = tx.flags.metrics_extracted; + tx.flags.metrics_extracted = extraction::extract_metrics( + &mut tx.event, + &mut metrics, + ExtractMetricsContext { + dsc: tx.headers.dsc(), + project_id: scoping.project_id, + ctx, + sampling_decision: SamplingDecision::Keep, + metrics_extracted: tx.flags.metrics_extracted, + extract_span_metrics: spans.is_some(), + }, + ) + .0; + + if had_metrics_extracted || !tx.flags.metrics_extracted { + // Invalid config or invalid original transaction + r.lenient(DataCategory::Transaction); + r.lenient(DataCategory::Span); + } + + // This really is a bug, we ignore here. + // + // Transactions are counted using a span metric, as transaction payloads should + // eventually be fully transformed into spans. + // + // Since there is no span metric extracted for this transaction, as we already extracted + // the spans from the transaction, there is no now metric carrying the transaction category. + // + // After extracting span metrics the count is accurate again, but attached to the span metrics. + // Unless, the spans have been rate limited, which is an actual potential bug which we + // ignore here for two reasons: + // - Span rate limits should be applied to transaction as well + // - Long-term transactions will no longer exist + if spans.is_none() { + r.lenient(DataCategory::Transaction); + } + + // Since we just extracted span metrics, which account for the total spans, we need also fix + // these counts, later we correct this again. + if let Some(spans) = &spans { + r.modify_by(DataCategory::Span, spans.0.len() as isize); + } + + (Box::new(tx.into_indexed()), metrics.into_inner()) + }); + + // In an ideal world we would use these extracted spans to also extract span metrics instead of + // re-using the transaction to get the metrics and risking differences in metrics. + // + // The master plan on how to clean this up: + // 1. Migrate dynamic sampling to EAP + // 2. Emit total category outcomes in Relay instead of as a metric + // 3. Remove all span metrics, including extraction (possible after 1., and 2.) + let spans = spans.map(|spans| { + spans.map(|spans, r| { + if let Some((c, q)) = metrics + .quantities() + .iter() + .find(|(c, _)| *c == DataCategory::Span) + { + // "Insurance" that metrics extracted from the transaction spans match the extracted + // spans. + r.modify_by(*c, -(*q as isize)); + } else if transaction.flags.metrics_extracted { + // This `metrics_extracted` flag really isn't necessary anymore and should be deleted. + r.lenient(DataCategory::Span); + } + + spans.into_indexed() + }) + }); + + (transaction, spans, metrics) +} + type IndexedAndMetrics = ( Managed>>, Managed, @@ -308,14 +403,14 @@ pub fn split_indexed_and_total( mut work: Managed>, ctx: Context<'_>, sampling_decision: SamplingDecision, -) -> Result> { +) -> IndexedAndMetrics { let scoping = work.scoping(); let had_metrics_extracted = work.flags.metrics_extracted; let extract_span_metrics = !work.flags.spans_rate_limited; let mut metrics = ProcessingExtractedMetrics::new(); - work.try_modify(|work, _| { + work.modify(|work, _| { work.flags.metrics_extracted = extraction::extract_metrics( &mut work.event, &mut metrics, @@ -327,13 +422,11 @@ pub fn split_indexed_and_total( metrics_extracted: work.flags.metrics_extracted, extract_span_metrics, }, - )? + ) .0; + }); - Ok::<_, Error>(()) - })?; - - Ok(work.split_once(|work, r| { + work.split_once(|work, r| { r.lenient(DataCategory::MetricBucket); if had_metrics_extracted || !work.flags.metrics_extracted { // Invalid config or invalid original transaction @@ -356,7 +449,7 @@ pub fn split_indexed_and_total( } (Box::new(work.into_indexed()), metrics.into_inner()) - })) + }) } /// Processes the profile attached to the transaction. @@ -383,38 +476,37 @@ pub fn process_profile(work: &mut Managed>, ctx: Contex /// /// Only extracts spans in processing. pub fn extract_spans( - mut work: Managed>, + transaction: Managed>, ctx: Context<'_>, server_sample_rate: Option, -) -> Managed> { - if !ctx.is_processing() { - return work; - } +) -> (Managed, Managed>) { + // This could be validated with additional typing, a different type for transactions which have + // spans extracted. + debug_assert!( + !transaction.flags.spans_extracted, + "spans can only be extracted once" + ); - work.modify(|work, r| { - if let Some(results) = spans::extract_from_event( - work.headers.dsc(), - &work.event, - ctx, - server_sample_rate, - EventMetricsExtracted(work.flags.metrics_extracted), - SpansExtracted(work.flags.spans_extracted), - ) { - work.flags.spans_extracted = true; - for result in results { - match result { - Ok(item) => work.extracted_spans.push(item), - Err(_) => { - r.reject_err( - Outcome::Invalid(DiscardReason::InvalidSpan), - IndexedSpans(1), - ); - } + transaction.split_once(|mut tx, r| { + let spans = spans::extract_from_event(tx.headers.dsc(), &tx.event, ctx, server_sample_rate) + .into_iter() + .filter_map(|span| match span { + Ok(span) => Some(span), + Err(()) => { + r.reject_err( + Outcome::Invalid(DiscardReason::InvalidSpan), + IndexedSpans(1), + ); + None } - } - } - }); - work + }) + .collect(); + + // Once spans are extracted, they are no longer counted towards the transaction. + tx.flags.spans_extracted = true; + + (ExtractedSpans(spans), tx) + }) } /// Runs PiiProcessors on the event and its attachments. diff --git a/relay-server/src/processing/transactions/spans.rs b/relay-server/src/processing/transactions/spans.rs index 78a48488aa8..544db4b1d51 100644 --- a/relay-server/src/processing/transactions/spans.rs +++ b/relay-server/src/processing/transactions/spans.rs @@ -1,45 +1,41 @@ use std::error::Error; -use crate::envelope::{ContentType, Item, ItemType}; use crate::processing; -use crate::processing::utils::event::{EventMetricsExtracted, SpansExtracted, event_type}; +use crate::processing::utils::event::event_type; use relay_base_schema::events::EventType; use relay_dynamic_config::Feature; -use relay_event_schema::protocol::{Event, Measurement, Measurements, Span}; +use relay_event_schema::protocol::{Event, Measurement, Measurements, Span, SpanV2}; use relay_metrics::MetricNamespace; use relay_metrics::{FractionUnit, MetricUnit}; use relay_protocol::{Annotated, Empty}; use relay_sampling::DynamicSamplingContext; -#[allow(clippy::too_many_arguments)] pub fn extract_from_event( dsc: Option<&DynamicSamplingContext>, event: &Annotated, ctx: processing::Context<'_>, server_sample_rate: Option, - event_metrics_extracted: EventMetricsExtracted, - spans_extracted: SpansExtracted, -) -> Option>> { +) -> Vec, ()>> { // Only extract spans from transactions (not errors). if event_type(event) != Some(EventType::Transaction) { - return None; + return Vec::new(); }; - if spans_extracted.0 { - return None; - } - let client_sample_rate = dsc.and_then(|ctx| ctx.sample_rate); - let event = event.value()?; + let Some(event) = event.value() else { + return Vec::new(); + }; - let transaction_span = processing::transactions::extraction::extract_segment_span( + let Some(transaction_span) = processing::transactions::extraction::extract_segment_span( event, ctx.config .aggregator_config_for(MetricNamespace::Spans) .max_tag_value_length, &[], - )?; + ) else { + return Vec::new(); + }; let mut results = vec![]; @@ -67,7 +63,6 @@ pub fn extract_from_event( ctx, client_sample_rate, server_sample_rate, - event_metrics_extracted.0, )); } } @@ -77,10 +72,9 @@ pub fn extract_from_event( ctx, client_sample_rate, server_sample_rate, - event_metrics_extracted.0, )); - Some(results) + results } fn make_span_item( @@ -88,8 +82,7 @@ fn make_span_item( ctx: processing::Context<'_>, client_sample_rate: Option, server_sample_rate: Option, - metrics_extracted: bool, -) -> Result { +) -> Result, ()> { add_sample_rate( &mut span.measurements, "client_sample_rate", @@ -114,13 +107,12 @@ fn make_span_item( }) .map_err(|_| ())?; - let mut item = create_span_item(span, ctx)?; + let use_measurements_smart_conversion = ctx + .project_info + .has_feature(Feature::MeasurementsSmartConversion); - // If metrics extraction happened for the event, it also happened for its spans: - item.set_metrics_extracted(metrics_extracted); - - relay_log::trace!("Adding span to envelope"); - Ok(item) + Ok(span + .map_value(|span| relay_spans::span_v1_to_span_v2(span, use_measurements_smart_conversion))) } /// Any violation of the span schema. @@ -197,44 +189,6 @@ pub fn validate(span: &mut Annotated) -> Result<(), ValidationError> { Ok(()) } -/// Serializes the given span into an envelope item. -/// -/// In processing relays, creates a Span V2 so it can be published via kafka. -pub fn create_span_item(span: Annotated, ctx: processing::Context<'_>) -> Result { - let mut new_item = Item::new(ItemType::Span); - if cfg!(feature = "processing") && ctx.config.processing_enabled() { - let use_measurements_smart_conversion = ctx - .project_info - .has_feature(Feature::MeasurementsSmartConversion); - let span_v2 = span.map_value(|span| { - relay_spans::span_v1_to_span_v2(span, use_measurements_smart_conversion) - }); - let payload = match span_v2.to_json() { - Ok(payload) => payload, - Err(err) => { - relay_log::error!("failed to serialize span V2: {}", err); - return Err(()); - } - }; - if let Some(trace_id) = span_v2.value().and_then(|s| s.trace_id.value()) { - new_item.set_routing_hint(*trace_id.as_ref()); - } - - new_item.set_payload(ContentType::Json, payload); - } else { - let payload = match span.to_json() { - Ok(payload) => payload, - Err(err) => { - relay_log::error!("failed to serialize span: {}", err); - return Err(()); - } - }; - new_item.set_payload(ContentType::Json, payload); - } - - Ok(new_item) -} - fn add_sample_rate(measurements: &mut Annotated, name: &str, value: Option) { let value = match value { Some(value) if value > 0.0 => value, @@ -250,102 +204,3 @@ fn add_sample_rate(measurements: &mut Annotated, name: &str, value .get_or_insert_with(Measurements::default) .insert(name.to_owned(), measurement); } - -#[cfg(test)] -mod tests { - - use std::collections::BTreeMap; - use std::sync::Arc; - - use bytes::Bytes; - use chrono::DateTime; - use relay_event_schema::protocol::{ - Context, ContextInner, Contexts, Span, Timestamp, TraceContext, - }; - use relay_system::Addr; - - use crate::Envelope; - use crate::managed::ManagedEnvelope; - use crate::services::projects::project::ProjectInfo; - - use super::*; - - fn params() -> (ManagedEnvelope, Annotated, Arc) { - let bytes = Bytes::from( - r#"{"event_id":"9ec79c33ec9942ab8353589fcb2e04dc","dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42","trace":{"trace_id":"89143b0763095bd9c9955e8175d1fb23","public_key":"e12d836b15bb49d7bbf99e64295d995b","sample_rate":"0.2"}} -{"type":"transaction"} -{} -"#, - ); - - let dummy_envelope = Envelope::parse_bytes(bytes).unwrap(); - let project_info = Arc::new(ProjectInfo::default()); - - let event = Event { - ty: EventType::Transaction.into(), - start_timestamp: Timestamp(DateTime::from_timestamp(0, 0).unwrap()).into(), - timestamp: Timestamp(DateTime::from_timestamp(1, 0).unwrap()).into(), - contexts: Contexts(BTreeMap::from([( - "trace".into(), - ContextInner(Context::Trace(Box::new(TraceContext { - trace_id: Annotated::new("4c79f60c11214eb38604f4ae0781bfb2".parse().unwrap()), - span_id: Annotated::new("fa90fdead5f74053".parse().unwrap()), - exclusive_time: 1000.0.into(), - ..Default::default() - }))) - .into(), - )])) - .into(), - ..Default::default() - }; - - let managed_envelope = ManagedEnvelope::new(dummy_envelope, Addr::dummy()); - let event = Annotated::from(event); - - (managed_envelope, event, project_info) - } - - #[test] - fn extract_sample_rates() { - let (managed_envelope, event, _) = params(); // client sample rate is 0.2 - let spans = extract_from_event( - managed_envelope.envelope().dsc(), - &event, - processing::Context::for_test(), - Some(0.1), - EventMetricsExtracted(false), - SpansExtracted(false), - ) - .unwrap(); - - let span = spans - .into_iter() - .find(|item| item.as_ref().unwrap().ty() == &ItemType::Span) - .unwrap() - .unwrap(); - - let span = Annotated::::from_json_bytes(&span.payload()).unwrap(); - let measurements = span.value().and_then(|s| s.measurements.value()); - - insta::assert_debug_snapshot!(measurements, @r###" - Some( - Measurements( - { - "client_sample_rate": Measurement { - value: 0.2, - unit: Fraction( - Ratio, - ), - }, - "server_sample_rate": Measurement { - value: 0.1, - unit: Fraction( - Ratio, - ), - }, - }, - ), - ) - "###); - } -} diff --git a/relay-server/src/processing/transactions/types.rs b/relay-server/src/processing/transactions/types.rs deleted file mode 100644 index d1210870832..00000000000 --- a/relay-server/src/processing/transactions/types.rs +++ /dev/null @@ -1,21 +0,0 @@ -mod expanded; -mod output; -mod profile; -mod serialized; - -pub use self::expanded::{ExpandedProfile, ExpandedTransaction}; -pub use self::output::TransactionOutput; -pub use self::profile::StandaloneProfile; -pub use self::serialized::SerializedTransaction; - -/// Flags extracted from transaction item headers. -/// -/// Ideally `metrics_extracted` and `spans_extracted` will not be needed in the future. Unsure -/// about `fully_normalized`. -#[derive(Debug, Default)] -pub struct Flags { - pub metrics_extracted: bool, - pub spans_extracted: bool, - pub fully_normalized: bool, - pub spans_rate_limited: bool, -} diff --git a/relay-server/src/processing/transactions/types/expanded.rs b/relay-server/src/processing/transactions/types/expanded.rs index 7ae220f0db3..07ef565eabe 100644 --- a/relay-server/src/processing/transactions/types/expanded.rs +++ b/relay-server/src/processing/transactions/types/expanded.rs @@ -1,5 +1,5 @@ use either::Either; -use relay_event_schema::protocol::Event; +use relay_event_schema::protocol::{Event, SpanV2}; use relay_profiling::{ProfileMetadata, ProfileType}; use relay_protocol::Annotated; use relay_quotas::DataCategory; @@ -14,10 +14,21 @@ use crate::metrics_extraction::ExtractedMetrics; use crate::processing::spans::{Indexed, TotalAndIndexed}; use crate::processing::transactions::Error; use crate::processing::transactions::process::split_indexed_and_total; -use crate::processing::transactions::types::Flags; -use crate::processing::{Context, RateLimited, RateLimiter}; +use crate::processing::{Context, CountRateLimited, RateLimited, RateLimiter}; use crate::statsd::RelayTimers; +/// Flags extracted from transaction item headers. +/// +/// Ideally `metrics_extracted` and `spans_extracted` will not be needed in the future. Unsure +/// about `fully_normalized`. +#[derive(Debug, Default)] +pub struct Flags { + pub metrics_extracted: bool, + pub spans_extracted: bool, + pub fully_normalized: bool, + pub spans_rate_limited: bool, +} + /// A transaction after parsing. /// /// The type parameter indicates whether metrics were already extracted, which changes how @@ -27,9 +38,8 @@ pub struct ExpandedTransaction { pub headers: EnvelopeHeaders, pub event: Annotated, pub flags: Flags, - pub attachments: Items, + pub attachments: Vec, pub profile: Option, - pub extracted_spans: Vec, #[expect(unused, reason = "marker field, only set never read")] pub category: C, } @@ -55,7 +65,6 @@ impl ExpandedTransaction { flags, attachments, profile, - extracted_spans, category: _, } = self; ExpandedTransaction { @@ -64,7 +73,6 @@ impl ExpandedTransaction { flags, attachments, profile, - extracted_spans, category: Indexed, } } @@ -78,7 +86,6 @@ impl Counted for ExpandedTransaction { flags, attachments, profile, - extracted_spans, category: _, } = self; let mut quantities = smallvec![ @@ -89,18 +96,13 @@ impl Counted for ExpandedTransaction { quantities.extend(attachments.quantities()); quantities.extend(profile.quantities()); - let span_count = if flags.spans_extracted { - extracted_spans.len() - } else { - debug_assert!(extracted_spans.is_empty()); - self.count_embedded_spans_and_self() - }; - if span_count > 0 { + if !flags.spans_extracted { + let span_count = self.count_embedded_spans_and_self(); quantities.extend([ (DataCategory::SpanIndexed, span_count), (DataCategory::Span, span_count), ]); - } + }; quantities } @@ -114,7 +116,6 @@ impl Counted for ExpandedTransaction { flags, attachments, profile, - extracted_spans, category: _, } = self; let mut quantities = smallvec![(DataCategory::TransactionIndexed, 1),]; @@ -122,15 +123,10 @@ impl Counted for ExpandedTransaction { quantities.extend(attachments.quantities()); quantities.extend(profile.quantities()); - let span_count = if flags.spans_extracted { - extracted_spans.len() - } else { - debug_assert!(self.extracted_spans.is_empty()); - self.count_embedded_spans_and_self() - }; - if span_count > 0 { + if !flags.spans_extracted { + let span_count = self.count_embedded_spans_and_self(); quantities.extend([(DataCategory::SpanIndexed, span_count)]); - } + }; quantities } @@ -166,7 +162,7 @@ impl RateLimited for Managed>> { .await; if !limits.is_empty() { let error = Error::from(limits); - let (indexed, metrics) = split_indexed_and_total(self, ctx, SamplingDecision::Keep)?; + let (indexed, metrics) = split_indexed_and_total(self, ctx, SamplingDecision::Keep); let _ = indexed.reject_err(error); return Ok(metrics.map(|metrics, _| Either::Right(metrics))); @@ -203,26 +199,6 @@ impl RateLimited for Managed>> { } } - // We assume that span extraction happens after metrics extraction, so safe to check both - // categories: - if !self.extracted_spans.is_empty() { - for category in [DataCategory::Span, DataCategory::SpanIndexed] { - let limits = rate_limiter - .try_consume(scoping.item(category), self.extracted_spans.len()) - .await; - - if !limits.is_empty() { - self.modify(|this, record_keeper| { - this.flags.spans_rate_limited = true; - record_keeper.reject_err( - Error::from(limits), - std::mem::take(&mut this.extracted_spans), - ); - }); - } - } - } - Ok(self.map(|work, _| Either::Left(work))) } } @@ -239,7 +215,6 @@ impl ExpandedTransaction { flags, attachments, profile, - extracted_spans, category: _, } = self; @@ -247,7 +222,6 @@ impl ExpandedTransaction { if let Some(profile) = profile { items.push(profile.serialize_item()); } - items.extend(extracted_spans); // To be compatible with previous code, add the transaction at the end: let data = metric!(timer(RelayTimers::EventProcessingSerialization), { @@ -305,3 +279,52 @@ impl Counted for ExpandedProfile { ] } } + +/// Spans which have been extracted from a [`ExpandedTransaction`]. +#[derive(Debug)] +pub struct ExtractedSpans(pub Vec>); + +impl ExtractedSpans { + pub fn into_indexed(self) -> ExtractedIndexedSpans { + ExtractedIndexedSpans(self.0) + } +} + +impl Counted for ExtractedSpans { + fn quantities(&self) -> Quantities { + smallvec![ + (DataCategory::Span, self.0.len()), + (DataCategory::SpanIndexed, self.0.len()), + ] + } +} + +impl CountRateLimited for Managed { + type Error = Error; +} + +/// [`ExtractedSpans`] which had their metrics extracted. +#[derive(Debug)] +pub struct ExtractedIndexedSpans(pub Vec>); + +impl ExtractedIndexedSpans { + pub fn into_iter(self) -> impl IntoIterator { + self.0.into_iter().map(ExtractedIndexedSpan) + } +} + +impl Counted for ExtractedIndexedSpans { + fn quantities(&self) -> Quantities { + smallvec![(DataCategory::SpanIndexed, self.0.len())] + } +} + +/// A single extracted span which had its metrics extracted. +#[derive(Debug)] +pub struct ExtractedIndexedSpan(pub Annotated); + +impl Counted for ExtractedIndexedSpan { + fn quantities(&self) -> Quantities { + smallvec![(DataCategory::SpanIndexed, 1)] + } +} diff --git a/relay-server/src/processing/transactions/types/mod.rs b/relay-server/src/processing/transactions/types/mod.rs new file mode 100644 index 00000000000..499c34ba5ef --- /dev/null +++ b/relay-server/src/processing/transactions/types/mod.rs @@ -0,0 +1,9 @@ +mod expanded; +mod output; +mod profile; +mod serialized; + +pub use self::expanded::*; +pub use self::output::*; +pub use self::profile::*; +pub use self::serialized::*; diff --git a/relay-server/src/processing/transactions/types/output.rs b/relay-server/src/processing/transactions/types/output.rs index aa0c14445f8..4f785223ec5 100644 --- a/relay-server/src/processing/transactions/types/output.rs +++ b/relay-server/src/processing/transactions/types/output.rs @@ -1,11 +1,11 @@ -use relay_quotas::DataCategory; - use crate::Envelope; use crate::managed::{Managed, ManagedResult, Rejected}; #[cfg(feature = "processing")] use crate::processing::StoreHandle; use crate::processing::spans::Indexed; -use crate::processing::transactions::types::{ExpandedTransaction, StandaloneProfile}; +use crate::processing::transactions::types::{ + ExpandedTransaction, ExtractedIndexedSpans, StandaloneProfile, +}; use crate::processing::{Forward, ForwardContext}; use crate::services::outcome::{DiscardReason, Outcome}; @@ -19,7 +19,10 @@ pub enum TransactionOutput { /// The transaction has not been dropped by dynamic sampling, and metrics have been extracted. /// /// This is used in processing relays. - Indexed(Managed>>), + Indexed { + spans: Option>, + transaction: Managed>>, + }, } impl Forward for TransactionOutput { @@ -36,16 +39,12 @@ impl Forward for TransactionOutput { TransactionOutput::Profile(profile) => { Ok(profile.map(|profile, _| profile.serialize_envelope())) } - TransactionOutput::Indexed(managed) => managed.try_map(|work, record_keeper| { - // TODO: This should raise an error, Indexed output should go straight to Kafka - // instead of an envelope. As long as we have this hack, ignore bookkeeping - record_keeper.lenient(DataCategory::Transaction); - record_keeper.lenient(DataCategory::Span); - - work.serialize_envelope() - .map_err(drop) - .with_outcome(Outcome::Invalid(DiscardReason::Internal)) - }), + TransactionOutput::Indexed { spans, transaction } => { + if let Some(spans) = spans { + let _ = spans.internal_error("indexed spans can only be stored"); + }; + Err(transaction.internal_error("an indexed transaction can only be stored")) + } } } @@ -55,8 +54,100 @@ impl Forward for TransactionOutput { s: StoreHandle<'_>, ctx: ForwardContext<'_>, ) -> Result<(), Rejected<()>> { - let envelope = self.serialize_envelope(ctx)?; - s.send_envelope(crate::managed::ManagedEnvelope::from(envelope)); + let (spans, transaction) = match self { + TransactionOutput::Full(managed) => { + return Err(managed.internal_error("only indexed transactions can be stored")); + } + TransactionOutput::Profile(profile) => { + s.send_to_store(profile.map(|p, _| store::convert_profile(p.profile, false, ctx))); + return Ok(()); + } + TransactionOutput::Indexed { spans, transaction } => (spans, transaction), + }; + + if let Some(spans) = spans { + let event_id = transaction.headers.event_id(); + let retention = ctx.retention(|r| r.span.as_ref()); + + for span in spans.split(|spans| spans.into_iter()) { + if let Ok(span) = + span.try_map(|span, _| store::convert_span(span, event_id, retention)) + { + s.send_to_store(span) + }; + } + } + + let (profile, transaction) = transaction.split_once(|mut tx, _| (tx.profile.take(), tx)); + if let Some(profile) = profile.transpose() { + s.send_to_store(profile.map(|p, _| store::convert_profile(p, true, ctx))); + } + + let envelope = transaction.try_map(|work, record_keeper| { + // TODO: This should raise an error, Indexed output should go straight to Kafka + // instead of an envelope. As long as we have this hack, ignore bookkeeping + record_keeper.lenient(relay_quotas::DataCategory::Transaction); + + work.serialize_envelope() + .map_err(drop) + .with_outcome(Outcome::Invalid(DiscardReason::Internal)) + })?; + + s.send_envelope(envelope.into()); + Ok(()) } } + +#[cfg(feature = "processing")] +mod store { + use relay_event_schema::protocol::EventId; + use relay_protocol::Annotated; + + use super::*; + + use crate::managed::Counted as _; + use crate::processing::Retention; + use crate::processing::transactions::types::{ExpandedProfile, ExtractedIndexedSpan}; + use crate::services::store::{StoreProfile, StoreSpanV2}; + + pub fn convert_profile( + profile: ExpandedProfile, + sampled: bool, + ctx: ForwardContext<'_>, + ) -> StoreProfile { + let retention_days = ctx.event_retention().standard; + + StoreProfile { + retention_days, + quantities: profile.quantities(), + profile: { + let mut item = profile.serialize_item(); + item.set_sampled(sampled); + item + }, + } + } + + pub fn convert_span( + span: ExtractedIndexedSpan, + event_id: Option, + retentions: Retention, + ) -> Result, Outcome> { + let span = match span.0 { + Annotated(Some(span), _) => span, + Annotated(None, meta) => { + relay_log::debug!("dropping empty span with meta {meta:?}"); + return Err(Outcome::Invalid(DiscardReason::InvalidSpan)); + } + }; + + Ok(Box::new(StoreSpanV2 { + routing_key: span.trace_id.value().copied().map(Into::into), + retention_days: retentions.standard, + downsampled_retention_days: retentions.downsampled, + event_id, + item: span, + })) + } +} diff --git a/relay-server/src/processing/transactions/types/profile.rs b/relay-server/src/processing/transactions/types/profile.rs index 951cafe504f..47333b81845 100644 --- a/relay-server/src/processing/transactions/types/profile.rs +++ b/relay-server/src/processing/transactions/types/profile.rs @@ -18,7 +18,6 @@ impl StandaloneProfile { let mut item = self.profile.serialize_item(); // This profile is now standalone, as dynamic sampling dropped the transaction. item.set_sampled(false); - Envelope::from_parts(self.headers, smallvec::smallvec![item]) } } diff --git a/relay-server/src/processing/transactions/types/serialized.rs b/relay-server/src/processing/transactions/types/serialized.rs index d8ad3c36893..e47fd60d843 100644 --- a/relay-server/src/processing/transactions/types/serialized.rs +++ b/relay-server/src/processing/transactions/types/serialized.rs @@ -1,6 +1,4 @@ -use smallvec::SmallVec; - -use crate::envelope::{EnvelopeHeaders, Item, Items}; +use crate::envelope::{EnvelopeHeaders, Item}; use crate::managed::{Counted, Quantities}; /// A transaction in its serialized state, as transported in an envelope. @@ -8,8 +6,8 @@ use crate::managed::{Counted, Quantities}; pub struct SerializedTransaction { pub headers: EnvelopeHeaders, pub event: Item, - pub attachments: Items, - pub profiles: SmallVec<[Item; 3]>, + pub attachments: Vec, + pub profiles: Vec, } impl Counted for SerializedTransaction { diff --git a/relay-server/src/processing/utils/event.rs b/relay-server/src/processing/utils/event.rs index 38e932c73b3..ba153108406 100644 --- a/relay-server/src/processing/utils/event.rs +++ b/relay-server/src/processing/utils/event.rs @@ -408,10 +408,6 @@ pub struct EventFullyNormalized(pub bool); #[derive(Debug, Copy, Clone)] pub struct EventMetricsExtracted(pub bool); -/// New type representing whether spans were extracted. -#[derive(Debug, Copy, Clone)] -pub struct SpansExtracted(pub bool); - /// Checks if the Event includes unprintable fields. fn has_unprintable_fields(event: &Annotated) -> bool { fn is_unprintable(value: &&str) -> bool { diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index bcfb479ead8..424de988c1e 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1105,14 +1105,6 @@ impl EnvelopeProcessorService { managed_envelope.envelope_mut().set_retention(retention); } - // Set the event retention. Effectively, this value will only be available in processing - // mode when the full project config is queried from the upstream. - if let Some(retention) = ctx.project_info.config.downsampled_event_retention { - managed_envelope - .envelope_mut() - .set_downsampled_retention(retention); - } - // Ensure the project ID is updated to the stored instance for this project cache. This can // differ in two cases: // 1. The envelope was sent to the legacy `/store/` endpoint without a project ID. diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 7c789c3d0fb..50d757fcf41 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -13,7 +13,6 @@ use futures::future::BoxFuture; use prost::Message as _; use sentry_protos::snuba::v1::{TraceItem, TraceItemType}; use serde::Serialize; -use serde_json::value::RawValue; use uuid::Uuid; use relay_base_schema::data_category::DataCategory; @@ -33,7 +32,7 @@ use relay_statsd::metric; use relay_system::{Addr, FromMessage, Interface, NoResponse, Service}; use relay_threading::AsyncPool; -use crate::envelope::{AttachmentPlaceholder, AttachmentType, ContentType, Item, ItemType}; +use crate::envelope::{AttachmentPlaceholder, AttachmentType, Item, ItemType}; use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected}; use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes}; use crate::service::ServiceError; @@ -137,6 +136,8 @@ pub struct StoreSpanV2 { pub retention_days: u16, /// Downsampled retention of the span. pub downsampled_retention_days: u16, + /// Optional event id, if the span was extracted from a transaction. + pub event_id: Option, /// The final Sentry compatible span item. pub item: SpanV2, } @@ -450,7 +451,6 @@ impl StoreService { let scoping = managed_envelope.scoping(); let retention = envelope.retention(); - let downsampled_retention = envelope.downsampled_retention(); let event_id = envelope.event_id(); let event_item = envelope.as_mut().take_item_by(|item| { @@ -477,7 +477,6 @@ impl StoreService { let mut attachments = Vec::new(); for item in envelope.items() { - let content_type = item.content_type(); match item.ty() { ItemType::Attachment => { if let Some(attachment) = self.produce_attachment( @@ -531,15 +530,7 @@ impl StoreService { item, )? } - ItemType::Span if content_type == Some(ContentType::Json) => self.produce_span( - scoping, - received_at, - event_id, - retention, - downsampled_retention, - item, - )?, - ty @ ItemType::Log => { + ty @ (ItemType::Log | ItemType::Span) => { debug_assert!( false, "received {ty} through an envelope, \ @@ -783,7 +774,7 @@ impl StoreService { organization_id: scoping.organization_id, project_id: scoping.project_id, key_id: scoping.key_id, - event_id: None, + event_id: message.event_id, retention_days: message.retention_days, downsampled_retention_days: message.downsampled_retention_days, received: datetime_to_timestamp(received_at), @@ -1279,87 +1270,6 @@ impl StoreService { Ok(()) } - - fn produce_span( - &self, - scoping: Scoping, - received_at: DateTime, - event_id: Option, - retention_days: u16, - downsampled_retention_days: u16, - item: &Item, - ) -> Result<(), StoreError> { - debug_assert_eq!(item.ty(), &ItemType::Span); - debug_assert_eq!(item.content_type(), Some(ContentType::Json)); - - let Scoping { - organization_id, - project_id, - project_key: _, - key_id, - } = scoping; - - let relay_emits_accepted_outcome = !utils::is_rolled_out( - scoping.organization_id.value(), - self.global_config - .current() - .options - .eap_span_outcomes_rollout_rate, - ) - .is_keep(); - - let payload = item.payload(); - let message = SpanKafkaMessageRaw { - meta: SpanMeta { - organization_id, - project_id, - key_id, - event_id, - retention_days, - downsampled_retention_days, - received: datetime_to_timestamp(received_at), - accepted_outcome_emitted: relay_emits_accepted_outcome, - }, - span: serde_json::from_slice(&payload) - .map_err(|e| StoreError::EncodingFailed(e.into()))?, - }; - - // Verify that this is a V2 span: - debug_assert!(message.span.contains_key("attributes")); - relay_statsd::metric!( - counter(RelayCounters::SpanV2Produced) += 1, - via = "envelope" - ); - - self.produce( - KafkaTopic::Spans, - KafkaMessage::SpanRaw { - routing_key: item.routing_hint(), - headers: BTreeMap::from([( - "project_id".to_owned(), - scoping.project_id.to_string(), - )]), - message, - org_id: organization_id, - }, - )?; - - if relay_emits_accepted_outcome { - // XXX: Temporarily produce span outcomes. Keep in sync with either EAP - // or the segments consumer, depending on which will produce outcomes later. - self.outcome_aggregator.send(TrackOutcome { - category: DataCategory::SpanIndexed, - event_id: None, - outcome: Outcome::Accepted, - quantity: 1, - remote_addr: None, - scoping, - timestamp: received_at, - }); - } - - Ok(()) - } } impl Service for StoreService { @@ -1644,14 +1554,6 @@ struct CheckInKafkaMessage { org_id: OrganizationId, } -#[derive(Debug, Serialize)] -struct SpanKafkaMessageRaw<'a> { - #[serde(flatten)] - meta: SpanMeta, - #[serde(flatten)] - span: BTreeMap<&'a str, &'a RawValue>, -} - #[derive(Debug, Serialize)] struct SpanKafkaMessage<'a> { #[serde(flatten)] @@ -1712,18 +1614,6 @@ enum KafkaMessage<'a> { #[serde(skip)] message: TraceItem, }, - SpanRaw { - #[serde(skip)] - routing_key: Option, - #[serde(skip)] - headers: BTreeMap, - #[serde(flatten)] - message: SpanKafkaMessageRaw<'a>, - - /// Used for [`KafkaMessage::key`] - #[serde(skip)] - org_id: OrganizationId, - }, SpanV2 { #[serde(skip)] routing_key: Option, @@ -1774,7 +1664,7 @@ impl Message for KafkaMessage<'_> { MetricNamespace::Unsupported => "metric_unsupported", }, KafkaMessage::CheckIn(_) => "check_in", - KafkaMessage::SpanRaw { .. } | KafkaMessage::SpanV2 { .. } => "span", + KafkaMessage::SpanV2 { .. } => "span", KafkaMessage::Item { item_type, .. } => item_type.as_str_name(), KafkaMessage::Attachment(_) => "attachment", @@ -1792,12 +1682,7 @@ impl Message for KafkaMessage<'_> { match self { Self::Event(message) => Some((message.event_id.0, message.org_id)), Self::UserReport(message) => Some((message.event_id.0, message.org_id)), - Self::SpanRaw { - routing_key, - org_id, - .. - } - | Self::SpanV2 { + Self::SpanV2 { routing_key, org_id, .. @@ -1834,7 +1719,6 @@ impl Message for KafkaMessage<'_> { fn headers(&self) -> Option<&BTreeMap> { match &self { KafkaMessage::Metric { headers, .. } - | KafkaMessage::SpanRaw { headers, .. } | KafkaMessage::SpanV2 { headers, .. } | KafkaMessage::Item { headers, .. } | KafkaMessage::Profile(ProfileKafkaMessage { headers, .. }) @@ -1852,7 +1736,6 @@ impl Message for KafkaMessage<'_> { fn serialize(&self) -> Result, ClientError> { match self { KafkaMessage::Metric { message, .. } => serialize_as_json(message), - KafkaMessage::SpanRaw { message, .. } => serialize_as_json(message), KafkaMessage::SpanV2 { message, .. } => serialize_as_json(message), KafkaMessage::Item { message, .. } => { let mut payload = Vec::new(); From 118d4d64e1df95cc59a5103e93eeb37588ed2077 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Fri, 29 May 2026 13:20:25 +0200 Subject: [PATCH 2/3] typo review --- relay-server/src/processing/transactions/process.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/processing/transactions/process.rs b/relay-server/src/processing/transactions/process.rs index 70fa4b19376..71aa7807bfb 100644 --- a/relay-server/src/processing/transactions/process.rs +++ b/relay-server/src/processing/transactions/process.rs @@ -344,7 +344,7 @@ pub fn split_indexed_and_total_with_extracted_spans( // eventually be fully transformed into spans. // // Since there is no span metric extracted for this transaction, as we already extracted - // the spans from the transaction, there is no now metric carrying the transaction category. + // the spans from the transaction, there is now no metric carrying the transaction category. // // After extracting span metrics the count is accurate again, but attached to the span metrics. // Unless, the spans have been rate limited, which is an actual potential bug which we From 7b36cea8e52802f313b8e24cd59519cd3f17645b Mon Sep 17 00:00:00 2001 From: David Herberth Date: Fri, 29 May 2026 13:22:14 +0200 Subject: [PATCH 3/3] clippy lint --- relay-server/src/processing/transactions/types/expanded.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/relay-server/src/processing/transactions/types/expanded.rs b/relay-server/src/processing/transactions/types/expanded.rs index 07ef565eabe..3a6f580a08d 100644 --- a/relay-server/src/processing/transactions/types/expanded.rs +++ b/relay-server/src/processing/transactions/types/expanded.rs @@ -308,6 +308,7 @@ impl CountRateLimited for Managed { pub struct ExtractedIndexedSpans(pub Vec>); impl ExtractedIndexedSpans { + #[cfg(feature = "processing")] pub fn into_iter(self) -> impl IntoIterator { self.0.into_iter().map(ExtractedIndexedSpan) } @@ -321,8 +322,10 @@ impl Counted for ExtractedIndexedSpans { /// A single extracted span which had its metrics extracted. #[derive(Debug)] +#[cfg(feature = "processing")] pub struct ExtractedIndexedSpan(pub Annotated); +#[cfg(feature = "processing")] impl Counted for ExtractedIndexedSpan { fn quantities(&self) -> Quantities { smallvec![(DataCategory::SpanIndexed, 1)]