Skip to content

Commit 00769ef

Browse files
committed
simplify session dependent tasks (#91729)
1 parent c275d99 commit 00769ef

File tree

24 files changed

+237
-272
lines changed

24 files changed

+237
-272
lines changed

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Lines changed: 55 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,37 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
363363
self.options.dependency_tracking
364364
}
365365

366+
/// Sets the initial aggregation number for a newly created task. Root tasks get `u32::MAX`
367+
/// to stay at the top. Session-dependent tasks get a high (but not max) aggregation number
368+
/// because they change on every session restore, behaving like dirty leaf nodes — keeping
369+
/// them near the leaves prevents long dirty-propagation chains through intermediate
370+
/// aggregated nodes.
371+
fn set_initial_aggregation_number(
372+
&self,
373+
task_id: TaskId,
374+
is_root: bool,
375+
is_session_dependent: bool,
376+
ctx: &mut impl ExecuteContext<'_>,
377+
) {
378+
let base_aggregation_number = if is_root {
379+
u32::MAX
380+
} else if is_session_dependent && self.should_track_dependencies() {
381+
const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
382+
SESSION_DEPENDENT_AGGREGATION_NUMBER
383+
} else {
384+
return;
385+
};
386+
387+
AggregationUpdateQueue::run(
388+
AggregationUpdateJob::UpdateAggregationNumber {
389+
task_id,
390+
base_aggregation_number,
391+
distance: None,
392+
},
393+
ctx,
394+
);
395+
}
396+
366397
fn should_track_activeness(&self) -> bool {
367398
self.options.active_tracking
368399
}
@@ -470,21 +501,6 @@ struct TaskExecutionCompletePrepareResult {
470501

471502
// Operations
472503
impl<B: BackingStorage> TurboTasksBackendInner<B> {
473-
fn connect_child(
474-
&self,
475-
parent_task: Option<TaskId>,
476-
child_task: TaskId,
477-
task_type: Option<ArcOrOwned<CachedTaskType>>,
478-
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
479-
) {
480-
operation::ConnectChildOperation::run(
481-
parent_task,
482-
child_task,
483-
task_type,
484-
self.execute_context(turbo_tasks),
485-
);
486-
}
487-
488504
fn try_read_task_output(
489505
self: &Arc<Self>,
490506
task_id: TaskId,
@@ -1513,24 +1529,23 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
15131529
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
15141530
) -> TaskId {
15151531
let is_root = task_type.native_fn.is_root;
1516-
1532+
let is_session_dependent = task_type.native_fn.is_session_dependent;
1533+
// Create a single ExecuteContext for both lookup and connect_child
1534+
let mut ctx = self.execute_context(turbo_tasks);
15171535
// First check if the task exists in the cache which only uses a read lock
15181536
// .map(|r| *r) copies the TaskId and drops the DashMap Ref (releasing the read lock)
15191537
// before ConnectChildOperation::run, which may re-enter task_cache with a write lock.
15201538
if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
15211539
self.track_cache_hit(&task_type);
1522-
self.connect_child(
1540+
operation::ConnectChildOperation::run(
15231541
parent_task,
15241542
task_id,
15251543
Some(ArcOrOwned::Owned(task_type)),
1526-
turbo_tasks,
1544+
ctx,
15271545
);
15281546
return task_id;
15291547
}
15301548

1531-
// Create a single ExecuteContext for both lookup and connect_child
1532-
let mut ctx = self.execute_context(turbo_tasks);
1533-
15341549
let mut is_new = false;
15351550
let (task_id, task_type) = if let Some(task_id) = ctx.task_by_type(&task_type) {
15361551
// Task exists in backing storage
@@ -1573,15 +1588,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
15731588
};
15741589
(task_id, task_type)
15751590
};
1576-
if is_new && is_root {
1577-
AggregationUpdateQueue::run(
1578-
AggregationUpdateJob::UpdateAggregationNumber {
1579-
task_id,
1580-
base_aggregation_number: u32::MAX,
1581-
distance: None,
1582-
},
1583-
&mut ctx,
1584-
);
1591+
if is_new {
1592+
self.set_initial_aggregation_number(task_id, is_root, is_session_dependent, &mut ctx);
15851593
}
15861594
// Reuse the same ExecuteContext for connect_child
15871595
operation::ConnectChildOperation::run(parent_task, task_id, Some(task_type), ctx);
@@ -1596,6 +1604,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
15961604
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
15971605
) -> TaskId {
15981606
let is_root = task_type.native_fn.is_root;
1607+
let is_session_dependent = task_type.native_fn.is_session_dependent;
15991608

16001609
if let Some(parent_task) = parent_task
16011610
&& !parent_task.is_transient()
@@ -1606,16 +1615,17 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
16061615
/* cell_id */ None,
16071616
);
16081617
}
1618+
let mut ctx = self.execute_context(turbo_tasks);
16091619
// First check if the task exists in the cache which only uses a read lock.
16101620
// .map(|r| *r) copies the TaskId and drops the DashMap Ref (releasing the read lock)
16111621
// before ConnectChildOperation::run, which may re-enter task_cache with a write lock.
16121622
if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
16131623
self.track_cache_hit(&task_type);
1614-
self.connect_child(
1624+
operation::ConnectChildOperation::run(
16151625
parent_task,
16161626
task_id,
16171627
Some(ArcOrOwned::Owned(task_type)),
1618-
turbo_tasks,
1628+
ctx,
16191629
);
16201630
return task_id;
16211631
}
@@ -1625,11 +1635,11 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
16251635
let task_id = *e.get();
16261636
drop(e);
16271637
self.track_cache_hit(&task_type);
1628-
self.connect_child(
1638+
operation::ConnectChildOperation::run(
16291639
parent_task,
16301640
task_id,
16311641
Some(ArcOrOwned::Owned(task_type)),
1632-
turbo_tasks,
1642+
ctx,
16331643
);
16341644
task_id
16351645
}
@@ -1639,23 +1649,18 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
16391649
e.insert(task_type.clone(), task_id);
16401650
self.track_cache_miss(&task_type);
16411651

1642-
if is_root {
1643-
let mut ctx = self.execute_context(turbo_tasks);
1644-
AggregationUpdateQueue::run(
1645-
AggregationUpdateJob::UpdateAggregationNumber {
1646-
task_id,
1647-
base_aggregation_number: u32::MAX,
1648-
distance: None,
1649-
},
1650-
&mut ctx,
1651-
);
1652-
}
1652+
self.set_initial_aggregation_number(
1653+
task_id,
1654+
is_root,
1655+
is_session_dependent,
1656+
&mut ctx,
1657+
);
16531658

1654-
self.connect_child(
1659+
operation::ConnectChildOperation::run(
16551660
parent_task,
16561661
task_id,
16571662
Some(ArcOrOwned::Arc(task_type)),
1658-
turbo_tasks,
1663+
ctx,
16591664
);
16601665

16611666
task_id
@@ -1927,7 +1932,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
19271932
stale: false,
19281933
once_task,
19291934
done_event,
1930-
session_dependent: false,
19311935
marked_as_completed: false,
19321936
new_children: Default::default(),
19331937
},
@@ -2156,7 +2160,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
21562160
let &mut InProgressState::InProgress(box InProgressStateInner {
21572161
stale,
21582162
ref mut new_children,
2159-
session_dependent,
21602163
once_task: is_once_task,
21612164
..
21622165
}) = in_progress
@@ -2252,7 +2255,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
22522255
// Task was previously marked as immutable
22532256
if !is_immutable
22542257
// Task is not session dependent (session dependent tasks can change between sessions)
2255-
&& !session_dependent
2258+
&& !task.is_session_dependent()
22562259
// Task has no invalidator
22572260
&& !task.invalidator()
22582261
// Task has no dependencies on collectibles
@@ -2607,7 +2610,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
26072610
done_event,
26082611
once_task: is_once_task,
26092612
stale,
2610-
session_dependent,
26112613
marked_as_completed: _,
26122614
new_children,
26132615
}) = in_progress
@@ -2646,7 +2648,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
26462648
}
26472649
}
26482650

