Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8edbf8a
feat: start test cases for deserializing time series data to BeliefsD…
Flix6x Nov 21, 2025
3b2770f
fix: update type annotation
Flix6x Nov 21, 2025
34a2dc5
feat: handle resampling after converting to a BeliefsDataFrame
Flix6x Nov 21, 2025
8d2d2d3
fix: catch NotImplementedError to fix failing test
Flix6x Nov 21, 2025
86ad774
feat: two more test cases
Flix6x Nov 21, 2025
30fb2ad
fix: update test expectations
Flix6x Nov 21, 2025
7f0caa1
Merge branch 'main' into mohamed/dev/floor-event-start
BelhsanHmida May 1, 2026
6c08284
fix: floor direct POST sensor data starts
BelhsanHmida May 2, 2026
f590e58
fix: floor uploaded sensor data datetimes
BelhsanHmida May 2, 2026
0566ceb
fix: floor flex-model scheduling datetimes
BelhsanHmida May 2, 2026
4afd828
Merge remote-tracking branch 'origin/main' into mohamed/dev/floor-eve…
BelhsanHmida May 5, 2026
a857806
fix: preserve 422 validation for invalid flex-model datetimes
BelhsanHmida May 9, 2026
f30429c
style: apply pre-commit
BelhsanHmida May 9, 2026
3a56e0c
fix: return 422 for invalid flex-model timed-event datetimes
Copilot May 9, 2026
dd2fcc4
test: refine invalid flex-model datetime regression assertion
Copilot May 9, 2026
aa93fe1
test: make flex-model datetime error assertion less brittle
Copilot May 9, 2026
e0f88a8
test: assert error payload shape for invalid flex-model datetime
Copilot May 9, 2026
aaebfaa
refactor: simplify test, incl. by not mixing timezone offsets
Flix6x May 12, 2026
43fe0b5
refactor: rename variable to match variable name in test_trigger_sche…
Flix6x May 12, 2026
9624f09
refactor: use search method over writing custom query
Flix6x May 12, 2026
860c700
feat: test resampling from 5 to 15 minutes
Flix6x May 12, 2026
2228410
docs: document flex-model datetime flooring helper
BelhsanHmida May 15, 2026
0d92e52
test: clarify sensor_index assumptions in upload tests
BelhsanHmida May 15, 2026
b7d3cf0
feat: handle floored downsampling posts
BelhsanHmida May 17, 2026
531d2db
test: cover datetime flooring opt-out
BelhsanHmida May 17, 2026
067bd92
fix: reject flooring event collisions
BelhsanHmida May 17, 2026
fa31e0e
docs: document datetime flooring attribute
BelhsanHmida May 17, 2026
cd4cfd2
docs: add changelog entry
BelhsanHmida May 17, 2026
d4e56bc
Merge branch 'main' into mohamed/dev/floor-event-start
BelhsanHmida May 17, 2026
c66b334
Merge remote-tracking branch 'origin/main' into mohamed/dev/floor-eve…
BelhsanHmida May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions flexmeasures/api/common/schemas/sensor_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,24 @@ def check_schema_unit_against_type(self, data, **kwargs):
f"The unit required for this message type should be convertible to an energy price unit, got incompatible unit: {posted_unit}"
)

@validates_schema
def check_single_value_zero_duration_for_non_instantaneous_sensor(
self, data, **kwargs
):
"""Reject inputs where a non-instantaneous sensor cannot infer any resolution."""

required_resolution = data["sensor"].event_resolution
inferred_resolution = data["duration"] / len(data["values"])

if (
required_resolution != timedelta(hours=0)
and len(data["values"]) == 1
and inferred_resolution == timedelta(hours=0)
):
raise ValidationError(
f"Cannot infer a non-zero resolution from one value over zero duration. This sensor requires a resolution of {required_resolution}."
)

