Skip to content

Commit 4a6d432

Browse files
committed
Revert "simplify session dependent tasks and add TTL support (#91729)"
This reverts commit fe99f0d.
1 parent 9cb2048 commit 4a6d432

File tree

25 files changed

+289
-704
lines changed

25 files changed

+289
-704
lines changed

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Lines changed: 96 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -363,37 +363,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
363363
self.options.dependency_tracking
364364
}
365365

366-
/// Sets the initial aggregation number for a newly created task. Root tasks get `u32::MAX`
367-
/// to stay at the top. Session-dependent tasks get a high (but not max) aggregation number
368-
/// because they change on every session restore, behaving like dirty leaf nodes — keeping
369-
/// them near the leaves prevents long dirty-propagation chains through intermediate
370-
/// aggregated nodes.
371-
fn set_initial_aggregation_number(
372-
&self,
373-
task_id: TaskId,
374-
is_root: bool,
375-
is_session_dependent: bool,
376-
ctx: &mut impl ExecuteContext<'_>,
377-
) {
378-
let base_aggregation_number = if is_root {
379-
u32::MAX
380-
} else if is_session_dependent && self.should_track_dependencies() {
381-
const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
382-
SESSION_DEPENDENT_AGGREGATION_NUMBER
383-
} else {
384-
return;
385-
};
386-
387-
AggregationUpdateQueue::run(
388-
AggregationUpdateJob::UpdateAggregationNumber {
389-
task_id,
390-
base_aggregation_number,
391-
distance: None,
392-
},
393-
ctx,
394-
);
395-
}
396-
397366
fn should_track_activeness(&self) -> bool {
398367
self.options.active_tracking
399368
}
@@ -501,6 +470,21 @@ struct TaskExecutionCompletePrepareResult {
501470

502471
// Operations
503472
impl<B: BackingStorage> TurboTasksBackendInner<B> {
473+
fn connect_child(
474+
&self,
475+
parent_task: Option<TaskId>,
476+
child_task: TaskId,
477+
task_type: Option<ArcOrOwned<CachedTaskType>>,
478+
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
479+
) {
480+
operation::ConnectChildOperation::run(
481+
parent_task,
482+
child_task,
483+
task_type,
484+
self.execute_context(turbo_tasks),
485+
);
486+
}
487+
504488
fn try_read_task_output(
505489
self: &Arc<Self>,
506490
task_id: TaskId,
@@ -1529,23 +1513,24 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
15291513
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
15301514
) -> TaskId {
15311515
let is_root = task_type.native_fn.is_root;
1532-
let is_session_dependent = task_type.native_fn.is_session_dependent;
1533-
// Create a single ExecuteContext for both lookup and connect_child
1534-
let mut ctx = self.execute_context(turbo_tasks);
1516+
15351517
// First check if the task exists in the cache which only uses a read lock
15361518
// .map(|r| *r) copies the TaskId and drops the DashMap Ref (releasing the read lock)
15371519
// before ConnectChildOperation::run, which may re-enter task_cache with a write lock.
15381520
if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
15391521
self.track_cache_hit(&task_type);
1540-
operation::ConnectChildOperation::run(
1522+
self.connect_child(
15411523
parent_task,
15421524
task_id,
15431525
Some(ArcOrOwned::Owned(task_type)),
1544-
ctx,
1526+
turbo_tasks,
15451527
);
15461528
return task_id;
15471529
}
15481530

1531+
// Create a single ExecuteContext for both lookup and connect_child
1532+
let mut ctx = self.execute_context(turbo_tasks);
1533+
15491534
let mut is_new = false;
15501535
let (task_id, task_type) = if let Some(task_id) = ctx.task_by_type(&task_type) {
15511536
// Task exists in backing storage
@@ -1588,8 +1573,15 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
15881573
};
15891574
(task_id, task_type)
15901575
};
1591-
if is_new {
1592-
self.set_initial_aggregation_number(task_id, is_root, is_session_dependent, &mut ctx);
1576+
if is_new && is_root {
1577+
AggregationUpdateQueue::run(
1578+
AggregationUpdateJob::UpdateAggregationNumber {
1579+
task_id,
1580+
base_aggregation_number: u32::MAX,
1581+
distance: None,
1582+
},
1583+
&mut ctx,
1584+
);
15931585
}
15941586
// Reuse the same ExecuteContext for connect_child
15951587
operation::ConnectChildOperation::run(parent_task, task_id, Some(task_type), ctx);
@@ -1604,7 +1596,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
16041596
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
16051597
) -> TaskId {
16061598
let is_root = task_type.native_fn.is_root;
1607-
let is_session_dependent = task_type.native_fn.is_session_dependent;
16081599

16091600
if let Some(parent_task) = parent_task
16101601
&& !parent_task.is_transient()
@@ -1615,17 +1606,16 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
16151606
/* cell_id */ None,
16161607
);
16171608
}
1618-
let mut ctx = self.execute_context(turbo_tasks);
16191609
// First check if the task exists in the cache which only uses a read lock.
16201610
// .map(|r| *r) copies the TaskId and drops the DashMap Ref (releasing the read lock)
16211611
// before ConnectChildOperation::run, which may re-enter task_cache with a write lock.
16221612
if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) {
16231613
self.track_cache_hit(&task_type);
1624-
operation::ConnectChildOperation::run(
1614+
self.connect_child(
16251615
parent_task,
16261616
task_id,
16271617
Some(ArcOrOwned::Owned(task_type)),
1628-
ctx,
1618+
turbo_tasks,
16291619
);
16301620
return task_id;
16311621
}
@@ -1635,11 +1625,11 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
16351625
let task_id = *e.get();
16361626
drop(e);
16371627
self.track_cache_hit(&task_type);
1638-
operation::ConnectChildOperation::run(
1628+
self.connect_child(
16391629
parent_task,
16401630
task_id,
16411631
Some(ArcOrOwned::Owned(task_type)),
1642-
ctx,
1632+
turbo_tasks,
16431633
);
16441634
task_id
16451635
}
@@ -1649,18 +1639,23 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
16491639
e.insert(task_type.clone(), task_id);
16501640
self.track_cache_miss(&task_type);
16511641

1652-
self.set_initial_aggregation_number(
1653-
task_id,
1654-
is_root,
1655-
is_session_dependent,
1656-
&mut ctx,
1657-
);
1642+
if is_root {
1643+
let mut ctx = self.execute_context(turbo_tasks);
1644+
AggregationUpdateQueue::run(
1645+
AggregationUpdateJob::UpdateAggregationNumber {
1646+
task_id,
1647+
base_aggregation_number: u32::MAX,
1648+
distance: None,
1649+
},
1650+
&mut ctx,
1651+
);
1652+
}
16581653

1659-
operation::ConnectChildOperation::run(
1654+
self.connect_child(
16601655
parent_task,
16611656
task_id,
16621657
Some(ArcOrOwned::Arc(task_type)),
1663-
ctx,
1658+
turbo_tasks,
16641659
);
16651660

16661661
task_id
@@ -1932,6 +1927,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
19321927
stale: false,
19331928
once_task,
19341929
done_event,
1930+
session_dependent: false,
19351931
marked_as_completed: false,
19361932
new_children: Default::default(),
19371933
},
@@ -2160,6 +2156,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
21602156
let &mut InProgressState::InProgress(box InProgressStateInner {
21612157
stale,
21622158
ref mut new_children,
2159+
session_dependent,
21632160
once_task: is_once_task,
21642161
..
21652162
}) = in_progress
@@ -2255,7 +2252,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
22552252
// Task was previously marked as immutable
22562253
if !is_immutable
22572254
// Task is not session dependent (session dependent tasks can change between sessions)
2258-
&& !task.is_session_dependent()
2255+
&& !session_dependent
22592256
// Task has no invalidator
22602257
&& !task.invalidator()
22612258
// Task has no dependencies on collectibles
@@ -2605,6 +2602,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
26052602
done_event,
26062603
once_task: is_once_task,
26072604
stale,
2605+
session_dependent,
26082606
marked_as_completed: _,
26092607
new_children,
26102608
}) = in_progress
@@ -2643,8 +2641,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
26432641
}
26442642
}
26452643