2649-
// Compute the new dirty state
2651+
// Compute and apply the new dirty state, propagating to aggregating ancestors
2652+
let session_dependent = task.is_session_dependent();
26502653
let new_dirtyness = if session_dependent {
26512654
Some(Dirtyness::SessionDependent)
26522655
} else {
@@ -3041,42 +3044,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
30413044
);
30423045
}
30433046

3044-
fn mark_own_task_as_session_dependent(
3045-
&self,
3046-
task_id: TaskId,
3047-
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3048-
) {
3049-
if !self.should_track_dependencies() {
3050-
// Without dependency tracking we don't need session dependent tasks
3051-
return;
3052-
}
3053-
const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
3054-
let mut ctx = self.execute_context(turbo_tasks);
3055-
let mut task = ctx.task(task_id, TaskDataCategory::Meta);
3056-
let aggregation_number = get_aggregation_number(&task);
3057-
if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
3058-
drop(task);
3059-
// We want to use a high aggregation number to avoid large aggregation chains for
3060-
// session dependent tasks (which change on every run)
3061-
AggregationUpdateQueue::run(
3062-
AggregationUpdateJob::UpdateAggregationNumber {
3063-
task_id,
3064-
base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
3065-
distance: None,
3066-
},
3067-
&mut ctx,
3068-
);
3069-
task = ctx.task(task_id, TaskDataCategory::Meta);
3070-
}
3071-
if let Some(InProgressState::InProgress(box InProgressStateInner {
3072-
session_dependent,
3073-
..
3074-
})) = task.get_in_progress_mut()
3075-
{
3076-
*session_dependent = true;
3077-
}
3078-
}
3079-
30803047
fn mark_own_task_as_finished(
30813048
&self,
30823049
task: TaskId,
@@ -3611,14 +3578,6 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
36113578
self.0.mark_own_task_as_finished(task_id, turbo_tasks);
36123579
}
36133580

