Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
4d657c4
fix: retry run_query on empty run_operation result (flaky fusion test)
devin-ai-integration[bot] Feb 25, 2026
c975eef
fix: add short delay between run_query retries
devin-ai-integration[bot] Feb 25, 2026
f1f1ca5
fix: scope run_query retry to fusion runner only
devin-ai-integration[bot] Feb 26, 2026
4384897
fix: apply run_query retry to all runners (not fusion-only)
devin-ai-integration[bot] Feb 26, 2026
268d3d0
fix: bypass run_operation log parsing with direct adapter connection
devin-ai-integration[bot] Feb 26, 2026
ae53f25
fix: fall back to run_operation for queries with non-ref Jinja
devin-ai-integration[bot] Feb 26, 2026
69ac36e
refactor: address review comments - lazy init, source resolution, man…
devin-ai-integration[bot] Feb 26, 2026
0ca8ca3
refactor: use tenacity for run_operation retry
devin-ai-integration[bot] Feb 26, 2026
77f5ddc
chore: add tenacity as direct test dependency
devin-ai-integration[bot] Feb 26, 2026
08a6ebb
fix: surface seed and init failures instead of swallowing them
devin-ai-integration[bot] Feb 26, 2026
6b4acf9
refactor: early exit on init failure to reduce log noise
devin-ai-integration[bot] Feb 26, 2026
148b1d9
Merge branch 'devin/1772105501-investigate-databricks-schema-failures…
devin-ai-integration[bot] Feb 26, 2026
69e1213
Merge remote-tracking branch 'origin/master' into devin/1772040020-fi…
devin-ai-integration[bot] Feb 26, 2026
4869797
refactor: address review comments - BaseAdapter type, custom Unsuppor…
devin-ai-integration[bot] Feb 26, 2026
b722905
refactor: log fallback to run_operation and use tenacity before_sleep…
devin-ai-integration[bot] Feb 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions integration_tests/tests/adapter_query_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""Direct database query execution via dbt adapter connection.
Comment thread
haritamar marked this conversation as resolved.

Bypasses ``run_operation`` log-parsing entirely so that query results are
never lost due to intermittent log-capture issues in the CLI / fusion
runners.
"""

import json
import multiprocessing
import os
import re
from datetime import date, datetime, time
from decimal import Decimal
from pathlib import Path
from typing import Any, Dict, List, Optional

from logger import get_logger

logger = get_logger(__name__)

# Pattern that matches {{ ref('name') }} or {{ ref("name") }} with optional whitespace
_REF_PATTERN = re.compile(r"\{\{\s*ref\(\s*['\"]([^'\"]+)['\"]\s*\)\s*\}\}")
Comment thread
haritamar marked this conversation as resolved.


def _serialize_value(val: Any) -> Any:
"""Mimic elementary's ``agate_to_dicts`` serialisation.

* ``Decimal`` → ``int`` (no fractional part) or ``float``
* ``datetime`` / ``date`` / ``time`` → ISO-format string
* Everything else is returned unchanged.
"""
if isinstance(val, Decimal):
# Match the Jinja macro: normalize, then int or float
normalized = val.normalize()
if normalized.as_tuple().exponent >= 0:
return int(normalized)
return float(normalized)
if isinstance(val, (datetime, date, time)):
return val.isoformat()
return val


class AdapterQueryRunner:
"""Execute SQL directly through a dbt adapter connection.

