Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions turbopack/crates/turbo-persistence/benches/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ fn prefill_multi_value_database(
) -> Result<Vec<Box<[u8]>>> {
let db_config = TpDbConfig {
family_configs: [FamilyConfig {
name: "test",
kind: FamilyKind::MultiValue,
}],
};
Expand Down Expand Up @@ -648,6 +649,7 @@ fn setup_prefilled_multi_value_db(
fn open_multi_value_db(path: &Path) -> TurboPersistence<SerialScheduler, 1> {
let db_config = TpDbConfig {
family_configs: [FamilyConfig {
name: "test",
kind: FamilyKind::MultiValue,
}],
};
Expand Down Expand Up @@ -914,6 +916,7 @@ fn bench_write_multi_value(c: &mut Criterion) {
|(tempdir, keys, random_data)| {
let db_config = TpDbConfig {
family_configs: [FamilyConfig {
name: "test",
kind: FamilyKind::MultiValue,
}],
};
Expand Down
11 changes: 7 additions & 4 deletions turbopack/crates/turbo-persistence/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,10 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
.collect::<Vec<_>>();

// Merge SST files
let span = tracing::trace_span!("merge files");
let span = tracing::trace_span!(
"merge files",
family = self.config.family_configs[family as usize].name
);
enum PartialMergeResult<'l> {
Merged {
new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>,
Expand Down Expand Up @@ -1406,7 +1409,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
}
let span = tracing::trace_span!(
"database read",
name = family,
name = self.config.family_configs[family].name,
result_size = tracing::field::Empty
)
.entered();
Expand Down Expand Up @@ -1436,7 +1439,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
}
let span = tracing::trace_span!(
"database read multiple",
name = family,
name = self.config.family_configs[family].name,
result_count = tracing::field::Empty,
result_size = tracing::field::Empty
)
Expand Down Expand Up @@ -1573,7 +1576,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
}
let span = tracing::trace_span!(
"database batch read",
name = family,
name = self.config.family_configs[family].name,
keys = keys.len(),
not_found = tracing::field::Empty,
deleted = tracing::field::Empty,
Expand Down
2 changes: 2 additions & 0 deletions turbopack/crates/turbo-persistence/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub enum FamilyKind {
/// Configuration for a single family to describe how the data is stored.
#[derive(Clone, Copy, Debug)]
pub struct FamilyConfig {
pub name: &'static str,
pub kind: FamilyKind,
}

Expand All @@ -65,6 +66,7 @@ impl<const FAMILIES: usize> Default for DbConfig<FAMILIES> {
fn default() -> Self {
Self {
family_configs: [FamilyConfig {
name: "unknown",
kind: FamilyKind::SingleValue,
}; FAMILIES],
}
Expand Down
2 changes: 2 additions & 0 deletions turbopack/crates/turbo-persistence/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1521,6 +1521,7 @@ fn compaction_multi_value_preserves_different_values() -> Result<()> {
fn multi_value_config() -> DbConfig<1> {
let mut config = DbConfig::<1>::default();
config.family_configs[0] = FamilyConfig {
name: "test",
kind: FamilyKind::MultiValue,
};
config
Expand Down Expand Up @@ -2097,6 +2098,7 @@ fn compaction_deletes_blob_multi_value_tombstone() -> Result<()> {

let config = DbConfig {
family_configs: [FamilyConfig {
name: "test",
kind: FamilyKind::MultiValue,
}],
};
Expand Down
6 changes: 3 additions & 3 deletions turbopack/crates/turbo-persistence/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl<K: StoreKey + Send + Sync, S: ParallelScheduler, const FAMILIES: usize>
Ok(collector)
}

#[tracing::instrument(level = "trace", skip(self, collector))]
#[tracing::instrument(level = "trace", skip(self, collector), fields(family_name = self.family_configs[usize_from_u32(family)].name))]
fn flush_thread_local_collector(
&self,
family: u32,
Expand Down Expand Up @@ -243,7 +243,7 @@ impl<K: StoreKey + Send + Sync, S: ParallelScheduler, const FAMILIES: usize>
///
/// Caller must ensure that no concurrent put or delete operation is happening on the flushed
/// family.
#[tracing::instrument(level = "trace", skip(self))]
#[tracing::instrument(level = "trace", skip(self), fields(family_name = self.family_configs[usize_from_u32(family)].name))]
pub unsafe fn flush(&self, family: u32) -> Result<()> {
// Flush the thread local collectors to the global collector.
let mut collectors = Vec::new();
Expand Down Expand Up @@ -452,7 +452,7 @@ impl<K: StoreKey + Send + Sync, S: ParallelScheduler, const FAMILIES: usize>

/// Creates a new SST file with the given collector data.
/// Returns a tuple of (sequence number, file).
#[tracing::instrument(level = "trace", skip(self, collector_data))]
#[tracing::instrument(level = "trace", skip(self, collector_data), fields(family_name = self.family_configs[usize_from_u32(family)].name))]
fn create_sst_file(
&self,
family: u32,
Expand Down
35 changes: 20 additions & 15 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1912,7 +1912,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_)));
if let Some(tasks) = task.prefetch() {
drop(task);
ctx.prepare_tasks(tasks);
ctx.prepare_tasks(tasks, "prefetch");
task = ctx.task(task_id, TaskDataCategory::All);
}
let in_progress = task.take_in_progress()?;
Expand Down Expand Up @@ -2399,6 +2399,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
output_dependent_tasks
.iter()
.map(|&id| (id, TaskDataCategory::All)),
"invalidate output dependents",
);
}

Expand Down Expand Up @@ -2491,20 +2492,24 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
debug_assert!(!new_children.is_empty());

let mut queue = AggregationUpdateQueue::new();
ctx.for_each_task_all(new_children.iter().copied(), |child_task, ctx| {
if !child_task.has_output() {
let child_id = child_task.id();
make_task_dirty_internal(
child_task,
child_id,
false,
#[cfg(feature = "trace_task_dirty")]
TaskDirtyCause::InitialDirty,
&mut queue,
ctx,
);
}
});
ctx.for_each_task_all(
new_children.iter().copied(),
"unfinished children dirty",
|child_task, ctx| {
if !child_task.has_output() {
let child_id = child_task.id();
make_task_dirty_internal(
child_task,
child_id,
false,
#[cfg(feature = "trace_task_dirty")]
TaskDirtyCause::InitialDirty,
&mut queue,
ctx,
);
}
},
);

queue.execute(ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1256,10 +1256,14 @@ impl AggregationUpdateQueue {
self.find_and_schedule_dirty(jobs, ctx);
false
} else if !self.scheduled_tasks.is_empty() {
ctx.for_each_task_all(self.scheduled_tasks.keys().copied(), |task, ctx| {
let parent_priority = self.scheduled_tasks[&task.id()];
ctx.schedule_task(task, parent_priority);
});
ctx.for_each_task_all(
self.scheduled_tasks.keys().copied(),
"schedule tasks",
|task, ctx| {
let parent_priority = self.scheduled_tasks[&task.id()];
ctx.schedule_task(task, parent_priority);
},
);
self.scheduled_tasks.clear();
false
} else {
Expand Down Expand Up @@ -1444,18 +1448,22 @@ impl AggregationUpdateQueue {
.map(|job| (job.task_id, job.span.clone()))
.collect();
// For performance reasons this should stay `Meta` and not `All`
ctx.for_each_task_meta(jobs.into_iter().map(|job| job.task_id), |task, ctx| {
let task_id = task.id();
// Enter the enqueue-time span and create a per-task child span with the
// task description. Both guards must live until the end of the closure.
#[cfg(feature = "trace_find_and_schedule")]
let _trace = (
spans.remove(&task_id).flatten().map(|s| s.entered()),
trace_span!("find and schedule", %task_id, name = task.get_task_description())
.entered(),
);
self.find_and_schedule_dirty_internal(task_id, task, ctx);
});
ctx.for_each_task_meta(
jobs.into_iter().map(|job| job.task_id),
"find and schedule dirty",
|task, ctx| {
let task_id = task.id();
// Enter the enqueue-time span and create a per-task child span with the
// task description. Both guards must live until the end of the closure.
#[cfg(feature = "trace_find_and_schedule")]
let _trace = (
spans.remove(&task_id).flatten().map(|s| s.entered()),
trace_span!("find and schedule", %task_id, name = task.get_task_description())
.entered(),
);
self.find_and_schedule_dirty_internal(task_id, task, ctx);
},
);
}

fn find_and_schedule_dirty_internal(
Expand Down Expand Up @@ -1507,21 +1515,25 @@ impl AggregationUpdateQueue {
update: AggregatedDataUpdate,
) {
// For performance reasons this should stay `Meta` and not `All`
ctx.for_each_task_meta(upper_ids.iter().copied(), |mut upper, ctx| {
let diff = update.apply(&mut upper, ctx.should_track_activeness(), self);
if !diff.is_empty() {
let upper_ids = get_uppers(&upper);
if !upper_ids.is_empty() {
self.push(
AggregatedDataUpdateJob {
upper_ids,
update: diff,
}
.into(),
);
ctx.for_each_task_meta(
upper_ids.iter().copied(),
"aggregated data update",
|mut upper, ctx| {
let diff = update.apply(&mut upper, ctx.should_track_activeness(), self);
if !diff.is_empty() {
let upper_ids = get_uppers(&upper);
if !upper_ids.is_empty() {
self.push(
AggregatedDataUpdateJob {
upper_ids,
update: diff,
}
.into(),
);
}
}
}
});
},
);
}

fn inner_of_uppers_lost_follower(
Expand Down Expand Up @@ -1574,20 +1586,24 @@ impl AggregationUpdateQueue {
if !data.is_empty() {
// remove data from upper
// For performance reasons this should stay `Meta` and not `All`
ctx.for_each_task_meta(removed_uppers.iter().copied(), |mut upper, ctx| {
// STEP 6
let diff = data.apply(&mut upper, ctx.should_track_activeness(), self);
if !diff.is_empty() {
let upper_ids = get_uppers(&upper);
self.push(
AggregatedDataUpdateJob {
upper_ids,
update: diff,
}
.into(),
)
}
});
ctx.for_each_task_meta(
removed_uppers.iter().copied(),
"remove data from uppers",
|mut upper, ctx| {
// STEP 6
let diff = data.apply(&mut upper, ctx.should_track_activeness(), self);
if !diff.is_empty() {
let upper_ids = get_uppers(&upper);
self.push(
AggregatedDataUpdateJob {
upper_ids,
update: diff,
}
.into(),
)
}
},
);
}
// STEP 7
if !followers.is_empty() {
Expand Down Expand Up @@ -2061,6 +2077,7 @@ impl AggregationUpdateQueue {
upper_ids_with_min_aggregation_number
.iter()
.map(|(entry, _)| entry.task_id()),
"add data to uppers",
|mut upper, ctx| {
// STEP 6d
if has_data {
Expand Down
Loading
Loading