Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions relay-cogs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ pub enum AppFeature {
Profiles,
/// User Reports
UserReports,
/// Event attachments not associated with an event.
StandaloneAttachments,

/// Metrics in the spans namespace.
MetricsSpans,
Expand Down Expand Up @@ -208,6 +210,7 @@ impl AppFeature {
Self::CheckIns => "check_ins",
Self::Replays => "replays",
Self::UserReports => "user_reports",
Self::StandaloneAttachments => "standalone_attachments",
Self::MetricsSpans => "metrics_spans",
Self::MetricsTransactions => "metrics_transactions",
Self::MetricsSessions => "metrics_sessions",
Expand Down
47 changes: 28 additions & 19 deletions relay-server/src/endpoints/security_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,37 @@ impl SecurityReportParams {
report_item
}

fn extract_envelope(self) -> Result<Box<Envelope>, BadStoreRequest> {
#[expect(
clippy::vec_box,
reason = "Box<Envelope> is created by Envelope::from_request and used for processing"
)]
fn extract_envelopes(self) -> Result<Vec<Box<Envelope>>, BadStoreRequest> {
let Self { meta, query, body } = self;

if body.is_empty() {
return Err(BadStoreRequest::EmptyBody);
}

let mut envelope = Envelope::from_request(Some(EventId::new()), meta);
let variant =
serde_json::from_slice::<Vec<&RawValue>>(&body).map_err(BadStoreRequest::InvalidJson);

if let Ok(items) = variant {
for item in items {
let report_item =
Self::create_security_item(&query, Bytes::from(item.to_owned().to_string()));
Ok(match serde_json::from_slice::<Vec<&RawValue>>(&body) {
Ok(items) => items
.into_iter()
.map(|item| {
let mut envelope = Envelope::from_request(Some(EventId::new()), meta.clone());
let report = Self::create_security_item(
&query,
Bytes::from(item.to_owned().to_string()),
);
envelope.add_item(report);
envelope
})
.collect(),
Err(_) => {
let mut envelope = Envelope::from_request(Some(EventId::new()), meta);
let report_item = Self::create_security_item(&query, body);
envelope.add_item(report_item);
vec![envelope]
}
} else {
let report_item = Self::create_security_item(&query, body);
envelope.add_item(report_item);
}

Ok(envelope)
})
}
}

Expand Down Expand Up @@ -100,10 +108,11 @@ async fn handle(
return Ok(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response());
}

let envelope = params.extract_envelope()?;
common::handle_envelope(&state, envelope)
.await?
.check_rate_limits()?;
for envelope in params.extract_envelopes()? {
common::handle_envelope(&state, envelope)
.await?
.check_rate_limits()?;
}
Comment thread
Dav1dde marked this conversation as resolved.
Comment thread
Dav1dde marked this conversation as resolved.

Ok(().into_response())
}
Expand Down
5 changes: 5 additions & 0 deletions relay-server/src/processing/attachments/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use relay_cogs::{AppFeature, FeatureWeights};
use relay_quotas::RateLimits;

