Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

- Implement mobile measurements calculation for V2 spans. ([#6022](https://github.com/getsentry/relay/pull/6022))

**Internal**:

- Forwards extracted transaction spans directly to Kafka instead of serializing to an intermediate envelope first. ([#6029](https://github.com/getsentry/relay/pull/6029))

## 26.5.1

**Features**:
Expand Down
26 changes: 0 additions & 26 deletions relay-server/src/envelope/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,6 @@ pub struct EnvelopeHeaders<M = RequestMeta> {
#[serde(default, skip_serializing_if = "Option::is_none")]
retention: Option<u16>,

/// Data retention in days for the items of this envelope.
///
/// This value is always overwritten in processing mode by the value specified in the project
/// configuration.
#[serde(default, skip_serializing_if = "Option::is_none")]
downsampled_retention: Option<u16>,

/// Timestamp when the event has been sent, according to the SDK.
///
/// This can be used to perform drift correction.
Expand Down Expand Up @@ -165,7 +158,6 @@ impl EnvelopeHeaders<PartialMeta> {
event_id: self.event_id,
meta: meta.copy_to(request_meta),
retention: self.retention,
downsampled_retention: self.downsampled_retention,
sent_at: self.sent_at,
trace: self.trace,
required_features: self.required_features,
Expand Down Expand Up @@ -231,7 +223,6 @@ where
event_id,
meta,
retention,
downsampled_retention,
sent_at,
trace,
required_features,
Expand All @@ -247,9 +238,6 @@ where
if let Some(retention) = retention {
map.entry(&"retention", retention);
}
if let Some(downsampled_retention) = downsampled_retention {
map.entry(&"downsampled_retention", downsampled_retention);
}
if let Some(sent_at) = sent_at {
map.entry(&"sent_at", sent_at);
}
Expand Down Expand Up @@ -302,7 +290,6 @@ impl Envelope {
event_id,
meta,
retention: None,
downsampled_retention: None,
sent_at: None,
other: BTreeMap::new(),
trace: None,
Expand Down Expand Up @@ -406,14 +393,6 @@ impl Envelope {
self.headers.retention.unwrap_or(DEFAULT_EVENT_RETENTION)
}

/// Returns the data retention in days for items in this envelope.
#[cfg_attr(not(feature = "processing"), allow(dead_code))]
pub fn downsampled_retention(&self) -> u16 {
self.headers
.downsampled_retention
.unwrap_or(self.retention())
}

/// When the event has been sent, according to the SDK.
pub fn sent_at(&self) -> Option<DateTime<Utc>> {
self.headers.sent_at
Expand Down Expand Up @@ -471,11 +450,6 @@ impl Envelope {
self.headers.retention = Some(retention);
}

/// Sets the data retention in days for items in this envelope.
pub fn set_downsampled_retention(&mut self, retention: u16) {
self.headers.downsampled_retention = Some(retention);
}

/// Runs transaction parametrization on the DSC trace transaction.
///
/// The purpose is for trace rules to match on the parametrized version of the transaction.
Expand Down
8 changes: 8 additions & 0 deletions relay-server/src/managed/counted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ impl<T: Counted> Counted for Option<T> {
}
}

impl<T: Counted, S: Counted> Counted for (T, S) {
fn quantities(&self) -> Quantities {
let mut quantities = self.0.quantities();
quantities.extend(self.1.quantities());
quantities
}
}

impl<L, R> Counted for Either<L, R>
where
L: Counted,
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/processing/legacy_spans/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub fn convert(
routing_key: span.trace_id.value().copied().map(Into::into),
retention_days: retentions.standard,
downsampled_retention_days: retentions.downsampled,
event_id: None,
item: span,
}))
}
56 changes: 0 additions & 56 deletions relay-server/src/processing/profiles/forward.rs

This file was deleted.

47 changes: 45 additions & 2 deletions relay-server/src/processing/profiles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ use relay_profiling::{ProfileError, ProfileType};
use relay_quotas::{DataCategory, RateLimits};
use smallvec::smallvec;

use crate::Envelope;
use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items};
use crate::managed::{
Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected,
};
use crate::processing::{Context, CountRateLimited, Output, Processor, QuotaRateLimiter};
use crate::processing::{
Context, CountRateLimited, Forward, ForwardContext, Output, Processor, QuotaRateLimiter,
};
use crate::services::outcome::{DiscardReason, Outcome};

mod filter;
mod forward;
mod process;

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -159,3 +161,44 @@ impl CountRateLimited for Managed<ExpandedProfile> {
/// Output produced by the [`ProfilesProcessor`].
#[derive(Debug)]
pub struct ProfilesOutput(Managed<ExpandedProfile>);

impl Forward for ProfilesOutput {
fn serialize_envelope(
self,
_ctx: ForwardContext<'_>,
) -> Result<Managed<Box<Envelope>>, Rejected<()>> {
let Self(profile) = self;
let envelope = profile.map(
|ExpandedProfile {
headers,
profile,
profile_type: _,
},
_| { Envelope::from_parts(headers, smallvec![profile]) },
);

Ok(envelope)
}

#[cfg(feature = "processing")]
fn forward_store(
self,
s: crate::processing::StoreHandle<'_>,
ctx: ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
use crate::services::store::StoreProfile;

let Self(profile) = self;

let retention_days = ctx.event_retention().standard;

let store_profile = profile.map(|profile, _| StoreProfile {
retention_days,
quantities: profile.quantities(),
profile: profile.profile,
});
s.send_to_store(store_profile);
Comment thread
Dav1dde marked this conversation as resolved.

Ok(())
}
}
1 change: 1 addition & 0 deletions relay-server/src/processing/spans/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub fn convert(span: IndexedSpanOnly, ctx: &Context) -> Result<Box<StoreSpanV2>>
routing_key,
retention_days: ctx.retention.standard,
downsampled_retention_days: ctx.retention.downsampled,
event_id: None,
item: span,
}))
}
Expand Down
14 changes: 7 additions & 7 deletions relay-server/src/processing/transactions/extraction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use relay_sampling::evaluation::SamplingDecision;

