Skip to content
Open
348 changes: 348 additions & 0 deletions integration_tests/tests/test_column_value_anomalies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,348 @@
from datetime import datetime, timedelta
Comment thread
coderabbitai[bot] marked this conversation as resolved.
from typing import Any, Dict, List

from data_generator import DATE_FORMAT, generate_dates
from dbt_project import DbtProject

TIMESTAMP_COLUMN = "updated_at"
DBT_TEST_NAME = "elementary.column_value_anomalies"
DBT_TEST_ARGS = {
"timestamp_column": TIMESTAMP_COLUMN,
}


def test_anomalyless_column_value_anomalies(test_id: str, dbt_project: DbtProject):
"""Test that normal, consistent numeric data produces no anomalies (test passes)."""
utc_today = datetime.utcnow().date()
data: List[Dict[str, Any]] = [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"amount": 100,
}
for cur_date in generate_dates(base_date=utc_today - timedelta(1))
for _ in range(5)
]
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data, test_column="amount"
)
assert test_result["status"] == "pass"


def test_anomalous_column_value_anomalies(test_id: str, dbt_project: DbtProject):
"""Test that an extreme outlier in the detection period is flagged (test fails).

Training data: values around 100 (95-105) for 30 days.
Detection data: includes a value of 10000, which is a clear outlier.
"""
utc_today = datetime.utcnow().date()
test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1))

# Training data: consistent values around 100
data: List[Dict[str, Any]] = [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"amount": amount,
}
for cur_date in training_dates
for amount in [95, 100, 105, 100, 100]
]
# Detection data: includes an extreme outlier
data += [
{
TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT),
"amount": amount,
}
for amount in [100, 100, 10000]
]

test_args = {
**DBT_TEST_ARGS,
"anomaly_sensitivity": 3,
}
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args, data=data, test_column="amount"
)
assert test_result["status"] == "fail"


def test_column_value_anomalies_spike_direction(test_id: str, dbt_project: DbtProject):
"""Test anomaly_direction='spike' only flags values above threshold.

Training data: values around 100.
Detection data: one very high value (10000) and one very low value (-10000).
With spike direction, only the high value should be flagged.
With drop direction, only the low value should be flagged.
"""
utc_today = datetime.utcnow().date()
test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1))

# Training data: consistent values around 100
data: List[Dict[str, Any]] = [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"amount": amount,
}
for cur_date in training_dates
for amount in [95, 100, 105, 100, 100]
]
# Detection data: one extreme high, one extreme low
data += [
{
TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT),
"amount": 10000,
},
{
TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT),
"amount": -10000,
},
]

# spike direction: should fail (10000 is a spike)
test_args_spike = {
**DBT_TEST_ARGS,
"anomaly_sensitivity": 3,
"anomaly_direction": "spike",
}
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args_spike, data=data, test_column="amount"
)
assert test_result["status"] == "fail"

# drop direction: should fail (-10000 is a drop)
test_args_drop = {
**DBT_TEST_ARGS,
"anomaly_sensitivity": 3,
"anomaly_direction": "drop",
}
test_result = dbt_project.test(
test_id,
DBT_TEST_NAME,
test_args_drop,
test_column="amount",
test_vars={"force_metrics_backfill": True},
)
assert test_result["status"] == "fail"
Comment thread
coderabbitai[bot] marked this conversation as resolved.


def test_column_value_anomalies_with_where_expression(
test_id: str, dbt_project: DbtProject
):
"""Test that where_expression filters data correctly.

Two categories: 'normal' has consistent values, 'outlier' has extreme values.
Filtering to 'normal' category should pass; filtering to 'outlier' category
should fail due to the extreme detection-period values.
"""
utc_today = datetime.utcnow().date()
test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1))

# Training data for both categories
data: List[Dict[str, Any]] = [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"category": category,
"amount": 100,
}
for cur_date in training_dates
for category in ["normal", "outlier"]
for _ in range(3)
]
# Detection data: normal category is fine, outlier category has extreme value
data += [
{
TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT),
"category": "normal",
"amount": 100,
},
{
TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT),
"category": "outlier",
"amount": 10000,
},
]

# Without where: should fail (outlier category has extreme value)
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data, test_column="amount"
)
assert test_result["status"] == "fail"

