diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 9dde575d7221a2..34a8024821f4c8 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -363,37 +363,6 @@ impl TurboTasksBackendInner { self.options.dependency_tracking } - /// Sets the initial aggregation number for a newly created task. Root tasks get `u32::MAX` - /// to stay at the top. Session-dependent tasks get a high (but not max) aggregation number - /// because they change on every session restore, behaving like dirty leaf nodes — keeping - /// them near the leaves prevents long dirty-propagation chains through intermediate - /// aggregated nodes. - fn set_initial_aggregation_number( - &self, - task_id: TaskId, - is_root: bool, - is_session_dependent: bool, - ctx: &mut impl ExecuteContext<'_>, - ) { - let base_aggregation_number = if is_root { - u32::MAX - } else if is_session_dependent && self.should_track_dependencies() { - const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2; - SESSION_DEPENDENT_AGGREGATION_NUMBER - } else { - return; - }; - - AggregationUpdateQueue::run( - AggregationUpdateJob::UpdateAggregationNumber { - task_id, - base_aggregation_number, - distance: None, - }, - ctx, - ); - } - fn should_track_activeness(&self) -> bool { self.options.active_tracking } @@ -501,6 +470,21 @@ struct TaskExecutionCompletePrepareResult { // Operations impl TurboTasksBackendInner { + fn connect_child( + &self, + parent_task: Option, + child_task: TaskId, + task_type: Option>, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) { + operation::ConnectChildOperation::run( + parent_task, + child_task, + task_type, + self.execute_context(turbo_tasks), + ); + } + fn try_read_task_output( self: &Arc, task_id: TaskId, @@ -1529,23 +1513,24 @@ impl TurboTasksBackendInner { turbo_tasks: &dyn TurboTasksBackendApi>, ) -> TaskId { let is_root = task_type.native_fn.is_root; - let is_session_dependent = task_type.native_fn.is_session_dependent; - // Create a single ExecuteContext for both lookup and connect_child - let mut ctx = self.execute_context(turbo_tasks); + // First check if the task exists in the cache which only uses a read lock // .map(|r| *r) copies the TaskId and drops the DashMap Ref (releasing the read lock) // before ConnectChildOperation::run, which may re-enter task_cache with a write lock. if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) { self.track_cache_hit(&task_type); - operation::ConnectChildOperation::run( + self.connect_child( parent_task, task_id, Some(ArcOrOwned::Owned(task_type)), - ctx, + turbo_tasks, ); return task_id; } + // Create a single ExecuteContext for both lookup and connect_child + let mut ctx = self.execute_context(turbo_tasks); + let mut is_new = false; let (task_id, task_type) = if let Some(task_id) = ctx.task_by_type(&task_type) { // Task exists in backing storage @@ -1588,8 +1573,15 @@ impl TurboTasksBackendInner { }; (task_id, task_type) }; - if is_new { - self.set_initial_aggregation_number(task_id, is_root, is_session_dependent, &mut ctx); + if is_new && is_root { + AggregationUpdateQueue::run( + AggregationUpdateJob::UpdateAggregationNumber { + task_id, + base_aggregation_number: u32::MAX, + distance: None, + }, + &mut ctx, + ); } // Reuse the same ExecuteContext for connect_child operation::ConnectChildOperation::run(parent_task, task_id, Some(task_type), ctx); @@ -1604,7 +1596,6 @@ impl TurboTasksBackendInner { turbo_tasks: &dyn TurboTasksBackendApi>, ) -> TaskId { let is_root = task_type.native_fn.is_root; - let is_session_dependent = task_type.native_fn.is_session_dependent; if let Some(parent_task) = parent_task && !parent_task.is_transient() @@ -1615,17 +1606,16 @@ impl TurboTasksBackendInner { /* cell_id */ None, ); } - let mut ctx = self.execute_context(turbo_tasks); // First check if the task exists in the cache which only uses a read lock. // .map(|r| *r) copies the TaskId and drops the DashMap Ref (releasing the read lock) // before ConnectChildOperation::run, which may re-enter task_cache with a write lock. if let Some(task_id) = self.task_cache.get(&task_type).map(|r| *r) { self.track_cache_hit(&task_type); - operation::ConnectChildOperation::run( + self.connect_child( parent_task, task_id, Some(ArcOrOwned::Owned(task_type)), - ctx, + turbo_tasks, ); return task_id; } @@ -1635,11 +1625,11 @@ impl TurboTasksBackendInner { let task_id = *e.get(); drop(e); self.track_cache_hit(&task_type); - operation::ConnectChildOperation::run( + self.connect_child( parent_task, task_id, Some(ArcOrOwned::Owned(task_type)), - ctx, + turbo_tasks, ); task_id } @@ -1649,18 +1639,23 @@ impl TurboTasksBackendInner { e.insert(task_type.clone(), task_id); self.track_cache_miss(&task_type); - self.set_initial_aggregation_number( - task_id, - is_root, - is_session_dependent, - &mut ctx, - ); + if is_root { + let mut ctx = self.execute_context(turbo_tasks); + AggregationUpdateQueue::run( + AggregationUpdateJob::UpdateAggregationNumber { + task_id, + base_aggregation_number: u32::MAX, + distance: None, + }, + &mut ctx, + ); + } - operation::ConnectChildOperation::run( + self.connect_child( parent_task, task_id, Some(ArcOrOwned::Arc(task_type)), - ctx, + turbo_tasks, ); task_id @@ -1932,6 +1927,7 @@ impl TurboTasksBackendInner { stale: false, once_task, done_event, + session_dependent: false, marked_as_completed: false, new_children: Default::default(), }, @@ -2160,6 +2156,7 @@ impl TurboTasksBackendInner { let &mut InProgressState::InProgress(box InProgressStateInner { stale, ref mut new_children, + session_dependent, once_task: is_once_task, .. }) = in_progress @@ -2255,7 +2252,7 @@ impl TurboTasksBackendInner { // Task was previously marked as immutable if !is_immutable // Task is not session dependent (session dependent tasks can change between sessions) - && !task.is_session_dependent() + && !session_dependent // Task has no invalidator && !task.invalidator() // Task has no dependencies on collectibles @@ -2605,6 +2602,7 @@ impl TurboTasksBackendInner { done_event, once_task: is_once_task, stale, + session_dependent, marked_as_completed: _, new_children, }) = in_progress @@ -2643,8 +2641,7 @@ impl TurboTasksBackendInner { } } - // Compute and apply the new dirty state, propagating to aggregating ancestors - let session_dependent = task.is_session_dependent(); + // Compute the new dirty state let new_dirtyness = if session_dependent { Some(Dirtyness::SessionDependent) } else { @@ -3039,6 +3036,42 @@ impl TurboTasksBackendInner { ); } + fn mark_own_task_as_session_dependent( + &self, + task_id: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) { + if !self.should_track_dependencies() { + // Without dependency tracking we don't need session dependent tasks + return; + } + const SESSION_DEPENDENT_AGGREGATION_NUMBER: u32 = u32::MAX >> 2; + let mut ctx = self.execute_context(turbo_tasks); + let mut task = ctx.task(task_id, TaskDataCategory::Meta); + let aggregation_number = get_aggregation_number(&task); + if aggregation_number < SESSION_DEPENDENT_AGGREGATION_NUMBER { + drop(task); + // We want to use a high aggregation number to avoid large aggregation chains for + // session dependent tasks (which change on every run) + AggregationUpdateQueue::run( + AggregationUpdateJob::UpdateAggregationNumber { + task_id, + base_aggregation_number: SESSION_DEPENDENT_AGGREGATION_NUMBER, + distance: None, + }, + &mut ctx, + ); + task = ctx.task(task_id, TaskDataCategory::Meta); + } + if let Some(InProgressState::InProgress(box InProgressStateInner { + session_dependent, + .. + })) = task.get_in_progress_mut() + { + *session_dependent = true; + } + } + fn mark_own_task_as_finished( &self, task: TaskId, @@ -3573,6 +3606,14 @@ impl Backend for TurboTasksBackend { self.0.mark_own_task_as_finished(task_id, turbo_tasks); } + fn mark_own_task_as_session_dependent( + &self, + task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { + self.0.mark_own_task_as_session_dependent(task, turbo_tasks); + } + fn connect_task( &self, task: TaskId, diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 47cb900b237055..e06baba14cb68e 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -959,9 +959,6 @@ pub trait TaskGuard: Debug + TaskStorageAccessors { panic!("Every task must have a task type {self:?}"); } } - fn is_session_dependent(&self) -> bool { - matches!(self.get_task_type(), TaskTypeRef::Cached(tt) if tt.native_fn.is_session_dependent) - } fn get_task_desc_fn(&self) -> impl Fn() -> String + Send + Sync + 'static { let task_type = self.get_task_type().to_owned(); let task_id = self.id(); diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index 0332e0e559d520..b50ffdb97edc5a 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -238,6 +238,7 @@ pub struct InProgressStateInner { pub stale: bool, #[allow(dead_code)] pub once_task: bool, + pub session_dependent: bool, /// Early marking as completed. This is set before the output is available and will ignore full /// task completion of the task for strongly consistent reads. pub marked_as_completed: bool, diff --git a/turbopack/crates/turbo-tasks-backend/tests/emptied_cells_session_dependent.rs b/turbopack/crates/turbo-tasks-backend/tests/emptied_cells_session_dependent.rs index e7ebcab248f791..38c02229ef6228 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/emptied_cells_session_dependent.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/emptied_cells_session_dependent.rs @@ -3,7 +3,7 @@ #![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this use anyhow::Result; -use turbo_tasks::{ResolvedVc, State, Vc}; +use turbo_tasks::{ResolvedVc, State, Vc, mark_session_dependent}; use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); @@ -76,8 +76,9 @@ async fn inner_compute(input: Vc) -> Result> { Ok(last.unwrap()) } -#[turbo_tasks::function(session_dependent)] +#[turbo_tasks::function] async fn compute2(input: Vc) -> Result> { + mark_session_dependent(); println!("compute2()"); let value = *input.await?; Ok(Vc::cell(value)) diff --git a/turbopack/crates/turbo-tasks-env/src/command_line.rs b/turbopack/crates/turbo-tasks-env/src/command_line.rs index 7bb420c6767300..000018c329a440 100644 --- a/turbopack/crates/turbo-tasks-env/src/command_line.rs +++ b/turbopack/crates/turbo-tasks-env/src/command_line.rs @@ -1,5 +1,5 @@ use turbo_rcstr::RcStr; -use turbo_tasks::{FxIndexMap, Vc}; +use turbo_tasks::{FxIndexMap, Vc, mark_session_dependent}; use crate::{GLOBAL_ENV_LOCK, ProcessEnv, TransientEnvMap, sorted_env_vars}; @@ -23,8 +23,9 @@ fn env_snapshot() -> FxIndexMap { #[turbo_tasks::value_impl] impl ProcessEnv for CommandLineProcessEnv { - #[turbo_tasks::function(session_dependent)] + #[turbo_tasks::function] fn read_all(&self) -> Vc { + mark_session_dependent(); Vc::cell(env_snapshot()) } } diff --git a/turbopack/crates/turbo-tasks-fetch/src/client.rs b/turbopack/crates/turbo-tasks-fetch/src/client.rs index b8e7626ce6956c..6dafa4099ffe60 100644 --- a/turbopack/crates/turbo-tasks-fetch/src/client.rs +++ b/turbopack/crates/turbo-tasks-fetch/src/client.rs @@ -1,18 +1,9 @@ -use std::{ - cmp::max, - fmt::{Display, Formatter}, - hash::Hash, - sync::LazyLock, - time::{Duration, SystemTime}, -}; +use std::{hash::Hash, sync::LazyLock}; use anyhow::Result; use quick_cache::sync::Cache; use turbo_rcstr::RcStr; -use turbo_tasks::{ - Completion, FxIndexSet, InvalidationReason, InvalidationReasonKind, Invalidator, ReadRef, - ResolvedVc, Vc, duration_span, util::StaticOrArc, -}; +use turbo_tasks::{ReadRef, Vc, duration_span, mark_session_dependent}; use crate::{FetchError, FetchResult, HttpResponse, HttpResponseBody}; @@ -28,21 +19,8 @@ static CLIENT_CACHE: LazyLock, reqwest::Client> /// This is needed because [`reqwest::ClientBuilder`] does not implement the required traits. This /// factory cannot be a closure because closures do not implement `Eq` or `Hash`. #[turbo_tasks::value(shared)] -#[derive(Hash)] -pub struct FetchClientConfig { - /// Minimum cache TTL in seconds. Responses with a `Cache-Control: max-age` shorter than this - /// will be clamped to this value. This prevents pathologically short timeouts from causing an - /// invalidation bomb. Defaults to 1 hour. - pub min_cache_control_secs: u64, -} - -impl Default for FetchClientConfig { - fn default() -> Self { - Self { - min_cache_control_secs: 60 * 60, - } - } -} +#[derive(Hash, Default)] +pub struct FetchClientConfig {} impl FetchClientConfig { /// Returns a cached instance of `reqwest::Client` it exists, otherwise constructs a new one. @@ -56,7 +34,7 @@ impl FetchClientConfig { /// The reqwest client fails to construct if the TLS backend cannot be initialized, or the /// resolver cannot load the system configuration. These failures should be treated as /// cached for some amount of time, but ultimately transient (e.g. using - /// `session_dependent`). + /// [`turbo_tasks::mark_session_dependent`]). pub fn try_get_cached_reqwest_client( self: ReadRef, ) -> reqwest::Result { @@ -100,66 +78,17 @@ impl FetchClientConfig { } } -/// Invalidation was caused by a max-age deadline returned by a server -#[derive(PartialEq, Eq, Hash)] -pub(crate) struct HttpTimeout {} - -impl InvalidationReason for HttpTimeout { - fn kind(&self) -> Option> { - Some(StaticOrArc::Static(&HTTP_TIMEOUT_KIND)) - } -} - -impl Display for HttpTimeout { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "http max-age timeout") - } -} - -/// Invalidation kind for [Write] -#[derive(PartialEq, Eq, Hash)] -struct HttpTimeoutKind; - -static HTTP_TIMEOUT_KIND: HttpTimeoutKind = HttpTimeoutKind; - -impl InvalidationReasonKind for HttpTimeoutKind { - fn fmt( - &self, - reasons: &FxIndexSet>, - f: &mut Formatter<'_>, - ) -> std::fmt::Result { - write!(f, "{} fetches timed out", reasons.len(),) - } -} - -/// Internal result from `fetch_inner` that includes the invalidator for TTL-based re-fetching. -#[turbo_tasks::value(shared)] -struct FetchInnerResult { - result: ResolvedVc, - /// Invalidator for the `fetch_inner` task. Used by the outer `fetch` to set up a timer that - /// triggers re-fetching when the Cache-Control max-age expires. - invalidator: Option, - /// Absolute deadline (seconds since UNIX epoch) after which the cached response should be - /// re-fetched. Computed as `now + max-age` at fetch time. An absolute timestamp is used - /// instead of a relative duration so that the remaining TTL is correct on warm cache restore. - deadline_secs: Option, -} - #[turbo_tasks::value_impl] impl FetchClientConfig { - /// Performs the actual HTTP request. This task is `network` but NOT `session_dependent`, so - /// its cached result survives restarts. The outer `fetch` task (which IS `session_dependent`) - /// reads the cached invalidator and sets up a timer for TTL-based re-fetching. #[turbo_tasks::function(network)] - async fn fetch_inner( + pub async fn fetch( self: Vc, url: RcStr, user_agent: Option, - ) -> Result> { + ) -> Result> { let url_ref = &*url; let this = self.await?; - let min_cache_control_secs = this.min_cache_control_secs; - let response_result: reqwest::Result<(HttpResponse, Option)> = async move { + let response_result: reqwest::Result = async move { let reqwest_client = this.try_get_cached_reqwest_client()?; let mut builder = reqwest_client.get(url_ref); @@ -174,7 +103,6 @@ impl FetchClientConfig { .and_then(|r| r.error_for_status())?; let status = response.status().as_u16(); - let max_age = parse_cache_control(response.headers()); let body = { let _span = duration_span!("fetch response", url = url_ref); @@ -182,144 +110,24 @@ impl FetchClientConfig { } .to_vec(); - Ok(( - HttpResponse { - status, - body: HttpResponseBody(body).resolved_cell(), - }, - max_age, - )) + Ok(HttpResponse { + status, + body: HttpResponseBody(body).resolved_cell(), + }) } .await; match response_result { - Ok((resp, max_age_secs)) => { - if let Some(max_age_secs) = max_age_secs { - let max_age_secs = max(max_age_secs, min_cache_control_secs); - let deadline_secs = { - // Transform the relative offset to an absolute deadline so it can be - // cached. - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("system clock is before UNIX epoch") - .as_secs(); - now + max_age_secs - }; - let invalidator = turbo_tasks::get_invalidator(); - Ok(FetchInnerResult { - result: ResolvedVc::cell(Ok(resp.resolved_cell())), - invalidator, - deadline_secs: Some(deadline_secs), - } - .cell()) - } else { - Completion::session_dependent().await?; - Ok(FetchInnerResult { - result: ResolvedVc::cell(Ok(resp.resolved_cell())), - invalidator: None, - deadline_secs: None, - } - .cell()) - } - } + Ok(resp) => Ok(Vc::cell(Ok(resp.resolved_cell()))), Err(err) => { - // Read session_dependent_completion so that this task is re-dirtied on session - // restore. This ensures transient errors (network down, DNS failure) are retried - // on the next session without a timer or busy-loop. - Completion::session_dependent().await?; - Ok(FetchInnerResult { - result: ResolvedVc::cell(Err( - FetchError::from_reqwest_error(&err, &url).resolved_cell() - )), - - invalidator: None, - deadline_secs: None, - } - .cell()) - } - } - } - - /// Fetches the given URL and returns the response. Results are cached across sessions using - /// TTL from the response's `Cache-Control: max-age` header. - /// - /// This is the outer task in a two-task pattern: - /// - `fetch` (session_dependent): always re-executes on restore, reads the cached inner result, - /// and spawns a timer for mid-session TTL expiry. - /// - `fetch_inner` (network, NOT session_dependent): performs the actual HTTP request and stays - /// cached across restarts. Returns an `Invalidator` that the outer task uses to trigger - /// re-fetching when the TTL expires. - #[turbo_tasks::function(network, session_dependent)] - pub async fn fetch( - self: Vc, - url: RcStr, - user_agent: Option, - ) -> Result> { - let FetchInnerResult { - result, - deadline_secs, - invalidator, - } = *self.fetch_inner(url, user_agent).await?; - - // Set up a timer to invalidate fetch_inner when the TTL expires. - // On warm cache restore, this re-executes (session_dependent), reads the persisted - // deadline from fetch_inner's cached result, and starts a timer for the remaining time. - // - // Skip when dependency tracking is disabled (e.g. one-shot `next build`) since - // invalidation panics without dependency tracking and the timer would be wasted work. - if turbo_tasks::turbo_tasks().is_tracking_dependencies() - && let (Some(deadline_secs), Some(invalidator)) = (deadline_secs, invalidator) - { - // transform absolute deadline back to a relative duration for the sleep call - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("system clock is before UNIX epoch") - .as_secs(); - let remaining = Duration::from_secs(deadline_secs.saturating_sub(now)); - // NOTE: in the case where the deadline is expired on session start this timeout will - // immediately invalidate and race with us returning. This is basically fine since in - // the most common case the actual fetch result is identical so this gives us a kind of - // 'stale while revalidate' feature. - // alternatively we could synchronously invalidate and re-execute `fetch-inner` but that - // simply adds latency in the common case where our fetch is identical. - // NOTE(2): if for some reason `fetch` is re-executed but `fetch-inner` isn't we could - // end up with multiple timers. Currently there is no known case where this could - // happen, if it somehow does we could end up with redundant invalidations and - // re-fetches. The solution is to detect this with a mutable hash map on - // FetchClientConfig to track outstanding timers and cancel them. - turbo_tasks::spawn(async move { - tokio::time::sleep(remaining).await; - invalidator.invalidate_with_reason(&*turbo_tasks::turbo_tasks(), HttpTimeout {}); - }); - } - - Ok(*result) - } -} - -/// Parses the `max-age` directive from a `Cache-Control` header value. -/// Returns the max-age in seconds, or `None` if not present or unparseable. -/// None means we shouldn't cache longer than the current session -fn parse_cache_control(headers: &reqwest::header::HeaderMap) -> Option { - let value = headers.get(reqwest::header::CACHE_CONTROL)?.to_str().ok()?; - let mut max_age = None; - for directive in value.split(',') { - let (key, val) = { - if let Some(index) = directive.find('=') { - (directive[0..index].trim(), Some(&directive[index + 1..])) - } else { - (directive.trim(), None) + // the client failed to construct or the HTTP request failed + mark_session_dependent(); + Ok(Vc::cell(Err( + FetchError::from_reqwest_error(&err, &url).resolved_cell() + ))) } - }; - if key.eq_ignore_ascii_case("max-age") - && let Some(val) = val - { - max_age = val.trim().parse().ok(); - } else if key.eq_ignore_ascii_case("no-cache") || key.eq_ignore_ascii_case("no-store") { - return None; } } - max_age } #[doc(hidden)] @@ -331,35 +139,3 @@ pub fn __test_only_reqwest_client_cache_clear() { pub fn __test_only_reqwest_client_cache_len() -> usize { CLIENT_CACHE.len() } - -#[cfg(test)] -mod tests { - use reqwest::header::{CACHE_CONTROL, HeaderMap, HeaderValue}; - - use super::parse_cache_control; - - fn headers(value: &str) -> HeaderMap { - let mut h = HeaderMap::new(); - h.insert(CACHE_CONTROL, HeaderValue::from_str(value).unwrap()); - h - } - - #[test] - fn max_age() { - assert_eq!(parse_cache_control(&headers("max-age=300")), Some(300)); - assert_eq!(parse_cache_control(&headers("MAX-AGE = 300")), Some(300)); - assert_eq!( - parse_cache_control(&headers("public, max-age=3600, must-revalidate")), - Some(3600) - ); - } - - #[test] - fn no_cache_headers() { - assert_eq!(parse_cache_control(&headers("NO-CACHE")), None); - assert_eq!(parse_cache_control(&headers("no-cache")), None); - assert_eq!(parse_cache_control(&headers("no-store")), None); - assert_eq!(parse_cache_control(&headers("max-age=300, no-store")), None); - assert_eq!(parse_cache_control(&HeaderMap::new()), None); - } -} diff --git a/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs b/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs index 43f67b6706023f..fffbe4c0a3721d 100644 --- a/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs +++ b/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use anyhow::Result; use tokio::sync::Mutex as TokioMutex; use turbo_rcstr::{RcStr, rcstr}; -use turbo_tasks::{ReadRef, TurboTasksApi, Vc}; +use turbo_tasks::{ReadRef, Vc}; use turbo_tasks_fetch::{ __test_only_reqwest_client_cache_clear, __test_only_reqwest_client_cache_len, FetchClientConfig, FetchErrorKind, FetchIssue, @@ -303,235 +303,6 @@ async fn errors_on_404() { .unwrap() } -/// Helper: create a TT instance for fetch tests. When `initial` is true, clears the cache -/// directory first (cold start). When false, reuses existing cache (warm restore). -fn create_fetch_tt(name: &str, initial: bool) -> Arc { - REGISTRATION.create_turbo_tasks(name, initial) -} - -#[turbo_tasks::function(operation)] -async fn fetch_body(url: RcStr) -> Result> { - let client_vc = FetchClientConfig { - min_cache_control_secs: 0, - } - .cell(); - let response = &*client_vc - .fetch(url, /* user_agent */ None) - .await? - .unwrap() - .await?; - Ok(response.body.to_string()) -} - -/// Test that the TTL timer invalidates `fetch_inner` within a session. -/// -/// 1. Server returns body "v1" with `max-age=1` -/// 2. First fetch returns "v1" -/// 3. Server changes to return "v2" -/// 4. Wait 2s for TTL to expire (timer fires, invalidates fetch_inner) -/// 5. Strongly consistent read returns "v2" -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn ttl_invalidates_within_session() { - let _guard = GLOBAL_TEST_LOCK.lock().await; - let mut server = mockito::Server::new_async().await; - let url = RcStr::from(format!("{}/ttl-within", server.url())); - - server - .mock("GET", "/ttl-within") - .with_body("v1") - .with_header("Cache-Control", "max-age=1") - .create_async() - .await; - - let tt = create_fetch_tt("ttl_invalidates_within_session", true); - let body = turbo_tasks::run_once(tt.clone(), { - let url = url.clone(); - async move { - let body = fetch_body(url).read_strongly_consistent().await?; - Ok((*body).clone()) - } - }) - .await - .unwrap(); - assert_eq!(&*body, "v1"); - - // Change the server response - server.reset(); - server - .mock("GET", "/ttl-within") - .with_body("v2") - .with_header("Cache-Control", "max-age=1") - .create_async() - .await; - - // Wait for the TTL timer to fire (max-age=1, so wait 2s to be safe) - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - - // The timer should have invalidated fetch_inner, so a new strongly consistent read - // should re-fetch and return the updated body. - let body = turbo_tasks::run_once(tt.clone(), { - let url = url.clone(); - async move { - let body = fetch_body(url).read_strongly_consistent().await?; - Ok((*body).clone()) - } - }) - .await - .unwrap(); - assert_eq!(&*body, "v2"); - - tt.stop_and_wait().await; -} - -/// Test that after a session restore, an expired TTL causes a re-fetch. -/// -/// 1. Server returns "v1" with `max-age=1` -/// 2. Fetch, stop TT -/// 3. Wait for TTL to expire -/// 4. Create new TT (warm restore), server now returns "v2" -/// 5. Fetch should return "v2" (deadline expired, timer fires immediately on restore) -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn ttl_invalidates_on_session_restore() { - let _guard = GLOBAL_TEST_LOCK.lock().await; - let mut server = mockito::Server::new_async().await; - let url = RcStr::from(format!("{}/ttl-restore", server.url())); - - server - .mock("GET", "/ttl-restore") - .with_body("v1") - .with_header("Cache-Control", "max-age=1") - .create_async() - .await; - - // Session 1: fetch and cache - let tt = create_fetch_tt("ttl_invalidates_on_session_restore", true); - let body = turbo_tasks::run_once(tt.clone(), { - let url = url.clone(); - async move { - let body = fetch_body(url).read_strongly_consistent().await?; - Ok((*body).clone()) - } - }) - .await - .unwrap(); - assert_eq!(&*body, "v1"); - tt.stop_and_wait().await; - - // Wait for TTL to expire - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - - // Change server response - server.reset(); - server - .mock("GET", "/ttl-restore") - .with_body("v2") - .with_header("Cache-Control", "max-age=1") - .create_async() - .await; - - // Session 2: warm restore — TTL expired, should re-fetch. - // On restore, `fetch` (session_dependent) re-executes and reads the cached `fetch_inner` - // result. The deadline is expired, so it spawns a zero-duration timer. That timer - // invalidates `fetch_inner` asynchronously, which triggers a second round of execution. - // We need to read twice: the first read returns the stale cached value, then wait for the - // timer-triggered re-execution to settle. - let tt = create_fetch_tt("ttl_invalidates_on_session_restore", false); - turbo_tasks::run_once(tt.clone(), { - let url = url.clone(); - async move { - // First read returns the stale cached value, but triggers the timer - let _body = fetch_body(url).read_strongly_consistent().await?; - Ok(()) - } - }) - .await - .unwrap(); - - // Wait for the timer to fire and re-execution to settle - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - - let body = turbo_tasks::run_once(tt.clone(), { - let url = url.clone(); - async move { - let body = fetch_body(url).read_strongly_consistent().await?; - Ok((*body).clone()) - } - }) - .await - .unwrap(); - assert_eq!(&*body, "v2"); - tt.stop_and_wait().await; -} - -/// Test that fetch errors are retried on session restore. -/// -/// 1. Server returns connection refused (error) -/// 2. Fetch returns error -/// 3. Stop TT, start new session -/// 4. Server now returns 200 -/// 5. Fetch should succeed (error was session-dependent, retried on restore) -/// -/// TODO: Consider retrying errors within a session with backoff. -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn errors_retried_on_session_restore() { - let _guard = GLOBAL_TEST_LOCK.lock().await; - let mut server = mockito::Server::new_async().await; - let url = RcStr::from(format!("{}/error-restore", server.url())); - - // Session 1: server returns 500 - server - .mock("GET", "/error-restore") - .with_status(500) - .create_async() - .await; - - let tt = create_fetch_tt("errors_retried_on_session_restore", true); - let is_err = turbo_tasks::run_once(tt.clone(), { - let url = url.clone(); - async move { - #[turbo_tasks::function(operation)] - async fn fetch_is_err(url: RcStr) -> Result> { - let client_vc = FetchClientConfig::default().cell(); - let result = &*client_vc.fetch(url, None).await?; - Ok(Vc::cell(result.is_err())) - } - let is_err = *fetch_is_err(url).read_strongly_consistent().await?; - Ok(is_err) - } - }) - .await - .unwrap(); - assert!(is_err, "first fetch should be an error"); - tt.stop_and_wait().await; - - // Session 2: server now returns 200 - server.reset(); - server - .mock("GET", "/error-restore") - .with_body("success") - .create_async() - .await; - - let tt = create_fetch_tt("errors_retried_on_session_restore", false); - let is_err = turbo_tasks::run_once(tt.clone(), { - let url = url.clone(); - async move { - #[turbo_tasks::function(operation)] - async fn fetch_is_err2(url: RcStr) -> Result> { - let client_vc = FetchClientConfig::default().cell(); - let result = &*client_vc.fetch(url, None).await?; - Ok(Vc::cell(result.is_err())) - } - let is_err = *fetch_is_err2(url).read_strongly_consistent().await?; - Ok(is_err) - } - }) - .await - .unwrap(); - assert!(!is_err, "second fetch should succeed after session restore"); - tt.stop_and_wait().await; -} - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn client_cache() { let mut server = mockito::Server::new_async().await; diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index b6ec734ea4675c..3a6c81c59ddf68 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -66,8 +66,8 @@ use turbo_rcstr::{RcStr, rcstr}; use turbo_tasks::{ ApplyEffectsContext, Completion, Effect, InvalidationReason, Invalidator, NonLocalValue, ReadRef, ResolvedVc, TaskInput, TurboTasksApi, ValueToString, ValueToStringRef, Vc, - debug::ValueDebugFormat, emit_effect, parallel, trace::TraceRawVcs, turbo_tasks_weak, - turbobail, turbofmt, + debug::ValueDebugFormat, emit_effect, mark_session_dependent, parallel, trace::TraceRawVcs, + turbo_tasks_weak, turbobail, turbofmt, }; use turbo_tasks_hash::{ DeterministicHash, DeterministicHasher, HashAlgorithm, deterministic_hash, hash_xxh3_hash64, @@ -752,8 +752,10 @@ impl Debug for DiskFileSystem { #[turbo_tasks::value_impl] impl FileSystem for DiskFileSystem { - #[turbo_tasks::function(fs, session_dependent)] + #[turbo_tasks::function(fs)] async fn read(&self, fs_path: FileSystemPath) -> Result> { + mark_session_dependent(); + // Check if path is denied - if so, treat as NotFound if self.inner.is_path_denied(&fs_path) { return Ok(FileContent::NotFound.cell()); @@ -778,8 +780,10 @@ impl FileSystem for DiskFileSystem { Ok(content.cell()) } - #[turbo_tasks::function(fs, session_dependent)] + #[turbo_tasks::function(fs)] async fn raw_read_dir(&self, fs_path: FileSystemPath) -> Result> { + mark_session_dependent(); + // Check if directory itself is denied - if so, treat as NotFound if self.inner.is_path_denied(&fs_path) { return Ok(RawDirectoryContent::not_found()); @@ -866,8 +870,10 @@ impl FileSystem for DiskFileSystem { Ok(RawDirectoryContent::new(entries)) } - #[turbo_tasks::function(fs, session_dependent)] + #[turbo_tasks::function(fs)] async fn read_link(&self, fs_path: FileSystemPath) -> Result> { + mark_session_dependent(); + // Check if path is denied - if so, treat as NotFound if self.inner.is_path_denied(&fs_path) { return Ok(LinkContent::NotFound.cell()); @@ -955,9 +961,9 @@ impl FileSystem for DiskFileSystem { #[turbo_tasks::function(fs)] async fn write(&self, fs_path: FileSystemPath, content: Vc) -> Result<()> { - // You might be tempted to use `session_dependent` here, but `write` purely declares a side - // effect and does not need to be reexecuted in the next session. All side effects are - // reexecuted in general. + // You might be tempted to use `mark_session_dependent` here, but + // `write` purely declares a side effect and does not need to be reexecuted in the next + // session. All side effects are reexecuted in general. // Check if path is denied - if so, return an error if self.inner.is_path_denied(&fs_path) { @@ -1108,7 +1114,7 @@ impl FileSystem for DiskFileSystem { #[turbo_tasks::function(fs)] async fn write_link(&self, fs_path: FileSystemPath, target: Vc) -> Result<()> { - // You might be tempted to use `session_dependent` here, but we purely declare a side + // You might be tempted to use `mark_session_dependent` here, but we purely declare a side // effect and does not need to be re-executed in the next session. All side effects are // re-executed in general. @@ -1341,8 +1347,9 @@ impl FileSystem for DiskFileSystem { Ok(()) } - #[turbo_tasks::function(fs, session_dependent)] + #[turbo_tasks::function(fs)] async fn metadata(&self, fs_path: FileSystemPath) -> Result> { + mark_session_dependent(); let full_path = self.to_sys_path(&fs_path); // Check if path is denied - if so, return an error (metadata shouldn't be readable) diff --git a/turbopack/crates/turbo-tasks-macros-tests/tests/function/fail_attribute_invalid_args.stderr b/turbopack/crates/turbo-tasks-macros-tests/tests/function/fail_attribute_invalid_args.stderr index 63bf140473f312..c7c745e37ef60f 100644 --- a/turbopack/crates/turbo-tasks-macros-tests/tests/function/fail_attribute_invalid_args.stderr +++ b/turbopack/crates/turbo-tasks-macros-tests/tests/function/fail_attribute_invalid_args.stderr @@ -1,4 +1,4 @@ -error: unexpected token, expected one of: "fs", "network", "operation", "root", or "session_dependent" +error: unexpected token, expected one of: "fs", "network", "operation", or "root" --> tests/function/fail_attribute_invalid_args.rs:10:25 | 10 | #[turbo_tasks::function(invalid_argument)] diff --git a/turbopack/crates/turbo-tasks-macros-tests/tests/function/fail_attribute_invalid_args_inherent_impl.stderr b/turbopack/crates/turbo-tasks-macros-tests/tests/function/fail_attribute_invalid_args_inherent_impl.stderr index 835ae956f4b4b6..b8657215643293 100644 --- a/turbopack/crates/turbo-tasks-macros-tests/tests/function/fail_attribute_invalid_args_inherent_impl.stderr +++ b/turbopack/crates/turbo-tasks-macros-tests/tests/function/fail_attribute_invalid_args_inherent_impl.stderr @@ -1,4 +1,4 @@ -error: unexpected token, expected one of: "fs", "network", "operation", "root", or "session_dependent" +error: unexpected token, expected one of: "fs", "network", "operation", or "root" --> tests/function/fail_attribute_invalid_args_inherent_impl.rs:15:29 | 15 | #[turbo_tasks::function(invalid_argument)] diff --git a/turbopack/crates/turbo-tasks-macros/src/func.rs b/turbopack/crates/turbo-tasks-macros/src/func.rs index cc1e55d7d446ce..0a8e687f5e9daf 100644 --- a/turbopack/crates/turbo-tasks-macros/src/func.rs +++ b/turbopack/crates/turbo-tasks-macros/src/func.rs @@ -728,10 +728,6 @@ pub struct FunctionArguments { /// Should the task be marked as a root in the aggregation graph on initial creation? /// Root tasks start with aggregation number `u32::MAX`. pub root: Option, - /// Should the task be marked as session dependent? Session dependent tasks are re-executed - /// when restored from persistent cache because they depend on external state (filesystem, - /// environment, network) that may change between sessions. - pub session_dependent: Option, } impl Parse for FunctionArguments { @@ -759,14 +755,11 @@ impl Parse for FunctionArguments { ("root", Meta::Path(_)) => { parsed_args.root = Some(meta.span()); } - ("session_dependent", Meta::Path(_)) => { - parsed_args.session_dependent = Some(meta.span()); - } (_, meta) => { return Err(syn::Error::new_spanned( meta, "unexpected token, expected one of: \"fs\", \"network\", \"operation\", \ - \"root\", or \"session_dependent\"", + or \"root\"", )); } } @@ -1093,7 +1086,6 @@ pub struct NativeFn { pub is_self_used: bool, pub filter_trait_call_args: Option, pub is_root: bool, - pub is_session_dependent: bool, } impl NativeFn { @@ -1110,7 +1102,6 @@ impl NativeFn { is_self_used, filter_trait_call_args, is_root, - is_session_dependent, } = self; let task_fn = if *is_method && *is_self_used { @@ -1146,7 +1137,6 @@ impl NativeFn { #arg_meta, &#task_fn, #is_root, - #is_session_dependent, ) } } diff --git a/turbopack/crates/turbo-tasks-macros/src/function_macro.rs b/turbopack/crates/turbo-tasks-macros/src/function_macro.rs index 255534ee654680..fe0c2e87158c53 100644 --- a/turbopack/crates/turbo-tasks-macros/src/function_macro.rs +++ b/turbopack/crates/turbo-tasks-macros/src/function_macro.rs @@ -43,7 +43,6 @@ pub fn function(args: TokenStream, input: TokenStream) -> TokenStream { .unwrap_or_default(); let is_self_used = args.operation.is_some() || is_self_used(&block); let is_root = args.root.is_some(); - let is_session_dependent = args.session_dependent.is_some(); let Some(turbo_fn) = TurboFn::new(&sig, DefinitionContext::NakedFn, args, is_self_used) else { return quote! { @@ -67,7 +66,6 @@ pub fn function(args: TokenStream, input: TokenStream) -> TokenStream { is_self_used, filter_trait_call_args: None, // not a trait method is_root, - is_session_dependent, }; let native_function_ident = get_native_function_ident(ident); let native_function_ty = native_fn.ty(); diff --git a/turbopack/crates/turbo-tasks-macros/src/value_impl_macro.rs b/turbopack/crates/turbo-tasks-macros/src/value_impl_macro.rs index c73394c60cac9b..fe3d6b58f16c82 100644 --- a/turbopack/crates/turbo-tasks-macros/src/value_impl_macro.rs +++ b/turbopack/crates/turbo-tasks-macros/src/value_impl_macro.rs @@ -101,8 +101,6 @@ pub fn value_impl(args: TokenStream, input: TokenStream) -> TokenStream { } }; let is_self_used = func_args.operation.is_some() || is_self_used(block); - let is_root = func_args.root.is_some(); - let is_session_dependent = func_args.session_dependent.is_some(); let Some(turbo_fn) = TurboFn::new( sig, @@ -125,8 +123,7 @@ pub fn value_impl(args: TokenStream, input: TokenStream) -> TokenStream { is_method: turbo_fn.is_method(), is_self_used, filter_trait_call_args: None, // not a trait method - is_root, - is_session_dependent, + is_root: false, }; let native_function_ident = get_inherent_impl_function_ident(ty_ident, ident); @@ -211,8 +208,6 @@ pub fn value_impl(args: TokenStream, input: TokenStream) -> TokenStream { }; // operations are not currently compatible with methods let is_self_used = func_args.operation.is_some() || is_self_used(block); - let is_root = func_args.root.is_some(); - let is_session_dependent = func_args.session_dependent.is_some(); let Some(turbo_fn) = TurboFn::new( sig, @@ -245,8 +240,7 @@ pub fn value_impl(args: TokenStream, input: TokenStream) -> TokenStream { is_method: turbo_fn.is_method(), is_self_used, filter_trait_call_args: turbo_fn.filter_trait_call_args(), - is_root, - is_session_dependent, + is_root: false, }; let native_function_ident = diff --git a/turbopack/crates/turbo-tasks-macros/src/value_trait_macro.rs b/turbopack/crates/turbo-tasks-macros/src/value_trait_macro.rs index 28e12877511bba..dd83dc8cac7e35 100644 --- a/turbopack/crates/turbo-tasks-macros/src/value_trait_macro.rs +++ b/turbopack/crates/turbo-tasks-macros/src/value_trait_macro.rs @@ -214,7 +214,6 @@ pub fn value_trait(args: TokenStream, input: TokenStream) -> TokenStream { is_self_used, filter_trait_call_args: turbo_fn.filter_trait_call_args(), is_root: false, - is_session_dependent: false, }; let native_function_ident = get_trait_default_impl_function_ident(trait_ident, ident); diff --git a/turbopack/crates/turbo-tasks-testing/src/lib.rs b/turbopack/crates/turbo-tasks-testing/src/lib.rs index 931a04f3d951e9..11fa2c9aec8b51 100644 --- a/turbopack/crates/turbo-tasks-testing/src/lib.rs +++ b/turbopack/crates/turbo-tasks-testing/src/lib.rs @@ -294,6 +294,10 @@ impl TurboTasksApi for VcStorage { // no-op } + fn mark_own_task_as_session_dependent(&self, _task: TaskId) { + // no-op + } + fn spawn_detached_for_testing( &self, _f: std::pin::Pin + Send + 'static>>, diff --git a/turbopack/crates/turbo-tasks/function.md b/turbopack/crates/turbo-tasks/function.md index c64f019191618f..bff5dd9146a653 100644 --- a/turbopack/crates/turbo-tasks/function.md +++ b/turbopack/crates/turbo-tasks/function.md @@ -71,78 +71,6 @@ fn foo( ) -> Vc; // was: impl Future>> ``` -## Attributes - -The `#[turbo_tasks::function]` macro accepts optional attributes that modify the behavior of the -task. Multiple attributes can be combined by separating them with commas. - -```rust -#[turbo_tasks::function(fs, session_dependent)] -async fn read_file(path: RcStr) -> Result> { - // ... -} -``` - -### `operation` - -Marks the task as an **operation**. The external signature will return an [`OperationVc`] instead -of a [`Vc`], and all arguments must implement `OperationValue`. Operation tasks serve as explicit -entry points into the task graph and can be used to connect non-reactive code to the reactive -computation graph. Mutually exclusive with `&self` receivers. - -### `root` - -Marks the task as a **root** in the aggregation graph. Root tasks start with the maximum aggregation -number (`u32::MAX`), which places them at the top of the aggregation tree. This is used for tasks -that represent top-level entry points into the computation. - -### `fs` - -An **I/O marker** indicating the task directly performs filesystem operations. This should only be -applied to the task that directly performs the I/O, not tasks that transitively call it. - -### `network` - -An **I/O marker** indicating the task directly performs network operations. Like `fs`, this should -only be applied to the task that directly performs the I/O. - -### `session_dependent` - -Marks the task as **session dependent**. Session-dependent tasks are re-executed when restored from -persistent cache because they depend on external state (filesystem, environment, network) that may -have changed between sessions. - -When a session-dependent task completes, it is not marked as fully clean — it retains a special -"session dependent" dirty state. If the task is later restored from persistent cache in a new -session, this state causes it to be re-executed rather than reusing the cached result. - -Typical use cases: - -- **Filesystem reads** — file contents may have changed on disk between sessions. -- **Environment variable reads** — process environment may differ between sessions. -- **Network requests** — remote resources may return different results. - -```rust -#[turbo_tasks::function(fs, session_dependent)] -async fn read(&self, path: FileSystemPath) -> Result> { - // File contents may have changed since the last session, - // so this task is always re-executed on cache restore. - // ... -} - -#[turbo_tasks::function(session_dependent)] -fn read_all_env(&self) -> Vc { - // Environment variables may differ between sessions. - Vc::cell(self.vars.clone()) -} -``` - -Note: `session_dependent` should be applied to the **leaf task** that directly reads external state. -Tasks that transitively depend on a session-dependent task do not need this attribute — they will -naturally re-execute when the session-dependent task they depend on produces a new result. - -[`OperationVc`]: crate::OperationVc - ## Methods and Self Tasks can be methods associated with a value or a trait implementation using the [`arbitrary_self_types` nightly compiler feature][self-types]. diff --git a/turbopack/crates/turbo-tasks/src/backend.rs b/turbopack/crates/turbo-tasks/src/backend.rs index 4b20c369d38cfe..97c9f5be63b239 100644 --- a/turbopack/crates/turbo-tasks/src/backend.rs +++ b/turbopack/crates/turbo-tasks/src/backend.rs @@ -624,6 +624,14 @@ pub trait Backend: Sync + Send { // Do nothing by default } + fn mark_own_task_as_session_dependent( + &self, + _task: TaskId, + _turbo_tasks: &dyn TurboTasksBackendApi, + ) { + // Do nothing by default + } + fn create_transient_task( &self, task_type: TransientTaskType, diff --git a/turbopack/crates/turbo-tasks/src/completion.rs b/turbopack/crates/turbo-tasks/src/completion.rs index f6adc63b17459d..821630af22f8fb 100644 --- a/turbopack/crates/turbo-tasks/src/completion.rs +++ b/turbopack/crates/turbo-tasks/src/completion.rs @@ -21,14 +21,6 @@ impl Completion { pub fn immutable() -> Vc { Completion::cell(Completion) } - - /// Returns a completion from a session-dependent task. Awaiting this creates a dependency on - /// a session-dependent task, which will cause the calling task to be re-executed when - /// restored from persistent cache. - #[turbo_tasks::function(session_dependent)] - pub fn session_dependent() -> Vc { - Completion::cell(Completion) - } } // no #[turbo_tasks::value_impl] to inline new into the caller task diff --git a/turbopack/crates/turbo-tasks/src/invalidation.rs b/turbopack/crates/turbo-tasks/src/invalidation.rs index e4f8d7c88d701b..1b966f414bd484 100644 --- a/turbopack/crates/turbo-tasks/src/invalidation.rs +++ b/turbopack/crates/turbo-tasks/src/invalidation.rs @@ -27,7 +27,7 @@ pub fn get_invalidator() -> Option { /// A lightweight handle to invalidate a task. Only stores the task ID. /// The caller must provide the `TurboTasksApi` when calling invalidation methods. -#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, Encode, Decode)] +#[derive(Clone, Copy, Hash, PartialEq, Eq, Encode, Decode)] pub struct Invalidator { task: TaskId, } @@ -55,10 +55,8 @@ impl TraceRawVcs for Invalidator { } } -unsafe impl OperationValue for Invalidator {} -// Safety: Invalidator only contains a TaskId (a NonZero wrapper) and does not contain any -// local Vc references. unsafe impl NonLocalValue for Invalidator {} +unsafe impl OperationValue for Invalidator {} /// A user-facing reason why a task was invalidated. This should only be used /// for invalidation that were triggered by the user. diff --git a/turbopack/crates/turbo-tasks/src/lib.rs b/turbopack/crates/turbo-tasks/src/lib.rs index 77cd2dcc7881d3..baf7a57dee7a8f 100644 --- a/turbopack/crates/turbo-tasks/src/lib.rs +++ b/turbopack/crates/turbo-tasks/src/lib.rs @@ -93,8 +93,8 @@ pub use crate::{ CurrentCellRef, ReadCellTracking, ReadConsistency, ReadTracking, TaskPersistence, TaskPriority, TurboTasks, TurboTasksApi, TurboTasksBackendApi, TurboTasksCallApi, Unused, UpdateInfo, dynamic_call, emit, get_serialization_invalidator, mark_finished, - mark_stateful, mark_top_level_task, prevent_gc, run, run_once, run_once_with_reason, - trait_call, turbo_tasks, turbo_tasks_scope, turbo_tasks_weak, + mark_session_dependent, mark_stateful, mark_top_level_task, prevent_gc, run, run_once, + run_once_with_reason, trait_call, turbo_tasks, turbo_tasks_scope, turbo_tasks_weak, unmark_top_level_task_may_leak_eventually_consistent_state, with_turbo_tasks, }, mapped_read_ref::MappedReadRef, @@ -104,7 +104,7 @@ pub use crate::{ read_ref::ReadRef, serialization_invalidation::SerializationInvalidator, spawn::{JoinHandle, block_for_future, block_in_place, spawn, spawn_blocking, spawn_thread}, - state::State, + state::{State, TransientState}, task::{ SharedReference, TypedSharedReference, task_input::{EitherTaskInput, TaskInput}, diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index f2369115733cb7..76fea4ae929577 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -177,6 +177,7 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send { verification_mode: VerificationMode, ); fn mark_own_task_as_finished(&self, task: TaskId); + fn mark_own_task_as_session_dependent(&self, task: TaskId); fn connect_task(&self, task: TaskId); @@ -1591,6 +1592,10 @@ impl TurboTasksApi for TurboTasks { self.backend.mark_own_task_as_finished(task, self); } + fn mark_own_task_as_session_dependent(&self, task: TaskId) { + self.backend.mark_own_task_as_session_dependent(task, self); + } + /// Creates a future that inherits the current task id and task state. The current global task /// will wait for this future to be dropped before exiting. fn spawn_detached_for_testing(&self, fut: Pin + Send + 'static>>) { @@ -1886,6 +1891,13 @@ pub fn current_task_for_testing() -> Option { CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id) } +/// Marks the current task as dirty when restored from filesystem cache. +pub fn mark_session_dependent() { + with_turbo_tasks(|tt| { + tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()")) + }); +} + /// Marks the current task as finished. This excludes it from waiting for /// strongly consistency. pub fn mark_finished() { @@ -1931,7 +1943,7 @@ pub fn mark_invalidator() { } /// Marks the current task as stateful. This is used to indicate that the task -/// has interior mutability (e.g., via State), which means +/// has interior mutability (e.g., via State or TransientState), which means /// the task may produce different outputs even with the same inputs. /// /// Only has an effect when the `verify_determinism` feature is enabled. diff --git a/turbopack/crates/turbo-tasks/src/marker_trait.rs b/turbopack/crates/turbo-tasks/src/marker_trait.rs index 7e103b3b4c7880..8cd89b15381d12 100644 --- a/turbopack/crates/turbo-tasks/src/marker_trait.rs +++ b/turbopack/crates/turbo-tasks/src/marker_trait.rs @@ -82,6 +82,7 @@ macro_rules! impl_auto_marker_trait { <::Read as $crate::VcRead>::Target: $trait {} unsafe impl $trait for $crate::State {} + unsafe impl $trait for $crate::TransientState {} unsafe impl $trait for $crate::TransientValue {} unsafe impl $trait for $crate::TransientInstance {} unsafe impl $trait for $crate::event::Event {} diff --git a/turbopack/crates/turbo-tasks/src/native_function.rs b/turbopack/crates/turbo-tasks/src/native_function.rs index b4d1ed1f8d4671..4cbc1c8f3daab4 100644 --- a/turbopack/crates/turbo-tasks/src/native_function.rs +++ b/turbopack/crates/turbo-tasks/src/native_function.rs @@ -193,11 +193,6 @@ pub struct NativeFunction { /// Whether this function's tasks should be treated as root nodes in the aggregation graph. /// Root tasks start with aggregation number `u32::MAX` on initial creation. pub is_root: bool, - - /// Whether this function's tasks are session dependent. Session dependent tasks are - /// re-executed when restored from persistent cache because they depend on external state - /// (filesystem, environment, network) that may change between sessions. - pub is_session_dependent: bool, } impl Debug for NativeFunction { @@ -224,7 +219,6 @@ impl NativeFunction { implementation: &into_task_fn(default_fn) as &dyn TaskFn, ty: RegistryType::new::<()>("", ""), is_root: false, - is_session_dependent: false, }; pub const fn new( @@ -233,14 +227,12 @@ impl NativeFunction { arg_meta: ArgMeta, implementation: &'static T, is_root: bool, - is_session_dependent: bool, ) -> Self { Self { ty: RegistryType::new::(name, global_name), arg_meta, implementation, is_root, - is_session_dependent, } } diff --git a/turbopack/crates/turbo-tasks/src/state.rs b/turbopack/crates/turbo-tasks/src/state.rs index 6d42c4e2582587..cc7cfe892affed 100644 --- a/turbopack/crates/turbo-tasks/src/state.rs +++ b/turbopack/crates/turbo-tasks/src/state.rs @@ -12,7 +12,8 @@ use tracing::trace_span; use crate::{ Invalidator, OperationValue, SerializationInvalidator, get_invalidator, - get_serialization_invalidator, manager::with_turbo_tasks, trace::TraceRawVcs, + get_serialization_invalidator, manager::with_turbo_tasks, mark_session_dependent, + mark_stateful, trace::TraceRawVcs, }; #[derive(Encode, Decode)] @@ -290,3 +291,109 @@ impl State { self.serialization_invalidator.invalidate(); } } + +#[derive(Encode, Decode)] +#[bincode(bounds = "")] +pub struct TransientState { + #[bincode(skip, default = "default_transient_state_inner")] + inner: Mutex>>, +} + +fn default_transient_state_inner() -> Mutex>> { + Mutex::new(StateInner::new(None)) +} + +impl Debug for TransientState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TransientState") + .field("value", &self.inner.lock().value) + .finish() + } +} + +impl TraceRawVcs for TransientState { + fn trace_raw_vcs(&self, trace_context: &mut crate::trace::TraceRawVcsContext) { + self.inner.lock().value.trace_raw_vcs(trace_context); + } +} + +impl Default for TransientState { + fn default() -> Self { + // Need to be explicit to ensure marking as stateful. + Self::new() + } +} + +impl PartialEq for TransientState { + fn eq(&self, _other: &Self) -> bool { + false + } +} +impl Eq for TransientState {} + +impl TransientState { + pub fn new() -> Self { + mark_stateful(); + Self { + inner: Mutex::new(StateInner::new(None)), + } + } + + /// Gets the current value of the state. The current task will be registered + /// as dependency of the state and will be invalidated when the state + /// changes. + pub fn get(&self) -> StateRef<'_, Option> { + mark_session_dependent(); + let invalidator = get_invalidator(); + let mut inner = self.inner.lock(); + if let Some(invalidator) = invalidator { + inner.add_invalidator(invalidator); + } + StateRef { + serialization_invalidator: None, + inner, + mutated: false, + } + } + + /// Gets the current value of the state. Untracked. + pub fn get_untracked(&self) -> StateRef<'_, Option> { + let inner = self.inner.lock(); + StateRef { + serialization_invalidator: None, + inner, + mutated: false, + } + } + + /// Sets the current state without comparing it with the old value. This + /// should only be used if one is sure that the value has changed. + pub fn set_unconditionally(&self, value: T) { + let mut inner = self.inner.lock(); + inner.set_unconditionally(Some(value)); + } + + /// Updates the current state with the `update` function. The `update` + /// function need to return `true` when the value was modified. Exposing + /// the current value from the `update` function is not allowed and will + /// result in incorrect cache invalidation. + pub fn update_conditionally(&self, update: impl FnOnce(&mut Option) -> bool) { + let mut inner = self.inner.lock(); + inner.update_conditionally(update); + } +} + +impl TransientState { + /// Update the current state when the `value` is different from the current + /// value. `T` must implement [PartialEq] for this to work. + pub fn set(&self, value: T) { + let mut inner = self.inner.lock(); + inner.set(Some(value)); + } + + /// Unset the current value. + pub fn unset(&self) { + let mut inner = self.inner.lock(); + inner.set(None); + } +} diff --git a/turbopack/crates/turbopack-ecmascript/src/lib.rs b/turbopack/crates/turbopack-ecmascript/src/lib.rs index 428758a9f2cabc..2133af15819954 100644 --- a/turbopack/crates/turbopack-ecmascript/src/lib.rs +++ b/turbopack/crates/turbopack-ecmascript/src/lib.rs @@ -364,35 +364,6 @@ impl EcmascriptModuleAssetBuilder { } } -/// A transient cache that stores a value across task re-executions within a session but is lost -/// when restored from persistent cache. -struct TransientCache(parking_lot::Mutex>); - -impl Default for TransientCache { - fn default() -> Self { - Self(parking_lot::Mutex::new(None)) - } -} - -// All caches are alwaqys eq, this doesn't really make sense on its own but fits the purpose of -// embedding in EcmascriptModuleAsset -impl PartialEq for TransientCache { - fn eq(&self, _other: &Self) -> bool { - true - } -} -impl Eq for TransientCache {} - -impl TransientCache { - fn get(&self) -> parking_lot::MutexGuard<'_, Option> { - self.0.lock() - } - - fn set(&self, value: T) { - *self.0.lock() = Some(value); - } -} - #[turbo_tasks::value] pub struct EcmascriptModuleAsset { pub source: ResolvedVc>, @@ -403,13 +374,8 @@ pub struct EcmascriptModuleAsset { pub compile_time_info: ResolvedVc, pub side_effect_free_packages: Option>, pub inner_assets: Option>, - /// A transient cache of successful parse results - /// Used when EcmascriptOptions::keep_last_successful_parse is enabled (only in dev) - /// This ensures that parse errors don't invalidate large portions of the task graph, so we - /// still report the issue but serve the previous AST - #[turbo_tasks(debug_ignore, trace_ignore)] - #[bincode(skip, default = "Default::default")] - last_successful_parse: TransientCache>, + #[turbo_tasks(debug_ignore)] + last_successful_parse: turbo_tasks::TransientState>, } impl core::fmt::Debug for EcmascriptModuleAsset { fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { @@ -541,11 +507,12 @@ impl EcmascriptParsable for EcmascriptModuleAsset { if self.options.await?.keep_last_successful_parse { let real_result_value = real_result.await?; let result_value = if matches!(*real_result_value, ParseResult::Ok { .. }) { - self.last_successful_parse.set(real_result_value.clone()); + self.last_successful_parse + .set_unconditionally(real_result_value.clone()); real_result_value } else { - let guard = self.last_successful_parse.get(); - guard.as_ref().unwrap_or(&real_result_value).clone() + let state_ref = self.last_successful_parse.get(); + state_ref.as_ref().unwrap_or(&real_result_value).clone() }; Ok(ReadRef::cell(result_value)) } else {