Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8edbf8a
feat: start test cases for deserializing time series data to BeliefsD…
Flix6x Nov 21, 2025
3b2770f
fix: update type annotation
Flix6x Nov 21, 2025
34a2dc5
feat: handle resampling after converting to a BeliefsDataFrame
Flix6x Nov 21, 2025
8d2d2d3
fix: catch NotImplementedError to fix failing test
Flix6x Nov 21, 2025
86ad774
feat: two more test cases
Flix6x Nov 21, 2025
30fb2ad
fix: update test expectations
Flix6x Nov 21, 2025
7f0caa1
Merge branch 'main' into mohamed/dev/floor-event-start
BelhsanHmida May 1, 2026
6c08284
fix: floor direct POST sensor data starts
BelhsanHmida May 2, 2026
f590e58
fix: floor uploaded sensor data datetimes
BelhsanHmida May 2, 2026
0566ceb
fix: floor flex-model scheduling datetimes
BelhsanHmida May 2, 2026
4afd828
Merge remote-tracking branch 'origin/main' into mohamed/dev/floor-eve…
BelhsanHmida May 5, 2026
a857806
fix: preserve 422 validation for invalid flex-model datetimes
BelhsanHmida May 9, 2026
f30429c
style: apply pre-commit
BelhsanHmida May 9, 2026
3a56e0c
fix: return 422 for invalid flex-model timed-event datetimes
Copilot May 9, 2026
dd2fcc4
test: refine invalid flex-model datetime regression assertion
Copilot May 9, 2026
aa93fe1
test: make flex-model datetime error assertion less brittle
Copilot May 9, 2026
e0f88a8
test: assert error payload shape for invalid flex-model datetime
Copilot May 9, 2026
aaebfaa
refactor: simplify test, incl. by not mixing timezone offsets
Flix6x May 12, 2026
43fe0b5
refactor: rename variable to match variable name in test_trigger_sche…
Flix6x May 12, 2026
9624f09
refactor: use search method over writing custom query
Flix6x May 12, 2026
860c700
feat: test resampling from 5 to 15 minutes
Flix6x May 12, 2026
2228410
docs: document flex-model datetime flooring helper
BelhsanHmida May 15, 2026
0d92e52
test: clarify sensor_index assumptions in upload tests
BelhsanHmida May 15, 2026
b7d3cf0
feat: handle floored downsampling posts
BelhsanHmida May 17, 2026
531d2db
test: cover datetime flooring opt-out
BelhsanHmida May 17, 2026
067bd92
fix: reject flooring event collisions
BelhsanHmida May 17, 2026
fa31e0e
docs: document datetime flooring attribute
BelhsanHmida May 17, 2026
cd4cfd2
docs: add changelog entry
BelhsanHmida May 17, 2026
d4e56bc
Merge branch 'main' into mohamed/dev/floor-event-start
BelhsanHmida May 17, 2026
c66b334
Merge remote-tracking branch 'origin/main' into mohamed/dev/floor-eve…
BelhsanHmida May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion documentation/api/notation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ In all current versions of the FlexMeasures API, only equidistant timeseries dat
- "start" should be a timestamp on the hour or a multiple of the sensor resolution thereafter (e.g. "16:10" works if the resolution is 5 minutes), and
- "duration" should also be a multiple of the sensor resolution.

For non-instantaneous sensors, FlexMeasures floors off-clock datetimes to the
sensor's resolution by default when ingesting sensor data. For example, data
posted with ``"start": "2026-05-12T08:29:58+02:00"`` to a 15-minute sensor is
saved from ``2026-05-12T08:15:00+02:00``. Set the sensor attribute
``"floor_datetimes_to_resolution": false`` to disable this behaviour.


.. _beliefs:

Expand Down Expand Up @@ -249,7 +255,10 @@ FlexMeasures handles two types of time series, which can be distinguished by def
Specifying a frequency and resolution is redundant for POST requests that contain both "values" and a "duration" ― FlexMeasures computes the frequency by dividing the duration by the number of values, and, for sensors that record non-instantaneous events, assumes the resolution of the data is equal to the frequency.

When POSTing data, FlexMeasures checks this inferred resolution against the required resolution of the sensors that are posted to.
If these can't be matched (through upsampling), an error will occur.
If these can't be matched through upsampling or downsampling, an error will occur.
Off-clock event starts for non-instantaneous sensors are floored to the sensor's resolution by default.
The sensor attribute ``floor_datetimes_to_resolution`` can be set to ``false`` to keep incoming datetimes unchanged.
This flooring behaviour is distinct from the existing ``frequency`` sensor attribute, which rounds incoming instantaneous measurements to a configured Pandas frequency.

