diff --git a/relay-cogs/src/lib.rs b/relay-cogs/src/lib.rs index 8e04c7cf029..55868e4df31 100644 --- a/relay-cogs/src/lib.rs +++ b/relay-cogs/src/lib.rs @@ -173,6 +173,8 @@ pub enum AppFeature { Profiles, /// User Reports UserReports, + /// Event attachments not associated with an event. + StandaloneAttachments, /// Metrics in the spans namespace. MetricsSpans, @@ -208,6 +210,7 @@ impl AppFeature { Self::CheckIns => "check_ins", Self::Replays => "replays", Self::UserReports => "user_reports", + Self::StandaloneAttachments => "standalone_attachments", Self::MetricsSpans => "metrics_spans", Self::MetricsTransactions => "metrics_transactions", Self::MetricsSessions => "metrics_sessions", diff --git a/relay-server/src/endpoints/security_report.rs b/relay-server/src/endpoints/security_report.rs index 579145afd9a..663858aab8b 100644 --- a/relay-server/src/endpoints/security_report.rs +++ b/relay-server/src/endpoints/security_report.rs @@ -46,29 +46,37 @@ impl SecurityReportParams { report_item } - fn extract_envelope(self) -> Result, BadStoreRequest> { + #[expect( + clippy::vec_box, + reason = "Box is created by Envelope::from_request and used for processing" + )] + fn extract_envelopes(self) -> Result>, BadStoreRequest> { let Self { meta, query, body } = self; if body.is_empty() { return Err(BadStoreRequest::EmptyBody); } - let mut envelope = Envelope::from_request(Some(EventId::new()), meta); - let variant = - serde_json::from_slice::>(&body).map_err(BadStoreRequest::InvalidJson); - - if let Ok(items) = variant { - for item in items { - let report_item = - Self::create_security_item(&query, Bytes::from(item.to_owned().to_string())); + Ok(match serde_json::from_slice::>(&body) { + Ok(items) => items + .into_iter() + .map(|item| { + let mut envelope = Envelope::from_request(Some(EventId::new()), meta.clone()); + let report = Self::create_security_item( + &query, + Bytes::from(item.to_owned().to_string()), + ); + envelope.add_item(report); + envelope + }) + .collect(), + Err(_) => { + let mut envelope = Envelope::from_request(Some(EventId::new()), meta); + let report_item = Self::create_security_item(&query, body); envelope.add_item(report_item); + vec![envelope] } - } else { - let report_item = Self::create_security_item(&query, body); - envelope.add_item(report_item); - } - - Ok(envelope) + }) } } @@ -100,10 +108,11 @@ async fn handle( return Ok(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response()); } - let envelope = params.extract_envelope()?; - common::handle_envelope(&state, envelope) - .await? - .check_rate_limits()?; + for envelope in params.extract_envelopes()? { + common::handle_envelope(&state, envelope) + .await? + .check_rate_limits()?; + } Ok(().into_response()) } diff --git a/relay-server/src/processing/attachments/mod.rs b/relay-server/src/processing/attachments/mod.rs index 21f7073d9af..c38cd659efa 100644 --- a/relay-server/src/processing/attachments/mod.rs +++ b/relay-server/src/processing/attachments/mod.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use relay_cogs::{AppFeature, FeatureWeights}; use relay_quotas::RateLimits; use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items}; @@ -64,6 +65,10 @@ impl processing::Processor for AttachmentProcessor { type Output = AttachmentsOutput; type Error = Error; + fn cogs() -> FeatureWeights { + AppFeature::StandaloneAttachments.into() + } + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { debug_assert!( !envelope.envelope().items().any(Item::creates_event), diff --git a/relay-server/src/processing/check_ins/mod.rs b/relay-server/src/processing/check_ins/mod.rs index 960628c9f8d..814c5fe5fb1 100644 --- a/relay-server/src/processing/check_ins/mod.rs +++ b/relay-server/src/processing/check_ins/mod.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use relay_cogs::{AppFeature, FeatureWeights}; use relay_quotas::{DataCategory, RateLimits}; use crate::Envelope; @@ -63,6 +64,10 @@ impl processing::Processor for CheckInsProcessor { type Output = CheckInsOutput; type Error = Error; + fn cogs() -> FeatureWeights { + AppFeature::CheckIns.into() + } + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); @@ -71,6 +76,10 @@ impl processing::Processor for CheckInsProcessor { .take_items_by(|item| matches!(*item.ty(), ItemType::CheckIn)) .into_vec(); + if check_ins.is_empty() { + return None; + } + let work = SerializedCheckIns { headers, check_ins }; Some(Managed::with_meta_from_managed_envelope(envelope, work)) } diff --git a/relay-server/src/processing/client_reports/mod.rs b/relay-server/src/processing/client_reports/mod.rs index 462f7ceb2c8..c1163447722 100644 --- a/relay-server/src/processing/client_reports/mod.rs +++ b/relay-server/src/processing/client_reports/mod.rs @@ -1,3 +1,4 @@ +use relay_cogs::{AppFeature, FeatureWeights}; use relay_common::time::UnixTimestamp; use relay_quotas::DataCategory; use relay_system::Addr; @@ -68,6 +69,10 @@ impl processing::Processor for ClientReportsProcessor { type Output = Nothing; type Error = Error; + fn cogs() -> FeatureWeights { + AppFeature::ClientReports.into() + } + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); let reports = envelope diff --git a/relay-server/src/processing/errors/mod.rs b/relay-server/src/processing/errors/mod.rs index 6a4a6e249bc..7769dc93e1c 100644 --- a/relay-server/src/processing/errors/mod.rs +++ b/relay-server/src/processing/errors/mod.rs @@ -24,6 +24,7 @@ mod filter; mod process; pub use errors::SwitchProcessingError; +use relay_cogs::{AppFeature, FeatureWeights}; use relay_event_normalization::GeoIpLookup; use relay_event_schema::protocol::{Event, Metrics}; use relay_protocol::Annotated; @@ -77,6 +78,10 @@ impl processing::Processor for ErrorsProcessor { type Output = ErrorOutput; type Error = Error; + fn cogs() -> FeatureWeights { + AppFeature::Errors.into() + } + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let has_transaction = envelope .envelope() @@ -87,6 +92,11 @@ impl processing::Processor for ErrorsProcessor { return None; } + let creates_event = envelope.envelope().items().any(Item::creates_event); + if !creates_event { + return None; + } + let items = envelope .envelope_mut() .take_items_by(Item::requires_event) diff --git a/relay-server/src/processing/forward.rs b/relay-server/src/processing/forward.rs index a4ecb1832cd..a69f8d25566 100644 --- a/relay-server/src/processing/forward.rs +++ b/relay-server/src/processing/forward.rs @@ -152,6 +152,7 @@ impl ForwardContext<'_> { /// The [`Nothing`] output. /// /// Some processors may only produce by-products and not have any output of their own. +#[derive(Debug, Copy, Clone)] pub struct Nothing(std::convert::Infallible); impl Forward for Nothing { diff --git a/relay-server/src/processing/forward_unknown/mod.rs b/relay-server/src/processing/forward_unknown/mod.rs index 5068a3bb41e..ab824b7f69d 100644 --- a/relay-server/src/processing/forward_unknown/mod.rs +++ b/relay-server/src/processing/forward_unknown/mod.rs @@ -1,3 +1,5 @@ +use relay_cogs::{AppFeature, FeatureWeights}; + use crate::Envelope; use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items}; use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected}; @@ -34,6 +36,10 @@ impl processing::Processor for ForwardUnknownProcessor { type Output = ForwardUnknownOutput; type Error = UnsupportedItem; + fn cogs() -> FeatureWeights { + AppFeature::UnattributedEnvelope.into() + } + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let unknown_items = envelope .envelope_mut() diff --git a/relay-server/src/processing/invalid/mod.rs b/relay-server/src/processing/invalid/mod.rs new file mode 100644 index 00000000000..041146a71c5 --- /dev/null +++ b/relay-server/src/processing/invalid/mod.rs @@ -0,0 +1,82 @@ +use relay_cogs::{AppFeature, FeatureWeights}; + +use crate::envelope::{Item, ItemType}; +use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected}; +use crate::processing::{self, Context, Nothing, Output}; +use crate::services::outcome::{DiscardReason, Outcome}; + +#[derive(Debug, thiserror::Error)] +#[error("the item is not allowed/supported in this envelope")] +pub struct InvalidItem; + +impl OutcomeError for InvalidItem { + type Error = Self; + + fn consume(self) -> (Option, Self::Error) { + (Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)), self) + } +} + +/// A processor which rejects unhandled invalid items. +/// +/// This processor accepts certain envelope items which may be mixed into envelopes incorrectly. +#[derive(Debug)] +pub struct InvalidUnhandledProcessor { + _priv: (), +} + +impl InvalidUnhandledProcessor { + /// Creates a new [`Self`]. + pub fn new() -> Self { + Self { _priv: () } + } +} + +impl processing::Processor for InvalidUnhandledProcessor { + type Input = InvalidUnhandledItems; + type Output = Nothing; + type Error = InvalidItem; + + fn cogs() -> FeatureWeights { + AppFeature::UnattributedEnvelope.into() + } + + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { + let invalid_items = envelope + .envelope_mut() + // `FormData` only makes sense together with an item which creates an event, + // if it's mixed into an envelope which does not create an event, it is invalid. + .take_items_by(|item| matches!(*item.ty(), ItemType::FormData)) + .into_vec(); + + if invalid_items.is_empty() { + return None; + } + + Some(Managed::with_meta_from_managed_envelope( + envelope, + InvalidUnhandledItems { invalid_items }, + )) + } + + async fn process( + &self, + invalid: Managed, + _ctx: Context<'_>, + ) -> Result, Rejected> { + Err(invalid.reject_err(InvalidItem)) + } +} + +/// Unknown items extracted from an envelope. +#[derive(Debug)] +pub struct InvalidUnhandledItems { + /// List of unknown envelope items. + pub invalid_items: Vec, +} + +impl Counted for InvalidUnhandledItems { + fn quantities(&self) -> Quantities { + self.invalid_items.quantities() + } +} diff --git a/relay-server/src/processing/legacy_spans/mod.rs b/relay-server/src/processing/legacy_spans/mod.rs index 9d4fd934b5f..a8e3082c590 100644 --- a/relay-server/src/processing/legacy_spans/mod.rs +++ b/relay-server/src/processing/legacy_spans/mod.rs @@ -1,13 +1,14 @@ use std::sync::Arc; use either::Either; +use relay_cogs::{AppFeature, FeatureWeights}; use relay_event_normalization::GeoIpLookup; -use relay_event_schema::protocol::Span; +use relay_event_schema::protocol::{Span, SpanV2}; use relay_protocol::Annotated; use relay_quotas::{DataCategory, RateLimits}; use crate::Envelope; -use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items}; +use crate::envelope::{EnvelopeHeaders, Item, ItemContainer, ItemType, Items}; use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected}; use crate::metrics_extraction::ExtractedMetrics; use crate::processing::{ @@ -85,12 +86,18 @@ impl processing::Processor for LegacySpansProcessor { type Output = LegacySpanOutput; type Error = Error; + fn cogs() -> FeatureWeights { + AppFeature::Spans.into() + } + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); let spans = envelope .envelope_mut() - .take_items_by(|item| matches!(item.ty(), ItemType::Span)) + .take_items_by(|item| { + matches!(item.ty(), ItemType::Span) && !ItemContainer::::is_container(item) + }) .into_vec(); if spans.is_empty() { diff --git a/relay-server/src/processing/logs/mod.rs b/relay-server/src/processing/logs/mod.rs index 460213955f8..4b39825ce44 100644 --- a/relay-server/src/processing/logs/mod.rs +++ b/relay-server/src/processing/logs/mod.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use relay_cogs::{AppFeature, FeatureWeights}; use relay_event_schema::processor::ProcessingAction; use relay_event_schema::protocol::{OurLog, ourlog}; use relay_filter::FilterStatKey; @@ -104,6 +105,10 @@ impl processing::Processor for LogsProcessor { type Output = LogOutput; type Error = Error; + fn cogs() -> FeatureWeights { + AppFeature::Logs.into() + } + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); diff --git a/relay-server/src/processing/mod.rs b/relay-server/src/processing/mod.rs index 7d3e63a14da..7f22562d315 100644 --- a/relay-server/src/processing/mod.rs +++ b/relay-server/src/processing/mod.rs @@ -6,6 +6,7 @@ //! //! The processor service, will then do its actual work using the processing logic defined here. +use relay_cogs::FeatureWeights; use relay_config::{Config, RelayMode}; use relay_dynamic_config::GlobalConfig; use relay_quotas::RateLimits; @@ -27,10 +28,12 @@ pub mod check_ins; pub mod client_reports; pub mod errors; pub mod forward_unknown; +pub mod invalid; pub mod legacy_spans; pub mod logs; pub mod profile_chunks; pub mod profiles; +pub mod relay; pub mod replays; pub mod sessions; pub mod spans; @@ -55,6 +58,9 @@ pub trait Processor { /// The error returned by [`Self::process`]. type Error: std::error::Error + 'static; + /// Returns [`FeatureWeights`] for this processor to attribute COGS. + fn cogs() -> FeatureWeights; + /// Extracts a [`Self::Input`] from a [`ManagedEnvelope`]. /// /// This is infallible, if a processor wants to report an error, diff --git a/relay-server/src/processing/profile_chunks/mod.rs b/relay-server/src/processing/profile_chunks/mod.rs index fa813cf0ec0..d769ea1074d 100644 --- a/relay-server/src/processing/profile_chunks/mod.rs +++ b/relay-server/src/processing/profile_chunks/mod.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use relay_cogs::{AppFeature, FeatureWeights}; use relay_profiling::ProfileType; use relay_quotas::{DataCategory, RateLimits}; @@ -76,6 +77,10 @@ impl processing::Processor for ProfileChunksProcessor { type Output = ProfileChunkOutput; type Error = Error; + fn cogs() -> FeatureWeights { + AppFeature::Profiles.into() + } + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let profile_chunks = envelope .envelope_mut() diff --git a/relay-server/src/processing/profiles/mod.rs b/relay-server/src/processing/profiles/mod.rs index d28a4c9117c..0746abd1754 100644 --- a/relay-server/src/processing/profiles/mod.rs +++ b/relay-server/src/processing/profiles/mod.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use relay_cogs::{AppFeature, FeatureWeights}; use relay_profiling::{ProfileError, ProfileType}; use relay_quotas::{DataCategory, RateLimits}; use smallvec::smallvec; @@ -81,6 +82,10 @@ impl Processor for ProfilesProcessor { type Output = ProfilesOutput; type Error = Error; + fn cogs() -> FeatureWeights { + AppFeature::Profiles.into() + } + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); let profiles = envelope diff --git a/relay-server/src/processing/relay.rs b/relay-server/src/processing/relay.rs new file mode 100644 index 00000000000..631e399b1cd --- /dev/null +++ b/relay-server/src/processing/relay.rs @@ -0,0 +1,205 @@ +use std::fmt::Debug; +use std::sync::Arc; + +use relay_cogs::{Cogs, ResourceId}; +use relay_dynamic_config::Feature; +use relay_event_normalization::GeoIpLookup; +use relay_system::Addr; + +use crate::envelope::Item; +use crate::managed::ManagedEnvelope; +use crate::processing::attachments::AttachmentProcessor; +use crate::processing::check_ins::CheckInsProcessor; +use crate::processing::client_reports::ClientReportsProcessor; +use crate::processing::errors::ErrorsProcessor; +use crate::processing::forward_unknown::ForwardUnknownProcessor; +use crate::processing::invalid::InvalidUnhandledProcessor; +use crate::processing::legacy_spans::LegacySpansProcessor; +use crate::processing::logs::LogsProcessor; +use crate::processing::profile_chunks::ProfileChunksProcessor; +use crate::processing::profiles::ProfilesProcessor; +use crate::processing::replays::ReplaysProcessor; +use crate::processing::sessions::SessionsProcessor; +use crate::processing::spans::SpansProcessor; +use crate::processing::trace_attachments::TraceAttachmentsProcessor; +use crate::processing::trace_metrics::TraceMetricsProcessor; +use crate::processing::transactions::TransactionProcessor; +use crate::processing::user_reports::UserReportsProcessor; +use crate::processing::{Context, Output, Outputs, Processor, QuotaRateLimiter}; +use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; + +/// Implementation of Relays processing pipeline. +/// +/// The processor is able to fully process an envelope and return the processed results. +pub struct RelayProcessor { + cogs: Cogs, + + attachments: AttachmentProcessor, + check_ins: CheckInsProcessor, + client_reports: ClientReportsProcessor, + errors: ErrorsProcessor, + forward_unknown: ForwardUnknownProcessor, + invalid: InvalidUnhandledProcessor, + legacy_spans: LegacySpansProcessor, + logs: LogsProcessor, + profile_chunks: ProfileChunksProcessor, + profiles: ProfilesProcessor, + replays: ReplaysProcessor, + sessions: SessionsProcessor, + spans: SpansProcessor, + trace_attachments: TraceAttachmentsProcessor, + trace_metrics: TraceMetricsProcessor, + transactions: TransactionProcessor, + user_reports: UserReportsProcessor, +} + +impl RelayProcessor { + /// Creates a new [`Self`]. + pub fn new( + cogs: Cogs, + quota_limiter: &Arc, + geoip_lookup: &GeoIpLookup, + outcome_aggregator: Addr, + ) -> Self { + // Just so everything fits in a single line. + let ql = || Arc::clone(quota_limiter); + + Self { + cogs, + + attachments: AttachmentProcessor::new(ql()), + check_ins: CheckInsProcessor::new(ql()), + client_reports: ClientReportsProcessor::new(outcome_aggregator), + errors: ErrorsProcessor::new(ql(), geoip_lookup.clone()), + forward_unknown: ForwardUnknownProcessor::new(), + invalid: InvalidUnhandledProcessor::new(), + legacy_spans: LegacySpansProcessor::new(ql(), geoip_lookup.clone()), + logs: LogsProcessor::new(ql()), + profile_chunks: ProfileChunksProcessor::new(ql()), + profiles: ProfilesProcessor::new(ql()), + replays: ReplaysProcessor::new(ql(), geoip_lookup.clone()), + sessions: SessionsProcessor::new(ql()), + spans: SpansProcessor::new(ql(), geoip_lookup.clone()), + trace_attachments: TraceAttachmentsProcessor::new(ql()), + trace_metrics: TraceMetricsProcessor::new(ql()), + transactions: TransactionProcessor::new(ql(), geoip_lookup.clone()), + user_reports: UserReportsProcessor::new(ql()), + } + } + + /// Fully processes an envelope and returns the resulting outputs. + /// + /// Since an envelope may contain payloads which need to be processed independently, the + /// function may return multiple results at once. + pub async fn run( + &self, + mut envelope: ManagedEnvelope, + ctx: Context<'_>, + ) -> Vec> { + let mut outputs = Vec::with_capacity(5); + + let pi = ctx.project_info; + + macro_rules! run { + ($processor:expr) => {{ + if let Some(output) = self.run_one(&$processor, &mut envelope, ctx).await { + outputs.push(output.map(Into::into)); + } + }}; + } + + // The order of processors is deliberate and must not be changed lightly! + // + // Processors may partially consume items of the envelope and different processors may + // process the same item types. + // + // Currently this allows for never intended combinations of different item types in envelopes, + // ideally we make Relay slowly and slowly more restrictive and properly define and follow + // the rules of item combinations allowed in a single envelope. + + // Item types which can be mixed into any envelope. + run!(self.sessions); + run!(self.client_reports); + + // Primary item types. + run!(self.replays); + if !pi.has_feature(Feature::SpanV2ExperimentalProcessing) { + // To be fully replaced with the npn-legacy span processor. + run!(self.legacy_spans); + } + run!(self.spans); + run!(self.logs); + run!(self.trace_metrics); + run!(self.profile_chunks); + run!(self.check_ins); + + // Event based envelopes. + run!(self.errors); + run!(self.transactions); + + // Item types which can be sent with a primary item type and can also be sent standalone. + // + // These need to be processed after their respective primary type. + run!(self.trace_attachments); + run!(self.attachments); + run!(self.user_reports); + run!(self.profiles); + + // Fallback for forward compatibility. + run!(self.forward_unknown); + // All remaining items which make no sense. + run!(self.invalid); + + // After processing there must not be any items in the original envelope left. + match envelope.envelope().is_empty() { + true => envelope.accept(), + false => { + relay_log::error!( + items = ?envelope.envelope().items().map(Item::ty).collect::>(), + "Processed envelope has items remaining" + ); + envelope.update(); + envelope.reject(Outcome::Invalid(DiscardReason::Internal)); + } + } + + outputs + } + + async fn run_one( + &self, + processor: &T, + envelope: &mut ManagedEnvelope, + ctx: Context<'_>, + ) -> Option> + where + T::Input: Debug, + { + let item = processor.prepare_envelope(envelope)?; + + relay_log::trace!( + processor = std::any::type_name::(), + "processing item: {:?}", + item.as_ref(), + ); + + let _token = self.cogs.timed(ResourceId::Relay, T::cogs()); + let output = processor.process(item, ctx).await; + + match output { + Ok(output) => Some(output), + Err(error) => { + // This is not a fatal error case. For example a processor may reject an + // item because it was filtered by an inbound filter or rate limit. + // This means, other items from the same original envelope must still be processed. + relay_log::debug!( + error = &error as &dyn std::error::Error, + processor = std::any::type_name::(), + "item rejected by processor" + ); + + None + } + } + } +} diff --git a/relay-server/src/processing/replays/mod.rs b/relay-server/src/processing/replays/mod.rs index 5dcf948ba89..b76fb1fc33a 100644 --- a/relay-server/src/processing/replays/mod.rs +++ b/relay-server/src/processing/replays/mod.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use bytes::Bytes; +use relay_cogs::{AppFeature, FeatureWeights}; use relay_event_normalization::GeoIpLookup; use relay_event_schema::processor::ProcessingAction; use relay_event_schema::protocol::Replay; @@ -160,6 +161,10 @@ impl processing::Processor for ReplaysProcessor { type Output = ReplaysOutput; type Error = Error; + fn cogs() -> FeatureWeights { + AppFeature::Replays.into() + } + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); let events = envelope diff --git a/relay-server/src/processing/sessions/mod.rs b/relay-server/src/processing/sessions/mod.rs index 3875587c268..3b4aa18596c 100644 --- a/relay-server/src/processing/sessions/mod.rs +++ b/relay-server/src/processing/sessions/mod.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use relay_cogs::{AppFeature, FeatureWeights}; use relay_event_schema::protocol::{SessionAggregates, SessionUpdate}; use relay_quotas::{DataCategory, RateLimits}; @@ -60,6 +61,10 @@ impl processing::Processor for SessionsProcessor { type Output = SessionsOutput; type Error = Error; + fn cogs() -> FeatureWeights { + AppFeature::Sessions.into() + } + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); @@ -73,6 +78,10 @@ impl processing::Processor for SessionsProcessor { .take_items_by(|item| matches!(*item.ty(), ItemType::Sessions)) .into_vec(); + if updates.is_empty() && aggregates.is_empty() { + return None; + } + let work = SerializedSessions { headers, updates, diff --git a/relay-server/src/processing/spans/mod.rs b/relay-server/src/processing/spans/mod.rs index 0816427d639..dfbf10d1868 100644 --- a/relay-server/src/processing/spans/mod.rs +++ b/relay-server/src/processing/spans/mod.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use either::Either; +use relay_cogs::{AppFeature, FeatureWeights}; use relay_event_normalization::GeoIpLookup; use relay_event_schema::processor::ProcessingAction; use relay_event_schema::protocol::{SpanV2, span_v2}; @@ -117,6 +118,10 @@ impl processing::Processor for SpansProcessor { type Output = SpanOutput; type Error = Error; + fn cogs() -> FeatureWeights { + AppFeature::Spans.into() + } + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); diff --git a/relay-server/src/processing/trace_attachments/mod.rs b/relay-server/src/processing/trace_attachments/mod.rs index e2a8ddc315e..ab4ef266e6d 100644 --- a/relay-server/src/processing/trace_attachments/mod.rs +++ b/relay-server/src/processing/trace_attachments/mod.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use relay_cogs::{AppFeature, FeatureWeights}; use relay_dynamic_config::Feature; use relay_event_schema::processor::ProcessingAction; use relay_quotas::RateLimits; @@ -93,11 +94,13 @@ impl TraceAttachmentsProcessor { impl Processor for TraceAttachmentsProcessor { type Input = SerializedAttachments; - type Output = Managed; - type Error = Error; + fn cogs() -> FeatureWeights { + AppFeature::TraceAttachments.into() + } + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); let items = envelope diff --git a/relay-server/src/processing/trace_metrics/mod.rs b/relay-server/src/processing/trace_metrics/mod.rs index 49f5dff6a5d..0a9897e3cc8 100644 --- a/relay-server/src/processing/trace_metrics/mod.rs +++ b/relay-server/src/processing/trace_metrics/mod.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use relay_cogs::{AppFeature, FeatureWeights}; use relay_event_schema::processor::ProcessingAction; use relay_event_schema::protocol::{TraceMetric, trace_metric}; use relay_filter::FilterStatKey; @@ -109,6 +110,10 @@ impl processing::Processor for TraceMetricsProcessor { type Output = TraceMetricOutput; type Error = Error; + fn cogs() -> FeatureWeights { + AppFeature::TraceMetrics.into() + } + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index 07bfc92018c..c50bd4c1f25 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use relay_cogs::{AppFeature, FeatureWeights}; use relay_event_normalization::GeoIpLookup; use relay_event_schema::protocol::Metrics; use relay_quotas::RateLimits; @@ -87,6 +88,10 @@ impl Processor for TransactionProcessor { type Output = TransactionOutput; type Error = Error; + fn cogs() -> FeatureWeights { + AppFeature::Transactions.into() + } + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); diff --git a/relay-server/src/processing/user_reports/mod.rs b/relay-server/src/processing/user_reports/mod.rs index 1d72669c47c..5732c835afb 100644 --- a/relay-server/src/processing/user_reports/mod.rs +++ b/relay-server/src/processing/user_reports/mod.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use relay_cogs::{AppFeature, FeatureWeights}; use relay_quotas::RateLimits; use crate::envelope::{EnvelopeHeaders, Item, ItemType}; @@ -66,6 +67,10 @@ impl Processor for UserReportsProcessor { type Output = UserReportsOutput; type Error = Error; + fn cogs() -> FeatureWeights { + AppFeature::UserReports.into() + } + fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option> { let headers = envelope.envelope().headers().clone(); let reports: Vec = envelope diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index bcfb479ead8..84cc2d1260c 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1,7 +1,7 @@ use std::borrow::Cow; use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::error::Error; -use std::fmt::{Debug, Display}; +use std::fmt::Debug; use std::future::Future; use std::io::Write; use std::pin::Pin; @@ -20,10 +20,9 @@ use relay_base_schema::project::{ProjectId, ProjectKey}; use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token}; use relay_common::time::UnixTimestamp; use relay_config::{Config, HttpEncoding, UpstreamDescriptor}; -use relay_dynamic_config::Feature; use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup}; use relay_event_schema::processor::ProcessingAction; -use relay_event_schema::protocol::{ClientReport, EventId, SpanV2}; +use relay_event_schema::protocol::ClientReport; use relay_filter::FilterStatKey; use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace}; use relay_quotas::{DataCategory, RateLimits, Scoping}; @@ -31,32 +30,16 @@ use relay_sampling::evaluation::SamplingDecision; use relay_statsd::metric; use relay_system::{Addr, FromMessage, NoResponse, Service}; use reqwest::header; -use smallvec::{SmallVec, smallvec}; use zstd::stream::Encoder as ZstdEncoder; -use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType}; +use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType}; use crate::extractors::{PartialDsn, RequestMeta, RequestTrust}; -use crate::integrations::Integration; use crate::managed::ManagedEnvelope; use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket}; use crate::metrics_extraction::ExtractedMetrics; -use crate::processing::attachments::AttachmentProcessor; -use crate::processing::check_ins::CheckInsProcessor; -use crate::processing::client_reports::ClientReportsProcessor; -use crate::processing::errors::{ErrorsProcessor, SwitchProcessingError}; -use crate::processing::forward_unknown::ForwardUnknownProcessor; -use crate::processing::legacy_spans::LegacySpansProcessor; -use crate::processing::logs::LogsProcessor; -use crate::processing::profile_chunks::ProfileChunksProcessor; -use crate::processing::profiles::ProfilesProcessor; -use crate::processing::replays::ReplaysProcessor; -use crate::processing::sessions::SessionsProcessor; -use crate::processing::spans::SpansProcessor; -use crate::processing::trace_attachments::TraceAttachmentsProcessor; -use crate::processing::trace_metrics::TraceMetricsProcessor; -use crate::processing::transactions::TransactionProcessor; -use crate::processing::user_reports::UserReportsProcessor; -use crate::processing::{Forward as _, ForwardContext, Output, Outputs, QuotaRateLimiter}; +use crate::processing::errors::SwitchProcessingError; +use crate::processing::relay::RelayProcessor; +use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter}; use crate::service::ServiceError; use crate::services::global_config::GlobalConfigHandle; use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets}; @@ -87,346 +70,6 @@ mod metrics; /// The minimum clock drift for correction to apply. pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60); -#[derive(Debug)] -pub struct GroupTypeError; - -impl Display for GroupTypeError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str("failed to convert processing group into corresponding type") - } -} - -impl std::error::Error for GroupTypeError {} - -macro_rules! processing_group { - ($ty:ident, $variant:ident$(, $($other:ident),+)?) => { - #[derive(Clone, Copy, Debug)] - pub struct $ty; - - impl From<$ty> for ProcessingGroup { - fn from(_: $ty) -> Self { - ProcessingGroup::$variant - } - } - - impl TryFrom for $ty { - type Error = GroupTypeError; - - fn try_from(value: ProcessingGroup) -> Result { - if matches!(value, ProcessingGroup::$variant) { - return Ok($ty); - } - $($( - if matches!(value, ProcessingGroup::$other) { - return Ok($ty); - } - )+)? - return Err(GroupTypeError); - } - } - }; -} - -processing_group!(TransactionGroup, Transaction); - -processing_group!(ErrorGroup, Error); - -processing_group!(SessionGroup, Session); -processing_group!(ClientReportGroup, ClientReport); -processing_group!(ReplayGroup, Replay); -processing_group!(CheckInGroup, CheckIn); -processing_group!(TraceMetricGroup, TraceMetric); -processing_group!(SpanGroup, Span); - -processing_group!(ProfileChunkGroup, ProfileChunk); -processing_group!(ForwardUnknownGroup, ForwardUnknown); -processing_group!(Ungrouped, Ungrouped); - -/// Describes the groups of the processable items. -#[derive(Clone, Copy, Debug)] -pub enum ProcessingGroup { - /// All the transaction related items. - /// - /// Includes transactions, related attachments, profiles. - Transaction, - /// All the items which require (have or create) events. - /// - /// This includes: errors, NEL, security reports, user reports, some of the - /// attachments. - Error, - /// Session events. - Session, - /// Standalone attachments - /// - /// Attachments that are send without an item that creates an event in the same envelope. - StandaloneAttachments, - /// Standalone user reports - /// - /// User reports that are send without an item that creates an event in the same envelope. - StandaloneUserReports, - /// Standalone profiles - /// - /// Profiles which had their transaction sampled. - StandaloneProfiles, - /// Outcomes. - ClientReport, - /// Replays and ReplayRecordings. - Replay, - /// Crons. - CheckIn, - /// Logs. - Log, - /// Trace metrics. - TraceMetric, - /// Spans. - Span, - /// Span V2 spans. - SpanV2, - /// ProfileChunk. - ProfileChunk, - /// V2 attachments without span / log association. - TraceAttachment, - /// Unknown item types will be forwarded upstream (to processing Relay), where we will - /// decide what to do with them. - ForwardUnknown, - /// All the items in the envelope that could not be grouped. - Ungrouped, -} - -impl ProcessingGroup { - /// Splits provided envelope into list of tuples of groups with associated envelopes. - fn split_envelope( - mut envelope: Envelope, - project_info: &ProjectInfo, - ) -> SmallVec<[(Self, Box); 3]> { - let headers = envelope.headers().clone(); - let mut grouped_envelopes = smallvec![]; - - // Extract replays. - let replay_items = envelope.take_items_by(|item| { - matches!( - item.ty(), - &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo - ) - }); - if !replay_items.is_empty() { - grouped_envelopes.push(( - ProcessingGroup::Replay, - Envelope::from_parts(headers.clone(), replay_items), - )) - } - - // Keep all the sessions together in one envelope. - let session_items = envelope - .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions)); - if !session_items.is_empty() { - grouped_envelopes.push(( - ProcessingGroup::Session, - Envelope::from_parts(headers.clone(), session_items), - )) - } - - let span_v2_items = envelope.take_items_by(|item| { - let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing); - - ItemContainer::::is_container(item) - || matches!(item.integration(), Some(Integration::Spans(_))) - // Process standalone spans (v1) via the v2 pipeline - || (exp_feature && matches!(item.ty(), &ItemType::Span)) - || (exp_feature && item.is_span_attachment()) - }); - - if !span_v2_items.is_empty() { - grouped_envelopes.push(( - ProcessingGroup::SpanV2, - Envelope::from_parts(headers.clone(), span_v2_items), - )) - } - - // Extract spans. - let span_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Span)); - if !span_items.is_empty() { - grouped_envelopes.push(( - ProcessingGroup::Span, - Envelope::from_parts(headers.clone(), span_items), - )) - } - - // Extract logs. - let logs_items = envelope.take_items_by(|item| { - matches!(item.ty(), &ItemType::Log) - || matches!(item.integration(), Some(Integration::Logs(_))) - }); - if !logs_items.is_empty() { - grouped_envelopes.push(( - ProcessingGroup::Log, - Envelope::from_parts(headers.clone(), logs_items), - )) - } - - // Extract trace metrics. - let trace_metric_items = - envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric)); - if !trace_metric_items.is_empty() { - grouped_envelopes.push(( - ProcessingGroup::TraceMetric, - Envelope::from_parts(headers.clone(), trace_metric_items), - )) - } - - // Extract profile chunks. - let profile_chunk_items = - envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk)); - if !profile_chunk_items.is_empty() { - grouped_envelopes.push(( - ProcessingGroup::ProfileChunk, - Envelope::from_parts(headers.clone(), profile_chunk_items), - )) - } - - let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment); - if !trace_attachment_items.is_empty() { - grouped_envelopes.push(( - ProcessingGroup::TraceAttachment, - Envelope::from_parts(headers.clone(), trace_attachment_items), - )) - } - - if !envelope.items().any(Item::creates_event) { - // Extract the standalone attachments - let standalone_attachments = envelope - .take_items_by(|i| i.requires_event() && matches!(i.ty(), ItemType::Attachment)); - if !standalone_attachments.is_empty() { - grouped_envelopes.push(( - ProcessingGroup::StandaloneAttachments, - Envelope::from_parts(headers.clone(), standalone_attachments), - )) - } - - // Extract the standalone user reports - let standalone_user_reports = - envelope.take_items_by(|i| matches!(i.ty(), ItemType::UserReport)); - if !standalone_user_reports.is_empty() { - grouped_envelopes.push(( - ProcessingGroup::StandaloneUserReports, - Envelope::from_parts(headers.clone(), standalone_user_reports), - )); - } - - // Extract the standalone profiles - let standalone_profiles = - envelope.take_items_by(|i| matches!(i.ty(), ItemType::Profile)); - if !standalone_profiles.is_empty() { - grouped_envelopes.push(( - ProcessingGroup::StandaloneProfiles, - Envelope::from_parts(headers.clone(), standalone_profiles), - )); - } - } - - // Make sure we create separate envelopes for each `RawSecurity` report. - let security_reports_items = envelope - .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity)) - .into_iter() - .map(|item| { - let headers = headers.clone(); - let items: SmallVec<[Item; 3]> = smallvec![item.clone()]; - let mut envelope = Envelope::from_parts(headers, items); - envelope.set_event_id(EventId::new()); - (ProcessingGroup::Error, envelope) - }); - grouped_envelopes.extend(security_reports_items); - - // Extract all the items which require an event into separate envelope. - let require_event_items = envelope.take_items_by(Item::requires_event); - if !require_event_items.is_empty() { - let group = if require_event_items - .iter() - .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile)) - { - ProcessingGroup::Transaction - } else { - ProcessingGroup::Error - }; - - grouped_envelopes.push(( - group, - Envelope::from_parts(headers.clone(), require_event_items), - )) - } - - // Get the rest of the envelopes, one per item. - let envelopes = envelope.items_mut().map(|item| { - let headers = headers.clone(); - let items: SmallVec<[Item; 3]> = smallvec![item.clone()]; - let envelope = Envelope::from_parts(headers, items); - let item_type = item.ty(); - let group = if matches!(item_type, &ItemType::CheckIn) { - ProcessingGroup::CheckIn - } else if matches!(item.ty(), &ItemType::ClientReport) { - ProcessingGroup::ClientReport - } else if matches!(item_type, &ItemType::Unknown(_)) { - ProcessingGroup::ForwardUnknown - } else { - // Cannot group this item type. - ProcessingGroup::Ungrouped - }; - - (group, envelope) - }); - grouped_envelopes.extend(envelopes); - - grouped_envelopes - } - - /// Returns the name of the group. - pub fn variant(&self) -> &'static str { - match self { - ProcessingGroup::Transaction => "transaction", - ProcessingGroup::Error => "error", - ProcessingGroup::Session => "session", - ProcessingGroup::StandaloneAttachments => "standalone_attachment", - ProcessingGroup::StandaloneUserReports => "standalone_user_reports", - ProcessingGroup::StandaloneProfiles => "standalone_profiles", - ProcessingGroup::ClientReport => "client_report", - ProcessingGroup::Replay => "replay", - ProcessingGroup::CheckIn => "check_in", - ProcessingGroup::Log => "log", - ProcessingGroup::TraceMetric => "trace_metric", - ProcessingGroup::Span => "span", - ProcessingGroup::SpanV2 => "span_v2", - ProcessingGroup::ProfileChunk => "profile_chunk", - ProcessingGroup::TraceAttachment => "trace_attachment", - ProcessingGroup::ForwardUnknown => "forward_unknown", - ProcessingGroup::Ungrouped => "ungrouped", - } - } -} - -impl From for AppFeature { - fn from(value: ProcessingGroup) -> Self { - match value { - ProcessingGroup::Transaction => AppFeature::Transactions, - ProcessingGroup::Error => AppFeature::Errors, - ProcessingGroup::Session => AppFeature::Sessions, - ProcessingGroup::StandaloneAttachments => AppFeature::UnattributedEnvelope, - ProcessingGroup::StandaloneUserReports => AppFeature::UserReports, - ProcessingGroup::StandaloneProfiles => AppFeature::Profiles, - ProcessingGroup::ClientReport => AppFeature::ClientReports, - ProcessingGroup::Replay => AppFeature::Replays, - ProcessingGroup::CheckIn => AppFeature::CheckIns, - ProcessingGroup::Log => AppFeature::Logs, - ProcessingGroup::TraceMetric => AppFeature::TraceMetrics, - ProcessingGroup::Span => AppFeature::Spans, - ProcessingGroup::SpanV2 => AppFeature::Spans, - ProcessingGroup::ProfileChunk => AppFeature::Profiles, - ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope, - ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope, - ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments, - } - } -} - /// An error returned when handling [`ProcessEnvelope`]. #[derive(Debug, thiserror::Error)] pub enum ProcessingError { @@ -457,9 +100,6 @@ pub enum ProcessingError { #[error("failed to extract event payload")] NoEventPayload, - #[error("missing project id in DSN")] - MissingProjectId, - #[error("invalid security report type: {0:?}")] InvalidSecurityType(Bytes), @@ -486,17 +126,9 @@ pub enum ProcessingError { #[error("playstation dump processing failed: {0}")] InvalidPlaystationDump(String), - #[error("processing group does not match specific processor")] - ProcessingGroupMismatch, - #[error("new processing pipeline failed")] - ProcessingFailure, - #[cfg(feature = "processing")] #[error("invalid attachment reference")] InvalidAttachmentRef, - - #[error("could not determine processing group for envelope items")] - NoProcessingGroup, } impl ProcessingError { @@ -528,24 +160,14 @@ impl ProcessingError { } #[cfg(feature = "processing")] Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)), - Self::MissingProjectId => None, Self::EventFiltered(key) => Some(Outcome::Filtered(key.clone())), - Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)), - // Outcomes are emitted in the new processing pipeline already. - Self::ProcessingFailure => None, #[cfg(feature = "processing")] Self::InvalidAttachmentRef => { Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef)) } - Self::NoProcessingGroup => Some(Outcome::Invalid(DiscardReason::Internal)), } } - - fn is_unexpected(&self) -> bool { - self.to_outcome() - .is_some_and(|outcome| outcome.is_unexpected()) - } } impl From for ProcessingError { @@ -676,17 +298,6 @@ pub struct ProcessEnvelope { pub sampling_project_info: Option>, } -/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped. -#[derive(Debug)] -struct ProcessEnvelopeGrouped<'a> { - /// The group the envelope belongs to. - pub group: ProcessingGroup, - /// Envelope to process. - pub envelope: ManagedEnvelope, - /// The processing context. - pub ctx: processing::Context<'a>, -} - /// Parses a list of metrics or metric buckets and pushes them to the project's aggregator. /// /// This parses and validates the metrics: @@ -933,26 +544,7 @@ struct InnerProcessor { #[cfg(feature = "processing")] rate_limiter: Option>, metric_outcomes: MetricOutcomes, - processing: Processing, -} - -struct Processing { - errors: ErrorsProcessor, - logs: LogsProcessor, - trace_metrics: TraceMetricsProcessor, - spans: SpansProcessor, - legacy_spans: LegacySpansProcessor, - check_ins: CheckInsProcessor, - sessions: SessionsProcessor, - transactions: TransactionProcessor, - profile_chunks: ProfileChunksProcessor, - trace_attachments: TraceAttachmentsProcessor, - replays: ReplaysProcessor, - client_reports: ClientReportsProcessor, - attachments: AttachmentProcessor, - user_reports: UserReportsProcessor, - profiles: ProfilesProcessor, - forward_unknown: ForwardUnknownProcessor, + processor: RelayProcessor, } impl EnvelopeProcessorService { @@ -1000,40 +592,21 @@ impl EnvelopeProcessorService { )); #[cfg(feature = "processing")] let rate_limiter = rate_limiter.map(Arc::new); - let outcome_aggregator = addrs.outcome_aggregator.clone(); let inner = InnerProcessor { pool, global_config, project_cache, - cogs, #[cfg(feature = "processing")] rate_limiter, + processor: RelayProcessor::new( + cogs.clone(), + "a_limiter, + &geoip_lookup, + addrs.outcome_aggregator.clone(), + ), + cogs, addrs, metric_outcomes, - processing: Processing { - errors: ErrorsProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()), - logs: LogsProcessor::new(Arc::clone("a_limiter)), - trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)), - spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()), - legacy_spans: LegacySpansProcessor::new( - Arc::clone("a_limiter), - geoip_lookup.clone(), - ), - check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)), - sessions: SessionsProcessor::new(Arc::clone("a_limiter)), - transactions: TransactionProcessor::new( - Arc::clone("a_limiter), - geoip_lookup.clone(), - ), - profile_chunks: ProfileChunksProcessor::new(Arc::clone("a_limiter)), - trace_attachments: TraceAttachmentsProcessor::new(Arc::clone("a_limiter)), - replays: ReplaysProcessor::new(Arc::clone("a_limiter), geoip_lookup), - client_reports: ClientReportsProcessor::new(outcome_aggregator), - attachments: AttachmentProcessor::new(Arc::clone("a_limiter)), - user_reports: UserReportsProcessor::new(Arc::clone("a_limiter)), - profiles: ProfilesProcessor::new(quota_limiter), - forward_unknown: ForwardUnknownProcessor::new(), - }, config, }; @@ -1042,59 +615,17 @@ impl EnvelopeProcessorService { } } - async fn process_with_processor( - &self, - processor: &P, - mut managed_envelope: ManagedEnvelope, - ctx: processing::Context<'_>, - ) -> Result, ProcessingError> - where - Outputs: From, - { - let Some(work) = processor.prepare_envelope(&mut managed_envelope) else { - debug_assert!( - false, - "there must be work for the {} processor", - std::any::type_name::

(), - ); - return Err(ProcessingError::ProcessingGroupMismatch); - }; - - managed_envelope.update(); - match managed_envelope.envelope().is_empty() { - true => managed_envelope.accept(), - false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)), - } - - processor - .process(work, ctx) - .await - .map_err(|err| { - relay_log::debug!( - error = &err as &dyn std::error::Error, - "processing pipeline failed" - ); - ProcessingError::ProcessingFailure - }) - .map(|o| o.map(Into::into)) - } - async fn process_envelope( &self, project_id: ProjectId, - message: ProcessEnvelopeGrouped<'_>, - ) -> Result, ProcessingError> { - let ProcessEnvelopeGrouped { - group, - envelope: mut managed_envelope, - ctx, - } = message; - + mut envelope: ManagedEnvelope, + ctx: processing::Context<'_>, + ) -> Vec> { // Pre-process the envelope headers. if let Some(sampling_state) = ctx.sampling_project_info { // Both transactions and standalone span envelopes need a normalized DSC header // to make sampling rules based on the segment/transaction name work correctly. - managed_envelope + envelope .envelope_mut() .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules); } @@ -1102,144 +633,25 @@ impl EnvelopeProcessorService { // Set the event retention. Effectively, this value will only be available in processing // mode when the full project config is queried from the upstream. if let Some(retention) = ctx.project_info.config.event_retention { - managed_envelope.envelope_mut().set_retention(retention); + envelope.envelope_mut().set_retention(retention); } // Set the event retention. Effectively, this value will only be available in processing // mode when the full project config is queried from the upstream. if let Some(retention) = ctx.project_info.config.downsampled_event_retention { - managed_envelope - .envelope_mut() - .set_downsampled_retention(retention); + envelope.envelope_mut().set_downsampled_retention(retention); } // Ensure the project ID is updated to the stored instance for this project cache. This can // differ in two cases: // 1. The envelope was sent to the legacy `/store/` endpoint without a project ID. // 2. The DSN was moved and the envelope sent to the old project ID. - managed_envelope + envelope .envelope_mut() .meta_mut() .set_project_id(project_id); - relay_log::trace!("Processing {group} group", group = group.variant()); - - match group { - ProcessingGroup::Error => { - self.process_with_processor(&self.inner.processing.errors, managed_envelope, ctx) - .await - } - ProcessingGroup::Transaction => { - self.process_with_processor( - &self.inner.processing.transactions, - managed_envelope, - ctx, - ) - .await - } - ProcessingGroup::Session => { - self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx) - .await - } - ProcessingGroup::StandaloneAttachments => { - self.process_with_processor( - &self.inner.processing.attachments, - managed_envelope, - ctx, - ) - .await - } - ProcessingGroup::StandaloneUserReports => { - self.process_with_processor( - &self.inner.processing.user_reports, - managed_envelope, - ctx, - ) - .await - } - ProcessingGroup::StandaloneProfiles => { - self.process_with_processor(&self.inner.processing.profiles, managed_envelope, ctx) - .await - } - ProcessingGroup::ClientReport => { - self.process_with_processor( - &self.inner.processing.client_reports, - managed_envelope, - ctx, - ) - .await - } - ProcessingGroup::Replay => { - self.process_with_processor(&self.inner.processing.replays, managed_envelope, ctx) - .await - } - ProcessingGroup::CheckIn => { - self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx) - .await - } - ProcessingGroup::Log => { - self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx) - .await - } - ProcessingGroup::TraceMetric => { - self.process_with_processor( - &self.inner.processing.trace_metrics, - managed_envelope, - ctx, - ) - .await - } - ProcessingGroup::SpanV2 => { - self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx) - .await - } - ProcessingGroup::TraceAttachment => { - self.process_with_processor( - &self.inner.processing.trace_attachments, - managed_envelope, - ctx, - ) - .await - } - ProcessingGroup::Span => { - self.process_with_processor( - &self.inner.processing.legacy_spans, - managed_envelope, - ctx, - ) - .await - } - ProcessingGroup::ProfileChunk => { - self.process_with_processor( - &self.inner.processing.profile_chunks, - managed_envelope, - ctx, - ) - .await - } - // Ungrouped items indicate bugs, as all items should have an associated processor, - // or be handled separately (like metrics). - ProcessingGroup::Ungrouped => { - relay_log::error!( - tags.project = %project_id, - items = ?managed_envelope.envelope().items().next().map(Item::ty), - "could not identify the processing group based on the envelope's items" - ); - - Err(ProcessingError::NoProcessingGroup) - } - // Leave this group unchanged. - // - // This will later be forwarded to upstream. - ProcessingGroup::ForwardUnknown => { - self.process_with_processor( - &self.inner.processing.forward_unknown, - managed_envelope, - ctx, - ) - .await - } - } + self.inner.processor.run(envelope, ctx).await } /// Processes the envelope and returns the processed envelope back. @@ -1249,14 +661,9 @@ impl EnvelopeProcessorService { /// to be dropped, this is `None`. async fn process<'a>( &self, - mut message: ProcessEnvelopeGrouped<'a>, - ) -> Result)>, ProcessingError> { - let ProcessEnvelopeGrouped { - ref mut envelope, - ctx, - .. - } = message; - + mut envelope: ManagedEnvelope, + ctx: processing::Context<'a>, + ) -> Vec> { // Prefer the project's project ID, and fall back to the stated project id from the // envelope. The project ID is available in all modes, other than in proxy mode, where // envelopes for unknown projects are forwarded blindly. @@ -1268,20 +675,16 @@ impl EnvelopeProcessorService { .project_id .or_else(|| envelope.envelope().meta().project_id()) else { + relay_log::error!( + tags.project_key = %envelope.envelope().meta().public_key(), + "project info does not contain project id" + ); envelope.reject(Outcome::Invalid(DiscardReason::Internal)); - return Err(ProcessingError::MissingProjectId); + return Vec::new(); }; let client = envelope.envelope().meta().client().map(str::to_owned); let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned); - let project_key = envelope.envelope().meta().public_key(); - // Only allow sending to the sampling key, if we successfully loaded a sampling project - // info relating to it. This filters out unknown/invalid project keys as well as project - // keys from different organizations. - let sampling_key = envelope - .envelope() - .sampling_key() - .filter(|_| ctx.sampling_project_info.is_some()); // We set additional information on the scope, which will be removed after processing the // envelope. @@ -1295,24 +698,7 @@ impl EnvelopeProcessorService { } }); - let result = - self.process_envelope(project_id, message) - .await - .map(|Output { main, metrics }| { - if let Some(metrics) = metrics { - metrics.accept(|metrics| { - send_metrics( - metrics, - project_key, - sampling_key, - &self.inner.addrs.aggregator, - ); - }); - } - - let ctx = ctx.to_forward(); - main.map(|output| (output, ctx)) - }); + let result = self.process_envelope(project_id, envelope, ctx).await; relay_log::configure_scope(|scope| { scope.remove_tag("project"); @@ -1324,67 +710,48 @@ impl EnvelopeProcessorService { } async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) { - let project_key = message.envelope.envelope().meta().public_key(); let wait_time = message.envelope.age(); metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time); // This COGS handling may need an overhaul in the future: - // Cancel the passed in token, to start individual measurements per envelope instead. + // Cancel the passed in token, to start individual measurements per processor instead. cogs.cancel(); - let scoping = message.envelope.scoping(); - for (group, envelope) in ProcessingGroup::split_envelope( - *message.envelope.into_envelope(), - &message.project_info, - ) { - let mut cogs = self - .inner - .cogs - .timed(ResourceId::Relay, AppFeature::from(group)); - - let mut envelope = - ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone()); - envelope.scope(scoping); + let global_config = self.inner.global_config.current(); - let global_config = self.inner.global_config.current(); + let ctx = processing::Context { + config: &self.inner.config, + global_config: &global_config, + project_info: &message.project_info, + sampling_project_info: message.sampling_project_info.as_deref(), + rate_limits: &message.rate_limits, + }; - let ctx = processing::Context { - config: &self.inner.config, - global_config: &global_config, - project_info: &message.project_info, - sampling_project_info: message.sampling_project_info.as_deref(), - rate_limits: &message.rate_limits, - }; + let project_key = message.envelope.meta().public_key(); + // Only allow sending to the sampling key, if we successfully loaded a sampling project + // info relating to it. This filters out unknown/invalid project keys as well as project + // keys from different organizations. + let sampling_key = ctx + .sampling_project_info + .and_then(|p| p.get_public_key_config()) + .map(|pkc| pkc.public_key); - let message = ProcessEnvelopeGrouped { - group, - envelope, - ctx, - }; + let outputs = metric!(timer(RelayTimers::EnvelopeProcessingTime), { + self.process(message.envelope, ctx).await + }); - let result = metric!( - timer(RelayTimers::EnvelopeProcessingTime), - group = group.variant(), - { self.process(message).await } - ); + let ctx = ctx.to_forward(); + for Output { main, metrics } in outputs { + if let Some(metrics) = metrics { + let agg = &self.inner.addrs.aggregator; + metrics.accept(|metrics| { + send_metrics(metrics, project_key, sampling_key, agg); + }); + } - match result { - Ok(Some((output, ctx))) => self.submit_upstream(&mut cogs, output, ctx), - Ok(None) => {} - Err(error) if error.is_unexpected() => { - relay_log::error!( - tags.project_key = %project_key, - error = &error as &dyn Error, - "error processing envelope" - ) - } - Err(error) => { - relay_log::debug!( - tags.project_key = %project_key, - error = &error as &dyn Error, - "error processing envelope" - ) - } + if let Some(output) = main { + // Only counting processing time for COGS at the moment. + self.submit_upstream(&mut Token::noop(), output, ctx); } } } @@ -2464,7 +1831,7 @@ mod tests { use relay_event_normalization::{ NormalizationConfig, RedactionRule, TransactionNameConfig, TransactionNameRule, }; - use relay_event_schema::protocol::{Event, TransactionSource}; + use relay_event_schema::protocol::{Event, EventId, TransactionSource}; use relay_pii::DataScrubbingConfig; use relay_protocol::Annotated; use similar_asserts::assert_eq; @@ -2480,6 +1847,26 @@ mod tests { use super::*; + async fn process_to_single_envelope<'a>( + processor: &EnvelopeProcessorService, + envelope: ManagedEnvelope, + ctx: processing::Context<'a>, + ) -> Box { + let mut outputs = processor.process(envelope, ctx).await; + assert_eq!(outputs.len(), 1); + + let Output { main, metrics } = outputs.pop().unwrap(); + + if let Some(metrics) = metrics { + metrics.accept(drop); + } + + main.unwrap() + .serialize_envelope(ctx.to_forward()) + .unwrap() + .accept(|envelope| envelope) + } + #[cfg(feature = "processing")] fn mock_quota(id: &str) -> Quota { Quota { @@ -2677,25 +2064,14 @@ mod tests { ..Default::default() }; - let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default()); - assert_eq!(envelopes.len(), 1); - - let (group, envelope) = envelopes.pop().unwrap(); let envelope = ManagedEnvelope::new(envelope, outcome_aggregator); - let message = ProcessEnvelopeGrouped { - group, - envelope, - ctx: processing::Context { - project_info: &project_info, - ..processing::Context::for_test() - }, + let ctx = processing::Context { + project_info: &project_info, + ..processing::Context::for_test() }; - let Ok(Some((output, ctx))) = processor.process(message).await else { - panic!(); - }; - let new_envelope = output.serialize_envelope(ctx).unwrap().accept(|f| f); + let new_envelope = process_to_single_envelope(&processor, envelope, ctx).await; let event_item = new_envelope.items().last().unwrap(); let annotated_event: Annotated = @@ -2763,22 +2139,17 @@ mod tests { } }); - let message = ProcessEnvelopeGrouped { - group: ProcessingGroup::Error, - envelope: managed_envelope, - ctx: processing::Context { - config: &Config::from_json_value(config.clone()).unwrap(), - project_info: &project_info, - sampling_project_info: Some(&project_info), - ..processing::Context::for_test() - }, + let processor = + create_test_processor(Config::from_json_value(config.clone()).unwrap()).await; + let config = Config::from_json_value(config).unwrap(); + let ctx = processing::Context { + config: &config, + project_info: &project_info, + sampling_project_info: Some(&project_info), + ..processing::Context::for_test() }; - let processor = create_test_processor(Config::from_json_value(config).unwrap()).await; - let Ok(Some((output, ctx))) = processor.process(message).await else { - panic!(); - }; - let envelope = output.serialize_envelope(ctx).unwrap(); + let envelope = process_to_single_envelope(&processor, managed_envelope, ctx).await; let event = envelope .get_item_by(|item| item.ty() == &ItemType::Event) .unwrap();