Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 96 additions & 55 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,37 +363,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
self.options.dependency_tracking
}

/// Sets the initial aggregation number for a newly created task. Root tasks get `u32::MAX`
/// to stay at the top. Session-dependent tasks get a high (but not max) aggregation number
/// because they change on every session restore, behaving like dirty leaf nodes — keeping
/// them near the leaves prevents long dirty-propagation chains through intermediate
/// aggregated nodes.
fn set_initial_aggregation_number(
&self,
task_id: TaskId,
is_root: bool,
is_session_dependent: bool,
ctx: &mut impl ExecuteContext<'_>,
) {
let base_aggregation_number = if is_root {
u32::MAX
} else if is_session_dependent && self.should_track_dependencies() {
const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
SESSION_DEPENDENT_AGGREGATION_NUMBER
} else {
return;
};

AggregationUpdateQueue::run(
AggregationUpdateJob::UpdateAggregationNumber {
task_id,
base_aggregation_number,
distance: None,
},
ctx,
);
}

fn should_track_activeness(&self) -> bool {
self.options.active_tracking
}
Expand Down Expand Up @@ -501,6 +470,21 @@ struct TaskExecutionCompletePrepareResult {

// Operations
impl<B: BackingStorage> TurboTasksBackendInner<B> {
fn connect_child(
&self,
parent_task: Option<TaskId>,
child_task: TaskId,
task_type: Option<ArcOrOwned<CachedTaskType>>,
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
) {
operation::ConnectChildOperation::run(
parent_task,
child_task,
task_type,
self.execute_context(turbo_tasks),
);
}

fn try_read_task_output(
self: &Arc<Self>,
task_id: TaskId,
Expand Down Expand Up @@ -1529,23 +1513,24 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
) -> TaskId {
let is_root = task_type.native_fn.is_root;
let is_session_dependent = task_type.native_fn.is_session_dependent;
// Create a single ExecuteContext for both lookup and connect_child
let mut ctx = self.execute_context(turbo_tasks);

// First check if the task exists in the cache which only uses a read lock
// .map(|r| *r) copies the TaskId and drops the DashMap Ref (releasing the read lock)
// before ConnectChildOperation::run, which may re-enter task_cache with a write lock.
if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
self.track_cache_hit(&task_type);
operation::ConnectChildOperation::run(
self.connect_child(
parent_task,
task_id,
Some(ArcOrOwned::Owned(task_type)),
ctx,
turbo_tasks,
);
return task_id;
}

// Create a single ExecuteContext for both lookup and connect_child
let mut ctx = self.execute_context(turbo_tasks);

let mut is_new = false;
let (task_id, task_type) = if let Some(task_id) = ctx.task_by_type(&task_type) {
// Task exists in backing storage
Expand Down Expand Up @@ -1588,8 +1573,15 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
};
(task_id, task_type)
};
if is_new {
self.set_initial_aggregation_number(task_id, is_root, is_session_dependent, &mut ctx);
if is_new && is_root {
AggregationUpdateQueue::run(
AggregationUpdateJob::UpdateAggregationNumber {
task_id,
base_aggregation_number: u32::MAX,
distance: None,
},
&mut ctx,
);
}
// Reuse the same ExecuteContext for connect_child
operation::ConnectChildOperation::run(parent_task, task_id, Some(task_type), ctx);
Expand All @@ -1604,7 +1596,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
) -> TaskId {
let is_root = task_type.native_fn.is_root;
let is_session_dependent = task_type.native_fn.is_session_dependent;

if let Some(parent_task) = parent_task
&& !parent_task.is_transient()
Expand All @@ -1615,17 +1606,16 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
/* cell_id */ None,
);
}
let mut ctx = self.execute_context(turbo_tasks);
// First check if the task exists in the cache which only uses a read lock.
// .map(|r| *r) copies the TaskId and drops the DashMap Ref (releasing the read lock)
// before ConnectChildOperation::run, which may re-enter task_cache with a write lock.
if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
self.track_cache_hit(&task_type);
operation::ConnectChildOperation::run(
self.connect_child(
parent_task,
task_id,
Some(ArcOrOwned::Owned(task_type)),
ctx,
turbo_tasks,
);
return task_id;
}
Expand All @@ -1635,11 +1625,11 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
let task_id = *e.get();
drop(e);
self.track_cache_hit(&task_type);
operation::ConnectChildOperation::run(
self.connect_child(
parent_task,
task_id,
Some(ArcOrOwned::Owned(task_type)),
ctx,
turbo_tasks,
);
task_id
}
Expand All @@ -1649,18 +1639,23 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
e.insert(task_type.clone(), task_id);
self.track_cache_miss(&task_type);

self.set_initial_aggregation_number(
task_id,
is_root,
is_session_dependent,
&mut ctx,
);
if is_root {
let mut ctx = self.execute_context(turbo_tasks);
AggregationUpdateQueue::run(
AggregationUpdateJob::UpdateAggregationNumber {
task_id,
base_aggregation_number: u32::MAX,
distance: None,
},
&mut ctx,
);
}

operation::ConnectChildOperation::run(
self.connect_child(
parent_task,
task_id,
Some(ArcOrOwned::Arc(task_type)),
ctx,
turbo_tasks,
);

task_id
Expand Down Expand Up @@ -1932,6 +1927,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
stale: false,
once_task,
done_event,
session_dependent: false,
marked_as_completed: false,
new_children: Default::default(),
},
Expand Down Expand Up @@ -2160,6 +2156,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
let &mut InProgressState::InProgress(box InProgressStateInner {
stale,
ref mut new_children,
session_dependent,
once_task: is_once_task,
..
}) = in_progress
Expand Down Expand Up @@ -2255,7 +2252,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
// Task was previously marked as immutable
if !is_immutable
// Task is not session dependent (session dependent tasks can change between sessions)
&& !task.is_session_dependent()
&& !session_dependent
// Task has no invalidator
&& !task.invalidator()
// Task has no dependencies on collectibles
Expand Down Expand Up @@ -2605,6 +2602,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
done_event,
once_task: is_once_task,
stale,
session_dependent,
marked_as_completed: _,
new_children,
}) = in_progress
Expand Down Expand Up @@ -2643,8 +2641,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
}
}