3614-
fn mark_own_task_as_session_dependent(
3615-
&self,
3616-
task: TaskId,
3617-
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3618-
) {
3619-
self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3620-
}
3621-
36223581
fn connect_task(
36233582
&self,
36243583
task: TaskId,

turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,6 +1050,9 @@ pub trait TaskGuard: Debug + TaskStorageAccessors {
10501050
panic!("Every task must have a task type {self:?}");
10511051
}
10521052
}
1053+
fn is_session_dependent(&self) -> bool {
1054+
matches!(self.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent)
1055+
}
10531056
fn get_task_desc_fn(&self) -> impl Fn() -> String + Send + Sync + 'static {
10541057
let task_type = self.get_task_type().to_owned();
10551058
let task_id = self.id();

turbopack/crates/turbo-tasks-backend/src/data.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,6 @@ pub struct InProgressStateInner {
238238
pub stale: bool,
239239
#[allow(dead_code)]
240240
pub once_task: bool,
241-
pub session_dependent: bool,
242241
/// Early marking as completed. This is set before the output is available and will ignore full
243242
/// task completion of the task for strongly consistent reads.
244243
pub marked_as_completed: bool,

turbopack/crates/turbo-tasks-backend/tests/emptied_cells_session_dependent.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this
44

55
use anyhow::Result;
6-
use turbo_tasks::{ResolvedVc, State, Vc, mark_session_dependent};
6+
use turbo_tasks::{ResolvedVc, State, Vc};
77
use turbo_tasks_testing::{Registration, register, run};
88

99
static REGISTRATION: Registration = register!();
@@ -76,9 +76,8 @@ async fn inner_compute(input: Vc<ChangingInput>) -> Result<Vc<u32>> {
7676
Ok(last.unwrap())
7777
}
7878

79-
#[turbo_tasks::function]
79+
#[turbo_tasks::function(session_dependent)]
8080
async fn compute2(input: Vc<u32>) -> Result<Vc<u32>> {
81-
mark_session_dependent();
8281
println!("compute2()");
8382
let value = *input.await?;
8483
Ok(Vc::cell(value))

turbopack/crates/turbo-tasks-env/src/command_line.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use turbo_rcstr::RcStr;
2-
use turbo_tasks::{FxIndexMap, Vc, mark_session_dependent};
2+
use turbo_tasks::{FxIndexMap, Vc};
33

44
use crate::{GLOBAL_ENV_LOCK, ProcessEnv, TransientEnvMap, sorted_env_vars};
55

@@ -23,9 +23,8 @@ fn env_snapshot() -> FxIndexMap<RcStr, RcStr> {
2323

2424
#[turbo_tasks::value_impl]
2525
impl ProcessEnv for CommandLineProcessEnv {
26-
#[turbo_tasks::function]
26+
#[turbo_tasks::function(session_dependent)]
2727
fn read_all(&self) -> Vc<TransientEnvMap> {
28-
mark_session_dependent();
2928
Vc::cell(env_snapshot())
3029
}
3130
}

turbopack/crates/turbo-tasks-fetch/src/client.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{hash::Hash, sync::LazyLock};
33
use anyhow::Result;
44
use quick_cache::sync::Cache;
55
use turbo_rcstr::RcStr;
6-
use turbo_tasks::{ReadRef, Vc, duration_span, mark_session_dependent};
6+
use turbo_tasks::{Completion, ReadRef, Vc, duration_span};
77

88
use crate::{FetchError, FetchResult, HttpResponse, HttpResponseBody};
99

@@ -121,7 +121,10 @@ impl FetchClientConfig {
121121
Ok(resp) => Ok(Vc::cell(Ok(resp.resolved_cell()))),
122122
Err(err) => {
123123
// the client failed to construct or the HTTP request failed
124-
mark_session_dependent();
124+
// Mark session dependent so we get retried in the next sessions
125+
// In dev our caller will keep going, but in prod builds this will fail the build
126+
// anyway.
127+
Completion::session_dependent().as_side_effect().await?;
125128
Ok(Vc::cell(Err(
126129
FetchError::from_reqwest_error(&err, &url).resolved_cell()
127130
)))

0 commit comments

Comments
 (0)