2646-
// Compute and apply the new dirty state, propagating to aggregating ancestors
2647-
let session_dependent = task.is_session_dependent();
2644+
// Compute the new dirty state
26482645
let new_dirtyness = if session_dependent {
26492646
Some(Dirtyness::SessionDependent)
26502647
} else {
@@ -3039,6 +3036,42 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
30393036
);
30403037
}
30413038

3039+
fn mark_own_task_as_session_dependent(
3040+
&self,
3041+
task_id: TaskId,
3042+
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
3043+
) {
3044+
if !self.should_track_dependencies() {
3045+
// Without dependency tracking we don't need session dependent tasks
3046+
return;
3047+
}
3048+
const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2;
3049+
let mut ctx = self.execute_context(turbo_tasks);
3050+
let mut task = ctx.task(task_id, TaskDataCategory::Meta);
3051+
let aggregation_number = get_aggregation_number(&task);
3052+
if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER {
3053+
drop(task);
3054+
// We want to use a high aggregation number to avoid large aggregation chains for
3055+
// session dependent tasks (which change on every run)
3056+
AggregationUpdateQueue::run(
3057+
AggregationUpdateJob::UpdateAggregationNumber {
3058+
task_id,
3059+
base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER,
3060+
distance: None,
3061+
},
3062+
&mut ctx,
3063+
);
3064+
task = ctx.task(task_id, TaskDataCategory::Meta);
3065+
}
3066+
if let Some(InProgressState::InProgress(box InProgressStateInner {
3067+
session_dependent,
3068+
..
3069+
})) = task.get_in_progress_mut()
3070+
{
3071+
*session_dependent = true;
3072+
}
3073+
}
3074+
30423075
fn mark_own_task_as_finished(
30433076
&self,
30443077
task: TaskId,
@@ -3573,6 +3606,14 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
35733606
self.0.mark_own_task_as_finished(task_id, turbo_tasks);
35743607
}
35753608