use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items};
Expand Down Expand Up @@ -64,6 +65,10 @@ impl processing::Processor for AttachmentProcessor {
type Output = AttachmentsOutput;
type Error = Error;

fn cogs() -> FeatureWeights {
AppFeature::StandaloneAttachments.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
debug_assert!(
!envelope.envelope().items().any(Item::creates_event),
Expand Down
9 changes: 9 additions & 0 deletions relay-server/src/processing/check_ins/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use relay_cogs::{AppFeature, FeatureWeights};
use relay_quotas::{DataCategory, RateLimits};

use crate::Envelope;
Expand Down Expand Up @@ -63,6 +64,10 @@ impl processing::Processor for CheckInsProcessor {
type Output = CheckInsOutput;
type Error = Error;

fn cogs() -> FeatureWeights {
AppFeature::CheckIns.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let headers = envelope.envelope().headers().clone();

Expand All @@ -71,6 +76,10 @@ impl processing::Processor for CheckInsProcessor {
.take_items_by(|item| matches!(*item.ty(), ItemType::CheckIn))
.into_vec();

if check_ins.is_empty() {
return None;
}

let work = SerializedCheckIns { headers, check_ins };
Some(Managed::with_meta_from_managed_envelope(envelope, work))
}
Expand Down
5 changes: 5 additions & 0 deletions relay-server/src/processing/client_reports/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use relay_cogs::{AppFeature, FeatureWeights};
use relay_common::time::UnixTimestamp;
use relay_quotas::DataCategory;
use relay_system::Addr;
Expand Down Expand Up @@ -68,6 +69,10 @@ impl processing::Processor for ClientReportsProcessor {
type Output = Nothing;
type Error = Error;

fn cogs() -> FeatureWeights {
AppFeature::ClientReports.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let headers = envelope.envelope().headers().clone();
let reports = envelope
Expand Down
10 changes: 10 additions & 0 deletions relay-server/src/processing/errors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod filter;
mod process;

pub use errors::SwitchProcessingError;
use relay_cogs::{AppFeature, FeatureWeights};
use relay_event_normalization::GeoIpLookup;
use relay_event_schema::protocol::{Event, Metrics};
use relay_protocol::Annotated;
Expand Down Expand Up @@ -77,6 +78,10 @@ impl processing::Processor for ErrorsProcessor {
type Output = ErrorOutput;
type Error = Error;

fn cogs() -> FeatureWeights {
AppFeature::Errors.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let has_transaction = envelope
.envelope()
Expand All @@ -87,6 +92,11 @@ impl processing::Processor for ErrorsProcessor {
return None;
}

let creates_event = envelope.envelope().items().any(Item::creates_event);
if !creates_event {
return None;
}

let items = envelope
.envelope_mut()
.take_items_by(Item::requires_event)
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/processing/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ impl ForwardContext<'_> {
/// The [`Nothing`] output.
///
/// Some processors may only produce by-products and not have any output of their own.
#[derive(Debug, Copy, Clone)]
pub struct Nothing(std::convert::Infallible);

impl Forward for Nothing {
Expand Down
6 changes: 6 additions & 0 deletions relay-server/src/processing/forward_unknown/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use relay_cogs::{AppFeature, FeatureWeights};

use crate::Envelope;
use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items};
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected};
Expand Down Expand Up @@ -34,6 +36,10 @@ impl processing::Processor for ForwardUnknownProcessor {
type Output = ForwardUnknownOutput;
type Error = UnsupportedItem;

fn cogs() -> FeatureWeights {
AppFeature::UnattributedEnvelope.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let unknown_items = envelope
.envelope_mut()
Expand Down
82 changes: 82 additions & 0 deletions relay-server/src/processing/invalid/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use relay_cogs::{AppFeature, FeatureWeights};

use crate::envelope::{Item, ItemType};
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected};
use crate::processing::{self, Context, Nothing, Output};
use crate::services::outcome::{DiscardReason, Outcome};

#[derive(Debug, thiserror::Error)]
#[error("the item is not allowed/supported in this envelope")]
pub struct InvalidItem;

impl OutcomeError for InvalidItem {
type Error = Self;

fn consume(self) -> (Option<crate::services::outcome::Outcome>, Self::Error) {
(Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)), self)
}
}

/// A processor which rejects unhandled invalid items.
///
/// This processor accepts certain envelope items which may be mixed into envelopes incorrectly.
#[derive(Debug)]
pub struct InvalidUnhandledProcessor {
_priv: (),
}

impl InvalidUnhandledProcessor {
/// Creates a new [`Self`].
pub fn new() -> Self {
Self { _priv: () }
}
}

impl processing::Processor for InvalidUnhandledProcessor {
type Input = InvalidUnhandledItems;
type Output = Nothing;
type Error = InvalidItem;

fn cogs() -> FeatureWeights {
AppFeature::UnattributedEnvelope.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let invalid_items = envelope
.envelope_mut()
// `FormData` only makes sense together with an item which creates an event,
// if it's mixed into an envelope which does not create an event, it is invalid.
.take_items_by(|item| matches!(*item.ty(), ItemType::FormData))
.into_vec();

if invalid_items.is_empty() {
return None;
}

Some(Managed::with_meta_from_managed_envelope(
envelope,
InvalidUnhandledItems { invalid_items },
))
}

async fn process(
&self,
invalid: Managed<Self::Input>,
_ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
Err(invalid.reject_err(InvalidItem))
}
}

/// Unknown items extracted from an envelope.
#[derive(Debug)]
pub struct InvalidUnhandledItems {
/// List of unknown envelope items.
pub invalid_items: Vec<Item>,
}

impl Counted for InvalidUnhandledItems {
fn quantities(&self) -> Quantities {
self.invalid_items.quantities()
}
}
13 changes: 10 additions & 3 deletions relay-server/src/processing/legacy_spans/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::sync::Arc;

use either::Either;
use relay_cogs::{AppFeature, FeatureWeights};
use relay_event_normalization::GeoIpLookup;
use relay_event_schema::protocol::Span;
use relay_event_schema::protocol::{Span, SpanV2};
use relay_protocol::Annotated;
use relay_quotas::{DataCategory, RateLimits};

use crate::Envelope;
use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items};
use crate::envelope::{EnvelopeHeaders, Item, ItemContainer, ItemType, Items};
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected};
use crate::metrics_extraction::ExtractedMetrics;
use crate::processing::{
Expand Down Expand Up @@ -85,12 +86,18 @@ impl processing::Processor for LegacySpansProcessor {
type Output = LegacySpanOutput;
type Error = Error;

fn cogs() -> FeatureWeights {
AppFeature::Spans.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let headers = envelope.envelope().headers().clone();

let spans = envelope
.envelope_mut()
.take_items_by(|item| matches!(item.ty(), ItemType::Span))
.take_items_by(|item| {
matches!(item.ty(), ItemType::Span) && !ItemContainer::<SpanV2>::is_container(item)
})
.into_vec();

if spans.is_empty() {
Expand Down
5 changes: 5 additions & 0 deletions relay-server/src/processing/logs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use relay_cogs::{AppFeature, FeatureWeights};
use relay_event_schema::processor::ProcessingAction;
use relay_event_schema::protocol::{OurLog, ourlog};
use relay_filter::FilterStatKey;
Expand Down Expand Up @@ -104,6 +105,10 @@ impl processing::Processor for LogsProcessor {
type Output = LogOutput;
type Error = Error;

fn cogs() -> FeatureWeights {
AppFeature::Logs.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let headers = envelope.envelope().headers().clone();

Expand Down
6 changes: 6 additions & 0 deletions relay-server/src/processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//!
//! The processor service, will then do its actual work using the processing logic defined here.

use relay_cogs::FeatureWeights;
use relay_config::{Config, RelayMode};
use relay_dynamic_config::GlobalConfig;
use relay_quotas::RateLimits;
Expand All @@ -27,10 +28,12 @@ pub mod check_ins;
pub mod client_reports;
pub mod errors;
pub mod forward_unknown;
pub mod invalid;
pub mod legacy_spans;
pub mod logs;
pub mod profile_chunks;
pub mod profiles;
pub mod relay;
pub mod replays;
pub mod sessions;
pub mod spans;
Expand All @@ -55,6 +58,9 @@ pub trait Processor {
/// The error returned by [`Self::process`].
type Error: std::error::Error + 'static;

/// Returns [`FeatureWeights`] for this processor to attribute COGS.
fn cogs() -> FeatureWeights;

/// Extracts a [`Self::Input`] from a [`ManagedEnvelope`].
///
/// This is infallible, if a processor wants to report an error,
Expand Down
5 changes: 5 additions & 0 deletions relay-server/src/processing/profile_chunks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use relay_cogs::{AppFeature, FeatureWeights};
use relay_profiling::ProfileType;
use relay_quotas::{DataCategory, RateLimits};

Expand Down Expand Up @@ -76,6 +77,10 @@ impl processing::Processor for ProfileChunksProcessor {
type Output = ProfileChunkOutput;
type Error = Error;

fn cogs() -> FeatureWeights {
AppFeature::Profiles.into()
}

fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
let profile_chunks = envelope
.envelope_mut()
Expand Down
Loading
Loading