diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 34a8024821f4c8..9dde575d7221a2 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -363,6 +363,37 @@ impl TurboTasksBackendInner { self.options.dependency_tracking } + /// Sets the initial aggregation number for a newly created task. Root tasks get `u32::MAX` + /// to stay at the top. Session-dependent tasks get a high (but not max) aggregation number + /// because they change on every session restore, behaving like dirty leaf nodes — keeping + /// them near the leaves prevents long dirty-propagation chains through intermediate + /// aggregated nodes. + fn set_initial_aggregation_number( + &self, + task_id: TaskId, + is_root: bool, + is_session_dependent: bool, + ctx: &mut impl ExecuteContext<'_>, + ) { + let base_aggregation_number = if is_root { + u32::MAX + } else if is_session_dependent && self.should_track_dependencies() { + const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2; + SESSION_DEPENDENT_AGGREGATION_NUMBER + } else { + return; + }; + + AggregationUpdateQueue::run( + AggregationUpdateJob::UpdateAggregationNumber { + task_id, + base_aggregation_number, + distance: None, + }, + ctx, + ); + } + fn should_track_activeness(&self) -> bool { self.options.active_tracking } @@ -470,21 +501,6 @@ struct TaskExecutionCompletePrepareResult { // Operations impl TurboTasksBackendInner { - fn connect_child( - &self, - parent_task: Option, - child_task: TaskId, - task_type: Option>, - turbo_tasks: &dyn TurboTasksBackendApi>, - ) { - operation::ConnectChildOperation::run( - parent_task, - child_task, - task_type, - self.execute_context(turbo_tasks), - ); - } - fn try_read_task_output( self: &Arc, task_id: TaskId, @@ -1513,24 +1529,23 @@ impl TurboTasksBackendInner { turbo_tasks: &dyn TurboTasksBackendApi>, ) -> TaskId { let is_root = task_type.native_fn.is_root; - + let is_session_dependent = task_type.native_fn.is_session_dependent; + // Create a single ExecuteContext for both lookup and connect_child + let mut ctx = self.execute_context(turbo_tasks); // First check if the task exists in the cache which only uses a read lock // .map(|r| *r) copies the TaskId and drops the DashMap Ref (releasing the read lock) // before ConnectChildOperation::run, which may re-enter task_cache with a write lock. if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) { self.track_cache_hit(&task_type); - self.connect_child( + operation::ConnectChildOperation::run( parent_task, task_id, Some(ArcOrOwned::Owned(task_type)), - turbo_tasks, + ctx, ); return task_id; } - // Create a single ExecuteContext for both lookup and connect_child - let mut ctx = self.execute_context(turbo_tasks); - let mut is_new = false; let (task_id, task_type) = if let Some(task_id) = ctx.task_by_type(&task_type) { // Task exists in backing storage @@ -1573,15 +1588,8 @@ impl TurboTasksBackendInner { }; (task_id, task_type) }; - if is_new && is_root { - AggregationUpdateQueue::run( - AggregationUpdateJob::UpdateAggregationNumber { - task_id, - base_aggregation_number: u32::MAX, - distance: None, - }, - &mut ctx, - ); + if is_new { + self.set_initial_aggregation_number(task_id, is_root, is_session_dependent, &mut ctx); } // Reuse the same ExecuteContext for connect_child operation::ConnectChildOperation::run(parent_task, task_id, Some(task_type), ctx); @@ -1596,6 +1604,7 @@ impl TurboTasksBackendInner { turbo_tasks: &dyn TurboTasksBackendApi>, ) -> TaskId { let is_root = task_type.native_fn.is_root; + let is_session_dependent = task_type.native_fn.is_session_dependent; if let Some(parent_task) = parent_task && !parent_task.is_transient() @@ -1606,16 +1615,17 @@ impl TurboTasksBackendInner { /* cell_id */ None, ); } + let mut ctx = self.execute_context(turbo_tasks); // First check if the task exists in the cache which only uses a read lock. // .map(|r| *r) copies the TaskId and drops the DashMap Ref (releasing the read lock) // before ConnectChildOperation::run, which may re-enter task_cache with a write lock. if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) { self.track_cache_hit(&task_type); - self.connect_child( + operation::ConnectChildOperation::run( parent_task, task_id, Some(ArcOrOwned::Owned(task_type)), - turbo_tasks, + ctx, ); return task_id; } @@ -1625,11 +1635,11 @@ impl TurboTasksBackendInner { let task_id = *e.get(); drop(e); self.track_cache_hit(&task_type); - self.connect_child( + operation::ConnectChildOperation::run( parent_task, task_id, Some(ArcOrOwned::Owned(task_type)), - turbo_tasks, + ctx, ); task_id } @@ -1639,23 +1649,18 @@ impl TurboTasksBackendInner { e.insert(task_type.clone(), task_id); self.track_cache_miss(&task_type); - if is_root { - let mut ctx = self.execute_context(turbo_tasks); - AggregationUpdateQueue::run( - AggregationUpdateJob::UpdateAggregationNumber { - task_id, - base_aggregation_number: u32::MAX, - distance: None, - }, - &mut ctx, - ); - } + self.set_initial_aggregation_number( + task_id, + is_root, + is_session_dependent, + &mut ctx, + ); - self.connect_child( + operation::ConnectChildOperation::run( parent_task, task_id, Some(ArcOrOwned::Arc(task_type)), - turbo_tasks, + ctx, ); task_id @@ -1927,7 +1932,6 @@ impl TurboTasksBackendInner { stale: false, once_task, done_event, - session_dependent: false, marked_as_completed: false, new_children: Default::default(), }, @@ -2156,7 +2160,6 @@ impl TurboTasksBackendInner { let &mut InProgressState::InProgress(box InProgressStateInner { stale, ref mut new_children, - session_dependent, once_task: is_once_task, .. }) = in_progress @@ -2252,7 +2255,7 @@ impl TurboTasksBackendInner { // Task was previously marked as immutable if !is_immutable // Task is not session dependent (session dependent tasks can change between sessions) - && !session_dependent + && !task.is_session_dependent() // Task has no invalidator && !task.invalidator() // Task has no dependencies on collectibles @@ -2602,7 +2605,6 @@ impl TurboTasksBackendInner { done_event, once_task: is_once_task, stale, - session_dependent, marked_as_completed: _, new_children, }) = in_progress @@ -2641,7 +2643,8 @@ impl TurboTasksBackendInner { } } - // Compute the new dirty state + // Compute and apply the new dirty state, propagating to aggregating ancestors + let session_dependent = task.is_session_dependent(); let new_dirtyness = if session_dependent { Some(Dirtyness::SessionDependent) } else { @@ -3036,42 +3039,6 @@ impl TurboTasksBackendInner { ); } - fn mark_own_task_as_session_dependent( - &self, - task_id: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi>, - ) { - if !self.should_track_dependencies() { - // Without dependency tracking we don't need session dependent tasks - return; - } - const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2; - let mut ctx = self.execute_context(turbo_tasks); - let mut task = ctx.task(task_id, TaskDataCategory::Meta); - let aggregation_number = get_aggregation_number(&task); - if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER { - drop(task); - // We want to use a high aggregation number to avoid large aggregation chains for - // session dependent tasks (which change on every run) - AggregationUpdateQueue::run( - AggregationUpdateJob::UpdateAggregationNumber { - task_id, - base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER, - distance: None, - }, - &mut ctx, - ); - task = ctx.task(task_id, TaskDataCategory::Meta); - } - if let Some(InProgressState::InProgress(box InProgressStateInner { - session_dependent, - .. - })) = task.get_in_progress_mut() - { - *session_dependent = true; - } - } - fn mark_own_task_as_finished( &self, task: TaskId, @@ -3606,14 +3573,6 @@ impl Backend for TurboTasksBackend { self.0.mark_own_task_as_finished(task_id, turbo_tasks); } - fn mark_own_task_as_session_dependent( - &self, - task: TaskId, - turbo_tasks: &dyn TurboTasksBackendApi, - ) { - self.0.mark_own_task_as_session_dependent(task, turbo_tasks); - } - fn connect_task( &self, task: TaskId, diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index e06baba14cb68e..47cb900b237055 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -959,6 +959,9 @@ pub trait TaskGuard: Debug + TaskStorageAccessors { panic!("Every task must have a task type {self:?}"); } } + fn is_session_dependent(&self) -> bool { + matches!(self.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent) + } fn get_task_desc_fn(&self) -> impl Fn() -> String + Send + Sync + 'static { let task_type = self.get_task_type().to_owned(); let task_id = self.id(); diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index b50ffdb97edc5a..0332e0e559d520 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -238,7 +238,6 @@ pub struct InProgressStateInner { pub stale: bool, #[allow(dead_code)] pub once_task: bool, - pub session_dependent: bool, /// Early marking as completed. This is set before the output is available and will ignore full /// task completion of the task for strongly consistent reads. pub marked_as_completed: bool, diff --git a/turbopack/crates/turbo-tasks-backend/tests/emptied_cells_session_dependent.rs b/turbopack/crates/turbo-tasks-backend/tests/emptied_cells_session_dependent.rs index 38c02229ef6228..e7ebcab248f791 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/emptied_cells_session_dependent.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/emptied_cells_session_dependent.rs @@ -3,7 +3,7 @@ #![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this use anyhow::Result; -use turbo_tasks::{ResolvedVc, State, Vc, mark_session_dependent}; +use turbo_tasks::{ResolvedVc, State, Vc}; use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); @@ -76,9 +76,8 @@ async fn inner_compute(input: Vc) -> Result> { Ok(last.unwrap()) } -#[turbo_tasks::function] +#[turbo_tasks::function(session_dependent)] async fn compute2(input: Vc) -> Result> { - mark_session_dependent(); println!("compute2()"); let value = *input.await?; Ok(Vc::cell(value)) diff --git a/turbopack/crates/turbo-tasks-env/src/command_line.rs b/turbopack/crates/turbo-tasks-env/src/command_line.rs index 000018c329a440..7bb420c6767300 100644 --- a/turbopack/crates/turbo-tasks-env/src/command_line.rs +++ b/turbopack/crates/turbo-tasks-env/src/command_line.rs @@ -1,5 +1,5 @@ use turbo_rcstr::RcStr; -use turbo_tasks::{FxIndexMap, Vc, mark_session_dependent}; +use turbo_tasks::{FxIndexMap, Vc}; use crate::{GLOBAL_ENV_LOCK, ProcessEnv, TransientEnvMap, sorted_env_vars}; @@ -23,9 +23,8 @@ fn env_snapshot() -> FxIndexMap { #[turbo_tasks::value_impl] impl ProcessEnv for CommandLineProcessEnv { - #[turbo_tasks::function] + #[turbo_tasks::function(session_dependent)] fn read_all(&self) -> Vc { - mark_session_dependent(); Vc::cell(env_snapshot()) } } diff --git a/turbopack/crates/turbo-tasks-fetch/src/client.rs b/turbopack/crates/turbo-tasks-fetch/src/client.rs index 6dafa4099ffe60..bc3925097afc4b 100644 --- a/turbopack/crates/turbo-tasks-fetch/src/client.rs +++ b/turbopack/crates/turbo-tasks-fetch/src/client.rs @@ -3,7 +3,7 @@ use std::{hash::Hash, sync::LazyLock}; use anyhow::Result; use quick_cache::sync::Cache; use turbo_rcstr::RcStr; -use turbo_tasks::{ReadRef, Vc, duration_span, mark_session_dependent}; +use turbo_tasks::{Completion, ReadRef, Vc, duration_span}; use crate::{FetchError, FetchResult, HttpResponse, HttpResponseBody}; @@ -121,7 +121,10 @@ impl FetchClientConfig { Ok(resp) => Ok(Vc::cell(Ok(resp.resolved_cell()))), Err(err) => { // the client failed to construct or the HTTP request failed - mark_session_dependent(); + // Mark session dependent so we get retried in the next sessions + // In dev our caller will keep going, but in prod builds this will fail the build + // anyway. + Completion::session_dependent().as_side_effect().await?; Ok(Vc::cell(Err( FetchError::from_reqwest_error(&err, &url).resolved_cell() ))) diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index 3a6c81c59ddf68..b6ec734ea4675c 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -66,8 +66,8 @@ use turbo_rcstr::{RcStr, rcstr}; use turbo_tasks::{ ApplyEffectsContext, Completion, Effect, InvalidationReason, Invalidator, NonLocalValue, ReadRef, ResolvedVc, TaskInput, TurboTasksApi, ValueToString, ValueToStringRef, Vc, - debug::ValueDebugFormat, emit_effect, mark_session_dependent, parallel, trace::TraceRawVcs, - turbo_tasks_weak, turbobail, turbofmt, + debug::ValueDebugFormat, emit_effect, parallel, trace::TraceRawVcs, turbo_tasks_weak, + turbobail, turbofmt, }; use turbo_tasks_hash::{ DeterministicHash, DeterministicHasher, HashAlgorithm, deterministic_hash, hash_xxh3_hash64, @@ -752,10 +752,8 @@ impl Debug for DiskFileSystem { #[turbo_tasks::value_impl] impl FileSystem for DiskFileSystem { - #[turbo_tasks::function(fs)] + #[turbo_tasks::function(fs, session_dependent)] async fn read(&self, fs_path: FileSystemPath) -> Result> { - mark_session_dependent(); - // Check if path is denied - if so, treat as NotFound if self.inner.is_path_denied(&fs_path) { return Ok(FileContent::NotFound.cell()); @@ -780,10 +778,8 @@ impl FileSystem for DiskFileSystem { Ok(content.cell()) } - #[turbo_tasks::function(fs)] + #[turbo_tasks::function(fs, session_dependent)] async fn raw_read_dir(&self, fs_path: FileSystemPath) -> Result> { - mark_session_dependent(); - // Check if directory itself is denied - if so, treat as NotFound if self.inner.is_path_denied(&fs_path) { return Ok(RawDirectoryContent::not_found()); @@ -870,10 +866,8 @@ impl FileSystem for DiskFileSystem { Ok(RawDirectoryContent::new(entries)) } - #[turbo_tasks::function(fs)] + #[turbo_tasks::function(fs, session_dependent)] async fn read_link(&self, fs_path: FileSystemPath) -> Result> { - mark_session_dependent(); - // Check if path is denied - if so, treat as NotFound if self.inner.is_path_denied(&fs_path) { return Ok(LinkContent::NotFound.cell()); @@ -961,9 +955,9 @@ impl FileSystem for DiskFileSystem { #[turbo_tasks::function(fs)] async fn write(&self, fs_path: FileSystemPath, content: Vc) -> Result<()> { - // You might be tempted to use `mark_session_dependent` here, but - // `write` purely declares a side effect and does not need to be reexecuted in the next - // session. All side effects are reexecuted in general. + // You might be tempted to use `session_dependent` here, but `write` purely declares a side + // effect and does not need to be reexecuted in the next session. All side effects are + // reexecuted in general. // Check if path is denied - if so, return an error if self.inner.is_path_denied(&fs_path) { @@ -1114,7 +1108,7 @@ impl FileSystem for DiskFileSystem { #[turbo_tasks::function(fs)] async fn write_link(&self, fs_path: FileSystemPath, target: Vc) -> Result<()> { - // You might be tempted to use `mark_session_dependent` here, but we purely declare a side + // You might be tempted to use `session_dependent` here, but we purely declare a side // effect and does not need to be re-executed in the next session. All side effects are // re-executed in general. @@ -1347,9 +1341,8 @@ impl FileSystem for DiskFileSystem { Ok(()) } - #[turbo_tasks::function(fs)] + #[turbo_tasks::function(fs, session_dependent)] async fn metadata(&self, fs_path: FileSystemPath) -> Result> { - mark_session_dependent(); let full_path = self.to_sys_path(&fs_path); // Check if path is denied - if so, return an error (metadata shouldn't be readable) diff --git a/turbopack/crates/turbo-tasks-macros-tests/tests/function/fail_attribute_invalid_args.stderr b/turbopack/crates/turbo-tasks-macros-tests/tests/function/fail_attribute_invalid_args.stderr index c7c745e37ef60f..63bf140473f312 100644 --- a/turbopack/crates/turbo-tasks-macros-tests/tests/function/fail_attribute_invalid_args.stderr +++ b/turbopack/crates/turbo-tasks-macros-tests/tests/function/fail_attribute_invalid_args.stderr @@ -1,4 +1,4 @@ -error: unexpected token, expected one of: "fs", "network", "operation", or "root" +error: unexpected token, expected one of: "fs", "network", "operation", "root", or "session_dependent" --> tests/function/fail_attribute_invalid_args.rs:10:25 | 10 | #[turbo_tasks::function(invalid_argument)] diff --git a/turbopack/crates/turbo-tasks-macros-tests/tests/function/fail_attribute_invalid_args_inherent_impl.stderr b/turbopack/crates/turbo-tasks-macros-tests/tests/function/fail_attribute_invalid_args_inherent_impl.stderr index b8657215643293..835ae956f4b4b6 100644 --- a/turbopack/crates/turbo-tasks-macros-tests/tests/function/fail_attribute_invalid_args_inherent_impl.stderr +++ b/turbopack/crates/turbo-tasks-macros-tests/tests/function/fail_attribute_invalid_args_inherent_impl.stderr @@ -1,4 +1,4 @@ -error: unexpected token, expected one of: "fs", "network", "operation", or "root" +error: unexpected token, expected one of: "fs", "network", "operation", "root", or "session_dependent" --> tests/function/fail_attribute_invalid_args_inherent_impl.rs:15:29 | 15 | #[turbo_tasks::function(invalid_argument)] diff --git a/turbopack/crates/turbo-tasks-macros/src/func.rs b/turbopack/crates/turbo-tasks-macros/src/func.rs index 0a8e687f5e9daf..cc1e55d7d446ce 100644 --- a/turbopack/crates/turbo-tasks-macros/src/func.rs +++ b/turbopack/crates/turbo-tasks-macros/src/func.rs @@ -728,6 +728,10 @@ pub struct FunctionArguments { /// Should the task be marked as a root in the aggregation graph on initial creation? /// Root tasks start with aggregation number `u32::MAX`. pub root: Option, + /// Should the task be marked as session dependent? Session dependent tasks are re-executed + /// when restored from persistent cache because they depend on external state (filesystem, + /// environment, network) that may change between sessions. + pub session_dependent: Option, } impl Parse for FunctionArguments { @@ -755,11 +759,14 @@ impl Parse for FunctionArguments { ("root", Meta::Path(_)) => { parsed_args.root = Some(meta.span()); } + ("session_dependent", Meta::Path(_)) => { + parsed_args.session_dependent = Some(meta.span()); + } (_, meta) => { return Err(syn::Error::new_spanned( meta, "unexpected token, expected one of: \"fs\", \"network\", \"operation\", \ - or \"root\"", + \"root\", or \"session_dependent\"", )); } } @@ -1086,6 +1093,7 @@ pub struct NativeFn { pub is_self_used: bool, pub filter_trait_call_args: Option, pub is_root: bool, + pub is_session_dependent: bool, } impl NativeFn { @@ -1102,6 +1110,7 @@ impl NativeFn { is_self_used, filter_trait_call_args, is_root, + is_session_dependent, } = self; let task_fn = if *is_method && *is_self_used { @@ -1137,6 +1146,7 @@ impl NativeFn { #arg_meta, &#task_fn, #is_root, + #is_session_dependent, ) } } diff --git a/turbopack/crates/turbo-tasks-macros/src/function_macro.rs b/turbopack/crates/turbo-tasks-macros/src/function_macro.rs index fe0c2e87158c53..255534ee654680 100644 --- a/turbopack/crates/turbo-tasks-macros/src/function_macro.rs +++ b/turbopack/crates/turbo-tasks-macros/src/function_macro.rs @@ -43,6 +43,7 @@ pub fn function(args: TokenStream, input: TokenStream) -> TokenStream { .unwrap_or_default(); let is_self_used = args.operation.is_some() || is_self_used(&block); let is_root = args.root.is_some(); + let is_session_dependent = args.session_dependent.is_some(); let Some(turbo_fn) = TurboFn::new(&sig, DefinitionContext::NakedFn, args, is_self_used) else { return quote! { @@ -66,6 +67,7 @@ pub fn function(args: TokenStream, input: TokenStream) -> TokenStream { is_self_used, filter_trait_call_args: None, // not a trait method is_root, + is_session_dependent, }; let native_function_ident = get_native_function_ident(ident); let native_function_ty = native_fn.ty(); diff --git a/turbopack/crates/turbo-tasks-macros/src/value_impl_macro.rs b/turbopack/crates/turbo-tasks-macros/src/value_impl_macro.rs index fe3d6b58f16c82..c73394c60cac9b 100644 --- a/turbopack/crates/turbo-tasks-macros/src/value_impl_macro.rs +++ b/turbopack/crates/turbo-tasks-macros/src/value_impl_macro.rs @@ -101,6 +101,8 @@ pub fn value_impl(args: TokenStream, input: TokenStream) -> TokenStream { } }; let is_self_used = func_args.operation.is_some() || is_self_used(block); + let is_root = func_args.root.is_some(); + let is_session_dependent = func_args.session_dependent.is_some(); let Some(turbo_fn) = TurboFn::new( sig, @@ -123,7 +125,8 @@ pub fn value_impl(args: TokenStream, input: TokenStream) -> TokenStream { is_method: turbo_fn.is_method(), is_self_used, filter_trait_call_args: None, // not a trait method - is_root: false, + is_root, + is_session_dependent, }; let native_function_ident = get_inherent_impl_function_ident(ty_ident, ident); @@ -208,6 +211,8 @@ pub fn value_impl(args: TokenStream, input: TokenStream) -> TokenStream { }; // operations are not currently compatible with methods let is_self_used = func_args.operation.is_some() || is_self_used(block); + let is_root = func_args.root.is_some(); + let is_session_dependent = func_args.session_dependent.is_some(); let Some(turbo_fn) = TurboFn::new( sig, @@ -240,7 +245,8 @@ pub fn value_impl(args: TokenStream, input: TokenStream) -> TokenStream { is_method: turbo_fn.is_method(), is_self_used, filter_trait_call_args: turbo_fn.filter_trait_call_args(), - is_root: false, + is_root, + is_session_dependent, }; let native_function_ident = diff --git a/turbopack/crates/turbo-tasks-macros/src/value_trait_macro.rs b/turbopack/crates/turbo-tasks-macros/src/value_trait_macro.rs index dd83dc8cac7e35..28e12877511bba 100644 --- a/turbopack/crates/turbo-tasks-macros/src/value_trait_macro.rs +++ b/turbopack/crates/turbo-tasks-macros/src/value_trait_macro.rs @@ -214,6 +214,7 @@ pub fn value_trait(args: TokenStream, input: TokenStream) -> TokenStream { is_self_used, filter_trait_call_args: turbo_fn.filter_trait_call_args(), is_root: false, + is_session_dependent: false, }; let native_function_ident = get_trait_default_impl_function_ident(trait_ident, ident); diff --git a/turbopack/crates/turbo-tasks-testing/src/lib.rs b/turbopack/crates/turbo-tasks-testing/src/lib.rs index 11fa2c9aec8b51..931a04f3d951e9 100644 --- a/turbopack/crates/turbo-tasks-testing/src/lib.rs +++ b/turbopack/crates/turbo-tasks-testing/src/lib.rs @@ -294,10 +294,6 @@ impl TurboTasksApi for VcStorage { // no-op } - fn mark_own_task_as_session_dependent(&self, _task: TaskId) { - // no-op - } - fn spawn_detached_for_testing( &self, _f: std::pin::Pin + Send + 'static>>, diff --git a/turbopack/crates/turbo-tasks/function.md b/turbopack/crates/turbo-tasks/function.md index bff5dd9146a653..c64f019191618f 100644 --- a/turbopack/crates/turbo-tasks/function.md +++ b/turbopack/crates/turbo-tasks/function.md @@ -71,6 +71,78 @@ fn foo( ) -> Vc; // was: impl Future>> ``` +## Attributes + +The `#[turbo_tasks::function]` macro accepts optional attributes that modify the behavior of the +task. Multiple attributes can be combined by separating them with commas. + +```rust +#[turbo_tasks::function(fs, session_dependent)] +async fn read_file(path: RcStr) -> Result> { + // ... +} +``` + +### `operation` + +Marks the task as an **operation**. The external signature will return an [`OperationVc`] instead +of a [`Vc`], and all arguments must implement `OperationValue`. Operation tasks serve as explicit +entry points into the task graph and can be used to connect non-reactive code to the reactive +computation graph. Mutually exclusive with `&self` receivers. + +### `root` + +Marks the task as a **root** in the aggregation graph. Root tasks start with the maximum aggregation +number (`u32::MAX`), which places them at the top of the aggregation tree. This is used for tasks +that represent top-level entry points into the computation. + +### `fs` + +An **I/O marker** indicating the task directly performs filesystem operations. This should only be +applied to the task that directly performs the I/O, not tasks that transitively call it. + +### `network` + +An **I/O marker** indicating the task directly performs network operations. Like `fs`, this should +only be applied to the task that directly performs the I/O. + +### `session_dependent` + +Marks the task as **session dependent**. Session-dependent tasks are re-executed when restored from +persistent cache because they depend on external state (filesystem, environment, network) that may +have changed between sessions. + +When a session-dependent task completes, it is not marked as fully clean — it retains a special +"session dependent" dirty state. If the task is later restored from persistent cache in a new +session, this state causes it to be re-executed rather than reusing the cached result. + +Typical use cases: + +- **Filesystem reads** — file contents may have changed on disk between sessions. +- **Environment variable reads** — process environment may differ between sessions. +- **Network requests** — remote resources may return different results. + +```rust +#[turbo_tasks::function(fs, session_dependent)] +async fn read(&self, path: FileSystemPath) -> Result> { + // File contents may have changed since the last session, + // so this task is always re-executed on cache restore. + // ... +} + +#[turbo_tasks::function(session_dependent)] +fn read_all_env(&self) -> Vc { + // Environment variables may differ between sessions. + Vc::cell(self.vars.clone()) +} +``` + +Note: `session_dependent` should be applied to the **leaf task** that directly reads external state. +Tasks that transitively depend on a session-dependent task do not need this attribute — they will +naturally re-execute when the session-dependent task they depend on produces a new result. + +[`OperationVc`]: crate::OperationVc + ## Methods and Self Tasks can be methods associated with a value or a trait implementation using the [`arbitrary_self_types` nightly compiler feature][self-types]. diff --git a/turbopack/crates/turbo-tasks/src/backend.rs b/turbopack/crates/turbo-tasks/src/backend.rs index 97c9f5be63b239..4b20c369d38cfe 100644 --- a/turbopack/crates/turbo-tasks/src/backend.rs +++ b/turbopack/crates/turbo-tasks/src/backend.rs @@ -624,14 +624,6 @@ pub trait Backend: Sync + Send { // Do nothing by default } - fn mark_own_task_as_session_dependent( - &self, - _task: TaskId, - _turbo_tasks: &dyn TurboTasksBackendApi, - ) { - // Do nothing by default - } - fn create_transient_task( &self, task_type: TransientTaskType, diff --git a/turbopack/crates/turbo-tasks/src/completion.rs b/turbopack/crates/turbo-tasks/src/completion.rs index 821630af22f8fb..f6adc63b17459d 100644 --- a/turbopack/crates/turbo-tasks/src/completion.rs +++ b/turbopack/crates/turbo-tasks/src/completion.rs @@ -21,6 +21,14 @@ impl Completion { pub fn immutable() -> Vc { Completion::cell(Completion) } + + /// Returns a completion from a session-dependent task. Awaiting this creates a dependency on + /// a session-dependent task, which will cause the calling task to be re-executed when + /// restored from persistent cache. + #[turbo_tasks::function(session_dependent)] + pub fn session_dependent() -> Vc { + Completion::cell(Completion) + } } // no #[turbo_tasks::value_impl] to inline new into the caller task diff --git a/turbopack/crates/turbo-tasks/src/invalidation.rs b/turbopack/crates/turbo-tasks/src/invalidation.rs index 1b966f414bd484..e4f8d7c88d701b 100644 --- a/turbopack/crates/turbo-tasks/src/invalidation.rs +++ b/turbopack/crates/turbo-tasks/src/invalidation.rs @@ -27,7 +27,7 @@ pub fn get_invalidator() -> Option { /// A lightweight handle to invalidate a task. Only stores the task ID. /// The caller must provide the `TurboTasksApi` when calling invalidation methods. -#[derive(Clone, Copy, Hash, PartialEq, Eq, Encode, Decode)] +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, Encode, Decode)] pub struct Invalidator { task: TaskId, } @@ -55,8 +55,10 @@ impl TraceRawVcs for Invalidator { } } -unsafe impl NonLocalValue for Invalidator {} unsafe impl OperationValue for Invalidator {} +// Safety: Invalidator only contains a TaskId (a NonZero wrapper) and does not contain any +// local Vc references. +unsafe impl NonLocalValue for Invalidator {} /// A user-facing reason why a task was invalidated. This should only be used /// for invalidation that were triggered by the user. diff --git a/turbopack/crates/turbo-tasks/src/lib.rs b/turbopack/crates/turbo-tasks/src/lib.rs index baf7a57dee7a8f..77cd2dcc7881d3 100644 --- a/turbopack/crates/turbo-tasks/src/lib.rs +++ b/turbopack/crates/turbo-tasks/src/lib.rs @@ -93,8 +93,8 @@ pub use crate::{ CurrentCellRef, ReadCellTracking, ReadConsistency, ReadTracking, TaskPersistence, TaskPriority, TurboTasks, TurboTasksApi, TurboTasksBackendApi, TurboTasksCallApi, Unused, UpdateInfo, dynamic_call, emit, get_serialization_invalidator, mark_finished, - mark_session_dependent, mark_stateful, mark_top_level_task, prevent_gc, run, run_once, - run_once_with_reason, trait_call, turbo_tasks, turbo_tasks_scope, turbo_tasks_weak, + mark_stateful, mark_top_level_task, prevent_gc, run, run_once, run_once_with_reason, + trait_call, turbo_tasks, turbo_tasks_scope, turbo_tasks_weak, unmark_top_level_task_may_leak_eventually_consistent_state, with_turbo_tasks, }, mapped_read_ref::MappedReadRef, @@ -104,7 +104,7 @@ pub use crate::{ read_ref::ReadRef, serialization_invalidation::SerializationInvalidator, spawn::{JoinHandle, block_for_future, block_in_place, spawn, spawn_blocking, spawn_thread}, - state::{State, TransientState}, + state::State, task::{ SharedReference, TypedSharedReference, task_input::{EitherTaskInput, TaskInput}, diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index 76fea4ae929577..f2369115733cb7 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -177,7 +177,6 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send { verification_mode: VerificationMode, ); fn mark_own_task_as_finished(&self, task: TaskId); - fn mark_own_task_as_session_dependent(&self, task: TaskId); fn connect_task(&self, task: TaskId); @@ -1592,10 +1591,6 @@ impl TurboTasksApi for TurboTasks { self.backend.mark_own_task_as_finished(task, self); } - fn mark_own_task_as_session_dependent(&self, task: TaskId) { - self.backend.mark_own_task_as_session_dependent(task, self); - } - /// Creates a future that inherits the current task id and task state. The current global task /// will wait for this future to be dropped before exiting. fn spawn_detached_for_testing(&self, fut: Pin + Send + 'static>>) { @@ -1891,13 +1886,6 @@ pub fn current_task_for_testing() -> Option { CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id) } -/// Marks the current task as dirty when restored from filesystem cache. -pub fn mark_session_dependent() { - with_turbo_tasks(|tt| { - tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()")) - }); -} - /// Marks the current task as finished. This excludes it from waiting for /// strongly consistency. pub fn mark_finished() { @@ -1943,7 +1931,7 @@ pub fn mark_invalidator() { } /// Marks the current task as stateful. This is used to indicate that the task -/// has interior mutability (e.g., via State or TransientState), which means +/// has interior mutability (e.g., via State), which means /// the task may produce different outputs even with the same inputs. /// /// Only has an effect when the `verify_determinism` feature is enabled. diff --git a/turbopack/crates/turbo-tasks/src/marker_trait.rs b/turbopack/crates/turbo-tasks/src/marker_trait.rs index 8cd89b15381d12..7e103b3b4c7880 100644 --- a/turbopack/crates/turbo-tasks/src/marker_trait.rs +++ b/turbopack/crates/turbo-tasks/src/marker_trait.rs @@ -82,7 +82,6 @@ macro_rules! impl_auto_marker_trait { <::Read as $crate::VcRead>::Target: $trait {} unsafe impl $trait for $crate::State {} - unsafe impl $trait for $crate::TransientState {} unsafe impl $trait for $crate::TransientValue {} unsafe impl $trait for $crate::TransientInstance {} unsafe impl $trait for $crate::event::Event {} diff --git a/turbopack/crates/turbo-tasks/src/native_function.rs b/turbopack/crates/turbo-tasks/src/native_function.rs index 4cbc1c8f3daab4..b4d1ed1f8d4671 100644 --- a/turbopack/crates/turbo-tasks/src/native_function.rs +++ b/turbopack/crates/turbo-tasks/src/native_function.rs @@ -193,6 +193,11 @@ pub struct NativeFunction { /// Whether this function's tasks should be treated as root nodes in the aggregation graph. /// Root tasks start with aggregation number `u32::MAX` on initial creation. pub is_root: bool, + + /// Whether this function's tasks are session dependent. Session dependent tasks are + /// re-executed when restored from persistent cache because they depend on external state + /// (filesystem, environment, network) that may change between sessions. + pub is_session_dependent: bool, } impl Debug for NativeFunction { @@ -219,6 +224,7 @@ impl NativeFunction { implementation: &into_task_fn(default_fn) as &dyn TaskFn, ty: RegistryType::new::<()>("", ""), is_root: false, + is_session_dependent: false, }; pub const fn new( @@ -227,12 +233,14 @@ impl NativeFunction { arg_meta: ArgMeta, implementation: &'static T, is_root: bool, + is_session_dependent: bool, ) -> Self { Self { ty: RegistryType::new::(name, global_name), arg_meta, implementation, is_root, + is_session_dependent, } } diff --git a/turbopack/crates/turbo-tasks/src/state.rs b/turbopack/crates/turbo-tasks/src/state.rs index cc7cfe892affed..6d42c4e2582587 100644 --- a/turbopack/crates/turbo-tasks/src/state.rs +++ b/turbopack/crates/turbo-tasks/src/state.rs @@ -12,8 +12,7 @@ use tracing::trace_span; use crate::{ Invalidator, OperationValue, SerializationInvalidator, get_invalidator, - get_serialization_invalidator, manager::with_turbo_tasks, mark_session_dependent, - mark_stateful, trace::TraceRawVcs, + get_serialization_invalidator, manager::with_turbo_tasks, trace::TraceRawVcs, }; #[derive(Encode, Decode)] @@ -291,109 +290,3 @@ impl State { self.serialization_invalidator.invalidate(); } } - -#[derive(Encode, Decode)] -#[bincode(bounds = "")] -pub struct TransientState { - #[bincode(skip, default = "default_transient_state_inner")] - inner: Mutex>>, -} - -fn default_transient_state_inner() -> Mutex>> { - Mutex::new(StateInner::new(None)) -} - -impl Debug for TransientState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("TransientState") - .field("value", &self.inner.lock().value) - .finish() - } -} - -impl TraceRawVcs for TransientState { - fn trace_raw_vcs(&self, trace_context: &mut crate::trace::TraceRawVcsContext) { - self.inner.lock().value.trace_raw_vcs(trace_context); - } -} - -impl Default for TransientState { - fn default() -> Self { - // Need to be explicit to ensure marking as stateful. - Self::new() - } -} - -impl PartialEq for TransientState { - fn eq(&self, _other: &Self) -> bool { - false - } -} -impl Eq for TransientState {} - -impl TransientState { - pub fn new() -> Self { - mark_stateful(); - Self { - inner: Mutex::new(StateInner::new(None)), - } - } - - /// Gets the current value of the state. The current task will be registered - /// as dependency of the state and will be invalidated when the state - /// changes. - pub fn get(&self) -> StateRef<'_, Option> { - mark_session_dependent(); - let invalidator = get_invalidator(); - let mut inner = self.inner.lock(); - if let Some(invalidator) = invalidator { - inner.add_invalidator(invalidator); - } - StateRef { - serialization_invalidator: None, - inner, - mutated: false, - } - } - - /// Gets the current value of the state. Untracked. - pub fn get_untracked(&self) -> StateRef<'_, Option> { - let inner = self.inner.lock(); - StateRef { - serialization_invalidator: None, - inner, - mutated: false, - } - } - - /// Sets the current state without comparing it with the old value. This - /// should only be used if one is sure that the value has changed. - pub fn set_unconditionally(&self, value: T) { - let mut inner = self.inner.lock(); - inner.set_unconditionally(Some(value)); - } - - /// Updates the current state with the `update` function. The `update` - /// function need to return `true` when the value was modified. Exposing - /// the current value from the `update` function is not allowed and will - /// result in incorrect cache invalidation. - pub fn update_conditionally(&self, update: impl FnOnce(&mut Option) -> bool) { - let mut inner = self.inner.lock(); - inner.update_conditionally(update); - } -} - -impl TransientState { - /// Update the current state when the `value` is different from the current - /// value. `T` must implement [PartialEq] for this to work. - pub fn set(&self, value: T) { - let mut inner = self.inner.lock(); - inner.set(Some(value)); - } - - /// Unset the current value. - pub fn unset(&self) { - let mut inner = self.inner.lock(); - inner.set(None); - } -} diff --git a/turbopack/crates/turbopack-ecmascript/src/lib.rs b/turbopack/crates/turbopack-ecmascript/src/lib.rs index 2133af15819954..428758a9f2cabc 100644 --- a/turbopack/crates/turbopack-ecmascript/src/lib.rs +++ b/turbopack/crates/turbopack-ecmascript/src/lib.rs @@ -364,6 +364,35 @@ impl EcmascriptModuleAssetBuilder { } } +/// A transient cache that stores a value across task re-executions within a session but is lost +/// when restored from persistent cache. +struct TransientCache(parking_lot::Mutex>); + +impl Default for TransientCache { + fn default() -> Self { + Self(parking_lot::Mutex::new(None)) + } +} + +// All caches are alwaqys eq, this doesn't really make sense on its own but fits the purpose of +// embedding in EcmascriptModuleAsset +impl PartialEq for TransientCache { + fn eq(&self, _other: &Self) -> bool { + true + } +} +impl Eq for TransientCache {} + +impl TransientCache { + fn get(&self) -> parking_lot::MutexGuard<'_, Option> { + self.0.lock() + } + + fn set(&self, value: T) { + *self.0.lock() = Some(value); + } +} + #[turbo_tasks::value] pub struct EcmascriptModuleAsset { pub source: ResolvedVc>, @@ -374,8 +403,13 @@ pub struct EcmascriptModuleAsset { pub compile_time_info: ResolvedVc, pub side_effect_free_packages: Option>, pub inner_assets: Option>, - #[turbo_tasks(debug_ignore)] - last_successful_parse: turbo_tasks::TransientState>, + /// A transient cache of successful parse results + /// Used when EcmascriptOptions::keep_last_successful_parse is enabled (only in dev) + /// This ensures that parse errors don't invalidate large portions of the task graph, so we + /// still report the issue but serve the previous AST + #[turbo_tasks(debug_ignore, trace_ignore)] + #[bincode(skip, default = "Default::default")] + last_successful_parse: TransientCache>, } impl core::fmt::Debug for EcmascriptModuleAsset { fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { @@ -507,12 +541,11 @@ impl EcmascriptParsable for EcmascriptModuleAsset { if self.options.await?.keep_last_successful_parse { let real_result_value = real_result.await?; let result_value = if matches!(*real_result_value, ParseResult::Ok { .. }) { - self.last_successful_parse - .set_unconditionally(real_result_value.clone()); + self.last_successful_parse.set(real_result_value.clone()); real_result_value } else { - let state_ref = self.last_successful_parse.get(); - state_ref.as_ref().unwrap_or(&real_result_value).clone() + let guard = self.last_successful_parse.get(); + guard.as_ref().unwrap_or(&real_result_value).clone() }; Ok(ReadRef::cell(result_value)) } else {