diff --git a/README_neo4j.md b/README_neo4j.md new file mode 100644 index 000000000..c39c95095 --- /dev/null +++ b/README_neo4j.md @@ -0,0 +1,82 @@ +# Elementary Neo4j Lineage Exporter + +## Overview + +This utility exports dbt lineage data from Elementary's dbt manifest into Neo4j +as a property graph. It enables downstream impact analysis, root cause detection, +and lineage visualization directly in Neo4j. + +## Motivation + +Elementary already captures rich lineage metadata via dbt artifacts. This exporter +makes that lineage available in Neo4j, enabling graph traversal queries for: + +- **Impact analysis** — which models are affected if a source schema changes? +- **Root cause detection** — trace data quality issues upstream +- **Lineage visualization** — explore your dbt DAG as a graph + +## Graph Model + +**Nodes** — each dbt model, source, seed, and snapshot becomes a `DbtNode`: +- `unique_id` — dbt unique identifier (primary key) +- `name` — model/source name +- `resource_type` — model, source, seed, snapshot +- `schema` — target schema in your warehouse +- `database` — target database +- `package_name` — dbt package +- `description` — model documentation + +**Relationships** — `(upstream)-[:FEEDS_INTO]->(downstream)` + +## Installation + +```bash +pip install neo4j +``` + +## Usage + +```python +from elementary_neo4j.neo4j_config import Neo4jConfig +from elementary_neo4j.neo4j_exporter import Neo4jLineageExporter + +config = Neo4jConfig( + uri="bolt://localhost:7687", + username="neo4j", + password="your-password" +) + +exporter = Neo4jLineageExporter(config) +result = exporter.export("path/to/manifest.json") +print(result) +# {"nodes_exported": 42, "dependencies_exported": 67} +exporter.close() +``` + +## Environment Variables + +```bash +export NEO4J_URI=bolt://localhost:7687 +export NEO4J_USERNAME=neo4j +export NEO4J_PASSWORD=your-password +export NEO4J_DATABASE=neo4j +``` + +Then use: +```python +config = Neo4jConfig.from_env() +``` + +## Example Neo4j Query + +Find all models impacted by a source change: +```cypher +MATCH (source:DbtNode {unique_id: "source.my_project.raw_customers"})-[:FEEDS_INTO*]->(impacted) +RETURN impacted.unique_id, impacted.name, impacted.resource_type +``` + +## Running Tests + +```bash +PYTHONPATH=. python -m pytest tests/test_neo4j_exporter.py -v --ignore=integration_tests +``` \ No newline at end of file diff --git a/elementary_neo4j/__init__.py b/elementary_neo4j/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/elementary_neo4j/neo4j_config.py b/elementary_neo4j/neo4j_config.py new file mode 100644 index 000000000..f7d25dcd3 --- /dev/null +++ b/elementary_neo4j/neo4j_config.py @@ -0,0 +1,20 @@ +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class Neo4jConfig: + uri: str + username: str + password: str + database: Optional[str] = "neo4j" + + @classmethod + def from_env(cls) -> "Neo4jConfig": + import os + return cls( + uri=os.environ.get("NEO4J_URI", "bolt://localhost:7687"), + username=os.environ.get("NEO4J_USERNAME", "neo4j"), + password=os.environ.get("NEO4J_PASSWORD", ""), + database=os.environ.get("NEO4J_DATABASE", "neo4j"), + ) \ No newline at end of file diff --git a/elementary_neo4j/neo4j_exporter.py b/elementary_neo4j/neo4j_exporter.py new file mode 100644 index 000000000..b38afc1c0 --- /dev/null +++ b/elementary_neo4j/neo4j_exporter.py @@ -0,0 +1,130 @@ +import json +import logging +from pathlib import Path +from typing import Any, Dict, List + +from neo4j import GraphDatabase + +from elementary_neo4j.neo4j_config import Neo4jConfig + +logger = logging.getLogger(__name__) + + +class Neo4jLineageExporter: + """ + Exports dbt lineage from Elementary's dbt manifest into Neo4j. + Creates nodes for models, sources, seeds, and snapshots, and + relationships for dependencies between them. + """ + + def __init__(self, config: Neo4jConfig): + self.config = config + self.driver = GraphDatabase.driver( + config.uri, + auth=(config.username, config.password) + ) + + def close(self): + self.driver.close() + + def load_manifest(self, manifest_path: str) -> Dict[str, Any]: + """Load dbt manifest.json from file path.""" + path = Path(manifest_path) + if not path.exists(): + raise FileNotFoundError(f"Manifest not found: {manifest_path}") + with open(path, "r") as f: + return json.load(f) + + def extract_nodes(self, manifest: Dict[str, Any]) -> List[Dict]: + """Extract model and source nodes from manifest.""" + nodes = [] + for unique_id, node in manifest.get("nodes", {}).items(): + if node.get("resource_type") in ("model", "seed", "snapshot"): + nodes.append({ + "unique_id": unique_id, + "name": node.get("name"), + "resource_type": node.get("resource_type"), + "schema": node.get("schema"), + "database": node.get("database"), + "package_name": node.get("package_name"), + "description": node.get("description", ""), + }) + for unique_id, source in manifest.get("sources", {}).items(): + nodes.append({ + "unique_id": unique_id, + "name": source.get("name"), + "resource_type": "source", + "schema": source.get("schema"), + "database": source.get("database"), + "package_name": source.get("package_name"), + "description": source.get("description", ""), + }) + return nodes + + def extract_dependencies(self, manifest: Dict[str, Any]) -> List[Dict]: + """Extract upstream dependencies between exported nodes only.""" + exported_ids = { + unique_id + for unique_id, node in manifest.get("nodes", {}).items() + if node.get("resource_type") in ("model", "seed", "snapshot") + } + exported_ids.update(manifest.get("sources", {}).keys()) + + dependencies = [] + for unique_id, node in manifest.get("nodes", {}).items(): + if unique_id not in exported_ids: + continue + for upstream_id in node.get("depends_on", {}).get("nodes", []): + if upstream_id not in exported_ids: + continue + dependencies.append({ + "from_id": upstream_id, + "to_id": unique_id, + }) + return dependencies + + def export_nodes(self, nodes: List[Dict]): + """Write nodes to Neo4j.""" + with self.driver.session(database=self.config.database) as session: + for node in nodes: + session.run( + """ + MERGE (n:DbtNode {unique_id: $unique_id}) + SET n.name = $name, + n.resource_type = $resource_type, + n.schema = $schema, + n.database = $database, + n.package_name = $package_name, + n.description = $description + """, + **node + ) + logger.info(f"Exported {len(nodes)} nodes to Neo4j") + + def export_dependencies(self, dependencies: List[Dict]): + """Write dependency relationships to Neo4j.""" + with self.driver.session(database=self.config.database) as session: + for dep in dependencies: + session.run( + """ + MATCH (a:DbtNode {unique_id: $from_id}) + MATCH (b:DbtNode {unique_id: $to_id}) + MERGE (a)-[:FEEDS_INTO]->(b) + """, + **dep + ) + logger.info(f"Exported {len(dependencies)} dependencies to Neo4j") + + def export(self, manifest_path: str): + """Full export pipeline — nodes + dependencies.""" + logger.info(f"Loading manifest from {manifest_path}") + manifest = self.load_manifest(manifest_path) + nodes = self.extract_nodes(manifest) + dependencies = self.extract_dependencies(manifest) + self.export_nodes(nodes) + self.export_dependencies(dependencies) + logger.info("Neo4j lineage export complete") + return { + "nodes_exported": len(nodes), + "dependencies_exported": len(dependencies) + } \ No newline at end of file diff --git a/elementary_neo4j/requirements.txt b/elementary_neo4j/requirements.txt new file mode 100644 index 000000000..6c342a6de --- /dev/null +++ b/elementary_neo4j/requirements.txt @@ -0,0 +1 @@ +neo4j>=5.0,<7.0 \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_neo4j_exporter.py b/tests/test_neo4j_exporter.py new file mode 100644 index 000000000..5e59ab4f6 --- /dev/null +++ b/tests/test_neo4j_exporter.py @@ -0,0 +1,127 @@ +import json +import pytest +from unittest.mock import MagicMock, patch, mock_open +from elementary_neo4j.neo4j_exporter import Neo4jLineageExporter +from elementary_neo4j.neo4j_config import Neo4jConfig + + +@pytest.fixture +def config(): + return Neo4jConfig( + uri="bolt://localhost:7687", + username="neo4j", + password="test", + database="neo4j" + ) + + +@pytest.fixture +def exporter(config): + with patch("elementary_neo4j.neo4j_exporter.GraphDatabase.driver"): + return Neo4jLineageExporter(config) + + +@pytest.fixture +def sample_manifest(): + return { + "nodes": { + "model.my_project.dim_customers": { + "name": "dim_customers", + "resource_type": "model", + "schema": "analytics", + "database": "snowflake_db", + "package_name": "my_project", + "description": "Customer dimension table", + "depends_on": { + "nodes": ["source.my_project.raw_customers"] + } + }, + "model.my_project.fct_orders": { + "name": "fct_orders", + "resource_type": "model", + "schema": "analytics", + "database": "snowflake_db", + "package_name": "my_project", + "description": "Orders fact table", + "depends_on": { + "nodes": ["model.my_project.dim_customers"] + } + } + }, + "sources": { + "source.my_project.raw_customers": { + "name": "raw_customers", + "resource_type": "source", + "schema": "raw", + "database": "snowflake_db", + "package_name": "my_project", + "description": "Raw customers source" + } + } + } + + +def test_extract_nodes_returns_models_and_sources(exporter, sample_manifest): + nodes = exporter.extract_nodes(sample_manifest) + assert len(nodes) == 3 + resource_types = [n["resource_type"] for n in nodes] + assert "model" in resource_types + assert "source" in resource_types + + +def test_extract_nodes_contains_correct_fields(exporter, sample_manifest): + nodes = exporter.extract_nodes(sample_manifest) + model_node = next(n for n in nodes if n["name"] == "dim_customers") + assert model_node["schema"] == "analytics" + assert model_node["database"] == "snowflake_db" + assert model_node["description"] == "Customer dimension table" + + +def test_extract_dependencies_correct_count(exporter, sample_manifest): + dependencies = exporter.extract_dependencies(sample_manifest) + assert len(dependencies) == 2 + + +def test_extract_dependencies_correct_direction(exporter, sample_manifest): + dependencies = exporter.extract_dependencies(sample_manifest) + dep = next( + d for d in dependencies + if d["to_id"] == "model.my_project.dim_customers" + ) + assert dep["from_id"] == "source.my_project.raw_customers" + + +def test_load_manifest_file_not_found(exporter): + with pytest.raises(FileNotFoundError): + exporter.load_manifest("nonexistent/path/manifest.json") + + +def test_load_manifest_reads_correctly(exporter, sample_manifest): + mock_data = json.dumps(sample_manifest) + with patch("builtins.open", mock_open(read_data=mock_data)): + with patch("pathlib.Path.exists", return_value=True): + result = exporter.load_manifest("fake/manifest.json") + assert "nodes" in result + assert "sources" in result + + +def test_export_nodes_calls_session(exporter, sample_manifest): + nodes = exporter.extract_nodes(sample_manifest) + mock_session = MagicMock() + exporter.driver.session.return_value.__enter__ = MagicMock( + return_value=mock_session + ) + exporter.driver.session.return_value.__exit__ = MagicMock( + return_value=False + ) + exporter.export_nodes(nodes) + assert mock_session.run.call_count == len(nodes) + + +def test_export_returns_correct_counts(exporter, sample_manifest): + with patch.object(exporter, "load_manifest", return_value=sample_manifest): + with patch.object(exporter, "export_nodes"): + with patch.object(exporter, "export_dependencies"): + result = exporter.export("fake/manifest.json") + assert result["nodes_exported"] == 3 + assert result["dependencies_exported"] == 2 \ No newline at end of file