diff --git a/relay-server/src/processing/forward.rs b/relay-server/src/processing/forward.rs index a4ecb1832cd..7fa51de507f 100644 --- a/relay-server/src/processing/forward.rs +++ b/relay-server/src/processing/forward.rs @@ -57,6 +57,22 @@ impl<'a> StoreHandle<'a> { } } + /// Tries to send a message to the [`Objectstore`] service. + /// + /// Returns the message back if the service is not configured, + /// allowing the caller to handle the fallback. + pub fn try_send_to_objectstore(&self, message: M) -> Option + where + Objectstore: FromMessage, + { + if let Some(objectstore) = self.objectstore { + objectstore.send(message); + None + } else { + Some(message) + } + } + /// Dispatches an envelopes to either the [`Objectstore`] or [`Store`] service. pub fn send_envelope(&self, envelope: ManagedEnvelope) { use crate::services::store::StoreEnvelope; diff --git a/relay-server/src/processing/profile_chunks/mod.rs b/relay-server/src/processing/profile_chunks/mod.rs index 812cc838410..7dbc20bd191 100644 --- a/relay-server/src/processing/profile_chunks/mod.rs +++ b/relay-server/src/processing/profile_chunks/mod.rs @@ -230,6 +230,7 @@ impl Forward for ProfileChunkOutput { s: processing::forward::StoreHandle<'_>, ctx: processing::ForwardContext<'_>, ) -> Result<(), Rejected<()>> { + use crate::services::objectstore::StoreRawProfile; use crate::services::store::StoreProfileChunk; let expanded = match self { @@ -243,12 +244,34 @@ impl Forward for ProfileChunkOutput { let retention_days = ctx.event_retention().standard; for chunk in expanded.split(|e| e.chunks) { - s.send_to_store(chunk.map(|chunk, _| StoreProfileChunk { - retention_days, - payload: chunk.payload, - quantities: chunk.quantities, - raw_profile: chunk.raw_profile, - })); + if chunk.raw_profile.is_some() { + let msg = chunk.map(|chunk, _| { + let raw_profile = chunk.raw_profile.unwrap(); + StoreRawProfile { + payload: raw_profile.payload, + content_type: raw_profile.content_type, + store_message: StoreProfileChunk { + retention_days, + payload: chunk.payload, + quantities: chunk.quantities, + raw_profile_object_store_key: None, + raw_profile_content_type: None, + }, + retention: retention_days, + } + }); + if let Some(unsent) = s.try_send_to_objectstore(msg) { + s.send_to_store(unsent.map(|profile, _| profile.store_message)); + } + } else { + s.send_to_store(chunk.map(|chunk, _| StoreProfileChunk { + retention_days, + payload: chunk.payload, + quantities: chunk.quantities, + raw_profile_object_store_key: None, + raw_profile_content_type: None, + })); + } } Ok(()) diff --git a/relay-server/src/services/objectstore.rs b/relay-server/src/services/objectstore.rs index 7cdd3d8a6ef..9144bba17e7 100644 --- a/relay-server/src/services/objectstore.rs +++ b/relay-server/src/services/objectstore.rs @@ -20,13 +20,15 @@ use relay_system::{ use sentry_protos::snuba::v1::TraceItem; use crate::constants::DEFAULT_ATTACHMENT_RETENTION; -use crate::envelope::{Item, ItemType}; +use crate::envelope::{ContentType, Item, ItemType}; use crate::managed::{ Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected, }; use crate::processing::utils::store::item_id_to_uuid; use crate::services::outcome::DiscardReason; -use crate::services::store::{Store, StoreAttachment, StoreEnvelope, StoreTraceItem}; +use crate::services::store::{ + Store, StoreAttachment, StoreEnvelope, StoreProfileChunk, StoreTraceItem, +}; use crate::services::upload::ByteStream; use crate::statsd::{RelayCounters, RelayTimers}; use crate::utils::{BoundedStream, MeteredStream, RetryableStream, TakeOnce}; @@ -38,6 +40,7 @@ pub enum Objectstore { Envelope(StoreEnvelope), TraceAttachment(Managed), EventAttachment(Managed), + RawProfile(Managed), Stream(Stream, Sender>), } @@ -47,6 +50,7 @@ impl Objectstore { Self::Envelope(_) => MessageKind::Envelope, Self::TraceAttachment(_) => MessageKind::TraceAttachment, Self::EventAttachment(_) => MessageKind::EventAttachment, + Self::RawProfile(_) => MessageKind::RawProfile, Self::Stream { .. } => MessageKind::Stream, } } @@ -60,6 +64,7 @@ impl Objectstore { .count(), Self::TraceAttachment(_) => 1, Self::EventAttachment(_) => 1, + Self::RawProfile(_) => 1, Self::Stream { .. } => 1, } } @@ -91,12 +96,21 @@ impl FromMessage> for Objectstore { } } +impl FromMessage> for Objectstore { + type Response = NoResponse; + + fn from_message(message: Managed, _sender: ()) -> Self { + Self::RawProfile(message) + } +} + /// A type tag used for logging. #[derive(Debug, Clone, Copy)] enum MessageKind { Envelope, EventAttachment, TraceAttachment, + RawProfile, Stream, } @@ -106,6 +120,7 @@ impl MessageKind { Self::Envelope => "envelope", Self::EventAttachment => "attachment", Self::TraceAttachment => "attachment_v2", + Self::RawProfile => "profiles_raw", Self::Stream => "stream", } } @@ -143,6 +158,28 @@ impl Counted for StoreTraceAttachment { } } +/// A raw profile (e.g. Perfetto trace) ready for objectstore upload. +/// +/// After upload, the [`StoreProfileChunk`] is forwarded to the Store service +/// with the objectstore key set, so the Kafka message carries a reference +/// instead of the full binary blob. +pub struct StoreRawProfile { + /// The raw binary profile payload to upload. + pub payload: Bytes, + /// Content type of the raw profile. + pub content_type: ContentType, + /// The profile chunk message to forward to Kafka after upload. + pub store_message: StoreProfileChunk, + /// Data retention in days. + pub retention: u16, +} + +impl Counted for StoreRawProfile { + fn quantities(&self) -> Quantities { + self.store_message.quantities() + } +} + #[derive(Debug, thiserror::Error)] #[error("objectstore upload failed")] pub struct Error { @@ -302,12 +339,15 @@ impl ObjectstoreService { .with_expiration_policy(ExpirationPolicy::TimeToLive(DEFAULT_ATTACHMENT_RETENTION)); let trace_attachments = Usecase::new("trace_attachments") .with_expiration_policy(ExpirationPolicy::TimeToLive(DEFAULT_ATTACHMENT_RETENTION)); + let profiles = Usecase::new("profiles_raw") + .with_expiration_policy(ExpirationPolicy::TimeToLive(DEFAULT_ATTACHMENT_RETENTION)); let inner = ObjectstoreServiceInner { store, objectstore_client, event_attachments, trace_attachments, + profiles, timeout: Duration::from_secs(*timeout), stream_timeout: Duration::from_secs(*stream_timeout), retry_interval: Duration::from_secs_f64(*retry_delay), @@ -344,6 +384,11 @@ impl LoadShed for ObjectstoreService { Objectstore::TraceAttachment(managed) => { let _ = managed.reject_err(error); } + Objectstore::RawProfile(managed) => { + self.inner + .store + .send(managed.map(|profile, _| profile.store_message)); + } Objectstore::Stream(_, sender) => { sender.send(Err(error)); } @@ -357,6 +402,7 @@ struct ObjectstoreServiceInner { objectstore_client: Client, event_attachments: Usecase, trace_attachments: Usecase, + profiles: Usecase, timeout: Duration, stream_timeout: Duration, retry_interval: Duration, @@ -381,6 +427,9 @@ impl ObjectstoreServiceInner { Objectstore::EventAttachment(attachment) => { self.handle_event_attachment(attachment).await; } + Objectstore::RawProfile(profile) => { + self.handle_raw_profile(profile).await; + } Objectstore::Stream(stream, sender) => { let result = self.handle_stream(stream).await; if let Err(error) = &result { @@ -424,6 +473,7 @@ impl ObjectstoreServiceInner { attachment.payload(), retention, None, + None, ) .await; @@ -469,6 +519,7 @@ impl ObjectstoreServiceInner { attachment.attachment.payload(), attachment.retention, None, + None, ) .await; @@ -532,6 +583,7 @@ impl ObjectstoreServiceInner { body, retention, Some(key), + None, ) .await .reject(&trace_item)?; @@ -546,6 +598,53 @@ impl ObjectstoreServiceInner { Ok(()) } + async fn handle_raw_profile(&self, managed: Managed) { + let scoping = managed.scoping(); + let session = self + .profiles + .for_project(scoping.organization_id.value(), scoping.project_id.value()) + .session(&self.objectstore_client); + + let payload = managed.payload.clone(); + let content_type = managed.content_type; + let retention = managed.retention; + + let mut store_message = managed.map(|profile, _| profile.store_message); + + match session { + Err(error) => Error::from(error).log(MessageKind::RawProfile), + Ok(session) if !payload.is_empty() => { + let result = self + .upload_bytes( + MessageKind::RawProfile, + &session, + payload, + retention, + None, + Some(content_type), + ) + .await; + + match result { + Ok(stored_key) => { + store_message.modify(|msg, _| { + msg.raw_profile_object_store_key = Some(stored_key.into_inner()); + msg.raw_profile_content_type = Some(content_type); + }); + } + Err(error) => { + error.log(MessageKind::RawProfile); + } + } + } + Ok(_) => {} + } + + // Always forward to store even if the raw profile upload failed, + // to ensure the kafka message is produced. + self.store.send(store_message); + } + async fn handle_stream(&self, stream: Stream) -> Result { let Stream { organization_id, @@ -564,6 +663,7 @@ impl ObjectstoreServiceInner { Some(key), Body::Stream(TakeOnce::new(stream)), None, + None, ) .await } @@ -575,10 +675,18 @@ impl ObjectstoreServiceInner { payload: Bytes, retention: u16, key: Option, + content_type: Option, ) -> Result { let retention_hours = retention.checked_mul(24); - self.upload(kind, session, key, Body::Bytes(payload), retention_hours) - .await + self.upload( + kind, + session, + key, + Body::Bytes(payload), + retention_hours, + content_type, + ) + .await } async fn upload( @@ -588,6 +696,7 @@ impl ObjectstoreServiceInner { key: Option, body: Body, retention_hours: Option, + content_type: Option, ) -> Result { let mut attempts = 0; let timeout = match &body { @@ -602,8 +711,15 @@ impl ObjectstoreServiceInner { }; attempts += 1; result.replace( - self.attempt_upload(kind, session, key.clone(), body, retention_hours) - .await, + self.attempt_upload( + kind, + session, + key.clone(), + body, + retention_hours, + content_type, + ) + .await, ); if attempts < self.max_attempts.get() @@ -642,12 +758,16 @@ impl ObjectstoreServiceInner { key: Option, body: BodyAttempt, retention_hours: Option, + content_type: Option, ) -> Result { let mut request = match body { BodyAttempt::Bytes(bytes) => session.put(bytes), BodyAttempt::Stream(stream) => session.put_stream(stream.boxed()), }; + if let Some(content_type) = content_type { + request = request.content_type(content_type.as_str()); + } if let Some(retention_hours) = retention_hours { request = request.expiration_policy(ExpirationPolicy::TimeToLive( Duration::from_hours(retention_hours.into()), diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 1bf6cd7ea92..6e84feda571 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -36,7 +36,6 @@ use relay_threading::AsyncPool; use crate::envelope::{AttachmentPlaceholder, AttachmentType, ContentType, Item, ItemType}; use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected}; use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes}; -use crate::processing::profile_chunks::RawProfile; use crate::service::ServiceError; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; @@ -159,11 +158,10 @@ pub struct StoreProfileChunk { /// /// Quantities are different for backend and ui profile chunks. pub quantities: Quantities, - /// Raw binary profile blob (e.g. Perfetto trace). - /// - /// Sent alongside the expanded JSON payload because the expansion only extracts a - /// minimum of information; the raw profile is preserved for further processing downstream. - pub raw_profile: Option, + /// Objectstore key where the raw profile blob is stored. + pub raw_profile_object_store_key: Option, + /// Content type of the raw profile (e.g. Perfetto trace). + pub raw_profile_content_type: Option, } impl Counted for StoreProfileChunk { @@ -854,8 +852,8 @@ impl StoreService { scoping.project_id.to_string(), )]), payload: message.payload, - raw_profile: message.raw_profile.as_ref().map(|r| r.payload.clone()), - raw_profile_content_type: message.raw_profile.map(|r| r.content_type), + raw_profile_object_store_key: message.raw_profile_object_store_key, + raw_profile_content_type: message.raw_profile_content_type, }; self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message)) @@ -1697,7 +1695,7 @@ struct ProfileChunkKafkaMessage { headers: BTreeMap, payload: Bytes, #[serde(skip_serializing_if = "Option::is_none")] - raw_profile: Option, + raw_profile_object_store_key: Option, #[serde(skip_serializing_if = "Option::is_none")] raw_profile_content_type: Option, } @@ -1935,18 +1933,18 @@ mod tests { retention_days: 90, headers: BTreeMap::new(), payload: Bytes::from(b"{\"profile\":true}".as_ref()), - raw_profile: None, + raw_profile_object_store_key: None, raw_profile_content_type: None, }; let json = serde_json::to_value(&message).unwrap(); assert_eq!(json["organization_id"], 1); assert_eq!(json["project_id"], 42); - assert!(json.get("raw_profile").is_none()); + assert!(json.get("raw_profile_object_store_key").is_none()); assert!(json.get("raw_profile_content_type").is_none()); } #[test] - fn test_profile_chunk_kafka_message_with_raw_profile() { + fn test_profile_chunk_kafka_message_with_raw_profile_key() { let message = ProfileChunkKafkaMessage { organization_id: OrganizationId::new(1), project_id: ProjectId::new(42), @@ -1954,13 +1952,13 @@ mod tests { retention_days: 90, headers: BTreeMap::new(), payload: Bytes::from(b"{\"profile\":true}".as_ref()), - raw_profile: Some(Bytes::from(b"perfetto-binary-data".as_ref())), + raw_profile_object_store_key: Some("abc123def456".to_owned()), raw_profile_content_type: Some(crate::envelope::ContentType::PerfettoTrace), }; let json = serde_json::to_value(&message).unwrap(); assert_eq!(json["organization_id"], 1); assert_eq!(json["project_id"], 42); - assert!(json.get("raw_profile").is_some()); + assert_eq!(json["raw_profile_object_store_key"], "abc123def456"); assert_eq!( json["raw_profile_content_type"], "application/x-perfetto-trace" diff --git a/tests/integration/test_profile_chunks_perfetto.py b/tests/integration/test_profile_chunks_perfetto.py index f2a99b7adc7..a0b133ade82 100644 --- a/tests/integration/test_profile_chunks_perfetto.py +++ b/tests/integration/test_profile_chunks_perfetto.py @@ -103,6 +103,49 @@ def test_perfetto_profile_chunk_end_to_end( assert isinstance(tid, str) assert "name" in meta and isinstance(meta["name"], str) - assert "raw_profile" in profile, "expected raw_profile in Kafka message" - assert len(profile["raw_profile"]) == 97252, "raw_profile size mismatch" + assert profile[ + "raw_profile_object_store_key" + ], "expected raw_profile_object_store_key in Kafka message" assert profile.get("raw_profile_content_type") == "application/x-perfetto-trace" + + +def test_perfetto_profile_chunk_objectstore_content_type( + mini_sentry, + relay_with_processing, + outcomes_consumer, + profiles_consumer, + objectstore, +): + """ + Verifies that the raw Perfetto trace is uploaded to objectstore with its + content type preserved on the stored object, so a subsequent GET returns + the correct `Content-Type`. + """ + profiles_consumer = profiles_consumer() + outcomes_consumer = outcomes_consumer() + + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id)["config"] + project_config.setdefault("features", []).extend( + [ + "organizations:continuous-profiling", + "organizations:continuous-profiling-perfetto", + ] + ) + + upstream = relay_with_processing(TEST_CONFIG) + + with open(PERFETTO_ENVELOPE_FIXTURE, "rb") as f: + envelope = Envelope.deserialize_from(f) + + upstream.send_envelope(project_id, envelope) + + outcomes_consumer.assert_empty() + + profile, _ = profiles_consumer.get_profile() + object_store_key = profile["raw_profile_object_store_key"] + assert object_store_key, "expected raw_profile_object_store_key in Kafka message" + + stored = objectstore("profiles_raw", project_id).get(object_store_key) + assert stored.metadata.content_type == "application/x-perfetto-trace" + assert len(stored.payload.read()) == 97252