Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 241 additions & 20 deletions turbopack/crates/turbo-tasks-fetch/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
use std::{hash::Hash, sync::LazyLock};
use std::{
cmp::max,
fmt::{Display, Formatter},
hash::Hash,
sync::LazyLock,
time::{Duration, SystemTime},
};

use anyhow::Result;
use quick_cache::sync::Cache;
use turbo_rcstr::RcStr;
use turbo_tasks::{Completion, ReadRef, Vc, duration_span};
use turbo_tasks::{
Completion, FxIndexSet, InvalidationReason, InvalidationReasonKind, Invalidator, ReadRef,
ResolvedVc, Vc, duration_span, util::StaticOrArc,
};

use crate::{FetchError, FetchResult, HttpResponse, HttpResponseBody};

Expand All @@ -19,8 +28,21 @@ static CLIENT_CACHE: LazyLock<Cache<ReadRef<FetchClientConfig>, reqwest::Client>
/// This is needed because [`reqwest::ClientBuilder`] does not implement the required traits. This
/// factory cannot be a closure because closures do not implement `Eq` or `Hash`.
#[turbo_tasks::value(shared)]
#[derive(Hash, Default)]
pub struct FetchClientConfig {}
#[derive(Hash)]
pub struct FetchClientConfig {
/// Minimum cache TTL in seconds. Responses with a `Cache-Control: max-age` shorter than this
/// will be clamped to this value. This prevents pathologically short timeouts from causing an
/// invalidation bomb. Defaults to 1 hour.
pub min_cache_control_secs: u64,
}

impl Default for FetchClientConfig {
fn default() -> Self {
Self {
min_cache_control_secs: 60 * 60,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be a duration

}
}
}

impl FetchClientConfig {
/// Returns a cached instance of `reqwest::Client` it exists, otherwise constructs a new one.
Expand All @@ -34,7 +56,7 @@ impl FetchClientConfig {
/// The reqwest client fails to construct if the TLS backend cannot be initialized, or the
/// resolver cannot load the system configuration. These failures should be treated as
/// cached for some amount of time, but ultimately transient (e.g. using
/// [`turbo_tasks::mark_session_dependent`]).
/// `session_dependent`).
pub fn try_get_cached_reqwest_client(
self: ReadRef<FetchClientConfig>,
) -> reqwest::Result<reqwest::Client> {
Expand Down Expand Up @@ -78,17 +100,66 @@ impl FetchClientConfig {
}
}

/// Invalidation was caused by a max-age deadline returned by a server
#[derive(PartialEq, Eq, Hash)]
pub(crate) struct HttpTimeout {}

impl InvalidationReason for HttpTimeout {
fn kind(&self) -> Option<StaticOrArc<dyn InvalidationReasonKind>> {
Some(StaticOrArc::Static(&HTTP_TIMEOUT_KIND))
}
}

impl Display for HttpTimeout {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "http max-age timeout")
}
}

/// Invalidation kind for [Write]
#[derive(PartialEq, Eq, Hash)]
struct HttpTimeoutKind;

static HTTP_TIMEOUT_KIND: HttpTimeoutKind = HttpTimeoutKind;

impl InvalidationReasonKind for HttpTimeoutKind {
fn fmt(
&self,
reasons: &FxIndexSet<StaticOrArc<dyn InvalidationReason>>,
f: &mut Formatter<'_>,
) -> std::fmt::Result {
write!(f, "{} fetches timed out", reasons.len(),)
}
}

/// Internal result from `fetch_inner` that includes the invalidator for TTL-based re-fetching.
#[turbo_tasks::value(shared)]
struct FetchInnerResult {
result: ResolvedVc<FetchResult>,
/// Invalidator for the `fetch_inner` task. Used by the outer `fetch` to set up a timer that
/// triggers re-fetching when the Cache-Control max-age expires.
invalidator: Option<Invalidator>,
/// Absolute deadline (seconds since UNIX epoch) after which the cached response should be
/// re-fetched. Computed as `now + max-age` at fetch time. An absolute timestamp is used
/// instead of a relative duration so that the remaining TTL is correct on warm cache restore.
deadline_secs: Option<u64>,
}

