Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

- Implement mobile measurements calculation for V2 spans. ([#6022](https://github.com/getsentry/relay/pull/6022))

**Internal**:

- Forwards extracted transaction spans directly to Kafka instead of serializing to an intermediate envelope first. ([#6029](https://github.com/getsentry/relay/pull/6029))

## 26.5.1

**Features**:
Expand Down
26 changes: 0 additions & 26 deletions relay-server/src/envelope/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,6 @@ pub struct EnvelopeHeaders<M = RequestMeta> {
#[serde(default, skip_serializing_if = "Option::is_none")]
retention: Option<u16>,

/// Data retention in days for the items of this envelope.
///
/// This value is always overwritten in processing mode by the value specified in the project
/// configuration.
#[serde(default, skip_serializing_if = "Option::is_none")]
downsampled_retention: Option<u16>,

/// Timestamp when the event has been sent, according to the SDK.
///
/// This can be used to perform drift correction.
Expand Down Expand Up @@ -165,7 +158,6 @@ impl EnvelopeHeaders<PartialMeta> {
event_id: self.event_id,
meta: meta.copy_to(request_meta),
retention: self.retention,
downsampled_retention: self.downsampled_retention,
sent_at: self.sent_at,
trace: self.trace,
required_features: self.required_features,
Expand Down Expand Up @@ -231,7 +223,6 @@ where
event_id,
meta,
retention,
downsampled_retention,
sent_at,
trace,
required_features,
Expand All @@ -247,9 +238,6 @@ where
if let Some(retention) = retention {
map.entry(&"retention", retention);
}
if let Some(downsampled_retention) = downsampled_retention {
map.entry(&"downsampled_retention", downsampled_retention);
}
if let Some(sent_at) = sent_at {
map.entry(&"sent_at", sent_at);
}
Expand Down Expand Up @@ -302,7 +290,6 @@ impl Envelope {
event_id,
meta,
retention: None,
downsampled_retention: None,
sent_at: None,
other: BTreeMap::new(),
trace: None,
Expand Down Expand Up @@ -406,14 +393,6 @@ impl Envelope {
self.headers.retention.unwrap_or(DEFAULT_EVENT_RETENTION)
}

/// Returns the data retention in days for items in this envelope.
#[cfg_attr(not(feature = "processing"), allow(dead_code))]
pub fn downsampled_retention(&self) -> u16 {
self.headers
.downsampled_retention
.unwrap_or(self.retention())
}

/// When the event has been sent, according to the SDK.
pub fn sent_at(&self) -> Option<DateTime<Utc>> {
self.headers.sent_at
Expand Down Expand Up @@ -471,11 +450,6 @@ impl Envelope {
self.headers.retention = Some(retention);
}

/// Sets the data retention in days for items in this envelope.
pub fn set_downsampled_retention(&mut self, retention: u16) {
self.headers.downsampled_retention = Some(retention);
}

/// Runs transaction parametrization on the DSC trace transaction.
///
/// The purpose is for trace rules to match on the parametrized version of the transaction.
Expand Down
8 changes: 8 additions & 0 deletions relay-server/src/managed/counted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ impl<T: Counted> Counted for Option<T> {
}
}

impl<T: Counted, S: Counted> Counted for (T, S) {
fn quantities(&self) -> Quantities {
let mut quantities = self.0.quantities();
quantities.extend(self.1.quantities());
quantities
}
}

impl<L, R> Counted for Either<L, R>
where
L: Counted,
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/processing/legacy_spans/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub fn convert(
routing_key: span.trace_id.value().copied().map(Into::into),
retention_days: retentions.standard,
downsampled_retention_days: retentions.downsampled,
event_id: None,
item: span,
}))
}
56 changes: 0 additions & 56 deletions relay-server/src/processing/profiles/forward.rs

This file was deleted.

47 changes: 45 additions & 2 deletions relay-server/src/processing/profiles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ use relay_profiling::{ProfileError, ProfileType};
use relay_quotas::{DataCategory, RateLimits};
use smallvec::smallvec;

use crate::Envelope;
use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items};
use crate::managed::{
Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected,
};
use crate::processing::{Context, CountRateLimited, Output, Processor, QuotaRateLimiter};
use crate::processing::{
Context, CountRateLimited, Forward, ForwardContext, Output, Processor, QuotaRateLimiter,
};
use crate::services::outcome::{DiscardReason, Outcome};

mod filter;
mod forward;
mod process;

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -159,3 +161,44 @@ impl CountRateLimited for Managed<ExpandedProfile> {
/// Output produced by the [`ProfilesProcessor`].
#[derive(Debug)]
pub struct ProfilesOutput(Managed<ExpandedProfile>);

impl Forward for ProfilesOutput {
fn serialize_envelope(
self,
_ctx: ForwardContext<'_>,
) -> Result<Managed<Box<Envelope>>, Rejected<()>> {
let Self(profile) = self;
let envelope = profile.map(
|ExpandedProfile {
headers,
profile,
profile_type: _,
},
_| { Envelope::from_parts(headers, smallvec![profile]) },
);

Ok(envelope)
}

#[cfg(feature = "processing")]
fn forward_store(
self,
s: crate::processing::StoreHandle<'_>,
ctx: ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
use crate::services::store::StoreProfile;

let Self(profile) = self;

let retention_days = ctx.event_retention().standard;

let store_profile = profile.map(|profile, _| StoreProfile {
retention_days,
quantities: profile.quantities(),
profile: profile.profile,
});
s.send_to_store(store_profile);
Comment thread
Dav1dde marked this conversation as resolved.

Ok(())
}
}
1 change: 1 addition & 0 deletions relay-server/src/processing/spans/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub fn convert(span: IndexedSpanOnly, ctx: &Context) -> Result<Box<StoreSpanV2>>
routing_key,
retention_days: ctx.retention.standard,
downsampled_retention_days: ctx.retention.downsampled,
event_id: None,
item: span,
}))
}
Expand Down
14 changes: 7 additions & 7 deletions relay-server/src/processing/transactions/extraction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use relay_sampling::evaluation::SamplingDecision;