# With where filtering to normal: should pass
test_result = dbt_project.test(
test_id,
DBT_TEST_NAME,
DBT_TEST_ARGS,
test_column="amount",
test_vars={"force_metrics_backfill": True},
test_config={"where": "category = 'normal'"},
)
assert test_result["status"] == "pass"


def test_column_value_anomalies_sensitivity(test_id: str, dbt_project: DbtProject):
"""Test that anomaly_sensitivity threshold controls detection.

Training data: values around 100 with some variance (stddev ~5).
Detection data: value of 130 (~6 stddevs away).
With sensitivity=3, should fail. With sensitivity=10, should pass.
"""
utc_today = datetime.utcnow().date()
test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1))

# Training data: values with known variance
data: List[Dict[str, Any]] = [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"amount": amount,
}
for cur_date in training_dates
for amount in [95, 100, 105, 100, 100]
]
# Detection data: moderately high value
data += [
{
TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT),
"amount": 130,
},
]

# Low sensitivity: should fail (130 is ~6 stddevs from mean of ~100)
test_args_low = {
**DBT_TEST_ARGS,
"anomaly_sensitivity": 3,
}
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args_low, data=data, test_column="amount"
)
assert test_result["status"] == "fail"

# High sensitivity: should pass (130 is within 10 stddevs)
test_args_high = {
**DBT_TEST_ARGS,
"anomaly_sensitivity": 10,
}
test_result = dbt_project.test(
test_id,
DBT_TEST_NAME,
test_args_high,
test_column="amount",
test_vars={"force_metrics_backfill": True},
)
assert test_result["status"] == "pass"


def test_column_value_anomalies_with_seasonality(test_id: str, dbt_project: DbtProject):
"""Test that seasonality=day_of_week uses per-day-of-week baselines.

Scenario: Weekdays have values ~100, weekends have values ~500.
Detection period falls on a weekend with value 500.
Without seasonality, 500 might look anomalous (overall mean ~200).
With day_of_week seasonality, 500 is normal for weekends → should pass.
"""
utc_today = datetime.utcnow().date()
test_date, *training_dates = generate_dates(
base_date=utc_today - timedelta(1), days_back=60
)

data: List[Dict[str, Any]] = []
# Training data: weekdays ~100, weekends ~500
for cur_date in training_dates:
day_of_week = cur_date.weekday() # 0=Monday, 6=Sunday
if day_of_week >= 5: # Weekend
values = [490, 500, 510]
else: # Weekday
values = [95, 100, 105]
for amount in values:
data.append(
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"amount": amount,
}
)

# Detection data: matches the pattern for test_date's day of week
test_day_of_week = test_date.weekday()
if test_day_of_week >= 5:
detection_value = 500
else:
detection_value = 100
data.append(
{
TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT),
"amount": detection_value,
}
)

# With seasonality: should pass (value matches day-of-week pattern)
test_args_seasonal = {
**DBT_TEST_ARGS,
"anomaly_sensitivity": 3,
"seasonality": "day_of_week",
"training_period": {"period": "day", "count": 60},
}
test_result = dbt_project.test(
test_id,
DBT_TEST_NAME,
test_args_seasonal,
data=data,
test_column="amount",
)
assert test_result["status"] == "pass"
Comment thread
coderabbitai[bot] marked this conversation as resolved.


def test_column_value_anomalies_with_training_period(
test_id: str, dbt_project: DbtProject
):
"""Test that training_period controls the baseline window.

30 days of low values (100), then 7 days of high values (500),
then detection with value 500.
With training_period=7 days (only recent high values): should pass.
"""
utc_today = datetime.utcnow().date()
test_date = utc_today - timedelta(1)

data: List[Dict[str, Any]] = []

# 30 days of low values (old data)
for i in range(30):
cur_date = utc_today - timedelta(days=37 - i)
for amount in [95, 100, 105]:
data.append(
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"amount": amount,
}
)

# 7 days of high values (recent training data)
for i in range(7):
cur_date = utc_today - timedelta(days=7 - i)
if cur_date >= test_date:
continue
for amount in [490, 500, 510]:
data.append(
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"amount": amount,
}
)

# Detection: value consistent with recent training
data.append(
{
TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT),
"amount": 500,
}
)

# Short training period (7 days) - baseline is ~500, detection value 500 is normal
test_args = {
**DBT_TEST_ARGS,
"anomaly_sensitivity": 3,
"training_period": {"period": "day", "count": 7},
}
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args, data=data, test_column="amount"
)
assert test_result["status"] == "pass"
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Loading
Loading