@validates_schema
def check_resolution_compatibility_of_sensor_data(self, data, **kwargs):
"""Ensure event frequency is compatible with the sensor's event resolution.
Expand All @@ -362,10 +380,6 @@ def check_resolution_compatibility_of_sensor_data(self, data, **kwargs):
# The event frequency is inferred by assuming sequential, equidistant values within a time interval.
# The event resolution is assumed to be equal to the event frequency.
inferred_resolution = data["duration"] / len(data["values"])
if len(data["values"]) == 1 and inferred_resolution == timedelta(hours=0):
raise ValidationError(
f"Cannot infer a non-zero resolution from one value over zero duration. This sensor requires a resolution of {required_resolution}."
)
if inferred_resolution % required_resolution != timedelta(hours=0):
raise ValidationError(
f"Resolution of {inferred_resolution} is incompatible with the sensor's required resolution of {required_resolution}."
Expand All @@ -385,7 +399,7 @@ def check_multiple_instantaneous_values(self, data, **kwargs):
)

@post_load()
def post_load_sequence(self, data: dict, **kwargs) -> BeliefsDataFrame:
def post_load_sequence(self, data: dict, **kwargs) -> dict[str, BeliefsDataFrame]:
"""
If needed, upsample and convert units, then deserialize to a BeliefsDataFrame.
Returns a dict with the BDF in it, as that is expected by webargs when used with as_kwargs=True.
Expand Down Expand Up @@ -454,7 +468,11 @@ def load_bdf(sensor_data: dict) -> BeliefsDataFrame:
start = sensor_data["start"]
sensor = sensor_data["sensor"]

if frequency := sensor.get_attribute("frequency"):
if sensor.event_resolution != timedelta(0) and sensor.get_attribute(
"round_datetimes_on_ingestion", True
Comment thread
BelhsanHmida marked this conversation as resolved.
Outdated
):
start = pd.Timestamp(start).floor(sensor.event_resolution)
elif frequency := sensor.get_attribute("frequency"):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the historical purpose of this attribute? How does that relate to the new functionality?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this a bit more. The historical purpose of frequency seems to be to align incoming measurements to a configured Pandas frequency by rounding. That is useful when a sensor stores instantaneous data, where event_resolution is zero, but we still want incoming point measurements to land on a predictable time grid.

This PR adds a different behavior for non-instantaneous sensors. For those, the sensor already has an interval grid through event_resolution, so off-clock starts are floored to that resolution before saving. So they are related in that both normalize incoming datetimes, but frequency is a custom rounding grid, while the new behavior is flooring to the sensor’s own event resolution.

I renamed the new flag to floor_datetimes_to_resolution to avoid mixing up those two concepts, and documented the distinction.

start = pd.Timestamp(start).round(frequency)

if event_resolution == timedelta(hours=0):
Expand Down Expand Up @@ -482,6 +500,7 @@ def load_bdf(sensor_data: dict) -> BeliefsDataFrame:
s,
source=source,
sensor=sensor_data["sensor"],
event_resolution=event_resolution,
**belief_timing,
)

Expand Down
26 changes: 26 additions & 0 deletions flexmeasures/api/v3_0/sensors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import isodate
from copy import deepcopy
from datetime import datetime, timedelta
import pandas as pd

from flexmeasures.data.services.sensors import (
serialize_sensor_status_data,
Expand Down Expand Up @@ -224,6 +226,27 @@ class TriggerScheduleKwargsSchema(Schema):
)


def floor_timed_event_datetimes(flex_model: dict, sensor: Sensor) -> dict:
Comment thread
BelhsanHmida marked this conversation as resolved.
if sensor.event_resolution == timedelta(0) or not sensor.get_attribute(
"round_datetimes_on_ingestion", True
):
return flex_model

floored_flex_model = deepcopy(flex_model)
for value in floored_flex_model.values():
if not isinstance(value, list):
continue
for timed_event in value:
if not isinstance(timed_event, dict):
continue
for key in ("datetime", "start", "end"):
if key in timed_event:
timed_event[key] = isodate.datetime_isoformat(
pd.Timestamp(timed_event[key]).floor(sensor.event_resolution)
)
Comment thread
BelhsanHmida marked this conversation as resolved.
Outdated
return floored_flex_model