use crate::processing::Context;
use crate::processing::utils::event::EventMetricsExtracted;
use crate::services::processor::{ProcessingError, ProcessingExtractedMetrics};
use crate::services::processor::ProcessingExtractedMetrics;

/// Creates a span from the transaction and applies tag extraction on it.
///
Expand Down Expand Up @@ -42,7 +42,7 @@ pub fn extract_metrics(
event: &mut Annotated<Event>,
extracted_metrics: &mut ProcessingExtractedMetrics,
ctx: ExtractMetricsContext,
) -> Result<EventMetricsExtracted, ProcessingError> {
) -> EventMetricsExtracted {
let ExtractMetricsContext {
dsc,
project_id,
Expand All @@ -55,11 +55,11 @@ pub fn extract_metrics(
// the full metrics extraction config and skip sampling if it is incomplete.

if metrics_extracted {
return Ok(EventMetricsExtracted(true));
return EventMetricsExtracted(true);
}
let Some(event) = event.value_mut() else {
// Nothing to extract, but metrics extraction was called.
return Ok(EventMetricsExtracted(true));
return EventMetricsExtracted(true);
};

// NOTE: This function requires a `metric_extraction` in the project config. Legacy configs
Expand All @@ -68,7 +68,7 @@ pub fn extract_metrics(
let combined_config = {
let config = match &ctx.project_info.config.metric_extraction {
ErrorBoundary::Ok(config) if config.is_supported() => config,
_ => return Ok(EventMetricsExtracted(false)),
_ => return EventMetricsExtracted(false),
};
let global_config = match &ctx.global_config.metric_extraction {
ErrorBoundary::Ok(global_config) => global_config,
Expand All @@ -83,7 +83,7 @@ pub fn extract_metrics(
// If there's an error with global metrics extraction, it is safe to assume that this
// Relay instance is not up-to-date, and we should skip extraction.
relay_log::debug!("Failed to parse global extraction config: {e}");
return Ok(EventMetricsExtracted(false));
return EventMetricsExtracted(false);
}
}
};
Expand All @@ -106,5 +106,5 @@ pub fn extract_metrics(
);
extracted_metrics.extend(metrics, Some(sampling_decision));

Ok(EventMetricsExtracted(true))
EventMetricsExtracted(true)
}
21 changes: 12 additions & 9 deletions relay-server/src/processing/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ use std::sync::Arc;
use relay_event_normalization::GeoIpLookup;
use relay_event_schema::protocol::Metrics;
use relay_quotas::RateLimits;
use relay_sampling::evaluation::SamplingDecision;

use crate::envelope::ItemType;
use crate::managed::{Managed, ManagedEnvelope, OutcomeError, Rejected};
use crate::processing::transactions::process::{SamplingOutput, split_indexed_and_total};
use crate::processing::transactions::process::SamplingOutput;
use crate::processing::transactions::types::{SerializedTransaction, TransactionOutput};
use crate::processing::utils::attachments;
use crate::processing::utils::event::event_type;
Expand Down Expand Up @@ -101,11 +100,13 @@ impl Processor for TransactionProcessor {

let attachments = envelope
.envelope_mut()
.take_items_by(|item| matches!(*item.ty(), ItemType::Attachment));
.take_items_by(|item| matches!(*item.ty(), ItemType::Attachment))
.into_vec();

let profiles = envelope
.envelope_mut()
.take_items_by(|item| matches!(*item.ty(), ItemType::Profile));
.take_items_by(|item| matches!(*item.ty(), ItemType::Profile))
.into_vec();

let work = SerializedTransaction {
headers,
Expand Down Expand Up @@ -144,7 +145,7 @@ impl Processor for TransactionProcessor {
process::process_profile(&mut tx, ctx);

relay_log::trace!("Sample transaction");
let (tx, server_sample_rate) = match process::run_dynamic_sampling(tx, ctx, filters_status)?
let (tx, server_sample_rate) = match process::run_dynamic_sampling(tx, ctx, filters_status)
{
SamplingOutput::Keep {
payload,
Expand All @@ -170,8 +171,6 @@ impl Processor for TransactionProcessor {
#[allow(unused_mut)]
let mut tx = process::scrub(tx, ctx)?;

tx = process::extract_spans(tx, ctx, server_sample_rate);

relay_log::trace!("Enforce quotas");
let tx = self.limiter.enforce_quotas(tx, ctx).await?;
let tx = match tx.transpose() {
Expand All @@ -188,10 +187,14 @@ impl Processor for TransactionProcessor {
);
};

let (indexed, metrics) = split_indexed_and_total(tx, ctx, SamplingDecision::Keep)?;
let (spans, tx) = process::extract_spans(tx, ctx, server_sample_rate);
let spans = self.limiter.enforce_quotas(spans, ctx).await.ok();

let (transaction, spans, metrics) =
process::split_indexed_and_total_with_extracted_spans(tx, spans, ctx);

return Ok(Output {
main: Some(TransactionOutput::Indexed(indexed)),
main: Some(TransactionOutput::Indexed { spans, transaction }),
metrics: Some(metrics),
});
}
Expand Down
Loading
Loading