Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,10 @@ async def _execute_deployment_plan(self, plan: DeploymentPlan) -> list:
if r.deploy_type == DeploymentResult.Type.LINK
and r.status != DeploymentResult.Status.SKIPPED
}

async def _add_history(event, session):
session.add(event)

with timer.phase("propagate impact") as p:
downstream = await propagate_impact(
session=self.session,
Expand All @@ -1354,6 +1358,8 @@ async def _execute_deployment_plan(self, plan: DeploymentPlan) -> list:
spec.rendered_name for spec in plan.to_delete
),
changed_link_node_names=changed_link_names,
current_user=self.context.current_user,
save_history=_add_history,
)
p.append(f"{len(downstream)} downstream")

Expand Down
23 changes: 23 additions & 0 deletions datajunction-server/datajunction_server/internal/impact.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from sqlalchemy.orm import joinedload, selectinload

from datajunction_server.database.node import Node, NodeRevision, NodeRelationship
from datajunction_server.internal.nodes import bump_cube_versions
from datajunction_server.instrumentation.provider import get_metrics_provider
from datajunction_server.internal.deployment.dimension_reachability import (
DimensionReachability,
Expand Down Expand Up @@ -64,6 +65,8 @@ async def propagate_impact(
changed_node_names: set[str],
deleted_node_names: frozenset[str] = frozenset(),
changed_link_node_names: set[str] | None = None,
current_user=None,
save_history=None,
) -> list[DownstreamImpact]:
"""BFS downstream impact analysis with revalidation.

Expand All @@ -79,6 +82,10 @@ async def propagate_impact(
deleted_node_names: Names of nodes about to be deleted (still in DB at
call time — caller must invoke this before hard_delete_node).
changed_link_node_names: Names of nodes whose dimension links changed.
current_user: If provided (with save_history), bumps the minor version of
every downstream cube so version-gated consumers see the change.
save_history: Callable(event, session) used to record the version-bump
history events. Required when current_user is provided.

Returns:
List of DownstreamImpact describing each affected downstream node.
Expand Down Expand Up @@ -112,6 +119,22 @@ async def propagate_impact(
# Phase 3: revalidate all downstream nodes and apply status changes
results = await _revalidate_and_apply(session, ctx, all_impacts)

# Phase 4: bump the minor version of every downstream cube so that
# version-gated consumers see the change. Cubes whose element list is
# unchanged don't get a new version from the normal diff path, but their
# effective SQL may have changed due to an upstream metric being updated.
# Only bumps cubes downstream of *changed* nodes — deleted-node downstreams
# become invalid and don't need a version bump.
if current_user and save_history and changed_node_names:
cube_names = [r.name for r in results if r.node_type == NodeType.CUBE]
if cube_names:
await bump_cube_versions(
session,
cube_names,
current_user,
save_history,
)

_emit_metrics(start, results)
return results

Expand Down
145 changes: 121 additions & 24 deletions datajunction-server/datajunction_server/internal/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1377,7 +1377,8 @@ async def update_node_with_query(

background_tasks.add_task(
propagate_update_downstream,
node,
node.name, # type: ignore
node.current_version, # type: ignore
current_user=current_user,
save_history=save_history,
cache=cache,
Expand Down Expand Up @@ -1662,7 +1663,8 @@ async def update_cube_node(


async def propagate_update_downstream(
node: Node,
node_name: str,
node_current_version: str,
current_user: User,
save_history: Callable,
cache: Cache | None = None,
Expand All @@ -1674,21 +1676,23 @@ async def propagate_update_downstream(
async with session_context() as session:
await _propagate_update_downstream(
session=session,
node=node,
node_name=node_name,
node_current_version=node_current_version,
current_user=current_user,
save_history=save_history,
cache=cache,
)
except Exception:
_logger.exception(
"Error propagating update of node %s downstream",
node.name,
node_name,
)


async def _propagate_update_downstream(
session: AsyncSession,
node: Node,
node_name: str,
node_current_version: str,
current_user: User,
save_history: Callable,
cache: Cache | None = None,
Expand All @@ -1700,32 +1704,38 @@ async def _propagate_update_downstream(
- altered column types: may invalidate downstream nodes
- new columns: won't affect downstream nodes
"""
_logger.info("Propagating update of node %s downstream", node.name)
downstreams = await get_downstream_nodes(
_logger.info("Propagating update of node %s downstream", node_name)
all_downstreams = await get_downstream_nodes(
session,
node.name,
node_name,
include_deactivated=False,
include_cubes=False,
include_cubes=True,
)
downstreams = topological_sort(downstreams)
non_cube_downstreams = topological_sort(
[n for n in all_downstreams if n.type != NodeType.CUBE],
)
cube_downstreams = [n for n in all_downstreams if n.type == NodeType.CUBE]
# Extract names now — commits in the loop below expire the Node objects,
# and accessing .name on an expired async-session object raises MissingGreenlet.
cube_names = [cube.name for cube in cube_downstreams]
_logger.info(
"Node %s updated — revalidating %s downstreams",
node.name,
len(downstreams),
"Node %s updated — revalidating %s downstreams, bumping %s cubes",
node_name,
len(non_cube_downstreams),
len(cube_downstreams),
)

# The downstreams need to be sorted topologically in order for the updates to be done
# in the right order. Otherwise it is possible for a leaf node like a metric to be updated
# before its upstreams are updated.
for idx, downstream in enumerate(downstreams):
# Revalidate non-cube downstreams in topological order so parents are
# processed before their children.
for idx, downstream in enumerate(non_cube_downstreams):
original_node_revision = downstream.current
previous_status = original_node_revision.status
_logger.info(
"[%s/%s] Revalidating downstream %s due to update of node %s",
idx + 1,
len(downstreams),
len(non_cube_downstreams),
downstream.name,
node.name,
node_name,
)
node_validator = await revalidate_node(
downstream.name,
Expand All @@ -1742,7 +1752,7 @@ async def _propagate_update_downstream(
_logger.info(
"Clearing upstream cache for node %s due to update of node %s (cache key: %s)",
downstream.name,
node.name,
node_name,
upstream_cache_key,
)
cache.delete(upstream_cache_key)
Expand All @@ -1765,11 +1775,11 @@ async def _propagate_update_downstream(
),
},
"upstream": {
"node": node.name,
"version": node.current_version,
"node": node_name,
"version": node_current_version,
},
"reason": f"Caused by update of `{node.name}` to "
f"{node.current_version}",
"reason": f"Caused by update of `{node_name}` to "
f"{node_current_version}",
},
pre={
"status": previous_status,
Expand All @@ -1785,6 +1795,93 @@ async def _propagate_update_downstream(
)
await session.commit()

# A cube's element list doesn't change when an upstream metric's SQL
# changes, so update_cube_node's diff logic won't fire. Create a new
# minor revision so callers that gate on version ID see the change.
await bump_cube_versions(
session,
cube_names,
current_user,
save_history,
upstream_node_name=node_name,
upstream_node_version=node_current_version,
)


async def bump_cube_versions(
session: AsyncSession,
cubes: List[str],
current_user: User,
save_history: Callable,
upstream_node_name: str = "",
upstream_node_version: str = "",
) -> None:
"""
Create a new minor-version revision for each cube name in ``cubes``.

A cube's element list is unchanged when an upstream metric's SQL changes,
so the normal diff-based update path produces no version bump. This function
forces one, ensuring any version-gated downstream consumer sees the change.

Accepts cube names (not Node objects) so callers can extract names before
any session commits that would expire the Node identity-map entries.
"""
for cube_name in cubes:
cube_node = await Node.get_cube_by_name(session, cube_name)
if not cube_node or not cube_node.current:
continue
current_rev = cube_node.current

# Load relationships not covered by get_cube_by_name's default eager-load
# before entering no_autoflush. Lazy-loading inside an async session raises
# MissingGreenlet; explicit refresh is the correct async pattern.
await session.refresh(current_rev, ["parents", "dimension_links"])

with session.no_autoflush:
# Copy the existing revision directly so column partition settings
# (granularity, format, type) are preserved exactly. Using
# create_cube_node_revision would rebuild columns from scratch and
# lose those settings, breaking materialization SQL generation.
new_cube_revision = copy_existing_node_revision(current_rev, current_user)
new_cube_revision.cube_elements = current_rev.cube_elements
new_cube_revision.cube_filters = current_rev.cube_filters
new_cube_revision.materializations = []
old_version = Version.parse(current_rev.version)
new_cube_revision.version = str(old_version.next_minor_version())
new_cube_revision.node = current_rev.node
new_cube_revision.node.current_version = new_cube_revision.version

await save_history(
event=History(
entity_type=EntityType.NODE,
entity_name=new_cube_revision.name,
node=new_cube_revision.name,
activity_type=ActivityType.UPDATE,
details={
"version": new_cube_revision.version,
"upstream": {
"node": upstream_node_name,
"version": upstream_node_version,
},
"reason": f"Caused by update of `{upstream_node_name}` to "
f"{upstream_node_version}",
},
pre={"version": current_rev.version},
post={"version": new_cube_revision.version},
user=current_user.username,
),
session=session,
)

await session.commit()
_logger.info(
"Bumped cube %s from %s to %s due to upstream change of %s",
cube_name,
current_rev.version,
new_cube_revision.version,
upstream_node_name,
)


def copy_existing_node_revision(old_revision: NodeRevision, current_user: User):
"""
Expand Down
67 changes: 67 additions & 0 deletions datajunction-server/tests/api/nodes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5018,6 +5018,73 @@ async def test_propagate_update_downstream(
# Node may be valid or invalid depending on whether removed columns were used
assert data["status"] in ("valid", "invalid")

@pytest.mark.asyncio
async def test_propagate_update_bumps_dependent_cube_version(
self,
client_with_roads: AsyncClient,
session: AsyncSession,
):
"""
When a metric's query changes, every cube that depends on that metric must
get a minor version bump so that version-gated downstream consumers see
the change and regenerate any derived artifacts.
"""
from sqlalchemy.orm import joinedload
from datajunction_server.internal.nodes import _propagate_update_downstream

response = await client_with_roads.post(
"/nodes/cube/",
json={
"name": "default.propagation_cube",
"metrics": ["default.avg_repair_price", "default.num_repair_orders"],
"dimensions": ["default.hard_hat.state"],
"description": "Cube for version-bump propagation test",
"mode": "published",
},
)
assert response.status_code < 400, response.json()

response = await client_with_roads.patch(
"/nodes/default.avg_repair_price/",
json={"query": "SELECT SUM(price) FROM default.repair_order_details"},
)
assert response.status_code == 200

metric = await Node.get_by_name(
session,
"default.avg_repair_price",
options=[joinedload(Node.current)],
)
cube = await Node.get_by_name(
session,
"default.propagation_cube",
options=[joinedload(Node.current)],
)
assert metric is not None
assert cube is not None
version_before = cube.current_version
user = await User.get_by_username(session, "dj")
assert user is not None

async def noop_save_history(event, session):
session.add(event)

# Call propagation directly with the test session so the newly created
# cube (not yet committed to the DB) is visible.
await _propagate_update_downstream(
session=session,
node_name=metric.name,
node_current_version=metric.current_version,
current_user=user,
save_history=noop_save_history,
)

await session.refresh(cube)
assert cube.current_version != version_before, (
f"Cube version should have bumped when upstream metric changed "
f"(was {version_before}, still {cube.current_version})"
)

@pytest.mark.asyncio
async def test_update_dimension_remove_pk_column(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ async def test_propagate_update_downstream_swallows_exceptions(caplog):
logger="datajunction_server.internal.nodes",
):
await propagate_update_downstream(
node=MagicMock(name="test.node"),
"test.node",
"v1.0",
current_user=MagicMock(),
save_history=MagicMock(),
)
Expand Down
Loading