Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 85 additions & 48 deletions flexmeasures/data/models/forecasting/pipelines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,78 @@ def _generate_splits(
belief_timestamps_list : list[pd.Timestamp]
"""

def _select_per_regressor(
data: pd.DataFrame,
regressor_columns: list[str],
*,
selection: str,
) -> pd.DataFrame:
"""
Select one known value per `(event_start, regressor)`.

Parameters
----------
data : pd.DataFrame
Input frame with `event_start`, `belief_time`, and regressor columns.
regressor_columns : list[str]
Regressor columns to select values for independently.
selection : str
Selection strategy:
- `"latest"`: choose the highest belief_time per event_start.
- `"closest"`: choose the minimum |event_start - belief_time| per event_start.

Returns
-------
pd.DataFrame
Wide frame with one row per event_start and one selected value
per regressor column.
"""
if selection not in {"latest", "closest"}:
raise ValueError(
"selection must be one of {'latest', 'closest'}, "
f"got {selection!r}"
)

selected = (
data[["event_start"]]
.drop_duplicates()
.sort_values("event_start")
.reset_index(drop=True)
)

for regressor in regressor_columns:
regressor_data = data[
["event_start", "belief_time", regressor]
].dropna(subset=[regressor])
if regressor_data.empty:
selected[regressor] = np.nan
continue

if selection == "latest":
idx = regressor_data.groupby("event_start")[
"belief_time"
].idxmax()
else:
regressor_data = regressor_data.copy()
regressor_data["time_diff"] = (
regressor_data["event_start"]
- regressor_data["belief_time"]
).abs()
idx = regressor_data.groupby("event_start")[
"time_diff"
].idxmin()

selected_values = regressor_data.loc[
idx, ["event_start", regressor]
]
selected = selected.merge(
selected_values,
on="event_start",
how="left",
)

return selected

target_sensor_resolution = self.target_sensor.event_resolution

# target_start is the timestamp of the event_start of the first event in realizations
Expand Down Expand Up @@ -327,45 +399,25 @@ def _generate_splits(
X_past_regressors_df["belief_time"]
> X_past_regressors_df["event_start"]
].copy()
idx = past_obs.groupby("event_start")["belief_time"].idxmax()
past_latest = (
past_obs.loc[idx].sort_values("event_start").reset_index(drop=True)
past_latest = _select_per_regressor(
past_obs,
self.past_regressors,
selection="latest",
)
past_keep = [c for c in past_latest.columns if c not in ("belief_time")]
past_latest = past_latest[past_keep]

future_realized_latest = None
future_all_closest = None
if X_future_regressors_df is not None:
# Realized-only (belief_time > event_start): take closest per event_start
fr = X_future_regressors_df.loc[
X_future_regressors_df["belief_time"]
> X_future_regressors_df["event_start"]
].copy()
fr["time_diff"] = (fr["event_start"] - fr["belief_time"]).abs()
idx_fr = fr.groupby("event_start")["time_diff"].idxmin()
fr = (
fr.loc[idx_fr]
.drop(columns=["time_diff"])
.sort_values("event_start")
.reset_index(drop=True)
)

# All beliefs: closest per event_start (used for forecast slice)
fa = X_future_regressors_df.copy()
fa["time_diff"] = (fa["event_start"] - fa["belief_time"]).abs()
idx_fa = fa.groupby("event_start")["time_diff"].idxmin()
fa = (
fa.loc[idx_fa]
.drop(columns=["time_diff"])
.sort_values("event_start")
.reset_index(drop=True)
future_realized_latest = _select_per_regressor(
fr,
self.future_regressors,
selection="closest",
)

keep = [c for c in fr.columns if c not in ("belief_time")]
future_realized_latest = fr[keep]
future_all_closest = fa[keep]

y_clean = (
y.drop(columns=["belief_time"])
.sort_values("event_start")
Expand Down Expand Up @@ -444,10 +496,7 @@ def _slice_closed(
past_covariates = None

# Future covariates (realized up to target_end + forecasts up to forecast_end) split
if (
future_realized_latest is not None
and future_all_closest is not None
):
if future_realized_latest is not None:
realized_slice = _slice_closed(
future_realized_latest, target_start, target_end
)
Expand All @@ -467,24 +516,12 @@ def _slice_closed(

# for each event_start in that window, pick the latest belief before the event
# (closest from below wrt belief_time)
fc_window["time_diff"] = (
X_future_regressors_df.loc[fc_window.index, "event_start"]
- X_future_regressors_df.loc[fc_window.index, "belief_time"]
).abs()
idx_fc = fc_window.groupby("event_start")["belief_time"].idxmax()
forecast_slice = (
fc_window.loc[idx_fc]
.drop(columns=["time_diff"], errors="ignore")
.sort_values("event_start")
.reset_index(drop=True)
forecast_slice = _select_per_regressor(
fc_window,
self.future_regressors,
selection="latest",
)

# keep only value columns (drop meta)
keep_fc = [
c for c in forecast_slice.columns if c not in ("belief_time")
]
forecast_slice = forecast_slice[keep_fc]

future_df = (
pd.concat([realized_slice, forecast_slice], ignore_index=True)
.drop_duplicates(subset=["event_start"])
Expand Down
79 changes: 79 additions & 0 deletions flexmeasures/data/tests/test_forecasting_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import logging
import pandas as pd
from datetime import timedelta
from types import SimpleNamespace

from marshmallow import ValidationError

from flexmeasures.data.models.forecasting.exceptions import NotEnoughDataException
from flexmeasures.data.models.forecasting.pipelines.base import BasePipeline
from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline
from flexmeasures.utils.job_utils import work_on_rq
from flexmeasures.data.services.forecasting import handle_forecasting_exception
Expand Down Expand Up @@ -588,3 +590,80 @@ def test_prior_restricts_training_beliefs(
f"Forecasts after anomaly ({mean_after:.1f}) should be at least 10x higher "
f"than forecasts before anomaly ({mean_before:.1f})"
)


def test_split_data_selects_latest_future_belief_per_regressor():
"""
Ensure future covariates keep the latest known value per regressor, not per joined row.
"""
resolution = timedelta(hours=1)
pipeline = BasePipeline(
target_sensor=SimpleNamespace(
name="target",
id=1,
event_resolution=resolution,
),
future_regressors=[
SimpleNamespace(name="regressor-a", id=2),
SimpleNamespace(name="regressor-b", id=3),
],
past_regressors=[],
n_steps_to_predict=1,
max_forecast_horizon=1,
forecast_frequency=1,
event_starts_after=pd.Timestamp("2025-01-08 09:00:00"),
event_ends_before=pd.Timestamp("2025-01-08 10:00:00"),
)

captured_future_df = None

def capture_future_covariates(
df, sensors, sensor_names, start, end, **kwargs
): # noqa: ARG001
nonlocal captured_future_df
if sensor_names == pipeline.future_regressors:
captured_future_df = df.copy()
return df

pipeline.detect_and_fill_missing_values = capture_future_covariates

regressor_a, regressor_b = pipeline.future_regressors
target_col = pipeline.target
input_df = pd.DataFrame(
[
{
"event_start": "2025-01-08 09:00:00",
"belief_time": "2025-01-08 10:00:00",
regressor_a: None,
regressor_b: None,
target_col: 1.0,
},
{
"event_start": "2025-01-08 10:00:00",
"belief_time": "2025-01-08 09:30:00",
regressor_a: 5.0,
regressor_b: None,
target_col: None,
},
{
"event_start": "2025-01-08 10:00:00",
"belief_time": "2025-01-08 09:45:00",
regressor_a: None,
regressor_b: 7.0,
target_col: None,
},
]
)
input_df["event_start"] = pd.to_datetime(input_df["event_start"])
input_df["belief_time"] = pd.to_datetime(input_df["belief_time"])

pipeline.split_data_all_beliefs(df=input_df)

assert captured_future_df is not None
selected_rows = captured_future_df.loc[
captured_future_df["event_start"] == pd.Timestamp("2025-01-08 10:00:00")
]
assert len(selected_rows) == 1
selected = selected_rows.iloc[0]
assert selected[regressor_a] == pytest.approx(5.0)
assert selected[regressor_b] == pytest.approx(7.0)