diff --git a/flexmeasures/data/models/forecasting/pipelines/base.py b/flexmeasures/data/models/forecasting/pipelines/base.py index 7fb394aa25..24a7f400d7 100644 --- a/flexmeasures/data/models/forecasting/pipelines/base.py +++ b/flexmeasures/data/models/forecasting/pipelines/base.py @@ -277,6 +277,78 @@ def _generate_splits( belief_timestamps_list : list[pd.Timestamp] """ + def _select_per_regressor( + data: pd.DataFrame, + regressor_columns: list[str], + *, + selection: str, + ) -> pd.DataFrame: + """ + Select one known value per `(event_start, regressor)`. + + Parameters + ---------- + data : pd.DataFrame + Input frame with `event_start`, `belief_time`, and regressor columns. + regressor_columns : list[str] + Regressor columns to select values for independently. + selection : str + Selection strategy: + - `"latest"`: choose the highest belief_time per event_start. + - `"closest"`: choose the minimum |event_start - belief_time| per event_start. + + Returns + ------- + pd.DataFrame + Wide frame with one row per event_start and one selected value + per regressor column. + """ + if selection not in {"latest", "closest"}: + raise ValueError( + "selection must be one of {'latest', 'closest'}, " + f"got {selection!r}" + ) + + selected = ( + data[["event_start"]] + .drop_duplicates() + .sort_values("event_start") + .reset_index(drop=True) + ) + + for regressor in regressor_columns: + regressor_data = data[ + ["event_start", "belief_time", regressor] + ].dropna(subset=[regressor]) + if regressor_data.empty: + selected[regressor] = np.nan + continue + + if selection == "latest": + idx = regressor_data.groupby("event_start")[ + "belief_time" + ].idxmax() + else: + regressor_data = regressor_data.copy() + regressor_data["time_diff"] = ( + regressor_data["event_start"] + - regressor_data["belief_time"] + ).abs() + idx = regressor_data.groupby("event_start")[ + "time_diff" + ].idxmin() + + selected_values = regressor_data.loc[ + idx, ["event_start", regressor] + ] + selected = selected.merge( + selected_values, + on="event_start", + how="left", + ) + + return selected + target_sensor_resolution = self.target_sensor.event_resolution # target_start is the timestamp of the event_start of the first event in realizations @@ -327,45 +399,25 @@ def _generate_splits( X_past_regressors_df["belief_time"] > X_past_regressors_df["event_start"] ].copy() - idx = past_obs.groupby("event_start")["belief_time"].idxmax() - past_latest = ( - past_obs.loc[idx].sort_values("event_start").reset_index(drop=True) + past_latest = _select_per_regressor( + past_obs, + self.past_regressors, + selection="latest", ) - past_keep = [c for c in past_latest.columns if c not in ("belief_time")] - past_latest = past_latest[past_keep] future_realized_latest = None - future_all_closest = None if X_future_regressors_df is not None: # Realized-only (belief_time > event_start): take closest per event_start fr = X_future_regressors_df.loc[ X_future_regressors_df["belief_time"] > X_future_regressors_df["event_start"] ].copy() - fr["time_diff"] = (fr["event_start"] - fr["belief_time"]).abs() - idx_fr = fr.groupby("event_start")["time_diff"].idxmin() - fr = ( - fr.loc[idx_fr] - .drop(columns=["time_diff"]) - .sort_values("event_start") - .reset_index(drop=True) - ) - - # All beliefs: closest per event_start (used for forecast slice) - fa = X_future_regressors_df.copy() - fa["time_diff"] = (fa["event_start"] - fa["belief_time"]).abs() - idx_fa = fa.groupby("event_start")["time_diff"].idxmin() - fa = ( - fa.loc[idx_fa] - .drop(columns=["time_diff"]) - .sort_values("event_start") - .reset_index(drop=True) + future_realized_latest = _select_per_regressor( + fr, + self.future_regressors, + selection="closest", ) - keep = [c for c in fr.columns if c not in ("belief_time")] - future_realized_latest = fr[keep] - future_all_closest = fa[keep] - y_clean = ( y.drop(columns=["belief_time"]) .sort_values("event_start") @@ -444,10 +496,7 @@ def _slice_closed( past_covariates = None # Future covariates (realized up to target_end + forecasts up to forecast_end) split - if ( - future_realized_latest is not None - and future_all_closest is not None - ): + if future_realized_latest is not None: realized_slice = _slice_closed( future_realized_latest, target_start, target_end ) @@ -467,24 +516,12 @@ def _slice_closed( # for each event_start in that window, pick the latest belief before the event # (closest from below wrt belief_time) - fc_window["time_diff"] = ( - X_future_regressors_df.loc[fc_window.index, "event_start"] - - X_future_regressors_df.loc[fc_window.index, "belief_time"] - ).abs() - idx_fc = fc_window.groupby("event_start")["belief_time"].idxmax() - forecast_slice = ( - fc_window.loc[idx_fc] - .drop(columns=["time_diff"], errors="ignore") - .sort_values("event_start") - .reset_index(drop=True) + forecast_slice = _select_per_regressor( + fc_window, + self.future_regressors, + selection="latest", ) - # keep only value columns (drop meta) - keep_fc = [ - c for c in forecast_slice.columns if c not in ("belief_time") - ] - forecast_slice = forecast_slice[keep_fc] - future_df = ( pd.concat([realized_slice, forecast_slice], ignore_index=True) .drop_duplicates(subset=["event_start"]) diff --git a/flexmeasures/data/tests/test_forecasting_pipeline.py b/flexmeasures/data/tests/test_forecasting_pipeline.py index 6e47873367..b20a8c269c 100644 --- a/flexmeasures/data/tests/test_forecasting_pipeline.py +++ b/flexmeasures/data/tests/test_forecasting_pipeline.py @@ -5,10 +5,12 @@ import logging import pandas as pd from datetime import timedelta +from types import SimpleNamespace from marshmallow import ValidationError from flexmeasures.data.models.forecasting.exceptions import NotEnoughDataException +from flexmeasures.data.models.forecasting.pipelines.base import BasePipeline from flexmeasures.data.models.forecasting.pipelines import TrainPredictPipeline from flexmeasures.utils.job_utils import work_on_rq from flexmeasures.data.services.forecasting import handle_forecasting_exception @@ -588,3 +590,80 @@ def test_prior_restricts_training_beliefs( f"Forecasts after anomaly ({mean_after:.1f}) should be at least 10x higher " f"than forecasts before anomaly ({mean_before:.1f})" ) + + +def test_split_data_selects_latest_future_belief_per_regressor(): + """ + Ensure future covariates keep the latest known value per regressor, not per joined row. + """ + resolution = timedelta(hours=1) + pipeline = BasePipeline( + target_sensor=SimpleNamespace( + name="target", + id=1, + event_resolution=resolution, + ), + future_regressors=[ + SimpleNamespace(name="regressor-a", id=2), + SimpleNamespace(name="regressor-b", id=3), + ], + past_regressors=[], + n_steps_to_predict=1, + max_forecast_horizon=1, + forecast_frequency=1, + event_starts_after=pd.Timestamp("2025-01-08 09:00:00"), + event_ends_before=pd.Timestamp("2025-01-08 10:00:00"), + ) + + captured_future_df = None + + def capture_future_covariates( + df, sensors, sensor_names, start, end, **kwargs + ): # noqa: ARG001 + nonlocal captured_future_df + if sensor_names == pipeline.future_regressors: + captured_future_df = df.copy() + return df + + pipeline.detect_and_fill_missing_values = capture_future_covariates + + regressor_a, regressor_b = pipeline.future_regressors + target_col = pipeline.target + input_df = pd.DataFrame( + [ + { + "event_start": "2025-01-08 09:00:00", + "belief_time": "2025-01-08 10:00:00", + regressor_a: None, + regressor_b: None, + target_col: 1.0, + }, + { + "event_start": "2025-01-08 10:00:00", + "belief_time": "2025-01-08 09:30:00", + regressor_a: 5.0, + regressor_b: None, + target_col: None, + }, + { + "event_start": "2025-01-08 10:00:00", + "belief_time": "2025-01-08 09:45:00", + regressor_a: None, + regressor_b: 7.0, + target_col: None, + }, + ] + ) + input_df["event_start"] = pd.to_datetime(input_df["event_start"]) + input_df["belief_time"] = pd.to_datetime(input_df["belief_time"]) + + pipeline.split_data_all_beliefs(df=input_df) + + assert captured_future_df is not None + selected_rows = captured_future_df.loc[ + captured_future_df["event_start"] == pd.Timestamp("2025-01-08 10:00:00") + ] + assert len(selected_rows) == 1 + selected = selected_rows.iloc[0] + assert selected[regressor_a] == pytest.approx(5.0) + assert selected[regressor_b] == pytest.approx(7.0)