Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions relay-cogs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ pub enum AppFeature {
Profiles,
/// User Reports
UserReports,
/// Event attachments not associated with an event.
StandaloneAttachments,

/// Metrics in the spans namespace.
MetricsSpans,
Expand Down Expand Up @@ -208,6 +210,7 @@ impl AppFeature {
Self::CheckIns => "check_ins",
Self::Replays => "replays",
Self::UserReports => "user_reports",
Self::StandaloneAttachments => "standalone_attachments",
Self::MetricsSpans => "metrics_spans",
Self::MetricsTransactions => "metrics_transactions",
Self::MetricsSessions => "metrics_sessions",
Expand Down
47 changes: 28 additions & 19 deletions relay-server/src/endpoints/security_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,37 @@ impl SecurityReportParams {
report_item
}

fn extract_envelope(self) -> Result<Box<Envelope>, BadStoreRequest> {
#[expect(
clippy::vec_box,
reason = "Box<Envelope> is created by Envelope::from_request and used for processing"
)]
fn extract_envelopes(self) -> Result<Vec<Box<Envelope>>, BadStoreRequest> {
let Self { meta, query, body } = self;

if body.is_empty() {
return Err(BadStoreRequest::EmptyBody);
}

let mut envelope = Envelope::from_request(Some(EventId::new()), meta);
let variant =
serde_json::from_slice::<Vec<&RawValue>>(&body).map_err(BadStoreRequest::InvalidJson);

if let Ok(items) = variant {
for item in items {
let report_item =
Self::create_security_item(&query, Bytes::from(item.to_owned().to_string()));
Ok(match serde_json::from_slice::<Vec<&RawValue>>(&body) {
Ok(items) => items
.into_iter()
.map(|item| {
let mut envelope = Envelope::from_request(Some(EventId::new()), meta.clone());
let report = Self::create_security_item(
&query,
Bytes::from(item.to_owned().to_string()),
);
envelope.add_item(report);
envelope
})
.collect(),
Err(_) => {
let mut envelope = Envelope::from_request(Some(EventId::new()), meta);
let report_item = Self::create_security_item(&query, body);
envelope.add_item(report_item);
vec![envelope]
}
} else {
let report_item = Self::create_security_item(&query, body);
envelope.add_item(report_item);
}

Ok(envelope)
})
}
}

Expand Down Expand Up @@ -100,10 +108,11 @@ async fn handle(
return Ok(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response());
}

let envelope = params.extract_envelope()?;
common::handle_envelope(&state, envelope)
.await?
.check_rate_limits()?;
for envelope in params.extract_envelopes()? {
common::handle_envelope(&state, envelope)
.await?
.check_rate_limits()?;
}
Comment thread
Dav1dde marked this conversation as resolved.
Comment thread
Dav1dde marked this conversation as resolved.

Ok(().into_response())
}
Expand Down
5 changes: 5 additions & 0 deletions relay-server/src/processing/attachments/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use relay_cogs::{AppFeature, FeatureWeights};
use relay_quotas::RateLimits;

