Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions relay-server/src/processing/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ impl<'a> StoreHandle<'a> {
}
}

/// Tries to send a message to the [`Objectstore`] service.
///
/// Returns the message back if the service is not configured,
/// allowing the caller to handle the fallback.
pub fn try_send_to_objectstore<M>(&self, message: M) -> Option<M>
where
Objectstore: FromMessage<M>,
{
if let Some(objectstore) = self.objectstore {
objectstore.send(message);
None
} else {
Some(message)
}
}

/// Dispatches an envelopes to either the [`Objectstore`] or [`Store`] service.
pub fn send_envelope(&self, envelope: ManagedEnvelope) {
use crate::services::store::StoreEnvelope;
Expand Down
35 changes: 29 additions & 6 deletions relay-server/src/processing/profile_chunks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ impl Forward for ProfileChunkOutput {
s: processing::forward::StoreHandle<'_>,
ctx: processing::ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
use crate::services::objectstore::StoreRawProfile;
use crate::services::store::StoreProfileChunk;

let expanded = match self {
Expand All @@ -243,12 +244,34 @@ impl Forward for ProfileChunkOutput {
let retention_days = ctx.event_retention().standard;

for chunk in expanded.split(|e| e.chunks) {
s.send_to_store(chunk.map(|chunk, _| StoreProfileChunk {
retention_days,
payload: chunk.payload,
quantities: chunk.quantities,
raw_profile: chunk.raw_profile,
}));
if chunk.raw_profile.is_some() {
let msg = chunk.map(|chunk, _| {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be possible to move the if into the map and then use if let Some(raw_profile) = chunk.raw_profile.

let raw_profile = chunk.raw_profile.unwrap();
StoreRawProfile {
payload: raw_profile.payload,
content_type: raw_profile.content_type,
store_message: StoreProfileChunk {
retention_days,
Comment thread
sentry[bot] marked this conversation as resolved.
payload: chunk.payload,
quantities: chunk.quantities,
raw_profile_object_store_key: None,
raw_profile_content_type: None,
},
retention: retention_days,
}
});
if let Some(unsent) = s.try_send_to_objectstore(msg) {
s.send_to_store(unsent.map(|profile, _| profile.store_message));
}
Comment on lines +263 to +265
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only need this branch if we want to support perfetto profiles in self-hosted, which has no objectstore yet by default. In that case, we also need the kafka consumer to handler the raw_profile_object_store_key correctly.

So the simpler approach would be to never send raw profiles via kafka, and accept that perfetto is not enabled in self-hosted.

} else {
s.send_to_store(chunk.map(|chunk, _| StoreProfileChunk {
retention_days,
payload: chunk.payload,
quantities: chunk.quantities,
raw_profile_object_store_key: None,
raw_profile_content_type: None,
}));
}
}

Ok(())
Expand Down
93 changes: 91 additions & 2 deletions relay-server/src/services/objectstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ use relay_system::{
use sentry_protos::snuba::v1::TraceItem;

use crate::constants::DEFAULT_ATTACHMENT_RETENTION;
use crate::envelope::{Item, ItemType};
use crate::envelope::{ContentType, Item, ItemType};
use crate::managed::{
Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected,
};
use crate::processing::utils::store::item_id_to_uuid;
use crate::services::outcome::DiscardReason;
use crate::services::store::{Store, StoreAttachment, StoreEnvelope, StoreTraceItem};
use crate::services::store::{
Store, StoreAttachment, StoreEnvelope, StoreProfileChunk, StoreTraceItem,
};
use crate::services::upload::ByteStream;
use crate::statsd::{RelayCounters, RelayTimers};
use crate::utils::{BoundedStream, MeteredStream, RetryableStream, TakeOnce};
Expand All @@ -38,6 +40,7 @@ pub enum Objectstore {
Envelope(StoreEnvelope),
TraceAttachment(Managed<StoreTraceAttachment>),
EventAttachment(Managed<StoreAttachment>),
RawProfile(Managed<StoreRawProfile>),
Stream(Stream, Sender<Result<ObjectstoreKey, Error>>),
}

Expand All @@ -47,6 +50,7 @@ impl Objectstore {
Self::Envelope(_) => MessageKind::Envelope,
Self::TraceAttachment(_) => MessageKind::TraceAttachment,
Self::EventAttachment(_) => MessageKind::EventAttachment,
Self::RawProfile(_) => MessageKind::RawProfile,
Self::Stream { .. } => MessageKind::Stream,
}
}
Expand All @@ -60,6 +64,7 @@ impl Objectstore {
.count(),
Self::TraceAttachment(_) => 1,
Self::EventAttachment(_) => 1,
Self::RawProfile(_) => 1,
Self::Stream { .. } => 1,
}
}
Expand Down Expand Up @@ -91,12 +96,21 @@ impl FromMessage<Managed<StoreAttachment>> for Objectstore {
}
}

impl FromMessage<Managed<StoreRawProfile>> for Objectstore {
type Response = NoResponse;

fn from_message(message: Managed<StoreRawProfile>, _sender: ()) -> Self {
Self::RawProfile(message)
}
}

/// A type tag used for logging.
#[derive(Debug, Clone, Copy)]
enum MessageKind {
Envelope,
EventAttachment,
TraceAttachment,
RawProfile,
Stream,
}

Expand All @@ -106,6 +120,7 @@ impl MessageKind {
Self::Envelope => "envelope",
Self::EventAttachment => "attachment",
Self::TraceAttachment => "attachment_v2",
Self::RawProfile => "profiles_raw",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the other strings are in the singular.

Suggested change
Self::RawProfile => "profiles_raw",
Self::RawProfile => "profile_raw",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other variants use singular:

Suggested change
Self::RawProfile => "profiles_raw",
Self::RawProfile => "raw_profile",

Self::Stream => "stream",
}
}
Expand Down Expand Up @@ -143,6 +158,28 @@ impl Counted for StoreTraceAttachment {
}
}