GET requests (such as */sensors/data*) return data with a frequency either equal to the resolution that the sensor is configured for (for non-instantaneous sensors), or a default frequency befitting (in our opinion) the requested time interval.
A "resolution" may be specified explicitly to obtain the data in downsampled form, which can be very beneficial for download speed.
Expand Down
1 change: 1 addition & 0 deletions documentation/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ New features
* Support sensor references for efficiency fields in storage flex-models [see `PR #2142 <https://www.github.com/FlexMeasures/flexmeasures/pull/2142>`_]
* Added a unified job status endpoint ``GET /api/v3_0/jobs/<uuid>`` to retrieve the current execution status and result message for any background job [see `PR #2141 <https://www.github.com/FlexMeasures/flexmeasures/pull/2141>`_]
* New ``GET /api/v3_0/sources`` endpoint to list accessible data sources and defined types, with ``only_latest=true`` by default to return only the most recent version per source [see `PR #2126 <https://www.github.com/FlexMeasures/flexmeasures/pull/2126>`_]
* Floor off-clock API datetimes to a non-instantaneous sensor's resolution by default when ingesting sensor data, uploading sensor data, and handling scheduler flex-model timed events; configurable with the ``floor_datetimes_to_resolution`` sensor attribute [see `PR #2146 <https://www.github.com/FlexMeasures/flexmeasures/pull/2146>`_]
* Add support for filtering sensor data GET requests by ``source-type`` on ``/api/v3_0/sensors/<id>/data`` [see `PR #2127 <https://www.github.com/FlexMeasures/flexmeasures/pull/2127>`_]
* Making monitoring alerts more flexible: allow ``flexmeasures monitor`` alerts to target one or more user IDs or email addresses with ``--recipient``; ``flexmeasures monitor last-seen`` can now narrow monitored users to one or more accounts with ``--account`` or to client accounts with ``--consultancy`` [see `PR #2158 <https://www.github.com/FlexMeasures/flexmeasures/pull/2158>`_]

Expand Down
56 changes: 47 additions & 9 deletions flexmeasures/api/common/schemas/sensor_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,13 +381,32 @@ def check_schema_unit_against_type(self, data, **kwargs):
f"The unit required for this message type should be convertible to an energy price unit, got incompatible unit: {posted_unit}"
)

@validates_schema
def check_single_value_zero_duration_for_non_instantaneous_sensor(
self, data, **kwargs
):
"""Reject inputs where a non-instantaneous sensor cannot infer any resolution."""

required_resolution = data["sensor"].event_resolution
inferred_resolution = data["duration"] / len(data["values"])

if (
required_resolution != timedelta(hours=0)
and len(data["values"]) == 1
and inferred_resolution == timedelta(hours=0)
):
raise ValidationError(
f"Cannot infer a non-zero resolution from one value over zero duration. This sensor requires a resolution of {required_resolution}."
)

@validates_schema
def check_resolution_compatibility_of_sensor_data(self, data, **kwargs):
"""Ensure event frequency is compatible with the sensor's event resolution.

For a sensor recording instantaneous values, any event frequency is compatible.
For a sensor recording non-instantaneous values, the event frequency must fit the sensor's event resolution.
Currently, only upsampling is supported (e.g. converting hourly events to 15-minute events).
Upsampling and downsampling are supported when the inferred resolution and
the sensor resolution are multiples of each other.
"""
required_resolution = data["sensor"].event_resolution

Expand All @@ -398,11 +417,9 @@ def check_resolution_compatibility_of_sensor_data(self, data, **kwargs):
# The event frequency is inferred by assuming sequential, equidistant values within a time interval.
# The event resolution is assumed to be equal to the event frequency.
inferred_resolution = data["duration"] / len(data["values"])
if len(data["values"]) == 1 and inferred_resolution == timedelta(hours=0):
raise ValidationError(
f"Cannot infer a non-zero resolution from one value over zero duration. This sensor requires a resolution of {required_resolution}."
)
if inferred_resolution % required_resolution != timedelta(hours=0):
if inferred_resolution % required_resolution != timedelta(
hours=0
) and required_resolution % inferred_resolution != timedelta(hours=0):
raise ValidationError(
f"Resolution of {inferred_resolution} is incompatible with the sensor's required resolution of {required_resolution}."
)
Expand All @@ -421,14 +438,15 @@ def check_multiple_instantaneous_values(self, data, **kwargs):
)

