-
Notifications
You must be signed in to change notification settings - Fork 137
feat: add Neo4j lineage exporter for dbt manifest metadata #990
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| # Elementary Neo4j Lineage Exporter | ||
|
|
||
| ## Overview | ||
|
|
||
| This utility exports dbt lineage data from Elementary's dbt manifest into Neo4j | ||
| as a property graph. It enables downstream impact analysis, root cause detection, | ||
| and lineage visualization directly in Neo4j. | ||
|
|
||
| ## Motivation | ||
|
|
||
| Elementary already captures rich lineage metadata via dbt artifacts. This exporter | ||
| makes that lineage available in Neo4j, enabling graph traversal queries for: | ||
|
|
||
| - **Impact analysis** — which models are affected if a source schema changes? | ||
| - **Root cause detection** — trace data quality issues upstream | ||
| - **Lineage visualization** — explore your dbt DAG as a graph | ||
|
|
||
| ## Graph Model | ||
|
|
||
| **Nodes** — each dbt model, source, seed, and snapshot becomes a `DbtNode`: | ||
| - `unique_id` — dbt unique identifier (primary key) | ||
| - `name` — model/source name | ||
| - `resource_type` — model, source, seed, snapshot | ||
| - `schema` — target schema in your warehouse | ||
| - `database` — target database | ||
| - `package_name` — dbt package | ||
| - `description` — model documentation | ||
|
|
||
| **Relationships** — `(upstream)-[:FEEDS_INTO]->(downstream)` | ||
|
|
||
| ## Installation | ||
|
|
||
| ```bash | ||
| pip install neo4j | ||
| ``` | ||
|
|
||
| ## Usage | ||
|
|
||
| ```python | ||
| from elementary_neo4j.neo4j_config import Neo4jConfig | ||
| from elementary_neo4j.neo4j_exporter import Neo4jLineageExporter | ||
|
|
||
| config = Neo4jConfig( | ||
| uri="bolt://localhost:7687", | ||
| username="neo4j", | ||
| password="your-password" | ||
| ) | ||
|
|
||
| exporter = Neo4jLineageExporter(config) | ||
| result = exporter.export("path/to/manifest.json") | ||
| print(result) | ||
| # {"nodes_exported": 42, "dependencies_exported": 67} | ||
| exporter.close() | ||
| ``` | ||
|
|
||
| ## Environment Variables | ||
|
|
||
| ```bash | ||
| export NEO4J_URI=bolt://localhost:7687 | ||
| export NEO4J_USERNAME=neo4j | ||
| export NEO4J_PASSWORD=your-password | ||
| export NEO4J_DATABASE=neo4j | ||
| ``` | ||
|
|
||
| Then use: | ||
| ```python | ||
| config = Neo4jConfig.from_env() | ||
| ``` | ||
|
|
||
| ## Example Neo4j Query | ||
|
|
||
| Find all models impacted by a source change: | ||
| ```cypher | ||
| MATCH (source:DbtNode {unique_id: "source.my_project.raw_customers"})-[:FEEDS_INTO*]->(impacted) | ||
| RETURN impacted.unique_id, impacted.name, impacted.resource_type | ||
| ``` | ||
|
|
||
| ## Running Tests | ||
|
|
||
| ```bash | ||
| PYTHONPATH=. python -m pytest tests/test_neo4j_exporter.py -v --ignore=integration_tests | ||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| from dataclasses import dataclass | ||
| from typing import Optional | ||
|
|
||
|
|
||
| @dataclass | ||
| class Neo4jConfig: | ||
| uri: str | ||
| username: str | ||
| password: str | ||
| database: Optional[str] = "neo4j" | ||
|
|
||
| @classmethod | ||
| def from_env(cls) -> "Neo4jConfig": | ||
| import os | ||
| return cls( | ||
| uri=os.environ.get("NEO4J_URI", "bolt://localhost:7687"), | ||
| username=os.environ.get("NEO4J_USERNAME", "neo4j"), | ||
| password=os.environ.get("NEO4J_PASSWORD", ""), | ||
| database=os.environ.get("NEO4J_DATABASE", "neo4j"), | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,130 @@ | ||
| import json | ||
| import logging | ||
| from pathlib import Path | ||
| from typing import Any, Dict, List | ||
|
|
||
| from neo4j import GraphDatabase | ||
|
|
||
| from elementary_neo4j.neo4j_config import Neo4jConfig | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class Neo4jLineageExporter: | ||
| """ | ||
| Exports dbt lineage from Elementary's dbt manifest into Neo4j. | ||
| Creates nodes for models, sources, seeds, and snapshots, and | ||
| relationships for dependencies between them. | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| """ | ||
|
|
||
| def __init__(self, config: Neo4jConfig): | ||
| self.config = config | ||
| self.driver = GraphDatabase.driver( | ||
| config.uri, | ||
| auth=(config.username, config.password) | ||
| ) | ||
|
|
||
| def close(self): | ||
| self.driver.close() | ||
|
|
||
| def load_manifest(self, manifest_path: str) -> Dict[str, Any]: | ||
| """Load dbt manifest.json from file path.""" | ||
| path = Path(manifest_path) | ||
| if not path.exists(): | ||
| raise FileNotFoundError(f"Manifest not found: {manifest_path}") | ||
| with open(path, "r") as f: | ||
| return json.load(f) | ||
|
|
||
| def extract_nodes(self, manifest: Dict[str, Any]) -> List[Dict]: | ||
| """Extract model and source nodes from manifest.""" | ||
| nodes = [] | ||
| for unique_id, node in manifest.get("nodes", {}).items(): | ||
| if node.get("resource_type") in ("model", "seed", "snapshot"): | ||
| nodes.append({ | ||
| "unique_id": unique_id, | ||
| "name": node.get("name"), | ||
| "resource_type": node.get("resource_type"), | ||
| "schema": node.get("schema"), | ||
| "database": node.get("database"), | ||
| "package_name": node.get("package_name"), | ||
| "description": node.get("description", ""), | ||
| }) | ||
| for unique_id, source in manifest.get("sources", {}).items(): | ||
| nodes.append({ | ||
| "unique_id": unique_id, | ||
| "name": source.get("name"), | ||
| "resource_type": "source", | ||
| "schema": source.get("schema"), | ||
| "database": source.get("database"), | ||
| "package_name": source.get("package_name"), | ||
| "description": source.get("description", ""), | ||
| }) | ||
| return nodes | ||
|
|
||
| def extract_dependencies(self, manifest: Dict[str, Any]) -> List[Dict]: | ||
| """Extract upstream dependencies between exported nodes only.""" | ||
| exported_ids = { | ||
| unique_id | ||
| for unique_id, node in manifest.get("nodes", {}).items() | ||
| if node.get("resource_type") in ("model", "seed", "snapshot") | ||
| } | ||
| exported_ids.update(manifest.get("sources", {}).keys()) | ||
|
|
||
| dependencies = [] | ||
| for unique_id, node in manifest.get("nodes", {}).items(): | ||
| if unique_id not in exported_ids: | ||
| continue | ||
| for upstream_id in node.get("depends_on", {}).get("nodes", []): | ||
| if upstream_id not in exported_ids: | ||
| continue | ||
| dependencies.append({ | ||
| "from_id": upstream_id, | ||
| "to_id": unique_id, | ||
| }) | ||
| return dependencies | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
| def export_nodes(self, nodes: List[Dict]): | ||
| """Write nodes to Neo4j.""" | ||
| with self.driver.session(database=self.config.database) as session: | ||
| for node in nodes: | ||
| session.run( | ||
| """ | ||
| MERGE (n:DbtNode {unique_id: $unique_id}) | ||
| SET n.name = $name, | ||
| n.resource_type = $resource_type, | ||
| n.schema = $schema, | ||
| n.database = $database, | ||
| n.package_name = $package_name, | ||
| n.description = $description | ||
| """, | ||
| **node | ||
| ) | ||
| logger.info(f"Exported {len(nodes)} nodes to Neo4j") | ||
|
|
||
| def export_dependencies(self, dependencies: List[Dict]): | ||
| """Write dependency relationships to Neo4j.""" | ||
| with self.driver.session(database=self.config.database) as session: | ||
| for dep in dependencies: | ||
| session.run( | ||
| """ | ||
| MATCH (a:DbtNode {unique_id: $from_id}) | ||
| MATCH (b:DbtNode {unique_id: $to_id}) | ||
| MERGE (a)-[:FEEDS_INTO]->(b) | ||
| """, | ||
| **dep | ||
| ) | ||
| logger.info(f"Exported {len(dependencies)} dependencies to Neo4j") | ||
|
|
||
| def export(self, manifest_path: str): | ||
| """Full export pipeline — nodes + dependencies.""" | ||
| logger.info(f"Loading manifest from {manifest_path}") | ||
| manifest = self.load_manifest(manifest_path) | ||
| nodes = self.extract_nodes(manifest) | ||
| dependencies = self.extract_dependencies(manifest) | ||
| self.export_nodes(nodes) | ||
| self.export_dependencies(dependencies) | ||
| logger.info("Neo4j lineage export complete") | ||
| return { | ||
| "nodes_exported": len(nodes), | ||
| "dependencies_exported": len(dependencies) | ||
| } | ||
|
Comment on lines
+118
to
+130
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle stale graph state on repeated exports. The export only |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| neo4j>=5.0,<7.0 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,127 @@ | ||
| import json | ||
| import pytest | ||
| from unittest.mock import MagicMock, patch, mock_open | ||
| from elementary_neo4j.neo4j_exporter import Neo4jLineageExporter | ||
| from elementary_neo4j.neo4j_config import Neo4jConfig | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def config(): | ||
| return Neo4jConfig( | ||
| uri="bolt://localhost:7687", | ||
| username="neo4j", | ||
| password="test", | ||
| database="neo4j" | ||
| ) | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def exporter(config): | ||
| with patch("elementary_neo4j.neo4j_exporter.GraphDatabase.driver"): | ||
| return Neo4jLineageExporter(config) | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def sample_manifest(): | ||
| return { | ||
| "nodes": { | ||
| "model.my_project.dim_customers": { | ||
| "name": "dim_customers", | ||
| "resource_type": "model", | ||
| "schema": "analytics", | ||
| "database": "snowflake_db", | ||
| "package_name": "my_project", | ||
| "description": "Customer dimension table", | ||
| "depends_on": { | ||
| "nodes": ["source.my_project.raw_customers"] | ||
| } | ||
| }, | ||
| "model.my_project.fct_orders": { | ||
| "name": "fct_orders", | ||
| "resource_type": "model", | ||
| "schema": "analytics", | ||
| "database": "snowflake_db", | ||
| "package_name": "my_project", | ||
| "description": "Orders fact table", | ||
| "depends_on": { | ||
| "nodes": ["model.my_project.dim_customers"] | ||
| } | ||
| } | ||
| }, | ||
| "sources": { | ||
| "source.my_project.raw_customers": { | ||
| "name": "raw_customers", | ||
| "resource_type": "source", | ||
| "schema": "raw", | ||
| "database": "snowflake_db", | ||
| "package_name": "my_project", | ||
| "description": "Raw customers source" | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| def test_extract_nodes_returns_models_and_sources(exporter, sample_manifest): | ||
| nodes = exporter.extract_nodes(sample_manifest) | ||
| assert len(nodes) == 3 | ||
| resource_types = [n["resource_type"] for n in nodes] | ||
| assert "model" in resource_types | ||
| assert "source" in resource_types | ||
|
|
||
|
|
||
| def test_extract_nodes_contains_correct_fields(exporter, sample_manifest): | ||
| nodes = exporter.extract_nodes(sample_manifest) | ||
| model_node = next(n for n in nodes if n["name"] == "dim_customers") | ||
| assert model_node["schema"] == "analytics" | ||
| assert model_node["database"] == "snowflake_db" | ||
| assert model_node["description"] == "Customer dimension table" | ||
|
|
||
|
|
||
| def test_extract_dependencies_correct_count(exporter, sample_manifest): | ||
| dependencies = exporter.extract_dependencies(sample_manifest) | ||
| assert len(dependencies) == 2 | ||
|
|
||
|
|
||
| def test_extract_dependencies_correct_direction(exporter, sample_manifest): | ||
| dependencies = exporter.extract_dependencies(sample_manifest) | ||
| dep = next( | ||
| d for d in dependencies | ||
| if d["to_id"] == "model.my_project.dim_customers" | ||
| ) | ||
| assert dep["from_id"] == "source.my_project.raw_customers" | ||
|
|
||
|
|
||
| def test_load_manifest_file_not_found(exporter): | ||
| with pytest.raises(FileNotFoundError): | ||
| exporter.load_manifest("nonexistent/path/manifest.json") | ||
|
|
||
|
|
||
| def test_load_manifest_reads_correctly(exporter, sample_manifest): | ||
| mock_data = json.dumps(sample_manifest) | ||
| with patch("builtins.open", mock_open(read_data=mock_data)): | ||
| with patch("pathlib.Path.exists", return_value=True): | ||
| result = exporter.load_manifest("fake/manifest.json") | ||
| assert "nodes" in result | ||
| assert "sources" in result | ||
|
|
||
|
|
||
| def test_export_nodes_calls_session(exporter, sample_manifest): | ||
| nodes = exporter.extract_nodes(sample_manifest) | ||
| mock_session = MagicMock() | ||
| exporter.driver.session.return_value.__enter__ = MagicMock( | ||
| return_value=mock_session | ||
| ) | ||
| exporter.driver.session.return_value.__exit__ = MagicMock( | ||
| return_value=False | ||
| ) | ||
| exporter.export_nodes(nodes) | ||
| assert mock_session.run.call_count == len(nodes) | ||
|
|
||
|
|
||
| def test_export_returns_correct_counts(exporter, sample_manifest): | ||
| with patch.object(exporter, "load_manifest", return_value=sample_manifest): | ||
| with patch.object(exporter, "export_nodes"): | ||
| with patch.object(exporter, "export_dependencies"): | ||
| result = exporter.export("fake/manifest.json") | ||
| assert result["nodes_exported"] == 3 | ||
| assert result["dependencies_exported"] == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: elementary-data/dbt-data-reliability
Length of output: 136
🏁 Script executed:
Repository: elementary-data/dbt-data-reliability
Length of output: 226
🏁 Script executed:
Repository: elementary-data/dbt-data-reliability
Length of output: 3994
🏁 Script executed:
Repository: elementary-data/dbt-data-reliability
Length of output: 563
Add
neo4jto project dependencies.The module imports
neo4jat import time (line 6), but the package is not declared in any dependency metadata. This means environments installing from project metadata will fail to installneo4jautomatically, despite README-only instructions.🤖 Prompt for AI Agents