// Compute and apply the new dirty state, propagating to aggregating ancestors
let session_dependent = task.is_session_dependent();
// Compute the new dirty state
let new_dirtyness = if session_dependent {
Some(Dirtyness::SessionDependent)
} else {
Expand Down Expand Up @@ -3039,6 +3036,42 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
);
}

fn mark_own_task_as_session_dependent(
&self,
task_id: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
) {
if !self.should_track_dependencies() {
// Without dependency tracking we don't need session dependent tasks
return;
}
const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
let mut ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task_id, TaskDataCategory::Meta);
let aggregation_number = get_aggregation_number(&task);
if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
drop(task);
// We want to use a high aggregation number to avoid large aggregation chains for
// session dependent tasks (which change on every run)
AggregationUpdateQueue::run(
AggregationUpdateJob::UpdateAggregationNumber {
task_id,
base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
distance: None,
},
&mut ctx,
);
task = ctx.task(task_id, TaskDataCategory::Meta);
}
if let Some(InProgressState::InProgress(box InProgressStateInner {
session_dependent,
..
})) = task.get_in_progress_mut()
{
*session_dependent = true;
}
}

fn mark_own_task_as_finished(
&self,
task: TaskId,
Expand Down Expand Up @@ -3573,6 +3606,14 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
self.0.mark_own_task_as_finished(task_id, turbo_tasks);
}

fn mark_own_task_as_session_dependent(
&self,
task: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) {
self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
}

fn connect_task(
&self,
task: TaskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,9 +959,6 @@ pub trait TaskGuard: Debug + TaskStorageAccessors {
panic!("Every task must have a task type {self:?}");
}
}
fn is_session_dependent(&self) -> bool {
matches!(self.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent)
}
fn get_task_desc_fn(&self) -> impl Fn() -> String + Send + Sync + 'static {
let task_type = self.get_task_type().to_owned();
let task_id = self.id();
Expand Down
1 change: 1 addition & 0 deletions turbopack/crates/turbo-tasks-backend/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ pub struct InProgressStateInner {
pub stale: bool,
#[allow(dead_code)]
pub once_task: bool,
pub session_dependent: bool,
/// Early marking as completed. This is set before the output is available and will ignore full
/// task completion of the task for strongly consistent reads.
pub marked_as_completed: bool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this

use anyhow::Result;
use turbo_tasks::{ResolvedVc, State, Vc};
use turbo_tasks::{ResolvedVc, State, Vc, mark_session_dependent};
use turbo_tasks_testing::{Registration, register, run};

static REGISTRATION: Registration = register!();
Expand Down Expand Up @@ -76,8 +76,9 @@ async fn inner_compute(input: Vc<ChangingInput>) -> Result<Vc<u32>> {
Ok(last.unwrap())
}

#[turbo_tasks::function(session_dependent)]
#[turbo_tasks::function]
async fn compute2(input: Vc<u32>) -> Result<Vc<u32>> {
mark_session_dependent();
println!("compute2()");
let value = *input.await?;
Ok(Vc::cell(value))
Expand Down
5 changes: 3 additions & 2 deletions turbopack/crates/turbo-tasks-env/src/command_line.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use turbo_rcstr::RcStr;
use turbo_tasks::{FxIndexMap, Vc};
use turbo_tasks::{FxIndexMap, Vc, mark_session_dependent};

use crate::{GLOBAL_ENV_LOCK, ProcessEnv, TransientEnvMap, sorted_env_vars};

Expand All @@ -23,8 +23,9 @@ fn env_snapshot() -> FxIndexMap<RcStr, RcStr> {

#[turbo_tasks::value_impl]
impl ProcessEnv for CommandLineProcessEnv {
#[turbo_tasks::function(session_dependent)]
#[turbo_tasks::function]
fn read_all(&self) -> Vc<TransientEnvMap> {
mark_session_dependent();
Vc::cell(env_snapshot())
}
}
Loading
Loading