#[turbo_tasks::value_impl]
impl FetchClientConfig {
/// Performs the actual HTTP request. This task is `network` but NOT `session_dependent`, so
/// its cached result survives restarts. The outer `fetch` task (which IS `session_dependent`)
/// reads the cached invalidator and sets up a timer for TTL-based re-fetching.
#[turbo_tasks::function(network)]
pub async fn fetch(
async fn fetch_inner(
self: Vc<FetchClientConfig>,
url: RcStr,
user_agent: Option<RcStr>,
) -> Result<Vc<FetchResult>> {
) -> Result<Vc<FetchInnerResult>> {
let url_ref = &*url;
let this = self.await?;
let response_result: reqwest::Result<HttpResponse> = async move {
let min_cache_control_secs = this.min_cache_control_secs;
let response_result: reqwest::Result<(HttpResponse, Option<u64>)> = async move {
let reqwest_client = this.try_get_cached_reqwest_client()?;

let mut builder = reqwest_client.get(url_ref);
Expand All @@ -103,34 +174,152 @@ impl FetchClientConfig {
.and_then(|r| r.error_for_status())?;

let status = response.status().as_u16();
let max_age = parse_cache_control(response.headers());

let body = {
let _span = duration_span!("fetch response", url = url_ref);
response.bytes().await?
}
.to_vec();

Ok(HttpResponse {
status,
body: HttpResponseBody(body).resolved_cell(),
})
Ok((
HttpResponse {
status,
body: HttpResponseBody(body).resolved_cell(),
},
max_age,
))
}
.await;

match response_result {
Ok(resp) => Ok(Vc::cell(Ok(resp.resolved_cell()))),
Ok((resp, max_age_secs)) => {
if let Some(max_age_secs) = max_age_secs {
let max_age_secs = max(max_age_secs, min_cache_control_secs);
let deadline_secs = {
// Transform the relative offset to an absolute deadline so it can be
// cached.
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("system clock is before UNIX epoch")
.as_secs();
now + max_age_secs
};
let invalidator = turbo_tasks::get_invalidator();
Ok(FetchInnerResult {
result: ResolvedVc::cell(Ok(resp.resolved_cell())),
invalidator,
deadline_secs: Some(deadline_secs),
}
.cell())
} else {
Completion::session_dependent().await?;
Ok(FetchInnerResult {
result: ResolvedVc::cell(Ok(resp.resolved_cell())),
invalidator: None,
deadline_secs: None,
}
.cell())
}
}
Err(err) => {
// the client failed to construct or the HTTP request failed
// Mark session dependent so we get retried in the next sessions
// In dev our caller will keep going, but in prod builds this will fail the build
// anyway.
// Read session_dependent_completion so that this task is re-dirtied on session
// restore. This ensures transient errors (network down, DNS failure) are retried
// on the next session without a timer or busy-loop.
Completion::session_dependent().as_side_effect().await?;
Ok(Vc::cell(Err(
FetchError::from_reqwest_error(&err, &url).resolved_cell()
)))
Ok(FetchInnerResult {
result: ResolvedVc::cell(Err(
FetchError::from_reqwest_error(&err, &url).resolved_cell()
)),

invalidator: None,
deadline_secs: None,
}
.cell())
}
}
}

/// Fetches the given URL and returns the response. Results are cached across sessions using
/// TTL from the response's `Cache-Control: max-age` header.
///
/// This is the outer task in a two-task pattern:
/// - `fetch` (session_dependent): always re-executes on restore, reads the cached inner result,
/// and spawns a timer for mid-session TTL expiry.
/// - `fetch_inner` (network, NOT session_dependent): performs the actual HTTP request and stays
/// cached across restarts. Returns an `Invalidator` that the outer task uses to trigger
/// re-fetching when the TTL expires.
#[turbo_tasks::function(network, session_dependent)]
pub async fn fetch(
self: Vc<FetchClientConfig>,
url: RcStr,
user_agent: Option<RcStr>,
) -> Result<Vc<FetchResult>> {
let FetchInnerResult {
result,
deadline_secs,
invalidator,
} = *self.fetch_inner(url, user_agent).await?;

// Set up a timer to invalidate fetch_inner when the TTL expires.
// On warm cache restore, this re-executes (session_dependent), reads the persisted
// deadline from fetch_inner's cached result, and starts a timer for the remaining time.
//
// Skip when dependency tracking is disabled (e.g. one-shot `next build`) since
// invalidation panics without dependency tracking and the timer would be wasted work.
if turbo_tasks::turbo_tasks().is_tracking_dependencies()
&& let (Some(deadline_secs), Some(invalidator)) = (deadline_secs, invalidator)
{
// transform absolute deadline back to a relative duration for the sleep call
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("system clock is before UNIX epoch")
.as_secs();
let remaining = Duration::from_secs(deadline_secs.saturating_sub(now));
// NOTE: in the case where the deadline is expired on session start this timeout will
// immediately invalidate and race with us returning. This is basically fine since in
// the most common case the actual fetch result is identical so this gives us a kind of
// 'stale while revalidate' feature.
// alternatively we could synchronously invalidate and re-execute `fetch-inner` but that
// simply adds latency in the common case where our fetch is identical.
// NOTE(2): if for some reason `fetch` is re-executed but `fetch-inner` isn't we could
// end up with multiple timers. Currently there is no known case where this could
// happen, if it somehow does we could end up with redundant invalidations and
// re-fetches. The solution is to detect this with a mutable hash map on
// FetchClientConfig to track outstanding timers and cancel them.
turbo_tasks::spawn(async move {
tokio::time::sleep(remaining).await;
invalidator.invalidate_with_reason(&*turbo_tasks::turbo_tasks(), HttpTimeout {});
});
}

Ok(*result)
}
}

/// Parses the `max-age` directive from a `Cache-Control` header value.
/// Returns the max-age in seconds, or `None` if not present or unparseable.
/// None means we shouldn't cache longer than the current session
fn parse_cache_control(headers: &reqwest::header::HeaderMap) -> Option<u64> {
let value = headers.get(reqwest::header::CACHE_CONTROL)?.to_str().ok()?;
let mut max_age = None;
for directive in value.split(',') {
let (key, val) = {
if let Some(index) = directive.find('=') {
(directive[0..index].trim(), Some(&directive[index + 1..]))
} else {
(directive.trim(), None)
}
};
if key.eq_ignore_ascii_case("max-age")
&& let Some(val) = val
{
max_age = val.trim().parse().ok();
} else if key.eq_ignore_ascii_case("no-cache") || key.eq_ignore_ascii_case("no-store") {
return None;
}
}
max_age
}

#[doc(hidden)]
Expand All @@ -142,3 +331,35 @@ pub fn __test_only_reqwest_client_cache_clear() {
pub fn __test_only_reqwest_client_cache_len() -> usize {
CLIENT_CACHE.len()
}

#[cfg(test)]
mod tests {
use reqwest::header::{CACHE_CONTROL, HeaderMap, HeaderValue};

use super::parse_cache_control;

fn headers(value: &str) -> HeaderMap {
let mut h = HeaderMap::new();
h.insert(CACHE_CONTROL, HeaderValue::from_str(value).unwrap());
h
}

#[test]
fn max_age() {
assert_eq!(parse_cache_control(&headers("max-age=300")), Some(300));
assert_eq!(parse_cache_control(&headers("MAX-AGE = 300")), Some(300));
assert_eq!(
parse_cache_control(&headers("public, max-age=3600, must-revalidate")),
Some(3600)
);
}

#[test]
fn no_cache_headers() {
assert_eq!(parse_cache_control(&headers("NO-CACHE")), None);
assert_eq!(parse_cache_control(&headers("no-cache")), None);
assert_eq!(parse_cache_control(&headers("no-store")), None);
assert_eq!(parse_cache_control(&headers("max-age=300, no-store")), None);
assert_eq!(parse_cache_control(&HeaderMap::new()), None);
}
}
Loading
Loading