use crate::processing::Context;
use crate::processing::utils::event::EventMetricsExtracted;
use crate::services::processor::{ProcessingError, ProcessingExtractedMetrics};
use crate::services::processor::ProcessingExtractedMetrics;

/// Creates a span from the transaction and applies tag extraction on it.
///
Expand Down Expand Up @@ -42,7 +42,7 @@ pub fn extract_metrics(
event: &mut Annotated<Event>,
extracted_metrics: &mut ProcessingExtractedMetrics,
ctx: ExtractMetricsContext,
) -> Result<EventMetricsExtracted, ProcessingError> {
) -> EventMetricsExtracted {
let ExtractMetricsContext {
dsc,
project_id,
Expand All @@ -55,11 +55,11 @@ pub fn extract_metrics(
// the full metrics extraction config and skip sampling if it is incomplete.

if metrics_extracted {
return Ok(EventMetricsExtracted(true));
return EventMetricsExtracted(true);
}
let Some(event) = event.value_mut() else {
// Nothing to extract, but metrics extraction was called.
return Ok(EventMetricsExtracted(true));
return EventMetricsExtracted(true);
};

// NOTE: This function requires a `metric_extraction` in the project config. Legacy configs
Expand All @@ -68,7 +68,7 @@ pub fn extract_metrics(
let combined_config = {
let config = match &ctx.project_info.config.metric_extraction {
ErrorBoundary::Ok(config) if config.is_supported() => config,
_ => return Ok(EventMetricsExtracted(false)),
_ => return EventMetricsExtracted(false),
};
let global_config = match &ctx.global_config.metric_extraction {
ErrorBoundary::Ok(global_config) => global_config,
Expand All @@ -83,7 +83,7 @@ pub fn extract_metrics(
// If there's an error with global metrics extraction, it is safe to assume that this
// Relay instance is not up-to-date, and we should skip extraction.
relay_log::debug!("Failed to parse global extraction config: {e}");
return Ok(EventMetricsExtracted(false));
return EventMetricsExtracted(false);
}
}
};
Expand All @@ -106,5 +106,5 @@ pub fn extract_metrics(
);
extracted_metrics.extend(metrics, Some(sampling_decision));

Ok(EventMetricsExtracted(true))
EventMetricsExtracted(true)
}
21 changes: 12 additions & 9 deletions relay-server/src/processing/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ use std::sync::Arc;
use relay_event_normalization::GeoIpLookup;
use relay_event_schema::protocol::Metrics;
use relay_quotas::RateLimits;
use relay_sampling::evaluation::SamplingDecision;

use crate::envelope::ItemType;
use crate::managed::{Managed, ManagedEnvelope, OutcomeError, Rejected};
use crate::processing::transactions::process::{SamplingOutput, split_indexed_and_total};
use crate::processing::transactions::process::SamplingOutput;
use crate::processing::transactions::types::{SerializedTransaction, TransactionOutput};
use crate::processing::utils::attachments;
use crate::processing::utils::event::event_type;
Expand Down Expand Up @@ -101,11 +100,13 @@ impl Processor for TransactionProcessor {

let attachments = envelope
.envelope_mut()
.take_items_by(|item| matches!(*item.ty(), ItemType::Attachment));
.take_items_by(|item| matches!(*item.ty(), ItemType::Attachment))
.into_vec();

let profiles = envelope
.envelope_mut()
.take_items_by(|item| matches!(*item.ty(), ItemType::Profile));
.take_items_by(|item| matches!(*item.ty(), ItemType::Profile))
.into_vec();

let work = SerializedTransaction {
headers,
Expand Down Expand Up @@ -144,7 +145,7 @@ impl Processor for TransactionProcessor {
process::process_profile(&mut tx, ctx);

relay_log::trace!("Sample transaction");
let (tx, server_sample_rate) = match process::run_dynamic_sampling(tx, ctx, filters_status)?
let (tx, server_sample_rate) = match process::run_dynamic_sampling(tx, ctx, filters_status)
{
SamplingOutput::Keep {
payload,
Expand All @@ -170,8 +171,6 @@ impl Processor for TransactionProcessor {
#[allow(unused_mut)]
let mut tx = process::scrub(tx, ctx)?;

tx = process::extract_spans(tx, ctx, server_sample_rate);

relay_log::trace!("Enforce quotas");
let tx = self.limiter.enforce_quotas(tx, ctx).await?;
let tx = match tx.transpose() {
Expand All @@ -188,10 +187,14 @@ impl Processor for TransactionProcessor {
);
};

let (indexed, metrics) = split_indexed_and_total(tx, ctx, SamplingDecision::Keep)?;
let (spans, tx) = process::extract_spans(tx, ctx, server_sample_rate);
let spans = self.limiter.enforce_quotas(spans, ctx).await.ok();

let (transaction, spans, metrics) =
process::split_indexed_and_total_with_extracted_spans(tx, spans, ctx);

return Ok(Output {
main: Some(TransactionOutput::Indexed(indexed)),
main: Some(TransactionOutput::Indexed { spans, transaction }),
metrics: Some(metrics),
});
}
Expand Down
Loading
Loading