use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items};
Expand Down Expand Up @@ -64,6 +65,10 @@ impl processing::Processor for AttachmentProcessor {
type Output = AttachmentsOutput;
type Error = Error;

fn cogs() -> FeatureWeights {
AppFeature::StandaloneAttachments.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
debug_assert!(
!envelope.envelope().items().any(Item::creates_event),
Expand Down
9 changes: 9 additions & 0 deletions relay-server/src/processing/check_ins/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use relay_cogs::{AppFeature, FeatureWeights};
use relay_quotas::{DataCategory, RateLimits};

use crate::Envelope;
Expand Down Expand Up @@ -63,6 +64,10 @@ impl processing::Processor for CheckInsProcessor {
type Output = CheckInsOutput;
type Error = Error;

fn cogs() -> FeatureWeights {
AppFeature::CheckIns.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let headers = envelope.envelope().headers().clone();

Expand All @@ -71,6 +76,10 @@ impl processing::Processor for CheckInsProcessor {
.take_items_by(|item| matches!(*item.ty(), ItemType::CheckIn))
.into_vec();

if check_ins.is_empty() {
return None;
}

let work = SerializedCheckIns { headers, check_ins };
Some(Managed::with_meta_from_managed_envelope(envelope, work))
}
Expand Down
5 changes: 5 additions & 0 deletions relay-server/src/processing/client_reports/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use relay_cogs::{AppFeature, FeatureWeights};
use relay_common::time::UnixTimestamp;
use relay_quotas::DataCategory;
use relay_system::Addr;
Expand Down Expand Up @@ -68,6 +69,10 @@ impl processing::Processor for ClientReportsProcessor {
type Output = Nothing;
type Error = Error;

fn cogs() -> FeatureWeights {
AppFeature::ClientReports.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let headers = envelope.envelope().headers().clone();
let reports = envelope
Expand Down
10 changes: 10 additions & 0 deletions relay-server/src/processing/errors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod filter;
mod process;

pub use errors::SwitchProcessingError;
use relay_cogs::{AppFeature, FeatureWeights};
use relay_event_normalization::GeoIpLookup;
use relay_event_schema::protocol::{Event, Metrics};
use relay_protocol::Annotated;
Expand Down Expand Up @@ -77,6 +78,10 @@ impl processing::Processor for ErrorsProcessor {
type Output = ErrorOutput;
type Error = Error;

fn cogs() -> FeatureWeights {
AppFeature::Errors.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let has_transaction = envelope
.envelope()
Expand All @@ -87,6 +92,11 @@ impl processing::Processor for ErrorsProcessor {
return None;
}

let creates_event = envelope.envelope().items().any(Item::creates_event);
if !creates_event {
return None;
}

let items = envelope
.envelope_mut()
.take_items_by(Item::requires_event)
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/processing/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ impl ForwardContext<'_> {
/// The [`Nothing`] output.
///
/// Some processors may only produce by-products and not have any output of their own.
#[derive(Debug, Copy, Clone)]
pub struct Nothing(std::convert::Infallible);

impl Forward for Nothing {
Expand Down
6 changes: 6 additions & 0 deletions relay-server/src/processing/forward_unknown/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use relay_cogs::{AppFeature, FeatureWeights};

use crate::Envelope;
use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items};
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected};
Expand Down Expand Up @@ -34,6 +36,10 @@ impl processing::Processor for ForwardUnknownProcessor {
type Output = ForwardUnknownOutput;
type Error = UnsupportedItem;

fn cogs() -> FeatureWeights {
AppFeature::UnattributedEnvelope.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let unknown_items = envelope
.envelope_mut()
Expand Down
82 changes: 82 additions & 0 deletions relay-server/src/processing/invalid/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use relay_cogs::{AppFeature, FeatureWeights};

use crate::envelope::{Item, ItemType};
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected};
use crate::processing::{self, Context, Nothing, Output};
use crate::services::outcome::{DiscardReason, Outcome};

#[derive(Debug, thiserror::Error)]
#[error("the item is not allowed/supported in this envelope")]
pub struct InvalidItem;

impl OutcomeError for InvalidItem {
type Error = Self;

fn consume(self) -> (Option<crate::services::outcome::Outcome>, Self::Error) {
(Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)), self)
}
}

/// A processor which rejects unhandled invalid items.
///
/// This processor accepts certain envelope items which may be mixed into envelopes incorrectly.
#[derive(Debug)]
pub struct InvalidUnhandledProcessor {
_priv: (),
}

impl InvalidUnhandledProcessor {
/// Creates a new [`Self`].
pub fn new() -> Self {
Self { _priv: () }
}
}

impl processing::Processor for InvalidUnhandledProcessor {
type Input = InvalidUnhandledItems;
type Output = Nothing;
type Error = InvalidItem;

fn cogs() -> FeatureWeights {
AppFeature::UnattributedEnvelope.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let invalid_items = envelope
.envelope_mut()
// `FormData` only makes sense together with an item which creates an event,
// if it's mixed into an envelope which does not create an event, it is invalid.
.take_items_by(|item| matches!(*item.ty(), ItemType::FormData))
.into_vec();

if invalid_items.is_empty() {
return None;
}

Some(Managed::with_meta_from_managed_envelope(
envelope,
InvalidUnhandledItems { invalid_items },
))
}

async fn process(
&self,
invalid: Managed<Self::Input>,
_ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
Err(invalid.reject_err(InvalidItem))
}
}

/// Unknown items extracted from an envelope.
#[derive(Debug)]
pub struct InvalidUnhandledItems {
/// List of unknown envelope items.
pub invalid_items: Vec<Item>,
}

impl Counted for InvalidUnhandledItems {
fn quantities(&self) -> Quantities {
self.invalid_items.quantities()
}
}
13 changes: 10 additions & 3 deletions relay-server/src/processing/legacy_spans/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::sync::Arc;

use either::Either;
use relay_cogs::{AppFeature, FeatureWeights};
use relay_event_normalization::GeoIpLookup;
use relay_event_schema::protocol::Span;
use relay_event_schema::protocol::{Span, SpanV2};
use relay_protocol::Annotated;
use relay_quotas::{DataCategory, RateLimits};

use crate::Envelope;
use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items};
use crate::envelope::{EnvelopeHeaders, Item, ItemContainer, ItemType, Items};
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected};
use crate::metrics_extraction::ExtractedMetrics;
use crate::processing::{
Expand Down Expand Up @@ -85,12 +86,18 @@ impl processing::Processor for LegacySpansProcessor {
type Output = LegacySpanOutput;
type Error = Error;

fn cogs() -> FeatureWeights {
AppFeature::Spans.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let headers = envelope.envelope().headers().clone();

let spans = envelope
.envelope_mut()
.take_items_by(|item| matches!(item.ty(), ItemType::Span))
.take_items_by(|item| {
matches!(item.ty(), ItemType::Span) && !ItemContainer::<SpanV2>::is_container(item)
})
.into_vec();

if spans.is_empty() {
Expand Down
5 changes: 5 additions & 0 deletions relay-server/src/processing/logs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use relay_cogs::{AppFeature, FeatureWeights};
use relay_event_schema::processor::ProcessingAction;
use relay_event_schema::protocol::{OurLog, ourlog};
use relay_filter::FilterStatKey;
Expand Down Expand Up @@ -104,6 +105,10 @@ impl processing::Processor for LogsProcessor {
type Output = LogOutput;
type Error = Error;

fn cogs() -> FeatureWeights {
AppFeature::Logs.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let headers = envelope.envelope().headers().clone();

Expand Down
6 changes: 6 additions & 0 deletions relay-server/src/processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//!
//! The processor service, will then do its actual work using the processing logic defined here.

use relay_cogs::FeatureWeights;
use relay_config::{Config, RelayMode};
use relay_dynamic_config::GlobalConfig;
use relay_quotas::RateLimits;
Expand All @@ -27,10 +28,12 @@ pub mod check_ins;
pub mod client_reports;
pub mod errors;
pub mod forward_unknown;
pub mod invalid;
pub mod legacy_spans;
pub mod logs;
pub mod profile_chunks;
pub mod profiles;
pub mod relay;
pub mod replays;
pub mod sessions;
pub mod spans;
Expand All @@ -55,6 +58,9 @@ pub trait Processor {
/// The error returned by [`Self::process`].
type Error: std::error::Error + 'static;

/// Returns [`FeatureWeights`] for this processor to attribute COGS.
fn cogs() -> FeatureWeights;

/// Extracts a [`Self::Input`] from a [`ManagedEnvelope`].
///
/// This is infallible, if a processor wants to report an error,
Expand Down
5 changes: 5 additions & 0 deletions relay-server/src/processing/profile_chunks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use relay_cogs::{AppFeature, FeatureWeights};
use relay_profiling::ProfileType;
use relay_quotas::{DataCategory, RateLimits};

Expand Down Expand Up @@ -76,6 +77,10 @@ impl processing::Processor for ProfileChunksProcessor {
type Output = ProfileChunkOutput;
type Error = Error;

fn cogs() -> FeatureWeights {
AppFeature::Profiles.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let profile_chunks = envelope
.envelope_mut()
Expand Down
Loading
Loading