/// A raw profile (e.g. Perfetto trace) ready for objectstore upload.
///
/// After upload, the [`StoreProfileChunk`] is forwarded to the Store service
/// with the objectstore key set, so the Kafka message carries a reference
/// instead of the full binary blob.
pub struct StoreRawProfile {
/// The raw binary profile payload to upload.
pub payload: Bytes,
/// Content type of the raw profile.
pub content_type: ContentType,
/// The profile chunk message to forward to Kafka after upload.
pub store_message: StoreProfileChunk,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks unnecessarily nested. The objectstore service should be able to construct the StoreProfileChunk message from scratch when it sends the data to the store service.

/// Data retention in days.
pub retention: u16,
}

impl Counted for StoreRawProfile {
fn quantities(&self) -> Quantities {
self.store_message.quantities()
}
}

#[derive(Debug, thiserror::Error)]
#[error("objectstore upload failed")]
pub struct Error {
Expand Down Expand Up @@ -302,12 +339,15 @@ impl ObjectstoreService {
.with_expiration_policy(ExpirationPolicy::TimeToLive(DEFAULT_ATTACHMENT_RETENTION));
let trace_attachments = Usecase::new("trace_attachments")
.with_expiration_policy(ExpirationPolicy::TimeToLive(DEFAULT_ATTACHMENT_RETENTION));
let profiles = Usecase::new("profiles")
Comment thread
markushi marked this conversation as resolved.
Outdated
Comment thread
markushi marked this conversation as resolved.
Outdated
.with_expiration_policy(ExpirationPolicy::TimeToLive(DEFAULT_ATTACHMENT_RETENTION));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this what we want?

Copy link
Copy Markdown
Member

@lcian lcian May 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that DEFAULT_ATTACHMENT_RETENTION means event attachments here. Even if that happens to be the correct retention you actually want, you likely want to use a different constant here to not mix things up.
Here you likely want to use the profiles retention instead.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I was wondering about this as well. Some quick research showed me that the monolith builds a project config, which then gets propagate to relay. And that config is coming from getsentry. At this point I feel like pulling on a thread a bit 😅 but I guess we need this, maybe @jjbayer could chime in here as well, before I start adding support for this.

We would need to

  1. [getsentry] Propagate the config based on the plan, looks like this already happens automatically as it iterates over all data categories
  2. [sentry] Extend the RETENTIONS_CONFIG_MAPPING to also include DataCategory.PROFILE_DURATION and DataCategory.PROFILE_DURATION_UI, so those get propagated to relay
  3. [relay] Extend RetentionsConfig and use that config here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds sensible, yes.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retention is already read from passed in by forward_store via StoreRawProfile::retention, and used in handle_raw_profile.

The fact that profiles don't have a custom retention config is out of scope of this PR IMO.

TL;DR: A hard-coded constant is fine for the fallback, but I would define it in a custom constant and set it to 90 days to match the default event retention.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong retention constant used for profile objectstore uploads

Medium Severity

The profiles usecase is initialized with DEFAULT_ATTACHMENT_RETENTION (30 days), which is the retention for event attachments. Profiles typically use event retention (90 days via DEFAULT_EVENT_RETENTION). While each upload call sets a per-request expiration policy via upload_bytes, the Usecase-level default acts as a fallback. If the per-request policy is ever not applied (e.g. retention_days * 24 overflows u16 via checked_mul returning None), stored raw profiles would be garbage-collected after 30 days instead of the correct profile retention, causing silent data loss. This was also flagged by a reviewer in the PR discussion.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 907aa48. Configure here.


let inner = ObjectstoreServiceInner {
store,
objectstore_client,
event_attachments,
trace_attachments,
profiles,
timeout: Duration::from_secs(*timeout),
stream_timeout: Duration::from_secs(*stream_timeout),
retry_interval: Duration::from_secs_f64(*retry_delay),
Expand Down Expand Up @@ -344,6 +384,11 @@ impl LoadShed<Objectstore> for ObjectstoreService {
Objectstore::TraceAttachment(managed) => {
let _ = managed.reject_err(error);
}
Objectstore::RawProfile(managed) => {
self.inner
.store
.send(managed.map(|profile, _| profile.store_message));
}
Objectstore::Stream(_, sender) => {
sender.send(Err(error));
}
Expand All @@ -357,6 +402,7 @@ struct ObjectstoreServiceInner {
objectstore_client: Client,
event_attachments: Usecase,
trace_attachments: Usecase,
profiles: Usecase,
timeout: Duration,
stream_timeout: Duration,
retry_interval: Duration,
Expand All @@ -381,6 +427,9 @@ impl ObjectstoreServiceInner {
Objectstore::EventAttachment(attachment) => {
self.handle_event_attachment(attachment).await;
}
Objectstore::RawProfile(profile) => {
self.handle_raw_profile(profile).await;
}
Objectstore::Stream(stream, sender) => {
let result = self.handle_stream(stream).await;
if let Err(error) = &result {
Expand Down Expand Up @@ -546,6 +595,46 @@ impl ObjectstoreServiceInner {
Ok(())
}

async fn handle_raw_profile(&self, managed: Managed<StoreRawProfile>) {
Comment thread
markushi marked this conversation as resolved.
let scoping = managed.scoping();
let session = self
.profiles
.for_project(scoping.organization_id.value(), scoping.project_id.value())
.session(&self.objectstore_client);

let payload = managed.payload.clone();
let content_type = managed.content_type;
let retention = managed.retention;

let mut store_message = managed.map(|profile, _| profile.store_message);

match session {
Err(error) => Error::from(error).log(MessageKind::RawProfile),
Ok(session) if !payload.is_empty() => {
let result = self
.upload_bytes(MessageKind::RawProfile, &session, payload, retention, None)
.await;

match result {
Ok(stored_key) => {
store_message.modify(|msg, _| {
msg.raw_profile_object_store_key = Some(stored_key.into_inner());
msg.raw_profile_content_type = Some(content_type);
});
}
Err(error) => {
error.log(MessageKind::RawProfile);
}
}
}
Ok(_) => {}
}

// Always forward to store even if the raw profile upload failed,
// to ensure the kafka message is produced.
self.store.send(store_message);
}

async fn handle_stream(&self, stream: Stream) -> Result<ObjectstoreKey, Error> {
let Stream {
organization_id,
Expand Down
26 changes: 12 additions & 14 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use relay_threading::AsyncPool;
use crate::envelope::{AttachmentPlaceholder, AttachmentType, ContentType, Item, ItemType};
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected};
use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
use crate::processing::profile_chunks::RawProfile;
use crate::service::ServiceError;
use crate::services::global_config::GlobalConfigHandle;
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
Expand Down Expand Up @@ -159,11 +158,10 @@ pub struct StoreProfileChunk {
///
/// Quantities are different for backend and ui profile chunks.
pub quantities: Quantities,
/// Raw binary profile blob (e.g. Perfetto trace).
///
/// Sent alongside the expanded JSON payload because the expansion only extracts a
/// minimum of information; the raw profile is preserved for further processing downstream.
pub raw_profile: Option<RawProfile>,
/// Objectstore key where the raw profile blob is stored.
pub raw_profile_object_store_key: Option<String>,
/// Content type of the raw profile (e.g. Perfetto trace).
pub raw_profile_content_type: Option<ContentType>,
Comment on lines +161 to +164
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since a StoreProfileChunk can only have either a byte payload or an objectstore key, I would encode this into the type system as something like

pub payload: StoreProfileChunkPayload

where

enum StoreProfileChunkPayload {
    Bytes(Bytes),
    ObjectstoreKey(String)
}

}

impl Counted for StoreProfileChunk {
Expand Down Expand Up @@ -854,8 +852,8 @@ impl StoreService {
scoping.project_id.to_string(),
)]),
payload: message.payload,
raw_profile: message.raw_profile.as_ref().map(|r| r.payload.clone()),
raw_profile_content_type: message.raw_profile.map(|r| r.content_type),
raw_profile_object_store_key: message.raw_profile_object_store_key,
raw_profile_content_type: message.raw_profile_content_type,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to provide the content type in the kafka message in the first place? Isn't it always ContentType::PerfettoTrace?

};

self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message))
Expand Down Expand Up @@ -1697,7 +1695,7 @@ struct ProfileChunkKafkaMessage {
headers: BTreeMap<String, String>,
payload: Bytes,
#[serde(skip_serializing_if = "Option::is_none")]
raw_profile: Option<Bytes>,
raw_profile_object_store_key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
raw_profile_content_type: Option<ContentType>,
}
Expand Down Expand Up @@ -1935,32 +1933,32 @@ mod tests {
retention_days: 90,
headers: BTreeMap::new(),
payload: Bytes::from(b"{\"profile\":true}".as_ref()),
raw_profile: None,
raw_profile_object_store_key: None,
raw_profile_content_type: None,
};
let json = serde_json::to_value(&message).unwrap();
assert_eq!(json["organization_id"], 1);
assert_eq!(json["project_id"], 42);
assert!(json.get("raw_profile").is_none());
assert!(json.get("raw_profile_object_store_key").is_none());
assert!(json.get("raw_profile_content_type").is_none());
}

#[test]
fn test_profile_chunk_kafka_message_with_raw_profile() {
fn test_profile_chunk_kafka_message_with_raw_profile_key() {
let message = ProfileChunkKafkaMessage {
organization_id: OrganizationId::new(1),
project_id: ProjectId::new(42),
received: 1234567890,
retention_days: 90,
headers: BTreeMap::new(),
payload: Bytes::from(b"{\"profile\":true}".as_ref()),
raw_profile: Some(Bytes::from(b"perfetto-binary-data".as_ref())),
raw_profile_object_store_key: Some("abc123def456".to_owned()),
raw_profile_content_type: Some(crate::envelope::ContentType::PerfettoTrace),
};
let json = serde_json::to_value(&message).unwrap();
assert_eq!(json["organization_id"], 1);
assert_eq!(json["project_id"], 42);
assert!(json.get("raw_profile").is_some());
assert_eq!(json["raw_profile_object_store_key"], "abc123def456");
assert_eq!(
json["raw_profile_content_type"],
"application/x-perfetto-trace"
Expand Down
Loading