@post_load()
def post_load_sequence(self, data: dict, **kwargs) -> BeliefsDataFrame:
def post_load_sequence(self, data: dict, **kwargs) -> dict[str, BeliefsDataFrame]:
"""
If needed, upsample and convert units, then deserialize to a BeliefsDataFrame.
Returns a dict with the BDF in it, as that is expected by webargs when used with as_kwargs=True.
"""
data = self.possibly_upsample_values(data)
data = self.possibly_convert_units(data)
bdf = self.load_bdf(data)
bdf = self.possibly_downsample_bdf(bdf, data["sensor"].event_resolution)

# Post-load validation against message type
_type = data.get("type", None)
Expand All @@ -451,7 +469,7 @@ def possibly_convert_units(data):
data["values"],
from_unit=data["unit"],
to_unit=data["sensor"].unit,
event_resolution=data["sensor"].event_resolution,
event_resolution=data["duration"] / len(data["values"]),
)
return data

Expand Down Expand Up @@ -479,6 +497,21 @@ def possibly_upsample_values(data):
)
return data

@staticmethod
def possibly_downsample_bdf(
bdf: BeliefsDataFrame, required_resolution: timedelta
) -> BeliefsDataFrame:
"""
Downsample the data if needed, to fit the sensor's resolution.
Marshmallow runs this after validation.
"""
if required_resolution == timedelta(hours=0):
return bdf

if bdf.event_resolution < required_resolution:
bdf = bdf.resample_events(required_resolution)
return bdf

def load_bdf(self, sensor_data: dict) -> BeliefsDataFrame:
"""
Turn the de-serialized and validated data into a BeliefsDataFrame.
Expand All @@ -489,7 +522,11 @@ def load_bdf(self, sensor_data: dict) -> BeliefsDataFrame:
start = sensor_data["start"]
sensor = sensor_data["sensor"]

if frequency := sensor.get_attribute("frequency"):
if sensor.event_resolution != timedelta(0) and sensor.get_attribute(
"floor_datetimes_to_resolution", True
):
start = pd.Timestamp(start).floor(sensor.event_resolution)
elif frequency := sensor.get_attribute("frequency"):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the historical purpose of this attribute? How does that relate to the new functionality?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this a bit more. The historical purpose of frequency seems to be to align incoming measurements to a configured Pandas frequency by rounding. That is useful when a sensor stores instantaneous data, where event_resolution is zero, but we still want incoming point measurements to land on a predictable time grid.

This PR adds a different behavior for non-instantaneous sensors. For those, the sensor already has an interval grid through event_resolution, so off-clock starts are floored to that resolution before saving. So they are related in that both normalize incoming datetimes, but frequency is a custom rounding grid, while the new behavior is flooring to the sensor’s own event resolution.

I renamed the new flag to floor_datetimes_to_resolution to avoid mixing up those two concepts, and documented the distinction.

start = pd.Timestamp(start).round(frequency)

if event_resolution == timedelta(hours=0):
Expand Down Expand Up @@ -517,6 +554,7 @@ def load_bdf(self, sensor_data: dict) -> BeliefsDataFrame:
s,
source=source,
sensor=sensor_data["sensor"],
event_resolution=event_resolution,
**belief_timing,
)

Expand Down
53 changes: 53 additions & 0 deletions flexmeasures/api/v3_0/sensors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import isodate
from copy import deepcopy
from datetime import datetime, timedelta
import pandas as pd

from flexmeasures.data.services.sensors import (
serialize_sensor_status_data,
Expand Down Expand Up @@ -287,6 +289,47 @@ def support_legacy_field_name(self, data, **kwargs):
return data


def floor_timed_event_datetimes(flex_model: dict, sensor: Sensor) -> dict:
Comment thread
BelhsanHmida marked this conversation as resolved.
"""Floor timed-event datetimes in list-valued flex-model fields.