Parameters
----------
project_dir : str
Path to the dbt project directory.
target : str
Name of the dbt target / profile output to use.
"""

def __init__(self, project_dir: str, target: str) -> None:
self._project_dir = project_dir
self._target = target
self._adapter = self._create_adapter(project_dir, target)
self._ref_map: Optional[Dict[str, str]] = None

# ------------------------------------------------------------------
# Adapter bootstrap
# ------------------------------------------------------------------

@staticmethod
def _create_adapter(project_dir: str, target: str) -> Any:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we return an adapter type instead of Any?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — _create_adapter now returns BaseAdapter (from dbt.adapters.base) and self._adapter is typed as BaseAdapter.

from argparse import Namespace

from dbt.adapters.factory import get_adapter, register_adapter, reset_adapters
from dbt.config.runtime import RuntimeConfig
from dbt.flags import set_from_args

args = Namespace(
project_dir=project_dir,
profiles_dir=os.path.expanduser("~/.dbt"),
target=target,
threads=1,
vars={},
profile=None,
PROFILES_DIR=os.path.expanduser("~/.dbt"),
PROJECT_DIR=project_dir,
)
set_from_args(args, None)
config = RuntimeConfig.from_args(args)

reset_adapters()
mp_context = multiprocessing.get_context("spawn")
register_adapter(config, mp_context)
return get_adapter(config)

# ------------------------------------------------------------------
# Ref resolution
# ------------------------------------------------------------------

def _load_ref_map(self) -> Dict[str, str]:
"""Build a ``{model_name: relation_name}`` map from the dbt manifest."""
manifest_path = Path(self._project_dir) / "target" / "manifest.json"
if not manifest_path.exists():
raise FileNotFoundError(
f"Manifest not found at {manifest_path}. "
"Run `dbt run` or `dbt compile` first."
)
with open(manifest_path) as fh:
manifest = json.load(fh)

ref_map: Dict[str, str] = {}
for node in manifest.get("nodes", {}).values():
relation_name = node.get("relation_name")
name = node.get("name")
if relation_name and name:
ref_map[name] = relation_name

# Also include sources (some queries reference source tables)
for source in manifest.get("sources", {}).values():
relation_name = source.get("relation_name")
name = source.get("name")
if relation_name and name:
ref_map[name] = relation_name

return ref_map

def resolve_refs(self, query: str) -> str:
"""Replace ``{{ ref('name') }}`` with the fully-qualified relation name."""
if self._ref_map is None:
self._ref_map = self._load_ref_map()

def _replace(match: re.Match) -> str: # type: ignore[type-arg]
name = match.group(1)
if name not in self._ref_map:
raise ValueError(
f"Cannot resolve ref('{name}'): not found in dbt manifest. "
f"Known models: {sorted(self._ref_map)!r}"
)
return self._ref_map[name]
Comment thread
coderabbitai[bot] marked this conversation as resolved.

return _REF_PATTERN.sub(_replace, query)

# ------------------------------------------------------------------
# Query execution
# ------------------------------------------------------------------

def run_query(self, prerendered_query: str) -> List[Dict[str, Any]]:
"""Render Jinja refs and execute a query, returning rows as dicts.

Column names are lower-cased and values are serialised to match the
behaviour of ``elementary.agate_to_dicts``.
"""
Comment thread
haritamar marked this conversation as resolved.
sql = self.resolve_refs(prerendered_query)
with self._adapter.connection_named("run_query"):
_response, table = self._adapter.execute(sql, fetch=True)

# Convert agate Table → list[dict] matching agate_to_dicts behaviour
columns = [c.lower() for c in table.column_names]
return [
{col: _serialize_value(val) for col, val in zip(columns, row)}
for row in table
]
12 changes: 4 additions & 8 deletions integration_tests/tests/dbt_project.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import json
import os
from contextlib import contextmanager, nullcontext
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Dict, Generator, List, Literal, Optional, Union, overload
from uuid import uuid4

from adapter_query_runner import AdapterQueryRunner
from data_seeder import DbtDataSeeder
from dbt_utils import get_database_and_schema_properties
from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner
Expand Down Expand Up @@ -59,14 +59,10 @@ def __init__(
self.tmp_models_dir_path = self.models_dir_path / "tmp"
self.seeds_dir_path = self.project_dir_path / "data"

self._query_runner = AdapterQueryRunner(project_dir, target)

def run_query(self, prerendered_query: str):
Comment thread
haritamar marked this conversation as resolved.
results = json.loads(
self.dbt_runner.run_operation(
"elementary.render_run_query",
macro_args={"prerendered_query": prerendered_query},
)[0]
)
return results
return self._query_runner.run_query(prerendered_query)

@staticmethod
def read_table_query(
Expand Down
Loading