diff --git a/documentation/api/notation.rst b/documentation/api/notation.rst index 75a9e0eb26..7f75ee838b 100644 --- a/documentation/api/notation.rst +++ b/documentation/api/notation.rst @@ -120,6 +120,12 @@ In all current versions of the FlexMeasures API, only equidistant timeseries dat - "start" should be a timestamp on the hour or a multiple of the sensor resolution thereafter (e.g. "16:10" works if the resolution is 5 minutes), and - "duration" should also be a multiple of the sensor resolution. +For non-instantaneous sensors, FlexMeasures floors off-clock datetimes to the +sensor's resolution by default when ingesting sensor data. For example, data +posted with ``"start": "2026-05-12T08:29:58+02:00"`` to a 15-minute sensor is +saved from ``2026-05-12T08:15:00+02:00``. Set the sensor attribute +``"floor_datetimes_to_resolution": false`` to disable this behaviour. + .. _beliefs: @@ -249,7 +255,10 @@ FlexMeasures handles two types of time series, which can be distinguished by def Specifying a frequency and resolution is redundant for POST requests that contain both "values" and a "duration" ― FlexMeasures computes the frequency by dividing the duration by the number of values, and, for sensors that record non-instantaneous events, assumes the resolution of the data is equal to the frequency. When POSTing data, FlexMeasures checks this inferred resolution against the required resolution of the sensors that are posted to. -If these can't be matched (through upsampling), an error will occur. +If these can't be matched through upsampling or downsampling, an error will occur. +Off-clock event starts for non-instantaneous sensors are floored to the sensor's resolution by default. +The sensor attribute ``floor_datetimes_to_resolution`` can be set to ``false`` to keep incoming datetimes unchanged. +This flooring behaviour is distinct from the existing ``frequency`` sensor attribute, which rounds incoming instantaneous measurements to a configured Pandas frequency. GET requests (such as */sensors/data*) return data with a frequency either equal to the resolution that the sensor is configured for (for non-instantaneous sensors), or a default frequency befitting (in our opinion) the requested time interval. A "resolution" may be specified explicitly to obtain the data in downsampled form, which can be very beneficial for download speed. diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 9689b03588..8b50b2367e 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -25,6 +25,7 @@ New features * Support sensor references for efficiency fields in storage flex-models [see `PR #2142 `_] * Added a unified job status endpoint ``GET /api/v3_0/jobs/`` to retrieve the current execution status and result message for any background job [see `PR #2141 `_] * New ``GET /api/v3_0/sources`` endpoint to list accessible data sources and defined types, with ``only_latest=true`` by default to return only the most recent version per source [see `PR #2126 `_] +* Floor off-clock API datetimes to a non-instantaneous sensor's resolution by default when ingesting sensor data, uploading sensor data, and handling scheduler flex-model timed events; configurable with the ``floor_datetimes_to_resolution`` sensor attribute [see `PR #2146 `_] * Add support for filtering sensor data GET requests by ``source-type`` on ``/api/v3_0/sensors//data`` [see `PR #2127 `_] * Making monitoring alerts more flexible: allow ``flexmeasures monitor`` alerts to target one or more user IDs or email addresses with ``--recipient``; ``flexmeasures monitor last-seen`` can now narrow monitored users to one or more accounts with ``--account`` or to client accounts with ``--consultancy`` [see `PR #2158 `_] diff --git a/flexmeasures/api/common/schemas/sensor_data.py b/flexmeasures/api/common/schemas/sensor_data.py index e39235f89a..ff44de1898 100644 --- a/flexmeasures/api/common/schemas/sensor_data.py +++ b/flexmeasures/api/common/schemas/sensor_data.py @@ -381,13 +381,32 @@ def check_schema_unit_against_type(self, data, **kwargs): f"The unit required for this message type should be convertible to an energy price unit, got incompatible unit: {posted_unit}" ) + @validates_schema + def check_single_value_zero_duration_for_non_instantaneous_sensor( + self, data, **kwargs + ): + """Reject inputs where a non-instantaneous sensor cannot infer any resolution.""" + + required_resolution = data["sensor"].event_resolution + inferred_resolution = data["duration"] / len(data["values"]) + + if ( + required_resolution != timedelta(hours=0) + and len(data["values"]) == 1 + and inferred_resolution == timedelta(hours=0) + ): + raise ValidationError( + f"Cannot infer a non-zero resolution from one value over zero duration. This sensor requires a resolution of {required_resolution}." + ) + @validates_schema def check_resolution_compatibility_of_sensor_data(self, data, **kwargs): """Ensure event frequency is compatible with the sensor's event resolution. For a sensor recording instantaneous values, any event frequency is compatible. For a sensor recording non-instantaneous values, the event frequency must fit the sensor's event resolution. - Currently, only upsampling is supported (e.g. converting hourly events to 15-minute events). + Upsampling and downsampling are supported when the inferred resolution and + the sensor resolution are multiples of each other. """ required_resolution = data["sensor"].event_resolution @@ -398,11 +417,9 @@ def check_resolution_compatibility_of_sensor_data(self, data, **kwargs): # The event frequency is inferred by assuming sequential, equidistant values within a time interval. # The event resolution is assumed to be equal to the event frequency. inferred_resolution = data["duration"] / len(data["values"]) - if len(data["values"]) == 1 and inferred_resolution == timedelta(hours=0): - raise ValidationError( - f"Cannot infer a non-zero resolution from one value over zero duration. This sensor requires a resolution of {required_resolution}." - ) - if inferred_resolution % required_resolution != timedelta(hours=0): + if inferred_resolution % required_resolution != timedelta( + hours=0 + ) and required_resolution % inferred_resolution != timedelta(hours=0): raise ValidationError( f"Resolution of {inferred_resolution} is incompatible with the sensor's required resolution of {required_resolution}." ) @@ -421,7 +438,7 @@ def check_multiple_instantaneous_values(self, data, **kwargs): ) @post_load() - def post_load_sequence(self, data: dict, **kwargs) -> BeliefsDataFrame: + def post_load_sequence(self, data: dict, **kwargs) -> dict[str, BeliefsDataFrame]: """ If needed, upsample and convert units, then deserialize to a BeliefsDataFrame. Returns a dict with the BDF in it, as that is expected by webargs when used with as_kwargs=True. @@ -429,6 +446,7 @@ def post_load_sequence(self, data: dict, **kwargs) -> BeliefsDataFrame: data = self.possibly_upsample_values(data) data = self.possibly_convert_units(data) bdf = self.load_bdf(data) + bdf = self.possibly_downsample_bdf(bdf, data["sensor"].event_resolution) # Post-load validation against message type _type = data.get("type", None) @@ -451,7 +469,7 @@ def possibly_convert_units(data): data["values"], from_unit=data["unit"], to_unit=data["sensor"].unit, - event_resolution=data["sensor"].event_resolution, + event_resolution=data["duration"] / len(data["values"]), ) return data @@ -479,6 +497,21 @@ def possibly_upsample_values(data): ) return data + @staticmethod + def possibly_downsample_bdf( + bdf: BeliefsDataFrame, required_resolution: timedelta + ) -> BeliefsDataFrame: + """ + Downsample the data if needed, to fit the sensor's resolution. + Marshmallow runs this after validation. + """ + if required_resolution == timedelta(hours=0): + return bdf + + if bdf.event_resolution < required_resolution: + bdf = bdf.resample_events(required_resolution) + return bdf + def load_bdf(self, sensor_data: dict) -> BeliefsDataFrame: """ Turn the de-serialized and validated data into a BeliefsDataFrame. @@ -489,7 +522,11 @@ def load_bdf(self, sensor_data: dict) -> BeliefsDataFrame: start = sensor_data["start"] sensor = sensor_data["sensor"] - if frequency := sensor.get_attribute("frequency"): + if sensor.event_resolution != timedelta(0) and sensor.get_attribute( + "floor_datetimes_to_resolution", True + ): + start = pd.Timestamp(start).floor(sensor.event_resolution) + elif frequency := sensor.get_attribute("frequency"): start = pd.Timestamp(start).round(frequency) if event_resolution == timedelta(hours=0): @@ -517,6 +554,7 @@ def load_bdf(self, sensor_data: dict) -> BeliefsDataFrame: s, source=source, sensor=sensor_data["sensor"], + event_resolution=event_resolution, **belief_timing, ) diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index 615b68d9b3..85febd182f 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -1,7 +1,9 @@ from __future__ import annotations import isodate +from copy import deepcopy from datetime import datetime, timedelta +import pandas as pd from flexmeasures.data.services.sensors import ( serialize_sensor_status_data, @@ -287,6 +289,47 @@ def support_legacy_field_name(self, data, **kwargs): return data +def floor_timed_event_datetimes(flex_model: dict, sensor: Sensor) -> dict: + """Floor timed-event datetimes in list-valued flex-model fields. + + This only touches list entries that look like timed-event dictionaries, + such as ``soc-minima``, ``soc-maxima`` and ``soc-targets``. + """ + if sensor.event_resolution == timedelta(0) or not sensor.get_attribute( + "floor_datetimes_to_resolution", True + ): + return flex_model + + floored_flex_model = deepcopy(flex_model) + for value_name, value in floored_flex_model.items(): + if not isinstance(value, list): + continue + for index, timed_event in enumerate(value): + if not isinstance(timed_event, dict): + continue + for key in ("datetime", "start", "end"): + if key in timed_event: + try: + timed_event[key] = isodate.datetime_isoformat( + pd.Timestamp(timed_event[key]).floor( + sensor.event_resolution + ) + ) + except (TypeError, ValueError) as exc: + raise ValidationError( + { + value_name: { + index: { + key: [ + f"Not a valid datetime: {timed_event[key]!r}." + ] + } + } + } + ) from exc + return floored_flex_model + + class SensorAPI(FlaskView): route_base = "/sensors" trailing_slash = False @@ -1002,6 +1045,12 @@ def trigger_schedule( f"Resolution of {resolution} is incompatible with the sensor's required resolution of {sensor.event_resolution}." ) + if flex_model is not None: + try: + flex_model = floor_timed_event_datetimes(flex_model, sensor) + except ValidationError as err: + return unprocessable_entity(err.messages) + end_of_schedule = start_of_schedule + duration scheduler_kwargs = dict( asset_or_sensor=sensor, @@ -1306,6 +1355,8 @@ def fetch_one(self, id, sensor: Sensor): timezone: Europe/Amsterdam event_resolution: PT15M entity_address: ea1.2021-01.io.flexmeasures:fm1.14 + attributes: + floor_datetimes_to_resolution: true generic_asset_id: 1 power_sensor: summary: A power sensor recording average consumption every 5 minutes. @@ -1369,6 +1420,7 @@ def post(self, sensor_data: dict): "event_resolution": "PT1H" "unit": "kWh" "generic_asset_id": 1 + "attributes": '{"floor_datetimes_to_resolution": false}' responses: 201: description: New Sensor @@ -1385,6 +1437,7 @@ def post(self, sensor_data: dict): "entity_address": "ea1.2023-08.localhost:fm1.1" "event_resolution": "PT1H" "generic_asset_id": 1 + "attributes": '{"floor_datetimes_to_resolution": false}' "timezone": "UTC" "id": 2 400: diff --git a/flexmeasures/api/v3_0/tests/test_sensor_data.py b/flexmeasures/api/v3_0/tests/test_sensor_data.py index 9d310ff0b7..797eb9f1e6 100644 --- a/flexmeasures/api/v3_0/tests/test_sensor_data.py +++ b/flexmeasures/api/v3_0/tests/test_sensor_data.py @@ -2,6 +2,7 @@ from datetime import timedelta from flask import current_app, url_for +import pandas as pd import pytest from sqlalchemy import event from sqlalchemy.engine import Engine @@ -357,10 +358,10 @@ def test_post_sensor_data_rejects_large_json( ("start", "2021-06-07T00:00:00", "start", "Not a valid aware datetime"), ( "duration", - "PT30M", + "PT25M", "_schema", - "Resolution of 0:05:00 is incompatible", - ), # downsampling not supported + "Resolution of 0:04:10 is incompatible", + ), ("unit", "m", "_schema", "Required unit"), ("type", "GetSensorDataRequest", "type", "Must be one of"), ], @@ -427,6 +428,69 @@ def test_post_sensor_data_rejects_unknown_sensor_before_queueing( assert current_app.queues["ingestion"].count == 0 +@pytest.mark.parametrize( + "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True +) +@pytest.mark.parametrize( + "offclock_start, precise_start, precise_end, values, expected_values", + [ + ( + "2021-06-08T00:00:40+02:00", + "2021-06-08T00:00:00+02:00", + "2021-06-08T01:00:00+02:00", + [-11.28] * 6, + [-11.28] * 6, + ), + ( + "2021-06-09T00:00:40+02:00", + "2021-06-09T00:00:00+02:00", + "2021-06-09T01:00:00+02:00", + [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200], + [150, 350, 550, 750, 950, 1150], + ), + ], +) +def test_post_non_instantaneous_sensor_data_floor( + client, + setup_api_test_data, + requesting_user, + offclock_start, + precise_start, + precise_end, + values, + expected_values, +): + post_data = make_sensor_data_request_for_gas_sensor( + num_values=len(values), unit="m³/h" + ) + post_data["start"] = offclock_start + post_data["values"] = values + sensor = setup_api_test_data["some gas sensor"] + + assert ( + len(sensor.search_beliefs(precise_start, precise_end)) == 0 + ), "No beliefs were expected before we post our test data." + + response = client.post( + url_for("SensorAPI:post_data", id=sensor.id), + json=post_data, + ) + + assert response.status_code == 200 + + new_data = sensor.search_beliefs(precise_start, precise_end).reset_index() + assert len(new_data) == 6 + assert list(new_data["event_start"]) == [ + pd.Timestamp(precise_start), + pd.Timestamp(precise_start) + pd.Timedelta(minutes=10), + pd.Timestamp(precise_start) + pd.Timedelta(minutes=20), + pd.Timestamp(precise_start) + pd.Timedelta(minutes=30), + pd.Timestamp(precise_start) + pd.Timedelta(minutes=40), + pd.Timestamp(precise_start) + pd.Timedelta(minutes=50), + ] + assert new_data["event_value"].to_list() == expected_values + + @pytest.mark.parametrize( "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True ) diff --git a/flexmeasures/api/v3_0/tests/test_sensor_schedules.py b/flexmeasures/api/v3_0/tests/test_sensor_schedules.py index c6602773b4..1e71006fff 100644 --- a/flexmeasures/api/v3_0/tests/test_sensor_schedules.py +++ b/flexmeasures/api/v3_0/tests/test_sensor_schedules.py @@ -32,6 +32,10 @@ def setup_capacity_sensor_on_asset_in_supplier_account(db, setup_generic_assets) return sensor +def get_sensor_by_name(asset, name: str) -> Sensor: + return next(sensor for sensor in asset.sensors if sensor.name == name) + + @pytest.mark.parametrize( "requesting_user", ["test_prosumer_user@seita.nl"], indirect=True ) @@ -114,6 +118,120 @@ def test_trigger_schedule_with_invalid_flexmodel( ) +@pytest.mark.parametrize( + "requesting_user", ["test_prosumer_user@seita.nl"], indirect=True +) +def test_trigger_schedule_floors_flex_model_datetimes( + app, + add_market_prices, + add_battery_assets, + battery_soc_sensor, + add_charging_station_assets, + keep_scheduling_queue_empty, + requesting_user, +): + message = message_for_trigger_schedule(with_targets=True) + offclock_target = "2015-01-02T23:00:40+01:00" + expected_target = "2015-01-02T23:00:00+01:00" + for field in ("soc-targets", "soc-minima", "soc-maxima"): + message["flex-model"][field][0]["datetime"] = offclock_target + + sensor = get_sensor_by_name( + add_charging_station_assets["Test charging station"], "power" + ) + with app.test_client() as client: + trigger_schedule_response = client.post( + url_for("SensorAPI:trigger_schedule", id=sensor.id), + json=message, + ) + + assert trigger_schedule_response.status_code == 200 + assert len(app.queues["scheduling"]) == 1 + + job = app.queues["scheduling"].jobs[0] + for field in ("soc-targets", "soc-minima", "soc-maxima"): + target = job.kwargs["flex_model"][field][0] + assert target["datetime"] == expected_target + if "start" in target: + assert parse_datetime(target["start"]) == parse_datetime(expected_target) + if "end" in target: + assert parse_datetime(target["end"]) == parse_datetime(expected_target) + + +@pytest.mark.parametrize( + "requesting_user", ["test_prosumer_user@seita.nl"], indirect=True +) +def test_trigger_schedule_keeps_flex_model_datetimes_when_flooring_disabled( + app, + add_market_prices, + add_battery_assets, + battery_soc_sensor, + add_charging_station_assets, + keep_scheduling_queue_empty, + requesting_user, +): + message = message_for_trigger_schedule(with_targets=True) + offclock_target = "2015-01-02T23:00:40+01:00" + for field in ("soc-targets", "soc-minima", "soc-maxima"): + message["flex-model"][field][0]["datetime"] = offclock_target + + sensor = get_sensor_by_name( + add_charging_station_assets["Test charging station"], "power" + ) + original_attributes = dict(sensor.attributes or {}) + sensor.attributes = { + **original_attributes, + "floor_datetimes_to_resolution": False, + } + with app.test_client() as client: + trigger_schedule_response = client.post( + url_for("SensorAPI:trigger_schedule", id=sensor.id), + json=message, + ) + sensor.attributes = original_attributes + + assert trigger_schedule_response.status_code == 200 + assert len(app.queues["scheduling"]) == 1 + + job = app.queues["scheduling"].jobs[0] + for field in ("soc-targets", "soc-minima", "soc-maxima"): + target = job.kwargs["flex_model"][field][0] + assert target["datetime"] == offclock_target + + +@pytest.mark.parametrize( + "requesting_user", ["test_prosumer_user@seita.nl"], indirect=True +) +def test_trigger_schedule_with_invalid_flex_model_datetime( + app, + add_battery_assets, + keep_scheduling_queue_empty, + requesting_user, +): + message = message_for_trigger_schedule(with_targets=True) + message["flex-model"]["soc-minima"][0]["datetime"] = "not-a-datetime" + + sensor = add_battery_assets["Test battery"].sensors[0] + with app.test_client() as client: + trigger_schedule_response = client.post( + url_for("SensorAPI:trigger_schedule", id=sensor.id), + json=message, + ) + + assert trigger_schedule_response.status_code == 422 + assert "message" in trigger_schedule_response.json + assert "json" in trigger_schedule_response.json["message"] + errors = trigger_schedule_response.json["message"]["json"] + assert "soc-minima" in errors + minima_errors = errors.get("soc-minima", {}) + assert "0" in minima_errors + timed_event_errors = minima_errors.get("0", {}) + assert "datetime" in timed_event_errors + datetime_errors = timed_event_errors.get("datetime", []) + assert datetime_errors + assert "Not a valid datetime" in datetime_errors[0] + + @pytest.mark.parametrize( "flex_config, field", [ @@ -250,7 +368,9 @@ def test_get_schedule_fallback( start = "2015-01-02T00:00:00+01:00" epex_da = get_test_sensor(db) - charging_station = add_charging_station_assets[charging_station_name].sensors[0] + charging_station = get_sensor_by_name( + add_charging_station_assets[charging_station_name], "power" + ) capacity = charging_station.get_attribute( "capacity_in_mw", @@ -408,7 +528,9 @@ def test_get_schedule_fallback_not_redirect( start = "2015-01-02T00:00:00+01:00" epex_da = get_test_sensor(db) - charging_station = add_charging_station_assets[charging_station_name].sensors[0] + charging_station = get_sensor_by_name( + add_charging_station_assets[charging_station_name], "power" + ) capacity = charging_station.get_attribute( "capacity_in_mw", diff --git a/flexmeasures/api/v3_0/tests/test_sensors_api_freshdb.py b/flexmeasures/api/v3_0/tests/test_sensors_api_freshdb.py index f0c7e12e0c..01cb7e95c6 100644 --- a/flexmeasures/api/v3_0/tests/test_sensors_api_freshdb.py +++ b/flexmeasures/api/v3_0/tests/test_sensors_api_freshdb.py @@ -1,6 +1,7 @@ import io import pytest from datetime import timedelta +import pandas as pd from flask import url_for from sqlalchemy import select @@ -10,6 +11,20 @@ from flexmeasures.api.v3_0.tests.utils import generate_csv_content +TEST_BATTERY_SENSOR_NAMES = ( + "power", + "power (kW)", + "energy (kWh)", + "state of charge", + "consumption sensor", + "cost sensor", +) + + +def assert_test_battery_sensor(sensor, sensor_index: int) -> None: + assert sensor.name == TEST_BATTERY_SENSOR_NAMES[sensor_index] + + @pytest.mark.parametrize( "requesting_user, sensor_index, data_unit, data_resolution, data_values, expected_event_values, expected_status", [ @@ -186,6 +201,7 @@ def test_upload_sensor_data_with_unit_conversion_success( ) test_battery = add_battery_assets_fresh_db["Test battery"] sensor = test_battery.sensors[sensor_index] + assert_test_battery_sensor(sensor, sensor_index) num_test_intervals = len(data_values) print( f"Uploading data to sensor '{sensor.name}' with unit={sensor.unit} and resolution={sensor.event_resolution}." @@ -232,6 +248,88 @@ def test_upload_sensor_data_with_unit_conversion_success( assert [b.event_value for b in beliefs] == expected_event_values +@pytest.mark.parametrize( + "requesting_user, sensor_index, start_date, data_resolution, data_values, expected_event_starts, expected_event_values", + [ + ( + "test_prosumer_user_2@seita.nl", + 1, + "2025-01-01T10:00:40+00:00", + timedelta(minutes=15), + [2, 3, 4], + [ + pd.Timestamp("2025-01-01T10:00:00+00:00"), + pd.Timestamp("2025-01-01T10:15:00+00:00"), + pd.Timestamp("2025-01-01T10:30:00+00:00"), + ], + [2, 3, 4], + ), + ( + "test_prosumer_user_2@seita.nl", + 2, + "2025-01-01T10:00:40+00:00", + timedelta(minutes=30), + [10, 20, 20, 40], + [ + pd.Timestamp("2025-01-01T10:00:00+00:00"), + pd.Timestamp("2025-01-01T11:00:00+00:00"), + ], + [30, 60], + ), + ( + "test_prosumer_user_2@seita.nl", + 1, + "2025-01-01T10:00:40+00:00", + timedelta(minutes=5), + [10, 20, 30, 40], + [ + pd.Timestamp("2025-01-01T10:00:00+00:00"), + pd.Timestamp("2025-01-01T10:15:00+00:00"), + ], + [20, 40], + ), + ], + indirect=["requesting_user"], +) +def test_upload_sensor_data_floors_offclock_datetimes( + fresh_db, + client, + add_battery_assets_fresh_db, + requesting_user, + sensor_index, + start_date, + data_resolution, + data_values, + expected_event_starts, + expected_event_values, +): + test_battery = add_battery_assets_fresh_db["Test battery"] + sensor = test_battery.sensors[sensor_index] + assert_test_battery_sensor(sensor, sensor_index) + + csv_content = generate_csv_content( + start_time_str=start_date, + interval=data_resolution, + values=data_values, + ) + file_obj = io.BytesIO(csv_content.encode("utf-8")) + + response = client.post( + url_for("SensorAPI:upload_data", id=sensor.id), + data={"uploaded-files": (file_obj, "data.csv"), "unit": sensor.unit}, + content_type="multipart/form-data", + ) + assert response.status_code == 200 + + bdf = sensor.search_beliefs( + expected_event_starts[0], expected_event_starts[-1] + sensor.event_resolution + ) + pd.testing.assert_index_equal( + bdf.event_starts, pd.DatetimeIndex(expected_event_starts, name="event_start") + ) + assert bdf["event_value"].to_list() == expected_event_values + + @pytest.mark.parametrize( "requesting_user, sensor_index, data_unit, data_resolution, data_values, expected_err_msg, expected_status", [ @@ -289,6 +387,7 @@ def test_upload_sensor_data_with_unit_conversion_failure( ) test_battery = add_battery_assets_fresh_db["Test battery"] sensor = test_battery.sensors[sensor_index] + assert_test_battery_sensor(sensor, sensor_index) print( f"Uploading data to sensor '{sensor.name}' with unit={sensor.unit} and resolution={sensor.event_resolution}." ) diff --git a/flexmeasures/data/schemas/scheduling/storage.py b/flexmeasures/data/schemas/scheduling/storage.py index e1c60c7143..9dd32c2ed9 100644 --- a/flexmeasures/data/schemas/scheduling/storage.py +++ b/flexmeasures/data/schemas/scheduling/storage.py @@ -237,6 +237,12 @@ def __init__( self.start = start self.sensor = sensor self.timezone = sensor.timezone if sensor is not None else None + self.flooring_resolution = ( + sensor.event_resolution + if sensor is not None + and sensor.get_attribute("floor_datetimes_to_resolution", True) + else None + ) # guess default soc-unit if default_soc_unit is None: @@ -251,6 +257,7 @@ def __init__( to_unit="MWh", default_src_unit=default_soc_unit, timezone=self.timezone, + event_resolution=self.flooring_resolution, data_key="soc-maxima", ) @@ -258,6 +265,7 @@ def __init__( to_unit="MWh", default_src_unit=default_soc_unit, timezone=self.timezone, + event_resolution=self.flooring_resolution, data_key="soc-minima", value_validator=validate.Range(min=0), ) @@ -265,6 +273,7 @@ def __init__( to_unit="MWh", default_src_unit=default_soc_unit, timezone=self.timezone, + event_resolution=self.flooring_resolution, data_key="soc-targets", ) diff --git a/flexmeasures/data/schemas/sensors.py b/flexmeasures/data/schemas/sensors.py index 2ceeb2ac0c..102a142e5b 100644 --- a/flexmeasures/data/schemas/sensors.py +++ b/flexmeasures/data/schemas/sensors.py @@ -69,6 +69,7 @@ class TimedEventSchema(Schema): def __init__( self, timezone: str | None = None, + event_resolution: timedelta | None = None, value_validator: Validator | None = None, to_unit: str | None = None, default_src_unit: str | None = None, @@ -81,6 +82,7 @@ def __init__( :param timezone: Optionally, set a timezone to be able to interpret nominal durations. """ self.timezone = timezone + self.event_resolution = event_resolution self.value_validator = value_validator super().__init__(*args, **kwargs) if to_unit is not None: @@ -99,12 +101,23 @@ def validate_value(self, _value, **kwargs): if self.value_validator is not None: self.value_validator(_value) + def floor_timing_fields(self, data: dict) -> None: + if self.event_resolution in (None, timedelta(0)): + return + + for key in ("datetime", "start", "end"): + if data.get(key) is not None: + data[key] = ( + pd.Timestamp(data[key]).floor(self.event_resolution).to_pydatetime() + ) + @validates_schema def check_time_window(self, data, **kwargs): """Checks whether a complete time interval can be derived from the timing fields. The data is updated in-place, guaranteeing that the 'start' and 'end' fields are filled out. """ + self.floor_timing_fields(data) dt = data.get("datetime") start = data.get("start") end = data.get("end") @@ -224,8 +237,18 @@ def timezone_validator(value: str): attributes = JSON( required=False, metadata=dict( - description="JSON serializable attributes to store arbitrary information on the sensor. A few attributes lead to special behaviour, such as `consumption_is_positive`, which informs the platform whether consumption values should be saved (and shown in charts) as positive or negative values.", - example="{consumption_is_positive: True}", + description=( + "JSON serializable attributes to store arbitrary information on " + "the sensor. A few attributes lead to special behaviour, such as " + "`consumption_is_positive`, which informs the platform whether " + "consumption values should be saved (and shown in charts) as " + "positive or negative values, `floor_datetimes_to_resolution`, " + "which controls whether off-clock datetimes are floored to a " + "non-instantaneous sensor's resolution, and `frequency`, which " + "rounds incoming instantaneous measurements to a configured " + "Pandas frequency." + ), + example='{"consumption_is_positive": true, "floor_datetimes_to_resolution": true}', ), ) @@ -338,6 +361,7 @@ def __init__( default_src_unit: str | None = None, return_magnitude: bool = False, timezone: str | None = None, + event_resolution: timedelta | None = None, value_validator: Validator | None = None, additional_sensor_units: list[str] | None = None, **kwargs, @@ -382,6 +406,7 @@ def __init__( value_validator = RepurposeValidatorToIgnoreSensorsAndLists(value_validator) self.validators.insert(0, value_validator) self.timezone = timezone + self.event_resolution = event_resolution self.value_validator = value_validator if to_unit.startswith("/") and len(to_unit) < 2: raise ValueError( @@ -445,6 +470,7 @@ def _deserialize_list(self, value: list[dict]) -> list[dict]: fields.Nested( TimedEventSchema( timezone=self.timezone, + event_resolution=self.event_resolution, value_validator=self.value_validator, to_unit=self.to_unit, default_src_unit=self.default_src_unit, @@ -715,6 +741,11 @@ def post_load(self, fields, **kwargs): # Reraise the error if an event frequency could not be inferred pd.infer_freq(bdf.index.unique("event_start")) + if sensor.event_resolution != timedelta(0) and sensor.get_attribute( + "floor_datetimes_to_resolution", True + ): + bdf = floor_bdf_event_starts(bdf, bdf.event_resolution) + bdf["event_value"] = convert_units( bdf["event_value"], from_unit, @@ -760,6 +791,36 @@ def post_load(self, fields, **kwargs): return fields +def floor_bdf_event_starts( + bdf: tb.BeliefsDataFrame, event_resolution: timedelta +) -> tb.BeliefsDataFrame: + floored_event_starts = bdf.index.get_level_values("event_start").floor( + event_resolution + ) + + new_index = pd.MultiIndex.from_arrays( + [ + ( + floored_event_starts + if name == "event_start" + else bdf.index.get_level_values(name) + ) + for name in bdf.index.names + ], + names=bdf.index.names, + ) + if new_index.duplicated().any(): + raise ValidationError( + "Flooring event_start would merge multiple beliefs with the same " + "source, belief_time and event_start. Please provide data already " + "aligned to the event resolution or use distinct belief/source metadata." + ) + + floored_bdf = bdf.copy() + floored_bdf.index = new_index + return floored_bdf + + class SensorDataFileRequestSchema(SensorDataFileSchema): """Validate a sensor data upload without parsing or resampling its files.""" diff --git a/flexmeasures/data/schemas/tests/test_sensor.py b/flexmeasures/data/schemas/tests/test_sensor.py index a547eaa66a..50606a2bc4 100644 --- a/flexmeasures/data/schemas/tests/test_sensor.py +++ b/flexmeasures/data/schemas/tests/test_sensor.py @@ -1,8 +1,11 @@ import pytest +import pandas as pd +import timely_beliefs as tb from flexmeasures import Sensor from flexmeasures.data.schemas.sensors import ( QuantityOrSensor, VariableQuantityField, + floor_bdf_event_starts, ) from flexmeasures.utils.unit_utils import ur from marshmallow import ValidationError @@ -175,3 +178,61 @@ def test_time_series_field(input_param, dst_unit, fails, db): assert not fails except Exception as e: assert fails, e + + +def test_floor_bdf_event_starts(setup_dummy_sensors, setup_sources): + sensor1, _, _, _ = setup_dummy_sensors + belief_time = pd.Timestamp("2025-01-01T09:00:00+00:00") + bdf = tb.BeliefsDataFrame( + pd.DataFrame( + { + "event_start": pd.to_datetime( + [ + "2025-01-01T10:00:40+00:00", + "2025-01-01T10:15:40+00:00", + ] + ), + "belief_time": [belief_time, belief_time], + "source": [setup_sources["Seita"], setup_sources["Seita"]], + "event_value": [1.0, 2.0], + } + ), + sensor=sensor1, + event_resolution=pd.Timedelta(minutes=15), + ) + + floored_bdf = floor_bdf_event_starts(bdf, pd.Timedelta(minutes=15)) + + pd.testing.assert_index_equal( + floored_bdf.event_starts, + pd.DatetimeIndex( + ["2025-01-01T10:00:00+00:00", "2025-01-01T10:15:00+00:00"], + name="event_start", + ), + ) + assert floored_bdf["event_value"].to_list() == [1.0, 2.0] + + +def test_floor_bdf_event_starts_rejects_collisions(setup_dummy_sensors, setup_sources): + sensor1, _, _, _ = setup_dummy_sensors + belief_time = pd.Timestamp("2025-01-01T09:00:00+00:00") + bdf = tb.BeliefsDataFrame( + pd.DataFrame( + { + "event_start": pd.to_datetime( + [ + "2025-01-01T10:00:40+00:00", + "2025-01-01T10:08:10+00:00", + ] + ), + "belief_time": [belief_time, belief_time], + "source": [setup_sources["Seita"], setup_sources["Seita"]], + "event_value": [1.0, 2.0], + } + ), + sensor=sensor1, + event_resolution=pd.Timedelta(minutes=7, seconds=30), + ) + + with pytest.raises(ValidationError, match="would merge multiple beliefs"): + floor_bdf_event_starts(bdf, pd.Timedelta(minutes=15)) diff --git a/flexmeasures/ui/static/openapi-specs.json b/flexmeasures/ui/static/openapi-specs.json index 9d4363b9cc..b2c6ca38f7 100644 --- a/flexmeasures/ui/static/openapi-specs.json +++ b/flexmeasures/ui/static/openapi-specs.json @@ -177,6 +177,9 @@ "timezone": "Europe/Amsterdam", "event_resolution": "PT15M", "entity_address": "ea1.2021-01.io.flexmeasures:fm1.14", + "attributes": { + "floor_datetimes_to_resolution": true + }, "generic_asset_id": 1 } }, @@ -1190,7 +1193,8 @@ "name": "power", "event_resolution": "PT1H", "unit": "kWh", - "generic_asset_id": 1 + "generic_asset_id": 1, + "attributes": "{\"floor_datetimes_to_resolution\": false}" } } } @@ -1215,6 +1219,7 @@ "entity_address": "ea1.2023-08.localhost:fm1.1", "event_resolution": "PT1H", "generic_asset_id": 1, + "attributes": "{\"floor_datetimes_to_resolution\": false}", "timezone": "UTC", "id": 2 } @@ -5729,8 +5734,8 @@ "description": "Obsolete identifier from [USEF](https://www.usef.energy/)." }, "attributes": { - "description": "JSON serializable attributes to store arbitrary information on the sensor. A few attributes lead to special behaviour, such as `consumption_is_positive`, which informs the platform whether consumption values should be saved (and shown in charts) as positive or negative values.", - "example": "{consumption_is_positive: True}" + "description": "JSON serializable attributes to store arbitrary information on the sensor. A few attributes lead to special behaviour, such as `consumption_is_positive`, which informs the platform whether consumption values should be saved (and shown in charts) as positive or negative values, `floor_datetimes_to_resolution`, which controls whether off-clock datetimes are floored to a non-instantaneous sensor's resolution, and `frequency`, which rounds incoming instantaneous measurements to a configured Pandas frequency.", + "example": "{\"consumption_is_positive\": true, \"floor_datetimes_to_resolution\": true}" }, "generic_asset_id": { "type": "integer",