class SensorAPI(FlaskView):
route_base = "/sensors"
trailing_slash = False
Expand Down Expand Up @@ -901,6 +924,9 @@ def trigger_schedule(
f"Resolution of {resolution} is incompatible with the sensor's required resolution of {sensor.event_resolution}."
)

if flex_model is not None:
flex_model = floor_timed_event_datetimes(flex_model, sensor)

end_of_schedule = start_of_schedule + duration
scheduler_kwargs = dict(
asset_or_sensor=sensor,
Expand Down
38 changes: 38 additions & 0 deletions flexmeasures/api/v3_0/tests/test_sensor_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from datetime import timedelta
from flask import url_for
import pandas as pd
import pytest
from sqlalchemy import event
from sqlalchemy.engine import Engine
Expand Down Expand Up @@ -239,6 +240,43 @@ def test_post_invalid_sensor_data(
)


@pytest.mark.parametrize(
"requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
)
def test_post_non_instantaneous_sensor_data_floor(
Comment thread
BelhsanHmida marked this conversation as resolved.
client, setup_api_test_data, requesting_user
):
post_data = make_sensor_data_request_for_gas_sensor(unit="m³/h")
post_data["start"] = "2021-06-08T00:00:40+02:00"
sensor = setup_api_test_data["some gas sensor"]

rows = len(sensor.search_beliefs())

response = client.post(
url_for("SensorAPI:post_data", id=sensor.id),
json=post_data,
)

assert response.status_code == 200

data = sensor.search_beliefs().reset_index()
new_data = data[
data["event_start"].between(
pd.Timestamp("2021-06-07 22:00:00+0000", tz="UTC"),
pd.Timestamp("2021-06-07 22:50:00+0000", tz="UTC"),
)
]
Comment thread
BelhsanHmida marked this conversation as resolved.
Outdated
assert len(sensor.search_beliefs()) - rows == 6
assert list(new_data["event_start"]) == [
pd.Timestamp("2021-06-07 22:00:00+0000", tz="UTC"),
pd.Timestamp("2021-06-07 22:10:00+0000", tz="UTC"),
pd.Timestamp("2021-06-07 22:20:00+0000", tz="UTC"),
pd.Timestamp("2021-06-07 22:30:00+0000", tz="UTC"),
pd.Timestamp("2021-06-07 22:40:00+0000", tz="UTC"),
pd.Timestamp("2021-06-07 22:50:00+0000", tz="UTC"),
]


@pytest.mark.parametrize(
"requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True
)
Expand Down
38 changes: 38 additions & 0 deletions flexmeasures/api/v3_0/tests/test_sensor_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,44 @@ def test_trigger_schedule_with_invalid_flexmodel(
)


@pytest.mark.parametrize(
"requesting_user", ["test_prosumer_user@seita.nl"], indirect=True
)
def test_trigger_schedule_floors_flex_model_datetimes(
Comment thread
BelhsanHmida marked this conversation as resolved.
app,
add_market_prices,
add_battery_assets,
battery_soc_sensor,
add_charging_station_assets,
keep_scheduling_queue_empty,
requesting_user,
):
message = message_for_trigger_schedule(with_targets=True)
offclock_target = "2015-01-02T23:00:40+01:00"
expected_target = "2015-01-02T23:00:00+01:00"
for field in ("soc-targets", "soc-minima", "soc-maxima"):
message["flex-model"][field][0]["datetime"] = offclock_target

sensor = add_charging_station_assets["Test charging station"].sensors[0]
with app.test_client() as client:
trigger_schedule_response = client.post(
url_for("SensorAPI:trigger_schedule", id=sensor.id),
json=message,
)

assert trigger_schedule_response.status_code == 200
assert len(app.queues["scheduling"]) == 1

job = app.queues["scheduling"].jobs[0]
for field in ("soc-targets", "soc-minima", "soc-maxima"):
target = job.kwargs["flex_model"][field][0]
assert target["datetime"] == expected_target
if "start" in target:
assert parse_datetime(target["start"]) == parse_datetime(expected_target)
if "end" in target:
assert parse_datetime(target["end"]) == parse_datetime(expected_target)


@pytest.mark.parametrize(
"flex_config, field",
[
Expand Down
79 changes: 79 additions & 0 deletions flexmeasures/api/v3_0/tests/test_sensors_api_freshdb.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import io
import pytest
from datetime import timedelta
import pandas as pd

from flask import url_for
from sqlalchemy import select
Expand Down Expand Up @@ -232,6 +233,84 @@ def test_upload_sensor_data_with_unit_conversion_success(
assert [b.event_value for b in beliefs] == expected_event_values


@pytest.mark.parametrize(
"requesting_user, sensor_index, start_date, data_resolution, data_values, expected_event_starts, expected_event_values",
[
(
"test_prosumer_user_2@seita.nl",
1,
"2025-01-01T10:00:40+00:00",
timedelta(minutes=15),
[2, 3, 4],
[
pd.Timestamp("2025-01-01T10:00:00+00:00"),
pd.Timestamp("2025-01-01T10:15:00+00:00"),
pd.Timestamp("2025-01-01T10:30:00+00:00"),
],
[2, 3, 4],
),
(
"test_prosumer_user_2@seita.nl",
2,
"2025-01-01T10:00:40+00:00",
timedelta(minutes=30),
[10, 20, 20, 40],
[
pd.Timestamp("2025-01-01T10:00:00+00:00"),
pd.Timestamp("2025-01-01T11:00:00+00:00"),
],
[30, 60],
),
],
indirect=["requesting_user"],
)
def test_upload_sensor_data_floors_offclock_datetimes(
fresh_db,
client,
add_battery_assets_fresh_db,
requesting_user,
sensor_index,
start_date,
data_resolution,
data_values,
expected_event_starts,
expected_event_values,
):
test_battery = add_battery_assets_fresh_db["Test battery"]
sensor = test_battery.sensors[sensor_index]
Comment thread
BelhsanHmida marked this conversation as resolved.

csv_content = generate_csv_content(
start_time_str=start_date,
interval=data_resolution,
values=data_values,
)
file_obj = io.BytesIO(csv_content.encode("utf-8"))

response = client.post(
url_for("SensorAPI:upload_data", id=sensor.id),
data={"uploaded-files": (file_obj, "data.csv"), "unit": sensor.unit},
content_type="multipart/form-data",
)
assert response.status_code == 200

beliefs = (
fresh_db.session.execute(
select(TimedBelief)
.filter(TimedBelief.sensor_id == sensor.id)
.filter(TimedBelief.event_start >= expected_event_starts[0])
.filter(
TimedBelief.event_start
< expected_event_starts[-1] + sensor.event_resolution
)
.order_by(TimedBelief.event_start)
)
.scalars()
.all()
)
assert [b.event_start for b in beliefs] == expected_event_starts
assert [b.event_value for b in beliefs] == expected_event_values


@pytest.mark.parametrize(
"requesting_user, sensor_index, data_unit, data_resolution, data_values, expected_err_msg, expected_status",
[
Expand Down
9 changes: 9 additions & 0 deletions flexmeasures/data/schemas/scheduling/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ def __init__(
self.start = start
self.sensor = sensor
self.timezone = sensor.timezone if sensor is not None else None
self.rounding_resolution = (
sensor.event_resolution
if sensor is not None
and sensor.get_attribute("round_datetimes_on_ingestion", True)
else None
)
Comment thread
BelhsanHmida marked this conversation as resolved.
Outdated

# guess default soc-unit
if default_soc_unit is None:
Expand All @@ -250,20 +256,23 @@ def __init__(
to_unit="MWh",
default_src_unit=default_soc_unit,
timezone=self.timezone,
event_resolution=self.rounding_resolution,
data_key="soc-maxima",
)

self.soc_minima = VariableQuantityField(
to_unit="MWh",
default_src_unit=default_soc_unit,
timezone=self.timezone,
event_resolution=self.rounding_resolution,
data_key="soc-minima",
value_validator=validate.Range(min=0),
)
self.soc_targets = VariableQuantityField(
to_unit="MWh",
default_src_unit=default_soc_unit,
timezone=self.timezone,
event_resolution=self.rounding_resolution,
data_key="soc-targets",
)

Expand Down
Loading
Loading