This only touches list entries that look like timed-event dictionaries,
such as ``soc-minima``, ``soc-maxima`` and ``soc-targets``.
"""
if sensor.event_resolution == timedelta(0) or not sensor.get_attribute(
"floor_datetimes_to_resolution", True
):
return flex_model

floored_flex_model = deepcopy(flex_model)
for value_name, value in floored_flex_model.items():
if not isinstance(value, list):
continue
for index, timed_event in enumerate(value):
if not isinstance(timed_event, dict):
continue
for key in ("datetime", "start", "end"):
if key in timed_event:
try:
timed_event[key] = isodate.datetime_isoformat(
pd.Timestamp(timed_event[key]).floor(
sensor.event_resolution
)
)
except (TypeError, ValueError) as exc:
raise ValidationError(
{
value_name: {
index: {
key: [
f"Not a valid datetime: {timed_event[key]!r}."
]
}
}
}
) from exc
return floored_flex_model


class SensorAPI(FlaskView):
route_base = "/sensors"
trailing_slash = False
Expand Down Expand Up @@ -1002,6 +1045,12 @@ def trigger_schedule(
f"Resolution of {resolution} is incompatible with the sensor's required resolution of {sensor.event_resolution}."
)

if flex_model is not None:
try:
flex_model = floor_timed_event_datetimes(flex_model, sensor)
except ValidationError as err:
return unprocessable_entity(err.messages)

end_of_schedule = start_of_schedule + duration
scheduler_kwargs = dict(
asset_or_sensor=sensor,
Expand Down Expand Up @@ -1306,6 +1355,8 @@ def fetch_one(self, id, sensor: Sensor):
timezone: Europe/Amsterdam
event_resolution: PT15M
entity_address: ea1.2021-01.io.flexmeasures:fm1.14
attributes:
floor_datetimes_to_resolution: true
generic_asset_id: 1
power_sensor:
summary: A power sensor recording average consumption every 5 minutes.
Expand Down Expand Up @@ -1369,6 +1420,7 @@ def post(self, sensor_data: dict):
"event_resolution": "PT1H"
"unit": "kWh"
"generic_asset_id": 1
"attributes": '{"floor_datetimes_to_resolution": false}'
responses:
201:
description: New Sensor
Expand All @@ -1385,6 +1437,7 @@ def post(self, sensor_data: dict):
"entity_address": "ea1.2023-08.localhost:fm1.1"
"event_resolution": "PT1H"
"generic_asset_id": 1
"attributes": '{"floor_datetimes_to_resolution": false}'
"timezone": "UTC"
"id": 2
400:
Expand Down
70 changes: 67 additions & 3 deletions flexmeasures/api/v3_0/tests/test_sensor_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from datetime import timedelta
from flask import current_app, url_for
import pandas as pd
import pytest
from sqlalchemy import event
from sqlalchemy.engine import Engine
Expand Down Expand Up @@ -357,10 +358,10 @@ def test_post_sensor_data_rejects_large_json(
("start", "2021-06-07T00:00:00", "start", "Not a valid aware datetime"),
(
"duration",
"PT30M",
"PT25M",
"_schema",
"Resolution of 0:05:00 is incompatible",
), # downsampling not supported
"Resolution of 0:04:10 is incompatible",
),
("unit", "m", "_schema", "Required unit"),
("type", "GetSensorDataRequest", "type", "Must be one of"),
],
Expand Down Expand Up @@ -427,6 +428,69 @@ def test_post_sensor_data_rejects_unknown_sensor_before_queueing(
assert current_app.queues["ingestion"].count == 0


@pytest.mark.parametrize(
"requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
)
@pytest.mark.parametrize(
"offclock_start, precise_start, precise_end, values, expected_values",
[
(
"2021-06-08T00:00:40+02:00",
"2021-06-08T00:00:00+02:00",
"2021-06-08T01:00:00+02:00",
[-11.28] * 6,
[-11.28] * 6,
),
(
"2021-06-09T00:00:40+02:00",
"2021-06-09T00:00:00+02:00",
"2021-06-09T01:00:00+02:00",
[100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200],
[150, 350, 550, 750, 950, 1150],
),
],
)
def test_post_non_instantaneous_sensor_data_floor(
Comment thread
BelhsanHmida marked this conversation as resolved.
client,
setup_api_test_data,
requesting_user,
offclock_start,
precise_start,
precise_end,
values,
expected_values,
):
post_data = make_sensor_data_request_for_gas_sensor(
num_values=len(values), unit="m³/h"
)
post_data["start"] = offclock_start
post_data["values"] = values
sensor = setup_api_test_data["some gas sensor"]

assert (
len(sensor.search_beliefs(precise_start, precise_end)) == 0
), "No beliefs were expected before we post our test data."

response = client.post(
url_for("SensorAPI:post_data", id=sensor.id),
json=post_data,
)

assert response.status_code == 200

new_data = sensor.search_beliefs(precise_start, precise_end).reset_index()
assert len(new_data) == 6
assert list(new_data["event_start"]) == [
pd.Timestamp(precise_start),
pd.Timestamp(precise_start) + pd.Timedelta(minutes=10),
pd.Timestamp(precise_start) + pd.Timedelta(minutes=20),
pd.Timestamp(precise_start) + pd.Timedelta(minutes=30),
pd.Timestamp(precise_start) + pd.Timedelta(minutes=40),
pd.Timestamp(precise_start) + pd.Timedelta(minutes=50),
]
assert new_data["event_value"].to_list() == expected_values


@pytest.mark.parametrize(
"requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
)
Expand Down
Loading
Loading