Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions checks/coordinate_checks/check_bounds_contiguity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#!/usr/bin/env python
"""
This module provides atomic checks that verify whether bounds intervals
are contiguous (no gaps or overlaps between adjacent intervals).
"""

from compliance_checker.base import BaseCheck, TestCtx
import numpy as np

from ..utils import get_bounds_data


def _check_bounds_contiguity(ds, bnds_var_name, check_id, severity):
"""
Internal helper to verify bounds intervals are contiguous.

Checks that there are no gaps or overlaps between adjacent intervals
by verifying that the upper bound of interval i equals the lower bound
of interval i+1.

Parameters
----------
ds : netCDF4.Dataset
An open netCDF dataset.
bnds_var_name : str
The name of the bounds variable (e.g., 'lat_bnds', 'lon_bnds').
check_id : str
The unique check identifier (e.g., 'V043' for lat_bnds).
severity : int
The severity level (BaseCheck.HIGH, BaseCheck.MEDIUM, BaseCheck.LOW).

Returns
-------
list[Result]
A list containing one Result object with pass/fail status.
"""
ctx = TestCtx(severity, f"[{check_id}] Bounds Contiguity: '{bnds_var_name}'")

bnds, error_msg = get_bounds_data(ds, bnds_var_name)
if error_msg:
ctx.add_failure(error_msg)
return [ctx.to_result()]

try:
if bnds.shape[0] < 2:
# Only one interval, nothing to check for contiguity
ctx.add_pass()
return [ctx.to_result()]

# Upper bound of interval i should equal lower bound of interval i+1
upper = bnds[:-1, 1] # Upper bounds of all intervals except last
lower_next = bnds[1:, 0] # Lower bounds of all intervals except first

# Use isclose for floating point comparison
contiguous = np.isclose(upper, lower_next)

if not contiguous.all():
non_contiguous_idx = np.where(~contiguous)[0]
count = len(non_contiguous_idx)

# Show examples of gaps/overlaps
examples = []
for i in non_contiguous_idx[:3]:
gap = lower_next[i] - upper[i]
if gap > 0:
examples.append(f"gap at idx {i}: [{upper[i]}, {lower_next[i]}] (gap={gap})")
else:
examples.append(f"overlap at idx {i}: [{upper[i]}, {lower_next[i]}] (overlap={-gap})")

ctx.add_failure(
f"{count} gap(s)/overlap(s) found between intervals. "
f"Examples: {'; '.join(examples)}"
)
else:
ctx.add_pass()

except Exception as e:
ctx.add_failure(f"Error checking bounds contiguity for '{bnds_var_name}': {e}")

return [ctx.to_result()]


# V043: lat_bnds contiguity
def check_lat_bnds_contiguity(ds, severity=BaseCheck.MEDIUM):
return _check_bounds_contiguity(ds, "lat_bnds", "V043", severity)


# V081: lon_bnds contiguity
def check_lon_bnds_contiguity(ds, severity=BaseCheck.MEDIUM):
return _check_bounds_contiguity(ds, "lon_bnds", "V081", severity)
83 changes: 83 additions & 0 deletions checks/coordinate_checks/check_bounds_monotonicity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/usr/bin/env python
"""
This module provides atomic checks that verify whether bounds values
are monotonically non-decreasing.
"""

from compliance_checker.base import BaseCheck, TestCtx
import numpy as np

from ..utils import get_bounds_data


def _check_bounds_monotonicity(ds, bnds_var_name, check_id, severity):
"""
Internal helper to verify bounds values are monotonically non-decreasing.

Checks both lower and upper bounds columns to ensure they are sorted
in non-decreasing order (diff >= 0 for all consecutive values).

Parameters
----------
ds : netCDF4.Dataset
An open netCDF dataset.
bnds_var_name : str
The name of the bounds variable (e.g., 'lat_bnds', 'lon_bnds').
check_id : str
The unique check identifier (e.g., 'V042' for lat_bnds).
severity : int
The severity level (BaseCheck.HIGH, BaseCheck.MEDIUM, BaseCheck.LOW).

Returns
-------
list[Result]
A list containing one Result object with pass/fail status.
"""
ctx = TestCtx(severity, f"[{check_id}] Bounds Monotonicity: '{bnds_var_name}'")

