Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test-warehouse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ jobs:
- name: Start Dremio
if: inputs.warehouse-type == 'dremio'
working-directory: ${{ env.TESTS_DIR }}
run: docker compose -f docker-compose-dremio.yml up -d
run: docker compose -f docker-compose-dremio.yml up -d --wait

- name: Setup Python
uses: actions/setup-python@v6
Expand Down
9 changes: 8 additions & 1 deletion integration_tests/docker-compose-dremio.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,19 @@ services:
- dremio_data:/opt/dremio/data:rw
# Workaround for permission issues in podman
user: "0"
healthcheck:
test: ["CMD-SHELL", "curl -sf http://localhost:9047 || exit 1"]
interval: 5s
timeout: 5s
retries: 30
start_period: 15s

dremio-setup:
image: alpine:latest
container_name: dremio-setup
depends_on:
- dremio
dremio:
condition: service_healthy
volumes:
- ./docker/dremio/dremio-setup.sh:/dremio-setup.sh
command: sh /dremio-setup.sh
Expand Down
48 changes: 39 additions & 9 deletions integration_tests/tests/data_seeder.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import csv
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Generator, List
Expand All @@ -8,6 +9,11 @@

# TODO: Write more performant data seeders per adapter.

# Retry settings for transient connection errors (e.g. Dremio Docker
# dropping connections under concurrent load from pytest-xdist workers).
_SEED_MAX_RETRIES = 3
_SEED_RETRY_DELAY_SECONDS = 10

logger = get_logger(__name__)


Expand All @@ -29,22 +35,46 @@ def seed(self, data: List[dict], table_name: str) -> Generator[None, None, None]
writer.writeheader()
writer.writerows(data)
seed_file.flush()
success = self.dbt_runner.seed(
select=str(relative_seed_path), full_refresh=True
)
success = self._seed_with_retry(str(relative_seed_path), table_name)
if not success:
logger.error(
"dbt seed failed for '%s'. This usually means the "
"target schema does not exist or could not be created. "
"Downstream queries will fail with "
"TABLE_OR_VIEW_NOT_FOUND.",
"dbt seed failed for '%s' after %d attempts. "
"This usually means the target schema does not "
"exist or could not be created. Downstream "
"queries will fail with TABLE_OR_VIEW_NOT_FOUND.",
table_name,
_SEED_MAX_RETRIES,
)
raise RuntimeError(
f"dbt seed failed for '{table_name}'. Check the dbt "
f"output above for the root cause (e.g. SCHEMA_NOT_FOUND)."
f"dbt seed failed for '{table_name}' after "
f"{_SEED_MAX_RETRIES} attempts. Check the dbt "
f"output above for the root cause "
f"(e.g. SCHEMA_NOT_FOUND)."
)

yield
finally:
seed_path.unlink()

def _seed_with_retry(self, relative_seed_path: str, table_name: str) -> bool:
"""Run dbt seed with retries for transient connection errors.

Dremio OSS (Docker) intermittently drops TCP connections under
concurrent load from pytest-xdist workers, producing
``RemoteDisconnected`` / ``ConnectionError`` during seed.
A simple retry with a back-off delay is sufficient to recover.
"""
for attempt in range(1, _SEED_MAX_RETRIES + 1):
success = self.dbt_runner.seed(select=relative_seed_path, full_refresh=True)
if success:
return True
if attempt < _SEED_MAX_RETRIES:
logger.warning(
"dbt seed failed for '%s' (attempt %d/%d). " "Retrying in %ds...",
table_name,
attempt,
_SEED_MAX_RETRIES,
_SEED_RETRY_DELAY_SECONDS,
)
time.sleep(_SEED_RETRY_DELAY_SECONDS)
return False
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
74 changes: 52 additions & 22 deletions integration_tests/tests/env.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from typing import Optional

import dbt_project
Expand All @@ -6,6 +7,11 @@

logger = get_logger(__name__)

# Retry settings for transient connection errors (e.g. Dremio Docker
# dropping connections under concurrent load from pytest-xdist workers).
_INIT_MAX_RETRIES = 3
_INIT_RETRY_DELAY_SECONDS = 15


class Environment:
def __init__(
Expand All @@ -25,27 +31,51 @@ def clear(self):
self.dbt_runner.run_operation("elementary_tests.clear_env")

def init(self):
init_success = self.dbt_runner.run(selector="init")
if not init_success:
logger.error(
"Environment init failed: 'dbt run --selector init' returned "
"failure. The target schema may not have been created. "
self._run_with_retry(
label="dbt run --selector init",
run_fn=lambda: self.dbt_runner.run(selector="init"),
error_detail=(
"The target schema may not have been created. "
"Subsequent seeds and queries will likely fail with "
"SCHEMA_NOT_FOUND or TABLE_OR_VIEW_NOT_FOUND."
)
raise RuntimeError(
"Test environment initialization failed during "
"'dbt run --selector init'. Check the dbt output above "
"for the root cause."
)
elementary_success = self.dbt_runner.run(select="elementary")
if not elementary_success:
logger.error(
"Environment init failed: 'dbt run --select elementary' "
"returned failure. Elementary models may not be available."
)
raise RuntimeError(
"Test environment initialization failed during "
"'dbt run --select elementary'. Check the dbt output "
"above for the root cause."
)
),
)
self._run_with_retry(
label="dbt run --select elementary",
run_fn=lambda: self.dbt_runner.run(select="elementary"),
error_detail="Elementary models may not be available.",
)

@staticmethod
def _run_with_retry(label: str, run_fn, error_detail: str) -> None:
"""Execute *run_fn* with retries for transient failures.

Dremio OSS (Docker) intermittently drops TCP connections under
concurrent load from pytest-xdist workers, producing
``RemoteDisconnected`` / ``ConnectionError``. Retrying after a
short delay is sufficient to recover.
"""
for attempt in range(1, _INIT_MAX_RETRIES + 1):
if run_fn():
return
if attempt < _INIT_MAX_RETRIES:
logger.warning(
"'%s' failed (attempt %d/%d). Retrying in %ds...",
label,
attempt,
_INIT_MAX_RETRIES,
_INIT_RETRY_DELAY_SECONDS,
)
time.sleep(_INIT_RETRY_DELAY_SECONDS)

logger.error(
"Environment init failed: '%s' returned failure after " "%d attempts. %s",
label,
_INIT_MAX_RETRIES,
error_detail,
)
raise RuntimeError(
f"Test environment initialization failed during "
f"'{label}' after {_INIT_MAX_RETRIES} attempts. "
f"Check the dbt output above for the root cause."
)
Loading