3609+
fn mark_own_task_as_session_dependent(
3610+
&self,
3611+
task: TaskId,
3612+
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
3613+
) {
3614+
self.0.mark_own_task_as_session_dependent(task, turbo_tasks);
3615+
}
3616+
35763617
fn connect_task(
35773618
&self,
35783619
task: TaskId,

turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -959,9 +959,6 @@ pub trait TaskGuard: Debug + TaskStorageAccessors {
959959
panic!("Every task must have a task type {self:?}");
960960
}
961961
}
962-
fn is_session_dependent(&self) -> bool {
963-
matches!(self.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent)
964-
}
965962
fn get_task_desc_fn(&self) -> impl Fn() -> String + Send + Sync + 'static {
966963
let task_type = self.get_task_type().to_owned();
967964
let task_id = self.id();

turbopack/crates/turbo-tasks-backend/src/data.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ pub struct InProgressStateInner {
238238
pub stale: bool,
239239
#[allow(dead_code)]
240240
pub once_task: bool,
241+
pub session_dependent: bool,
241242
/// Early marking as completed. This is set before the output is available and will ignore full
242243
/// task completion of the task for strongly consistent reads.
243244
pub marked_as_completed: bool,

turbopack/crates/turbo-tasks-backend/tests/emptied_cells_session_dependent.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this
44

55
use anyhow::Result;
6-
use turbo_tasks::{ResolvedVc, State, Vc};
6+
use turbo_tasks::{ResolvedVc, State, Vc, mark_session_dependent};
77
use turbo_tasks_testing::{Registration, register, run};
88

99
static REGISTRATION: Registration = register!();
@@ -76,8 +76,9 @@ async fn inner_compute(input: Vc<ChangingInput>) -> Result<Vc<u32>> {
7676
Ok(last.unwrap())
7777
}
7878

79-
#[turbo_tasks::function(session_dependent)]
79+
#[turbo_tasks::function]
8080
async fn compute2(input: Vc<u32>) -> Result<Vc<u32>> {
81+
mark_session_dependent();
8182
println!("compute2()");
8283
let value = *input.await?;
8384
Ok(Vc::cell(value))

turbopack/crates/turbo-tasks-env/src/command_line.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use turbo_rcstr::RcStr;
2-
use turbo_tasks::{FxIndexMap, Vc};
2+
use turbo_tasks::{FxIndexMap, Vc, mark_session_dependent};
33

44
use crate::{GLOBAL_ENV_LOCK, ProcessEnv, TransientEnvMap, sorted_env_vars};
55

@@ -23,8 +23,9 @@ fn env_snapshot() -> FxIndexMap<RcStr, RcStr> {
2323

2424
#[turbo_tasks::value_impl]
2525
impl ProcessEnv for CommandLineProcessEnv {
26-
#[turbo_tasks::function(session_dependent)]
26+
#[turbo_tasks::function]
2727
fn read_all(&self) -> Vc<TransientEnvMap> {
28+
mark_session_dependent();
2829
Vc::cell(env_snapshot())
2930
}
3031
}

0 commit comments

Comments
 (0)