-
Notifications
You must be signed in to change notification settings - Fork 137
feat: add Neo4j lineage exporter for dbt manifest metadata #990
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| # Elementary Neo4j Lineage Exporter | ||
|
|
||
| ## Overview | ||
|
|
||
| This utility exports dbt lineage data from Elementary's dbt manifest into Neo4j | ||
| as a property graph. It enables downstream impact analysis, root cause detection, | ||
| and lineage visualization directly in Neo4j. | ||
|
|
||
| ## Motivation | ||
|
|
||
| Elementary already captures rich lineage metadata via dbt artifacts. This exporter | ||
| makes that lineage available in Neo4j, enabling graph traversal queries for: | ||
|
|
||
| - **Impact analysis** — which models are affected if a source schema changes? | ||
| - **Root cause detection** — trace data quality issues upstream | ||
| - **Lineage visualization** — explore your dbt DAG as a graph | ||
|
|
||
| ## Graph Model | ||
|
|
||
| **Nodes** — each dbt model, source, seed, and snapshot becomes a `DbtNode`: | ||
| - `unique_id` — dbt unique identifier (primary key) | ||
| - `name` — model/source name | ||
| - `resource_type` — model, source, seed, snapshot | ||
| - `schema` — target schema in your warehouse | ||
| - `database` — target database | ||
| - `package_name` — dbt package | ||
| - `description` — model documentation | ||
|
|
||
| **Relationships** — `(upstream)-[:FEEDS_INTO]->(downstream)` | ||
|
|
||
| ## Installation | ||
|
|
||
| ```bash | ||
| pip install neo4j | ||
| ``` | ||
|
|
||
| ## Usage | ||
|
|
||
| ```python | ||
| from elementary_neo4j.neo4j_config import Neo4jConfig | ||
| from elementary_neo4j.neo4j_exporter import Neo4jLineageExporter | ||
|
|
||
| config = Neo4jConfig( | ||
| uri="bolt://localhost:7687", | ||
| username="neo4j", | ||
| password="your-password" | ||
| ) | ||
|
|
||
| exporter = Neo4jLineageExporter(config) | ||
| result = exporter.export("path/to/manifest.json") | ||
| print(result) | ||
| # {"nodes_exported": 42, "dependencies_exported": 67} | ||
| exporter.close() | ||
| ``` | ||
|
|
||
| ## Environment Variables | ||
|
|
||
| ```bash | ||
| export NEO4J_URI=bolt://localhost:7687 | ||
| export NEO4J_USERNAME=neo4j | ||
| export NEO4J_PASSWORD=your-password | ||
| export NEO4J_DATABASE=neo4j | ||
| ``` | ||
|
|
||
| Then use: | ||
| ```python | ||
| config = Neo4jConfig.from_env() | ||
| ``` | ||
|
|
||
| ## Example Neo4j Query | ||
|
|
||
| Find all models impacted by a source change: | ||
| ```cypher | ||
| MATCH (source:DbtNode {name: "raw_customers"})-[:FEEDS_INTO*]->(impacted) | ||
| RETURN impacted.name, impacted.resource_type | ||
| ``` | ||
|
|
||
| ## Running Tests | ||
|
|
||
| ```bash | ||
| PYTHONPATH=. python -m pytest tests/test_neo4j_exporter.py -v --ignore=integration_tests | ||
| ``` | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| from dataclasses import dataclass | ||
| from typing import Optional | ||
|
|
||
|
|
||
| @dataclass | ||
| class Neo4jConfig: | ||
| uri: str | ||
| username: str | ||
| password: str | ||
| database: Optional[str] = "neo4j" | ||
|
|
||
| @classmethod | ||
| def from_env(cls) -> "Neo4jConfig": | ||
| import os | ||
| return cls( | ||
| uri=os.environ.get("NEO4J_URI", "bolt://localhost:7687"), | ||
| username=os.environ.get("NEO4J_USERNAME", "neo4j"), | ||
| password=os.environ.get("NEO4J_PASSWORD", ""), | ||
| database=os.environ.get("NEO4J_DATABASE", "neo4j"), | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| import json | ||
| import logging | ||
| from pathlib import Path | ||
| from typing import Any, Dict, List, Optional | ||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||
|
|
||
| from neo4j import GraphDatabase | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Description: Verify that the Neo4j Python driver is declared in dependency metadata.
set -euo pipefail
mapfile -t metadata_files < <(
fd -HI '^(pyproject\.toml|setup\.py|setup\.cfg|requirements.*|Pipfile|poetry\.lock)$'
)
printf 'Dependency metadata files found:\n'
printf ' - %s\n' "${metadata_files[@]:-}"
if ((${`#metadata_files`[@]})); then
rg -n -i -C2 '\bneo4j\b' "${metadata_files[@]}" || true
fiRepository: elementary-data/dbt-data-reliability Length of output: 136 🏁 Script executed: #!/bin/bash
# Find all dependency metadata files
fd -HI '^(pyproject\.toml|setup\.py|setup\.cfg|requirements.*|Pipfile|poetry\.lock)$' > /tmp/metadata_files.txt
echo "Dependency metadata files found:"
cat /tmp/metadata_files.txt || echo "(none)"
echo ""
echo "Searching for 'neo4j' in dependency files:"
if [ -s /tmp/metadata_files.txt ]; then
cat /tmp/metadata_files.txt | xargs rg -n -i -C2 '\bneo4j\b' || echo "No neo4j found in dependency files"
fiRepository: elementary-data/dbt-data-reliability Length of output: 226 🏁 Script executed: # Check root level for common Python project files
echo "=== Files in root directory ==="
ls -la | grep -E '(pyproject|setup|requirements|poetry|Pipfile)' || echo "No common files at root"
echo ""
echo "=== All occurrences of 'neo4j' in the repository ==="
rg -i -n 'neo4j' --max-count=50
echo ""
echo "=== Content of integration_tests/requirements.txt ==="
cat integration_tests/requirements.txt
echo ""
echo "=== Search for .txt files with dependencies ==="
fd -i 'requirements.*\.txt$' -x cat -n {}Repository: elementary-data/dbt-data-reliability Length of output: 3994 🏁 Script executed: # Check for any Python project metadata files with different search
echo "=== All Python config files in repo ==="
find . -maxdepth 2 -type f \( -name "pyproject.toml" -o -name "setup.py" -o -name "setup.cfg" -o -name "MANIFEST.in" \) -not -path "./.*"
echo ""
echo "=== Check if elementary_neo4j is a package with metadata ==="
ls -la elementary_neo4j/
echo ""
echo "=== Check for any __init__.py that might have version/dependencies ==="
cat elementary_neo4j/__init__.py 2>/dev/null || echo "No __init__.py found"Repository: elementary-data/dbt-data-reliability Length of output: 563 Add The module imports 🤖 Prompt for AI Agents |
||
|
|
||
| from elementary_neo4j.neo4j_config import Neo4jConfig | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class Neo4jLineageExporter: | ||
| """ | ||
| Exports dbt lineage from Elementary's dbt manifest into Neo4j. | ||
| Creates nodes for models, sources, and columns, and | ||
| relationships for dependencies between them. | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| """ | ||
|
|
||
| def __init__(self, config: Neo4jConfig): | ||
| self.config = config | ||
| self.driver = GraphDatabase.driver( | ||
| config.uri, | ||
| auth=(config.username, config.password) | ||
| ) | ||
|
|
||
| def close(self): | ||
| self.driver.close() | ||
|
|
||
| def load_manifest(self, manifest_path: str) -> Dict[str, Any]: | ||
| """Load dbt manifest.json from file path.""" | ||
| path = Path(manifest_path) | ||
| if not path.exists(): | ||
| raise FileNotFoundError(f"Manifest not found: {manifest_path}") | ||
| with open(path, "r") as f: | ||
| return json.load(f) | ||
|
|
||
| def extract_nodes(self, manifest: Dict[str, Any]) -> List[Dict]: | ||
| """Extract model and source nodes from manifest.""" | ||
| nodes = [] | ||
| for unique_id, node in manifest.get("nodes", {}).items(): | ||
| if node.get("resource_type") in ("model", "seed", "snapshot"): | ||
| nodes.append({ | ||
| "unique_id": unique_id, | ||
| "name": node.get("name"), | ||
| "resource_type": node.get("resource_type"), | ||
| "schema": node.get("schema"), | ||
| "database": node.get("database"), | ||
| "package_name": node.get("package_name"), | ||
| "description": node.get("description", ""), | ||
| }) | ||
| for unique_id, source in manifest.get("sources", {}).items(): | ||
| nodes.append({ | ||
| "unique_id": unique_id, | ||
| "name": source.get("name"), | ||
| "resource_type": "source", | ||
| "schema": source.get("schema"), | ||
| "database": source.get("database"), | ||
| "package_name": source.get("package_name"), | ||
| "description": source.get("description", ""), | ||
| }) | ||
| return nodes | ||
|
|
||
| def extract_dependencies(self, manifest: Dict[str, Any]) -> List[Dict]: | ||
| """Extract upstream dependencies between nodes.""" | ||
| dependencies = [] | ||
| for unique_id, node in manifest.get("nodes", {}).items(): | ||
| for upstream_id in node.get("depends_on", {}).get("nodes", []): | ||
| dependencies.append({ | ||
| "from_id": upstream_id, | ||
| "to_id": unique_id, | ||
| }) | ||
| return dependencies | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
| def export_nodes(self, nodes: List[Dict]): | ||
| """Write nodes to Neo4j.""" | ||
| with self.driver.session(database=self.config.database) as session: | ||
| for node in nodes: | ||
| session.run( | ||
| """ | ||
| MERGE (n:DbtNode {unique_id: $unique_id}) | ||
| SET n.name = $name, | ||
| n.resource_type = $resource_type, | ||
| n.schema = $schema, | ||
| n.database = $database, | ||
| n.package_name = $package_name, | ||
| n.description = $description | ||
| """, | ||
| **node | ||
| ) | ||
| logger.info(f"Exported {len(nodes)} nodes to Neo4j") | ||
|
|
||
| def export_dependencies(self, dependencies: List[Dict]): | ||
| """Write dependency relationships to Neo4j.""" | ||
| with self.driver.session(database=self.config.database) as session: | ||
| for dep in dependencies: | ||
| session.run( | ||
| """ | ||
| MATCH (a:DbtNode {unique_id: $from_id}) | ||
| MATCH (b:DbtNode {unique_id: $to_id}) | ||
| MERGE (a)-[:FEEDS_INTO]->(b) | ||
| """, | ||
| **dep | ||
| ) | ||
| logger.info(f"Exported {len(dependencies)} dependencies to Neo4j") | ||
|
|
||
| def export(self, manifest_path: str): | ||
| """Full export pipeline — nodes + dependencies.""" | ||
| logger.info(f"Loading manifest from {manifest_path}") | ||
| manifest = self.load_manifest(manifest_path) | ||
| nodes = self.extract_nodes(manifest) | ||
| dependencies = self.extract_dependencies(manifest) | ||
| self.export_nodes(nodes) | ||
| self.export_dependencies(dependencies) | ||
| logger.info("Neo4j lineage export complete") | ||
| return { | ||
| "nodes_exported": len(nodes), | ||
| "dependencies_exported": len(dependencies) | ||
| } | ||
|
Comment on lines
+118
to
+130
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle stale graph state on repeated exports. The export only |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,127 @@ | ||
| import json | ||
| import pytest | ||
| from unittest.mock import MagicMock, patch, mock_open | ||
| from elementary_neo4j.neo4j_exporter import Neo4jLineageExporter | ||
| from elementary_neo4j.neo4j_config import Neo4jConfig | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def config(): | ||
| return Neo4jConfig( | ||
| uri="bolt://localhost:7687", | ||
| username="neo4j", | ||
| password="test", | ||
| database="neo4j" | ||
| ) | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def exporter(config): | ||
| with patch("elementary_neo4j.neo4j_exporter.GraphDatabase.driver"): | ||
| return Neo4jLineageExporter(config) | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def sample_manifest(): | ||
| return { | ||
| "nodes": { | ||
| "model.my_project.dim_customers": { | ||
| "name": "dim_customers", | ||
| "resource_type": "model", | ||
| "schema": "analytics", | ||
| "database": "snowflake_db", | ||
| "package_name": "my_project", | ||
| "description": "Customer dimension table", | ||
| "depends_on": { | ||
| "nodes": ["source.my_project.raw_customers"] | ||
| } | ||
| }, | ||
| "model.my_project.fct_orders": { | ||
| "name": "fct_orders", | ||
| "resource_type": "model", | ||
| "schema": "analytics", | ||
| "database": "snowflake_db", | ||
| "package_name": "my_project", | ||
| "description": "Orders fact table", | ||
| "depends_on": { | ||
| "nodes": ["model.my_project.dim_customers"] | ||
| } | ||
| } | ||
| }, | ||
| "sources": { | ||
| "source.my_project.raw_customers": { | ||
| "name": "raw_customers", | ||
| "resource_type": "source", | ||
| "schema": "raw", | ||
| "database": "snowflake_db", | ||
| "package_name": "my_project", | ||
| "description": "Raw customers source" | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| def test_extract_nodes_returns_models_and_sources(exporter, sample_manifest): | ||
| nodes = exporter.extract_nodes(sample_manifest) | ||
| assert len(nodes) == 3 | ||
| resource_types = [n["resource_type"] for n in nodes] | ||
| assert "model" in resource_types | ||
| assert "source" in resource_types | ||
|
|
||
|
|
||
| def test_extract_nodes_contains_correct_fields(exporter, sample_manifest): | ||
| nodes = exporter.extract_nodes(sample_manifest) | ||
| model_node = next(n for n in nodes if n["name"] == "dim_customers") | ||
| assert model_node["schema"] == "analytics" | ||
| assert model_node["database"] == "snowflake_db" | ||
| assert model_node["description"] == "Customer dimension table" | ||
|
|
||
|
|
||
| def test_extract_dependencies_correct_count(exporter, sample_manifest): | ||
| dependencies = exporter.extract_dependencies(sample_manifest) | ||
| assert len(dependencies) == 2 | ||
|
|
||
|
|
||
| def test_extract_dependencies_correct_direction(exporter, sample_manifest): | ||
| dependencies = exporter.extract_dependencies(sample_manifest) | ||
| dep = next( | ||
| d for d in dependencies | ||
| if d["to_id"] == "model.my_project.dim_customers" | ||
| ) | ||
| assert dep["from_id"] == "source.my_project.raw_customers" | ||
|
|
||
|
|
||
| def test_load_manifest_file_not_found(exporter): | ||
| with pytest.raises(FileNotFoundError): | ||
| exporter.load_manifest("nonexistent/path/manifest.json") | ||
|
|
||
|
|
||
| def test_load_manifest_reads_correctly(exporter, sample_manifest): | ||
| mock_data = json.dumps(sample_manifest) | ||
| with patch("builtins.open", mock_open(read_data=mock_data)): | ||
| with patch("pathlib.Path.exists", return_value=True): | ||
| result = exporter.load_manifest("fake/manifest.json") | ||
| assert "nodes" in result | ||
| assert "sources" in result | ||
|
|
||
|
|
||
| def test_export_nodes_calls_session(exporter, sample_manifest): | ||
| nodes = exporter.extract_nodes(sample_manifest) | ||
| mock_session = MagicMock() | ||
| exporter.driver.session.return_value.__enter__ = MagicMock( | ||
| return_value=mock_session | ||
| ) | ||
| exporter.driver.session.return_value.__exit__ = MagicMock( | ||
| return_value=False | ||
| ) | ||
| exporter.export_nodes(nodes) | ||
| assert mock_session.run.call_count == len(nodes) | ||
|
|
||
|
|
||
| def test_export_returns_correct_counts(exporter, sample_manifest): | ||
| with patch.object(exporter, "load_manifest", return_value=sample_manifest): | ||
| with patch.object(exporter, "export_nodes"): | ||
| with patch.object(exporter, "export_dependencies"): | ||
| result = exporter.export("fake/manifest.json") | ||
| assert result["nodes_exported"] == 3 | ||
| assert result["dependencies_exported"] == 2 |
Uh oh!
There was an error while loading. Please reload this page.