bnds, error_msg = get_bounds_data(ds, bnds_var_name)
if error_msg:
ctx.add_failure(error_msg)
return [ctx.to_result()]

try:
lower = bnds[:, 0]
upper = bnds[:, 1]

# Check monotonicity (non-decreasing: diff >= 0)
lower_diff = np.diff(lower)
upper_diff = np.diff(upper)

lower_monotonic = np.all(lower_diff >= 0)
upper_monotonic = np.all(upper_diff >= 0)

failures = []
if not lower_monotonic:
decreasing_idx = np.where(lower_diff < 0)[0][:3]
examples = ", ".join(f"idx {i}: {lower[i]} > {lower[i+1]}" for i in decreasing_idx)
failures.append(f"Lower bounds not monotonic. Examples: {examples}")

if not upper_monotonic:
decreasing_idx = np.where(upper_diff < 0)[0][:3]
examples = ", ".join(f"idx {i}: {upper[i]} > {upper[i+1]}" for i in decreasing_idx)
failures.append(f"Upper bounds not monotonic. Examples: {examples}")

if failures:
ctx.add_failure("; ".join(failures))
else:
ctx.add_pass()

except Exception as e:
ctx.add_failure(f"Error checking bounds monotonicity for '{bnds_var_name}': {e}")

return [ctx.to_result()]


# V042: lat_bnds monotonicity
def check_lat_bnds_monotonicity(ds, severity=BaseCheck.MEDIUM):
return _check_bounds_monotonicity(ds, "lat_bnds", "V042", severity)


# V080: lon_bnds monotonicity
def check_lon_bnds_monotonicity(ds, severity=BaseCheck.MEDIUM):
return _check_bounds_monotonicity(ds, "lon_bnds", "V080", severity)
97 changes: 97 additions & 0 deletions checks/coordinate_checks/check_data_within_actual_range.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/usr/bin/env python
"""
This module provides atomic checks that verify whether all values of a
variable fall within its declared actual_range attribute.

Note: This is DIFFERENT from CF's check_actual_range which validates that
actual_range matches the data min/max. This check validates that all data
values are contained within the declared actual_range.
"""

from compliance_checker.base import BaseCheck, TestCtx

from ..utils import get_variable_data


def _check_data_within_actual_range(ds, var_name, check_id, severity):
"""
Internal helper to verify all data values fall within the actual_range attribute.

Checks that the data min/max are contained within the declared
actual_range [min, max] attribute on the variable. Skips the check
if no actual_range attribute exists.

Parameters
----------
ds : netCDF4.Dataset
An open netCDF dataset.
var_name : str
The name of the variable (e.g., 'lat', 'lon').
check_id : str
The unique check identifier (e.g., 'V067' for lat).
severity : int
The severity level (BaseCheck.HIGH, BaseCheck.MEDIUM, BaseCheck.LOW).

Returns
-------
list[Result]
A list containing one Result object with pass/fail status,
or empty list if no actual_range attribute exists.
"""
ctx = TestCtx(severity, f"[{check_id}] Data Within Actual Range: '{var_name}'")

if var_name not in ds.variables:
ctx.add_failure(f"Variable '{var_name}' not found in dataset.")
return [ctx.to_result()]

var = ds.variables[var_name]
actual_range = getattr(var, 'actual_range', None)

# If no actual_range attribute, skip this check
if actual_range is None:
return []

try:
# Validate actual_range has 2 elements
if len(actual_range) != 2:
ctx.add_failure(
f"actual_range attribute has {len(actual_range)} elements, expected 2."
)
return [ctx.to_result()]

min_val, max_val = actual_range[0], actual_range[1]

data, error_msg = get_variable_data(ds, var_name)
if error_msg:
ctx.add_failure(error_msg)
return [ctx.to_result()]

