Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/notes/2.32.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ As an aid to developing Pants itself (or plugins!), Pants 2.32 includes two deve

Together these can produce unified C/Rust/Python tracing data suitable for [flamegraphs](https://www.brendangregg.com/flamegraphs.html), <https://www.speedscope.app/>, and other tools.

#### Incremental Dependency Graph Cache

Setting the `PANTS_INCREMENTAL_DEPENDENTS` environment variable persists the forward dependency graph to disk, so that `--changed-dependents=transitive` does not need to resolve dependencies for every target on every run. On subsequent invocations, only targets whose source files have changed (by SHA-256 content hash) have their dependencies re-resolved. In a 53K-target monorepo, this reduces `--changed-dependents=transitive` from ~3.5 minutes to ~43 seconds. The cache file is portable across machines, making it suitable for ephemeral CI agents when shared via S3 or similar.

### Goals

#### Check
Expand Down
153 changes: 141 additions & 12 deletions src/python/pants/backend/project_info/dependents.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
# Copyright 2020 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
import json
import logging
import os
import time
from collections import defaultdict
from collections.abc import Iterable
from dataclasses import dataclass
from enum import Enum

from pants.backend.project_info.incremental_dependents import (
CachedEntry,
compute_source_fingerprint,
get_cache_path,
load_persisted_graph,
save_persisted_graph,
)
from pants.base.build_environment import get_buildroot
from pants.engine.addresses import Address, Addresses
from pants.engine.collection import DeduplicatedCollection
from pants.engine.console import Console
Expand All @@ -23,6 +34,8 @@
from pants.util.logging import LogLevel
from pants.util.ordered_set import FrozenOrderedSet

logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class AddressToDependents:
Expand All @@ -41,21 +54,137 @@ class DependentsOutputFormat(Enum):


@rule(desc="Map all targets to their dependents", level=LogLevel.DEBUG)
async def map_addresses_to_dependents(all_targets: AllUnexpandedTargets) -> AddressToDependents:
dependencies_per_target = await concurrently(
resolve_dependencies(
DependenciesRequest(
tgt.get(Dependencies), should_traverse_deps_predicate=AlwaysTraverseDeps()
),
**implicitly(),
async def map_addresses_to_dependents(
all_targets: AllUnexpandedTargets,
) -> AddressToDependents:
"""Build a reverse dependency map (target -> set of its dependents).

When incremental mode is enabled via the PANTS_INCREMENTAL_DEPENDENTS environment
variable, the forward dependency graph is persisted to disk. On subsequent runs,
only targets whose source files have changed need their dependencies re-resolved,
dramatically reducing wall time for large repos.
"""
if not os.environ.get("PANTS_INCREMENTAL_DEPENDENTS"):
# Original behavior: resolve all dependencies from scratch.
dependencies_per_target = await concurrently(
resolve_dependencies(
DependenciesRequest(
tgt.get(Dependencies),
should_traverse_deps_predicate=AlwaysTraverseDeps(),
),
**implicitly(),
)
for tgt in all_targets
)

address_to_dependents = defaultdict(set)
for tgt, dependencies in zip(all_targets, dependencies_per_target):
for dependency in dependencies:
address_to_dependents[dependency].add(tgt.address)
return AddressToDependents(
FrozenDict(
{
addr: FrozenOrderedSet(dependents)
for addr, dependents in address_to_dependents.items()
}
)
)
for tgt in all_targets

# --- Incremental mode ---
start_time = time.time()
buildroot = get_buildroot()
cache_path = get_cache_path()

# Step 1: Load previous graph
previous = load_persisted_graph(cache_path, buildroot)
logger.warning(
"Incremental dep graph: loaded %d cached entries from %s",
len(previous),
cache_path,
)

# Step 2: Classify targets as cached or changed
changed_targets = []
cached_results: list[tuple[Address, CachedEntry]] = []

for tgt in all_targets:
spec = tgt.address.spec
fingerprint = compute_source_fingerprint(tgt.address, buildroot)

cached_entry = previous.get(spec)
if cached_entry is not None and cached_entry.fingerprint == fingerprint:
cached_results.append((tgt.address, cached_entry))
else:
changed_targets.append(tgt)

cache_hits = len(cached_results)
cache_misses = len(changed_targets)
logger.warning(
"Incremental dep graph: %d cached, %d changed (out of %d total targets)",
cache_hits,
cache_misses,
len(all_targets),
)

# Step 3: Resolve deps only for changed targets
if changed_targets:
fresh_deps_per_target = await concurrently(
resolve_dependencies(
DependenciesRequest(
tgt.get(Dependencies),
should_traverse_deps_predicate=AlwaysTraverseDeps(),
),
**implicitly(),
)
for tgt in changed_targets
)
else:
fresh_deps_per_target = []

# Step 4: Build the reverse dependency map from merged results
address_to_dependents: dict[Address, set[Address]] = defaultdict(set)

# Build a spec → Address lookup from all_targets for resolving cached specs
spec_to_address: dict[str, Address] = {tgt.address.spec: tgt.address for tgt in all_targets}

# Process cached results (deps stored as address spec strings)
for addr, entry in cached_results:
for dep_spec in entry.deps:
dep_addr = spec_to_address.get(dep_spec)
if dep_addr is not None:
address_to_dependents[dep_addr].add(addr)

# Process freshly resolved results
for tgt, deps in zip(changed_targets, fresh_deps_per_target):
for dep_addr in deps:
address_to_dependents[dep_addr].add(tgt.address)

# Step 5: Save the updated forward graph for next run
new_entries: dict[str, CachedEntry] = {}

# Carry forward cached entries
for addr, entry in cached_results:
new_entries[addr.spec] = entry

# Add fresh entries
for tgt, deps in zip(changed_targets, fresh_deps_per_target):
spec = tgt.address.spec
fingerprint = compute_source_fingerprint(tgt.address, buildroot)
new_entries[spec] = CachedEntry(
fingerprint=fingerprint,
deps=tuple(dep.spec for dep in deps),
)

save_persisted_graph(cache_path, buildroot, new_entries)

elapsed = time.time() - start_time
logger.warning(
"Incremental dep graph: completed in %.1fs (%d from cache, %d resolved fresh)",
elapsed,
cache_hits,
cache_misses,
)

address_to_dependents = defaultdict(set)
for tgt, dependencies in zip(all_targets, dependencies_per_target):
for dependency in dependencies:
address_to_dependents[dependency].add(tgt.address)
return AddressToDependents(
FrozenDict(
{
Expand Down
181 changes: 181 additions & 0 deletions src/python/pants/backend/project_info/incremental_dependents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# Copyright 2024 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

"""Incremental dependency graph updates for faster `--changed-dependents` runs.

Instead of resolving dependencies for ALL targets every time, this module persists
the forward dependency graph to disk and only re-resolves dependencies for targets
whose source files have changed since the last run.
"""

from __future__ import annotations

import hashlib
import json
import logging
import os
from dataclasses import dataclass

from pants.base.build_environment import get_pants_cachedir
from pants.engine.addresses import Address
from pants.option.option_types import BoolOption
from pants.option.subsystem import Subsystem
from pants.util.strutil import help_text

logger = logging.getLogger(__name__)


class IncrementalDependents(Subsystem):
options_scope = "incremental-dependents"
help = help_text(
"""
Persist the forward dependency graph to disk and incrementally update it,
so that `--changed-dependents=transitive` does not need to resolve
dependencies for every target on every run.
"""
)

enabled = BoolOption(
default=False,
help="Enable incremental dependency graph caching. "
"When enabled, the forward dependency graph is persisted to disk and only "
"targets with changed source files have their dependencies re-resolved.",
)


# ---------------------------------------------------------------------------
# Persisted graph helpers
# ---------------------------------------------------------------------------

_CACHE_VERSION = 2 # v2: stores structured address components


@dataclass(frozen=True)
class CachedEntry:
fingerprint: str
# Dependencies stored as address spec strings (e.g. "src/python/foo/bar.py:lib")
deps: tuple[str, ...]


def get_cache_path() -> str:
"""Return the path to the incremental dep graph cache file."""
return os.path.join(get_pants_cachedir(), "incremental_dep_graph_v2.json")


def load_persisted_graph(path: str, buildroot: str) -> dict[str, CachedEntry]:
"""Load the persisted forward dependency graph from disk.

Returns an empty dict if the file doesn't exist or is invalid.
"""
try:
with open(path) as f:
data = json.load(f)
if data.get("version") != _CACHE_VERSION:
logger.debug("Incremental dep graph cache version mismatch, rebuilding.")
return {}
if data.get("buildroot") != buildroot:
logger.debug("Incremental dep graph cache buildroot mismatch, rebuilding.")
return {}
entries: dict[str, CachedEntry] = {}
for addr_spec, entry in data.get("entries", {}).items():
entries[addr_spec] = CachedEntry(
fingerprint=entry["fingerprint"],
deps=tuple(entry["deps"]),
)
return entries
except (FileNotFoundError, json.JSONDecodeError, KeyError, TypeError) as e:
logger.debug("Could not load incremental dep graph cache: %s", e)
return {}


def save_persisted_graph(
path: str,
buildroot: str,
entries: dict[str, CachedEntry],
) -> None:
"""Save the forward dependency graph to disk."""
data = {
"version": _CACHE_VERSION,
"buildroot": buildroot,
"entries": {
addr_spec: {
"fingerprint": entry.fingerprint,
"deps": list(entry.deps),
}
for addr_spec, entry in entries.items()
},
}
os.makedirs(os.path.dirname(path), exist_ok=True)

# Atomic write: write to temp file then rename
tmp_path = path + ".tmp"
try:
with open(tmp_path, "w") as f:
json.dump(data, f, separators=(",", ":"))
os.replace(tmp_path, path)
logger.debug(
"Saved incremental dep graph cache with %d entries to %s",
len(entries),
path,
)
except OSError as e:
logger.warning("Failed to save incremental dep graph cache: %s", e)
try:
os.unlink(tmp_path)
except OSError:
pass


def _sha256_file(path: str) -> str | None:
"""Return the SHA-256 hex digest of a file's contents, or None if unreadable."""
try:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
except OSError:
return None


def compute_source_fingerprint(target_address: Address, buildroot: str) -> str:
"""Compute a content-based fingerprint for a target.

Uses SHA-256 of file contents (not mtime) so the cache is portable across
machines — critical for CI where git clone sets all mtimes to the same value.

The fingerprint includes:
- The BUILD file defining the target
- The specific source file (for generated/file-level targets)
"""
hasher = hashlib.sha256()

# Always include the BUILD file(s) in the fingerprint
spec_path = target_address.spec_path
build_dir = os.path.join(buildroot, spec_path) if spec_path else buildroot

for build_name in ("BUILD", "BUILD.pants"):
build_file = os.path.join(build_dir, build_name)
digest = _sha256_file(build_file)
if digest:
hasher.update(f"BUILD:{build_file}:{digest}".encode())

# For file-addressed targets (e.g. python_source generated from python_sources),
# include the file's own content hash.
if target_address.is_generated_target and target_address.generated_name:
gen_name = target_address.generated_name
candidate = (
os.path.join(buildroot, spec_path, gen_name)
if spec_path
else os.path.join(buildroot, gen_name)
)
digest = _sha256_file(candidate)
if digest:
hasher.update(f"SRC:{candidate}:{digest}".encode())
elif candidate != os.path.join(buildroot, gen_name):
# Also try as a path directly from buildroot
digest = _sha256_file(os.path.join(buildroot, gen_name))
if digest:
hasher.update(f"SRC:{gen_name}:{digest}".encode())

return hasher.hexdigest()
Loading
Loading