diff --git a/turbopack/crates/turbo-tasks-fetch/src/client.rs b/turbopack/crates/turbo-tasks-fetch/src/client.rs index bc3925097afc4..7d8c9d5fe8eaa 100644 --- a/turbopack/crates/turbo-tasks-fetch/src/client.rs +++ b/turbopack/crates/turbo-tasks-fetch/src/client.rs @@ -1,9 +1,18 @@ -use std::{hash::Hash, sync::LazyLock}; +use std::{ + cmp::max, + fmt::{Display, Formatter}, + hash::Hash, + sync::LazyLock, + time::{Duration, SystemTime}, +}; use anyhow::Result; use quick_cache::sync::Cache; use turbo_rcstr::RcStr; -use turbo_tasks::{Completion, ReadRef, Vc, duration_span}; +use turbo_tasks::{ + Completion, FxIndexSet, InvalidationReason, InvalidationReasonKind, Invalidator, ReadRef, + ResolvedVc, Vc, duration_span, util::StaticOrArc, +}; use crate::{FetchError, FetchResult, HttpResponse, HttpResponseBody}; @@ -19,8 +28,21 @@ static CLIENT_CACHE: LazyLock, reqwest::Client> /// This is needed because [`reqwest::ClientBuilder`] does not implement the required traits. This /// factory cannot be a closure because closures do not implement `Eq` or `Hash`. #[turbo_tasks::value(shared)] -#[derive(Hash, Default)] -pub struct FetchClientConfig {} +#[derive(Hash)] +pub struct FetchClientConfig { + /// Minimum cache TTL in seconds. Responses with a `Cache-Control: max-age` shorter than this + /// will be clamped to this value. This prevents pathologically short timeouts from causing an + /// invalidation bomb. Defaults to 1 hour. + pub min_cache_control_secs: u64, +} + +impl Default for FetchClientConfig { + fn default() -> Self { + Self { + min_cache_control_secs: 60 * 60, + } + } +} impl FetchClientConfig { /// Returns a cached instance of `reqwest::Client` it exists, otherwise constructs a new one. @@ -34,7 +56,7 @@ impl FetchClientConfig { /// The reqwest client fails to construct if the TLS backend cannot be initialized, or the /// resolver cannot load the system configuration. These failures should be treated as /// cached for some amount of time, but ultimately transient (e.g. using - /// [`turbo_tasks::mark_session_dependent`]). + /// `session_dependent`). pub fn try_get_cached_reqwest_client( self: ReadRef, ) -> reqwest::Result { @@ -78,17 +100,66 @@ impl FetchClientConfig { } } +/// Invalidation was caused by a max-age deadline returned by a server +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct HttpTimeout {} + +impl InvalidationReason for HttpTimeout { + fn kind(&self) -> Option> { + Some(StaticOrArc::Static(&HTTP_TIMEOUT_KIND)) + } +} + +impl Display for HttpTimeout { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "http max-age timeout") + } +} + +/// Invalidation kind for [Write] +#[derive(PartialEq, Eq, Hash)] +struct HttpTimeoutKind; + +static HTTP_TIMEOUT_KIND: HttpTimeoutKind = HttpTimeoutKind; + +impl InvalidationReasonKind for HttpTimeoutKind { + fn fmt( + &self, + reasons: &FxIndexSet>, + f: &mut Formatter<'_>, + ) -> std::fmt::Result { + write!(f, "{} fetches timed out", reasons.len(),) + } +} + +/// Internal result from `fetch_inner` that includes the invalidator for TTL-based re-fetching. +#[turbo_tasks::value(shared)] +struct FetchInnerResult { + result: ResolvedVc, + /// Invalidator for the `fetch_inner` task. Used by the outer `fetch` to set up a timer that + /// triggers re-fetching when the Cache-Control max-age expires. + invalidator: Option, + /// Absolute deadline (seconds since UNIX epoch) after which the cached response should be + /// re-fetched. Computed as `now + max-age` at fetch time. An absolute timestamp is used + /// instead of a relative duration so that the remaining TTL is correct on warm cache restore. + deadline_secs: Option, +} + #[turbo_tasks::value_impl] impl FetchClientConfig { + /// Performs the actual HTTP request. This task is `network` but NOT `session_dependent`, so + /// its cached result survives restarts. The outer `fetch` task (which IS `session_dependent`) + /// reads the cached invalidator and sets up a timer for TTL-based re-fetching. #[turbo_tasks::function(network)] - pub async fn fetch( + async fn fetch_inner( self: Vc, url: RcStr, user_agent: Option, - ) -> Result> { + ) -> Result> { let url_ref = &*url; let this = self.await?; - let response_result: reqwest::Result = async move { + let min_cache_control_secs = this.min_cache_control_secs; + let response_result: reqwest::Result<(HttpResponse, Option)> = async move { let reqwest_client = this.try_get_cached_reqwest_client()?; let mut builder = reqwest_client.get(url_ref); @@ -103,6 +174,7 @@ impl FetchClientConfig { .and_then(|r| r.error_for_status())?; let status = response.status().as_u16(); + let max_age = parse_cache_control(response.headers()); let body = { let _span = duration_span!("fetch response", url = url_ref); @@ -110,27 +182,144 @@ impl FetchClientConfig { } .to_vec(); - Ok(HttpResponse { - status, - body: HttpResponseBody(body).resolved_cell(), - }) + Ok(( + HttpResponse { + status, + body: HttpResponseBody(body).resolved_cell(), + }, + max_age, + )) } .await; match response_result { - Ok(resp) => Ok(Vc::cell(Ok(resp.resolved_cell()))), + Ok((resp, max_age_secs)) => { + if let Some(max_age_secs) = max_age_secs { + let max_age_secs = max(max_age_secs, min_cache_control_secs); + let deadline_secs = { + // Transform the relative offset to an absolute deadline so it can be + // cached. + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("system clock is before UNIX epoch") + .as_secs(); + now + max_age_secs + }; + let invalidator = turbo_tasks::get_invalidator(); + Ok(FetchInnerResult { + result: ResolvedVc::cell(Ok(resp.resolved_cell())), + invalidator, + deadline_secs: Some(deadline_secs), + } + .cell()) + } else { + Completion::session_dependent().await?; + Ok(FetchInnerResult { + result: ResolvedVc::cell(Ok(resp.resolved_cell())), + invalidator: None, + deadline_secs: None, + } + .cell()) + } + } Err(err) => { - // the client failed to construct or the HTTP request failed - // Mark session dependent so we get retried in the next sessions - // In dev our caller will keep going, but in prod builds this will fail the build - // anyway. + // Read session_dependent_completion so that this task is re-dirtied on session + // restore. This ensures transient errors (network down, DNS failure) are retried + // on the next session without a timer or busy-loop. Completion::session_dependent().as_side_effect().await?; - Ok(Vc::cell(Err( - FetchError::from_reqwest_error(&err, &url).resolved_cell() - ))) + Ok(FetchInnerResult { + result: ResolvedVc::cell(Err( + FetchError::from_reqwest_error(&err, &url).resolved_cell() + )), + + invalidator: None, + deadline_secs: None, + } + .cell()) + } + } + } + + /// Fetches the given URL and returns the response. Results are cached across sessions using + /// TTL from the response's `Cache-Control: max-age` header. + /// + /// This is the outer task in a two-task pattern: + /// - `fetch` (session_dependent): always re-executes on restore, reads the cached inner result, + /// and spawns a timer for mid-session TTL expiry. + /// - `fetch_inner` (network, NOT session_dependent): performs the actual HTTP request and stays + /// cached across restarts. Returns an `Invalidator` that the outer task uses to trigger + /// re-fetching when the TTL expires. + #[turbo_tasks::function(network, session_dependent)] + pub async fn fetch( + self: Vc, + url: RcStr, + user_agent: Option, + ) -> Result> { + let FetchInnerResult { + result, + deadline_secs, + invalidator, + } = *self.fetch_inner(url, user_agent).await?; + + // Set up a timer to invalidate fetch_inner when the TTL expires. + // On warm cache restore, this re-executes (session_dependent), reads the persisted + // deadline from fetch_inner's cached result, and starts a timer for the remaining time. + // + // Skip when dependency tracking is disabled (e.g. one-shot `next build`) since + // invalidation panics without dependency tracking and the timer would be wasted work. + if turbo_tasks::turbo_tasks().is_tracking_dependencies() + && let (Some(deadline_secs), Some(invalidator)) = (deadline_secs, invalidator) + { + // transform absolute deadline back to a relative duration for the sleep call + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("system clock is before UNIX epoch") + .as_secs(); + let remaining = Duration::from_secs(deadline_secs.saturating_sub(now)); + // NOTE: in the case where the deadline is expired on session start this timeout will + // immediately invalidate and race with us returning. This is basically fine since in + // the most common case the actual fetch result is identical so this gives us a kind of + // 'stale while revalidate' feature. + // alternatively we could synchronously invalidate and re-execute `fetch-inner` but that + // simply adds latency in the common case where our fetch is identical. + // NOTE(2): if for some reason `fetch` is re-executed but `fetch-inner` isn't we could + // end up with multiple timers. Currently there is no known case where this could + // happen, if it somehow does we could end up with redundant invalidations and + // re-fetches. The solution is to detect this with a mutable hash map on + // FetchClientConfig to track outstanding timers and cancel them. + turbo_tasks::spawn(async move { + tokio::time::sleep(remaining).await; + invalidator.invalidate_with_reason(&*turbo_tasks::turbo_tasks(), HttpTimeout {}); + }); + } + + Ok(*result) + } +} + +/// Parses the `max-age` directive from a `Cache-Control` header value. +/// Returns the max-age in seconds, or `None` if not present or unparseable. +/// None means we shouldn't cache longer than the current session +fn parse_cache_control(headers: &reqwest::header::HeaderMap) -> Option { + let value = headers.get(reqwest::header::CACHE_CONTROL)?.to_str().ok()?; + let mut max_age = None; + for directive in value.split(',') { + let (key, val) = { + if let Some(index) = directive.find('=') { + (directive[0..index].trim(), Some(&directive[index + 1..])) + } else { + (directive.trim(), None) } + }; + if key.eq_ignore_ascii_case("max-age") + && let Some(val) = val + { + max_age = val.trim().parse().ok(); + } else if key.eq_ignore_ascii_case("no-cache") || key.eq_ignore_ascii_case("no-store") { + return None; } } + max_age } #[doc(hidden)] @@ -142,3 +331,35 @@ pub fn __test_only_reqwest_client_cache_clear() { pub fn __test_only_reqwest_client_cache_len() -> usize { CLIENT_CACHE.len() } + +#[cfg(test)] +mod tests { + use reqwest::header::{CACHE_CONTROL, HeaderMap, HeaderValue}; + + use super::parse_cache_control; + + fn headers(value: &str) -> HeaderMap { + let mut h = HeaderMap::new(); + h.insert(CACHE_CONTROL, HeaderValue::from_str(value).unwrap()); + h + } + + #[test] + fn max_age() { + assert_eq!(parse_cache_control(&headers("max-age=300")), Some(300)); + assert_eq!(parse_cache_control(&headers("MAX-AGE = 300")), Some(300)); + assert_eq!( + parse_cache_control(&headers("public, max-age=3600, must-revalidate")), + Some(3600) + ); + } + + #[test] + fn no_cache_headers() { + assert_eq!(parse_cache_control(&headers("NO-CACHE")), None); + assert_eq!(parse_cache_control(&headers("no-cache")), None); + assert_eq!(parse_cache_control(&headers("no-store")), None); + assert_eq!(parse_cache_control(&headers("max-age=300, no-store")), None); + assert_eq!(parse_cache_control(&HeaderMap::new()), None); + } +} diff --git a/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs b/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs index fffbe4c0a3721..43f67b6706023 100644 --- a/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs +++ b/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use anyhow::Result; use tokio::sync::Mutex as TokioMutex; use turbo_rcstr::{RcStr, rcstr}; -use turbo_tasks::{ReadRef, Vc}; +use turbo_tasks::{ReadRef, TurboTasksApi, Vc}; use turbo_tasks_fetch::{ __test_only_reqwest_client_cache_clear, __test_only_reqwest_client_cache_len, FetchClientConfig, FetchErrorKind, FetchIssue, @@ -303,6 +303,235 @@ async fn errors_on_404() { .unwrap() } +/// Helper: create a TT instance for fetch tests. When `initial` is true, clears the cache +/// directory first (cold start). When false, reuses existing cache (warm restore). +fn create_fetch_tt(name: &str, initial: bool) -> Arc { + REGISTRATION.create_turbo_tasks(name, initial) +} + +#[turbo_tasks::function(operation)] +async fn fetch_body(url: RcStr) -> Result> { + let client_vc = FetchClientConfig { + min_cache_control_secs: 0, + } + .cell(); + let response = &*client_vc + .fetch(url, /* user_agent */ None) + .await? + .unwrap() + .await?; + Ok(response.body.to_string()) +} + +/// Test that the TTL timer invalidates `fetch_inner` within a session. +/// +/// 1. Server returns body "v1" with `max-age=1` +/// 2. First fetch returns "v1" +/// 3. Server changes to return "v2" +/// 4. Wait 2s for TTL to expire (timer fires, invalidates fetch_inner) +/// 5. Strongly consistent read returns "v2" +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn ttl_invalidates_within_session() { + let _guard = GLOBAL_TEST_LOCK.lock().await; + let mut server = mockito::Server::new_async().await; + let url = RcStr::from(format!("{}/ttl-within", server.url())); + + server + .mock("GET", "/ttl-within") + .with_body("v1") + .with_header("Cache-Control", "max-age=1") + .create_async() + .await; + + let tt = create_fetch_tt("ttl_invalidates_within_session", true); + let body = turbo_tasks::run_once(tt.clone(), { + let url = url.clone(); + async move { + let body = fetch_body(url).read_strongly_consistent().await?; + Ok((*body).clone()) + } + }) + .await + .unwrap(); + assert_eq!(&*body, "v1"); + + // Change the server response + server.reset(); + server + .mock("GET", "/ttl-within") + .with_body("v2") + .with_header("Cache-Control", "max-age=1") + .create_async() + .await; + + // Wait for the TTL timer to fire (max-age=1, so wait 2s to be safe) + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + // The timer should have invalidated fetch_inner, so a new strongly consistent read + // should re-fetch and return the updated body. + let body = turbo_tasks::run_once(tt.clone(), { + let url = url.clone(); + async move { + let body = fetch_body(url).read_strongly_consistent().await?; + Ok((*body).clone()) + } + }) + .await + .unwrap(); + assert_eq!(&*body, "v2"); + + tt.stop_and_wait().await; +} + +/// Test that after a session restore, an expired TTL causes a re-fetch. +/// +/// 1. Server returns "v1" with `max-age=1` +/// 2. Fetch, stop TT +/// 3. Wait for TTL to expire +/// 4. Create new TT (warm restore), server now returns "v2" +/// 5. Fetch should return "v2" (deadline expired, timer fires immediately on restore) +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn ttl_invalidates_on_session_restore() { + let _guard = GLOBAL_TEST_LOCK.lock().await; + let mut server = mockito::Server::new_async().await; + let url = RcStr::from(format!("{}/ttl-restore", server.url())); + + server + .mock("GET", "/ttl-restore") + .with_body("v1") + .with_header("Cache-Control", "max-age=1") + .create_async() + .await; + + // Session 1: fetch and cache + let tt = create_fetch_tt("ttl_invalidates_on_session_restore", true); + let body = turbo_tasks::run_once(tt.clone(), { + let url = url.clone(); + async move { + let body = fetch_body(url).read_strongly_consistent().await?; + Ok((*body).clone()) + } + }) + .await + .unwrap(); + assert_eq!(&*body, "v1"); + tt.stop_and_wait().await; + + // Wait for TTL to expire + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + // Change server response + server.reset(); + server + .mock("GET", "/ttl-restore") + .with_body("v2") + .with_header("Cache-Control", "max-age=1") + .create_async() + .await; + + // Session 2: warm restore — TTL expired, should re-fetch. + // On restore, `fetch` (session_dependent) re-executes and reads the cached `fetch_inner` + // result. The deadline is expired, so it spawns a zero-duration timer. That timer + // invalidates `fetch_inner` asynchronously, which triggers a second round of execution. + // We need to read twice: the first read returns the stale cached value, then wait for the + // timer-triggered re-execution to settle. + let tt = create_fetch_tt("ttl_invalidates_on_session_restore", false); + turbo_tasks::run_once(tt.clone(), { + let url = url.clone(); + async move { + // First read returns the stale cached value, but triggers the timer + let _body = fetch_body(url).read_strongly_consistent().await?; + Ok(()) + } + }) + .await + .unwrap(); + + // Wait for the timer to fire and re-execution to settle + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + let body = turbo_tasks::run_once(tt.clone(), { + let url = url.clone(); + async move { + let body = fetch_body(url).read_strongly_consistent().await?; + Ok((*body).clone()) + } + }) + .await + .unwrap(); + assert_eq!(&*body, "v2"); + tt.stop_and_wait().await; +} + +/// Test that fetch errors are retried on session restore. +/// +/// 1. Server returns connection refused (error) +/// 2. Fetch returns error +/// 3. Stop TT, start new session +/// 4. Server now returns 200 +/// 5. Fetch should succeed (error was session-dependent, retried on restore) +/// +/// TODO: Consider retrying errors within a session with backoff. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn errors_retried_on_session_restore() { + let _guard = GLOBAL_TEST_LOCK.lock().await; + let mut server = mockito::Server::new_async().await; + let url = RcStr::from(format!("{}/error-restore", server.url())); + + // Session 1: server returns 500 + server + .mock("GET", "/error-restore") + .with_status(500) + .create_async() + .await; + + let tt = create_fetch_tt("errors_retried_on_session_restore", true); + let is_err = turbo_tasks::run_once(tt.clone(), { + let url = url.clone(); + async move { + #[turbo_tasks::function(operation)] + async fn fetch_is_err(url: RcStr) -> Result> { + let client_vc = FetchClientConfig::default().cell(); + let result = &*client_vc.fetch(url, None).await?; + Ok(Vc::cell(result.is_err())) + } + let is_err = *fetch_is_err(url).read_strongly_consistent().await?; + Ok(is_err) + } + }) + .await + .unwrap(); + assert!(is_err, "first fetch should be an error"); + tt.stop_and_wait().await; + + // Session 2: server now returns 200 + server.reset(); + server + .mock("GET", "/error-restore") + .with_body("success") + .create_async() + .await; + + let tt = create_fetch_tt("errors_retried_on_session_restore", false); + let is_err = turbo_tasks::run_once(tt.clone(), { + let url = url.clone(); + async move { + #[turbo_tasks::function(operation)] + async fn fetch_is_err2(url: RcStr) -> Result> { + let client_vc = FetchClientConfig::default().cell(); + let result = &*client_vc.fetch(url, None).await?; + Ok(Vc::cell(result.is_err())) + } + let is_err = *fetch_is_err2(url).read_strongly_consistent().await?; + Ok(is_err) + } + }) + .await + .unwrap(); + assert!(!is_err, "second fetch should succeed after session restore"); + tt.stop_and_wait().await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn client_cache() { let mut server = mockito::Server::new_async().await;