if len(data) == 0:
ctx.add_pass()
return [ctx.to_result()]

data_min = data.min()
data_max = data.max()

if data_min >= min_val and data_max <= max_val:
ctx.add_pass()
else:
ctx.add_failure(
f"Data range [{data_min}, {data_max}] is outside "
f"actual_range [{min_val}, {max_val}]."
)

except Exception as e:
ctx.add_failure(f"Error checking data within actual_range for '{var_name}': {e}")

return [ctx.to_result()]


# V067: lat data within actual_range
def check_lat_data_within_actual_range(ds, severity=BaseCheck.LOW):
return _check_data_within_actual_range(ds, "lat", "V067", severity)


# V105: lon data within actual_range
def check_lon_data_within_actual_range(ds, severity=BaseCheck.LOW):
return _check_data_within_actual_range(ds, "lon", "V105", severity)
84 changes: 84 additions & 0 deletions checks/coordinate_checks/check_fill_value_equals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/usr/bin/env python
"""
This module provides atomic checks that verify whether a fill value
attribute (_FillValue or missing_value) equals an expected value.
"""

from compliance_checker.base import BaseCheck, TestCtx
import numpy as np


def _check_fill_value_equals(ds, var_name, attr_name, expected_value, check_id, severity):
"""
Internal helper to verify a fill value attribute equals an expected value.

Uses numpy.isclose for floating point comparison with relative tolerance.

Parameters
----------
ds : netCDF4.Dataset
An open netCDF dataset.
var_name : str
The name of the variable (e.g., 'vertices_latitude').
attr_name : str
The fill value attribute name ('_FillValue' or 'missing_value').
expected_value : float
The expected fill value (e.g., 1.e+20).
check_id : str
The unique check identifier (e.g., 'V250').
severity : int
The severity level (BaseCheck.HIGH, BaseCheck.MEDIUM, BaseCheck.LOW).

Returns
-------
list[Result]
A list containing one Result object with pass/fail status.
"""
ctx = TestCtx(severity, f"[{check_id}] Fill Value: '{var_name}.{attr_name}'")

if var_name not in ds.variables:
ctx.add_failure(f"Variable '{var_name}' not found in dataset.")
return [ctx.to_result()]

var = ds.variables[var_name]
fill_val = getattr(var, attr_name, None)

if fill_val is None:
ctx.add_failure(
f"Attribute '{attr_name}' not found on variable '{var_name}'."
)
return [ctx.to_result()]

try:
# Use numpy isclose for floating point comparison with relative tolerance
if np.isclose(fill_val, expected_value, rtol=1e-5):
ctx.add_pass()
else:
ctx.add_failure(
f"Expected {attr_name}={expected_value}, got {fill_val}."
)

except Exception as e:
ctx.add_failure(f"Error checking fill value for '{var_name}.{attr_name}': {e}")

return [ctx.to_result()]


# V250: vertices_latitude missing_value = 1.e+20f
def check_vertices_latitude_missing_value(ds, severity=BaseCheck.MEDIUM):
return _check_fill_value_equals(ds, "vertices_latitude", "missing_value", 1.e+20, "V250", severity)


# V253: vertices_latitude _FillValue = 1.e+20f
def check_vertices_latitude_fill_value(ds, severity=BaseCheck.MEDIUM):
return _check_fill_value_equals(ds, "vertices_latitude", "_FillValue", 1.e+20, "V253", severity)


# V260: vertices_longitude missing_value = 1.e+20f
def check_vertices_longitude_missing_value(ds, severity=BaseCheck.MEDIUM):
return _check_fill_value_equals(ds, "vertices_longitude", "missing_value", 1.e+20, "V260", severity)


# V263: vertices_longitude _FillValue = 1.e+20f
def check_vertices_longitude_fill_value(ds, severity=BaseCheck.MEDIUM):
return _check_fill_value_equals(ds, "vertices_longitude", "_FillValue", 1.e+20, "V263", severity)
Loading