Skip to content
2 changes: 2 additions & 0 deletions elementary/monitor/api/report/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def get_report_data(
exclude_elementary_models: bool = False,
project_name: Optional[str] = None,
disable_samples: bool = False,
skip_test_result_rows: bool = False,
filter: SelectorFilterSchema = SelectorFilterSchema(),
env: Optional[str] = None,
warehouse_type: Optional[str] = None,
Expand All @@ -86,6 +87,7 @@ def get_report_data(
days_back=days_back,
invocations_per_test=test_runs_amount,
disable_passed_test_metrics=disable_passed_test_metrics,
skip_test_result_rows=skip_test_result_rows,
)
source_freshnesses_api = SourceFreshnessesAPI(
dbt_runner=self.dbt_runner,
Expand Down
4 changes: 4 additions & 0 deletions elementary/monitor/api/tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,29 @@ def __init__(
days_back: int = 7,
invocations_per_test: int = 720,
disable_passed_test_metrics: bool = False,
skip_test_result_rows: bool = False,
):
super().__init__(dbt_runner)
self.tests_fetcher = TestsFetcher(dbt_runner=self.dbt_runner)
self.test_results_db_rows = self._get_test_results_db_rows(
days_back=days_back,
invocations_per_test=invocations_per_test,
disable_passed_test_metrics=disable_passed_test_metrics,
skip_test_result_rows=skip_test_result_rows,
)

def _get_test_results_db_rows(
self,
days_back: Optional[int] = 7,
invocations_per_test: int = 720,
disable_passed_test_metrics: bool = False,
skip_test_result_rows: bool = False,
) -> List[TestResultDBRowSchema]:
return self.tests_fetcher.get_all_test_results_db_rows(
days_back=days_back,
invocations_per_test=invocations_per_test,
disable_passed_test_metrics=disable_passed_test_metrics,
skip_test_result_rows=skip_test_result_rows,
)

def get_test_results_summary(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro current_tests_run_results_query(days_back = none, invocation_id = none) %}
{% macro current_tests_run_results_query(days_back = none, invocation_id = none, skip_test_result_rows = false) %}
with elementary_test_results as (
select * from {{ ref('elementary', 'elementary_test_results') }}
{% if days_back %}
Expand Down Expand Up @@ -79,7 +79,7 @@
dbt_tests.short_name,
elementary_test_results.test_alias,
elementary_test_results.failures,
elementary_test_results.result_rows,
{% if skip_test_result_rows %}null{% else %}elementary_test_results.result_rows{% endif %} as result_rows,
dbt_tests.original_path,
dbt_tests.meta,
dbt_tests.description as test_description,
Expand Down
45 changes: 29 additions & 16 deletions elementary/monitor/dbt_project/macros/get_test_results.sql
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
{%- macro get_test_results(days_back = 7, invocations_per_test = 720, disable_passed_test_metrics = false) -%}
{{ return(adapter.dispatch('get_test_results', 'elementary_cli')(days_back, invocations_per_test, disable_passed_test_metrics)) }}
{%- macro get_test_results(days_back = 7, invocations_per_test = 720, disable_passed_test_metrics = false, skip_test_result_rows = false) -%}
{{ return(adapter.dispatch('get_test_results', 'elementary_cli')(days_back, invocations_per_test, disable_passed_test_metrics, skip_test_result_rows)) }}
{%- endmacro -%}

{#
Shared post-processing helper: filters tests by meta, attaches sample data.
Called by both default__ and fabric__ dispatches to avoid duplicating the
Jinja processing loop.
#}
{%- macro _process_raw_test_results(test_results_agate, test_result_rows_agate, elementary_tests_allowlist_status) -%}
{%- macro _process_raw_test_results(test_results_agate, test_result_rows_agate, elementary_tests_allowlist_status, skip_test_result_rows = false) -%}
{% set test_results = [] %}
{% set tests = elementary.agate_to_dicts(test_results_agate) %}

Expand All @@ -26,7 +26,7 @@
{% set test_params = fromjson(test.test_params) %}
{% set status = test.status | lower %}

{%- if (test_type == 'dbt_test' and status in ['fail', 'warn']) or (test_type != 'dbt_test' and status in elementary_tests_allowlist_status) -%}
{%- if not skip_test_result_rows and ((test_type == 'dbt_test' and status in ['fail', 'warn']) or (test_type != 'dbt_test' and status in elementary_tests_allowlist_status)) -%}
{% set test_rows_sample = elementary_cli.get_test_rows_sample(test.result_rows, test_result_rows_agate.get(test.id)) %}
{%- endif -%}
{% else %}
Expand All @@ -40,11 +40,11 @@
{% do return(test_results) %}
{%- endmacro -%}

{%- macro default__get_test_results(days_back = 7, invocations_per_test = 720, disable_passed_test_metrics = false) -%}
{%- macro default__get_test_results(days_back = 7, invocations_per_test = 720, disable_passed_test_metrics = false, skip_test_result_rows = false) -%}
{% set elementary_tests_allowlist_status = ['fail', 'warn'] if disable_passed_test_metrics else ['fail', 'warn', 'pass'] %}
{% set select_test_results %}
with test_results as (
{{ elementary_cli.current_tests_run_results_query(days_back=days_back) }}
{{ elementary_cli.current_tests_run_results_query(days_back=days_back, skip_test_result_rows=skip_test_result_rows) }}
),

ordered_test_results as (
Expand Down Expand Up @@ -111,15 +111,19 @@
{% endset %}

{% set test_results_agate = elementary.run_query(test_results_agate_sql) %}
{% set test_result_rows_agate = elementary_cli.get_result_rows_agate(days_back, valid_ids_query) %}
{% if not skip_test_result_rows %}
{% set test_result_rows_agate = elementary_cli.get_result_rows_agate(days_back, valid_ids_query) %}
{% else %}
{% set test_result_rows_agate = {} %}
{% endif %}
{% if not elementary.has_temp_table_support() %}
{% do elementary.fully_drop_relation(ordered_test_results_relation) %}
{% endif %}

{% do return(elementary_cli._process_raw_test_results(test_results_agate, test_result_rows_agate, elementary_tests_allowlist_status)) %}
{% do return(elementary_cli._process_raw_test_results(test_results_agate, test_result_rows_agate, elementary_tests_allowlist_status, skip_test_result_rows)) %}
{%- endmacro -%}

{%- macro fabric__get_test_results(days_back = 7, invocations_per_test = 720, disable_passed_test_metrics = false) -%}
{%- macro fabric__get_test_results(days_back = 7, invocations_per_test = 720, disable_passed_test_metrics = false, skip_test_result_rows = false) -%}
{#
T-SQL does not allow nested CTEs (WITH inside WITH).
current_tests_run_results_query already starts with WITH, so we
Expand All @@ -132,7 +136,7 @@

{# Step 1 – materialise the base test-results query into a temp table #}
{% set base_query %}
{{ elementary_cli.current_tests_run_results_query(days_back=days_back) }}
{{ elementary_cli.current_tests_run_results_query(days_back=days_back, skip_test_result_rows=skip_test_result_rows) }}
{% endset %}

{% set elementary_database, elementary_schema = elementary.get_package_database_and_schema() %}
Expand Down Expand Up @@ -165,16 +169,21 @@
{% endset %}

{% set test_results_agate = elementary.run_query(test_results_agate_sql) %}
{% set test_result_rows_agate = elementary_cli.get_result_rows_agate(days_back, valid_ids_query) %}
{% if not skip_test_result_rows %}
{% set test_result_rows_agate = elementary_cli.get_result_rows_agate(days_back, valid_ids_query) %}
{% else %}
{% set test_result_rows_agate = {} %}
{% endif %}

{# Clean up intermediate tables #}
{% do elementary.fully_drop_relation(base_relation) %}
{% do elementary.fully_drop_relation(ordered_relation) %}

{% do return(elementary_cli._process_raw_test_results(test_results_agate, test_result_rows_agate, elementary_tests_allowlist_status)) %}
{% do return(elementary_cli._process_raw_test_results(test_results_agate, test_result_rows_agate, elementary_tests_allowlist_status, skip_test_result_rows)) %}
{%- endmacro -%}

{%- macro clickhouse__get_test_results(days_back = 7, invocations_per_test = 720, disable_passed_test_metrics = false) -%}
{%- macro clickhouse__get_test_results(days_back = 7, invocations_per_test = 720, disable_passed_test_metrics = false, skip_test_result_rows = false) -%}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
{% set elementary_tests_allowlist_status = ['fail', 'warn'] if disable_passed_test_metrics else ['fail', 'warn', 'pass'] %}
{% do elementary.run_query('drop table if exists ordered_test_results') %}
{% set create_table_query %}
CREATE TABLE ordered_test_results (
Expand Down Expand Up @@ -262,7 +271,7 @@
{{ elementary.edr_datediff(elementary.edr_cast_as_timestamp('etr.detected_at'), elementary.edr_current_timestamp(), 'day') }} AS days_diff,
ROW_NUMBER() OVER (PARTITION BY elementary_unique_id ORDER BY etr.detected_at DESC) AS invocations_rank_index,
etr.failures,
etr.result_rows
{% if skip_test_result_rows %}''{% else %}etr.result_rows{% endif %} AS result_rows
FROM {{ ref('elementary', 'elementary_test_results') }} etr
JOIN {{ ref('elementary', 'dbt_tests') }} dt ON etr.test_unique_id = dt.unique_id
LEFT JOIN (
Expand Down Expand Up @@ -302,10 +311,14 @@
{% endset %}

{% set test_results_agate = elementary.run_query(test_results_agate_sql) %}
{% set test_result_rows_agate = elementary_cli.get_result_rows_agate(days_back, valid_ids_query) %}
{% if not skip_test_result_rows %}
{% set test_result_rows_agate = elementary_cli.get_result_rows_agate(days_back, valid_ids_query) %}
{% else %}
{% set test_result_rows_agate = {} %}
{% endif %}
{% if not elementary.has_temp_table_support() %}
{% do elementary.fully_drop_relation(ordered_test_results_relation) %}
{% endif %}

{% do return(elementary_cli._process_raw_test_results(test_results_agate, test_result_rows_agate, elementary_tests_allowlist_status)) %}
{% do return(elementary_cli._process_raw_test_results(test_results_agate, test_result_rows_agate, elementary_tests_allowlist_status, skip_test_result_rows)) %}
{%- endmacro -%}
2 changes: 2 additions & 0 deletions elementary/monitor/fetchers/tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ def get_all_test_results_db_rows(
days_back: Optional[int] = 7,
invocations_per_test: int = 720,
disable_passed_test_metrics: bool = False,
skip_test_result_rows: bool = False,
) -> List[TestResultDBRowSchema]:
run_operation_response = self.dbt_runner.run_operation(
macro_name="elementary_cli.get_test_results",
macro_args=dict(
days_back=days_back,
invocations_per_test=invocations_per_test,
disable_passed_test_metrics=disable_passed_test_metrics,
skip_test_result_rows=skip_test_result_rows,
),
)
test_results = (
Expand Down
Loading