diff --git a/docker-compose.yml b/docker-compose.yml index 538d2e0cfc..38bcbe6d48 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -122,7 +122,7 @@ services: if [ -f /usr/var/flexmeasures-instance/requirements.txt ]; then pip install --no-cache-dir -r /usr/var/flexmeasures-instance/requirements.txt fi - flexmeasures jobs run-worker --name flexmeasures-worker --queue forecasting\|scheduling + flexmeasures jobs run-worker --name flexmeasures-worker --queue forecasting\|scheduling\|ingestion test-db: image: postgres expose: diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 37a795fd9b..a129e1e51f 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -7,6 +7,14 @@ FlexMeasures Changelog v0.33.0 | May XX, 2026 ============================ +.. note:: It is recommended to assign a worker to the ``ingestion`` queue (or configure existing workers to handle it, as well), + so that sensor data posted via the API is processed asynchronously [see `PR #2101 `_]. + For instance, using: + + .. code-block:: bash + + $ flexmeasures jobs run-worker --queue "forecasting|scheduling|ingestion" + .. warning:: We are deprecating ``FLEXMEASURES_MONITORING_MAIL_RECIPIENTS`` in favor of ``FLEXMEASURES_DEFAULT_MONITORING_MAIL_RECIPIENTS``. New features @@ -21,6 +29,7 @@ New features Infrastructure / Support ---------------------- +* Move sensor data ingestion to a job queue for improved performance when POSTing large amounts of data to the sensor data API, returning a ``202 Accepted`` response with a job status URL when queued [see `PR #2101 `_] * Remove legacy rolling viewpoint forecasting code and utilities after migrating to fixed-point forecasting [see `PR #2082 `_] * Upgraded dependencies [see `PR #2114 `_, `PR #2148 `_, `PR #2161 `_ and `PR #2177 `_] * Run ``flexmeasures jobs run-worker`` with RQ's embedded scheduler on by default so jobs created with ``enqueue_in`` are promoted from the scheduled registry when due; pass ``--without-scheduler`` to disable [see `PR #2112 `_] @@ -32,6 +41,7 @@ Bugfixes ----------- * Fix forecasting regressor filtering to use only regressor beliefs known at the forecast ``belief_time`` [see `PR #2134 `_] * Check read permissions for sensors referenced in forecasting and scheduling config payloads, and return a clearer 403 error when a referenced sensor is not readable [see `PR #2096 `_ and `PR #2125 `_] +* Clean up stale sensor references from ``flex-config`` and ``sensors_to_show`` when deleting a sensor, using JSONB queries to find affected assets before pruning those references [see `PR #2106 `_] * Standardize resolution formatting across API endpoints for consistent response payloads [see `PR #2152 `_] * Make the auth check for CLI commands work with ``flask``, too, instead of only with the ``flexmeasures`` alias [see `PR #2169 `_] @@ -93,7 +103,6 @@ New features * Separate the ``StorageScheduler``'s tie-breaking preference for a full :abbr:`SoC (state of charge)` from its reported energy costs [see `PR #2023 `_ and `PR #2108 `_] * Improve asset graph hover interaction with a vertical ruler across subcharts, while keeping hover dots for easier visual tracking [see `PR #2079 `_] * Improve asset audit log messages for JSON field edits (especially ``sensors_to_show`` and nested flex-config values) [see `PR #2055 `_] -* Clean up stale sensor references from ``flex-config`` and ``sensors_to_show`` when deleting a sensor, using JSONB queries to find affected assets before pruning those references [see `PR #2106 `_] Infrastructure / Support ---------------------- diff --git a/documentation/configuration.rst b/documentation/configuration.rst index 7202f94fef..1b823aa585 100644 --- a/documentation/configuration.rst +++ b/documentation/configuration.rst @@ -347,6 +347,14 @@ Set a negative value to persist forever. Default: ``3600`` +FLEXMEASURES_MAX_SENSOR_DATA_INGESTION_BYTES +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Maximum request body size for sensor data posted to the sensor data API, both for JSON data and file uploads. +Set to ``None`` to disable this FlexMeasures-specific limit. + +Default: ``3 * 1024 * 1024`` + .. _datasource_config: FLEXMEASURES_DEFAULT_DATASOURCE diff --git a/documentation/host/installation.rst b/documentation/host/installation.rst index 2a32705eab..1c7ab6183b 100644 --- a/documentation/host/installation.rst +++ b/documentation/host/installation.rst @@ -385,7 +385,7 @@ Then, start workers in a console (or some other method to keep a long-running pr .. code-block:: bash - $ flexmeasures jobs run-worker --queue "scheduling|forecasting" + $ flexmeasures jobs run-worker --queue "scheduling|forecasting|ingestion" You can go to `http://localhost:5000/tasks/` and see the state of job queues and find individual jobs (and investigate why they failed, for instance). diff --git a/documentation/host/queues.rst b/documentation/host/queues.rst index ac489a2b69..5454061615 100644 --- a/documentation/host/queues.rst +++ b/documentation/host/queues.rst @@ -24,7 +24,7 @@ Here is how to run one worker for each kind of job (in separate terminals): .. code-block:: bash - $ flexmeasures jobs run-worker --name our-only-worker --queue forecasting|scheduling + $ flexmeasures jobs run-worker --name our-only-worker --queue forecasting|scheduling|ingestion Running multiple workers in parallel might be a great idea. @@ -32,6 +32,7 @@ Running multiple workers in parallel might be a great idea. $ flexmeasures jobs run-worker --name forecaster --queue forecasting $ flexmeasures jobs run-worker --name scheduler --queue scheduling + $ flexmeasures jobs run-worker --name ingester --queue ingestion You can also clear the job queues: @@ -39,10 +40,14 @@ You can also clear the job queues: $ flexmeasures jobs clear-queue --queue forecasting $ flexmeasures jobs clear-queue --queue scheduling + $ flexmeasures jobs clear-queue --queue ingestion When the main FlexMeasures process runs (e.g. by ``flexmeasures run``\ ), the queues of forecasting and scheduling jobs can be visited at ``http://localhost:5000/tasks/forecasting`` and ``http://localhost:5000/tasks/schedules``\ , respectively (by admins). +.. note:: + The ``ingestion`` queue is used for sensor data posted via the API. If the queue is not configured, or if no worker is connected to it, data is processed synchronously (in the web process) with a warning logged. Running a dedicated ingestion worker is recommended in production to keep API responses fast when large amounts of data are posted. When ingestion is queued, the API returns ``202 Accepted`` with a job status URL. + Inspect the queue and jobs diff --git a/documentation/tut/forecasting_scheduling.rst b/documentation/tut/forecasting_scheduling.rst index f69d028177..952faf87f7 100644 --- a/documentation/tut/forecasting_scheduling.rst +++ b/documentation/tut/forecasting_scheduling.rst @@ -28,6 +28,7 @@ Start to run one worker for each kind of job (in a separate terminal): $ flexmeasures jobs run-worker --queue forecasting $ flexmeasures jobs run-worker --queue scheduling + $ flexmeasures jobs run-worker --queue ingestion You can also clear the job queues: @@ -36,6 +37,7 @@ You can also clear the job queues: $ flexmeasures jobs clear-queue --queue forecasting $ flexmeasures jobs clear-queue --queue scheduling + $ flexmeasures jobs clear-queue --queue ingestion When the main FlexMeasures process runs (e.g. by ``flexmeasures run``), the queues of forecasting and scheduling jobs can be visited at ``http://localhost:5000/tasks/forecasting`` and ``http://localhost:5000/tasks/schedules``, respectively (by admins). diff --git a/flexmeasures/api/__init__.py b/flexmeasures/api/__init__.py index f7afec5463..c4c8a445a9 100644 --- a/flexmeasures/api/__init__.py +++ b/flexmeasures/api/__init__.py @@ -17,9 +17,11 @@ from flexmeasures.data.models.user import User from flexmeasures.api.common.utils.args_parsing import ( validation_error_handler, + request_entity_too_large_handler, ) from flexmeasures.api.common.responses import invalid_sender from flexmeasures.data.schemas.utils import FMValidationError +from werkzeug.exceptions import RequestEntityTooLarge from flexmeasures.api.v3_0.users import AuthRequestSchema # The api blueprint. It is registered with the Flask app (see app.py) @@ -150,6 +152,7 @@ def register_at(app: Flask): # handle API specific errors app.register_error_handler(FMValidationError, validation_error_handler) + app.register_error_handler(RequestEntityTooLarge, request_entity_too_large_handler) app.register_error_handler(IntegrityError, catch_timed_belief_replacements) app.unauthorized_handler_api = invalid_sender diff --git a/flexmeasures/api/common/responses.py b/flexmeasures/api/common/responses.py index a59ce62762..5b161ffa27 100644 --- a/flexmeasures/api/common/responses.py +++ b/flexmeasures/api/common/responses.py @@ -3,6 +3,8 @@ import inflect from functools import wraps +from flask import url_for + from flexmeasures.auth.error_handling import FORBIDDEN_MSG, FORBIDDEN_STATUS_CODE p = inflect.engine() @@ -62,7 +64,7 @@ def already_received_and_successfully_processed(message: str) -> ResponseTuple: @BaseMessage( - "Some of the data represents a replacement, which is reserved for customized servers. If you are hosting FlexMeasures, you can enable replacements by setting FLEXMEASURES_ALLOW_DATA_OVERWRITE=True in the configuration settings. Alternatively, update the prior in your request." + "Some of the data represents a replacement: existing values would be changed for the same sensor, source, event timestamp and belief time. This is reserved for customized servers. If you are hosting FlexMeasures, you can enable replacements by setting FLEXMEASURES_ALLOW_DATA_OVERWRITE=True in the configuration settings. Alternatively, submit the data with a different prior, or check whether the same file was already uploaded with different values." ) def invalid_replacement(message: str) -> ResponseTuple: return ( @@ -378,6 +380,25 @@ def request_processed(message: str) -> ResponseTuple: return dict(status="PROCESSED", message=message), 200 +def request_accepted_for_processing( + job_id: str, + message: str = "Request has been accepted for processing.", +) -> ResponseTuple: + return ( + dict( + status="ACCEPTED", + message=message, + job_monitor_url=url_for("JobAPI:get_job_status", uuid=job_id), + job_id=job_id, + ), + 202, + ) + + +def request_too_large(message: str) -> ResponseTuple: + return dict(result="Rejected", status="PAYLOAD_TOO_LARGE", message=message), 413 + + def pluralize(usef_role_name: str) -> str: """Adding a trailing 's' works well for USEF roles.""" return "%ss" % usef_role_name diff --git a/flexmeasures/api/common/schemas/sensor_data.py b/flexmeasures/api/common/schemas/sensor_data.py index 3692c50646..2407335a76 100644 --- a/flexmeasures/api/common/schemas/sensor_data.py +++ b/flexmeasures/api/common/schemas/sensor_data.py @@ -12,6 +12,7 @@ from flexmeasures.data import ma from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.models.user import User from flexmeasures.api.common.schemas.sensors import ( SensorEntityAddressField, SensorIdField, @@ -328,6 +329,10 @@ class PostSensorDataSchema(SensorDataDescriptionSchema): This schema includes data (values) and still describes it. """ + def __init__(self, *args, source_user: User | None = None, **kwargs): + super().__init__(*args, **kwargs) + self.source_user = source_user + values = PolyField( deserialization_schema_selector=select_schema_to_ensure_list_of_floats, serialization_schema_selector=select_schema_to_ensure_list_of_floats, @@ -469,12 +474,11 @@ def possibly_upsample_values(data): ) return data - @staticmethod - def load_bdf(sensor_data: dict) -> BeliefsDataFrame: + def load_bdf(self, sensor_data: dict) -> BeliefsDataFrame: """ Turn the de-serialized and validated data into a BeliefsDataFrame. """ - source = get_or_create_source(current_user) + source = get_or_create_source(self.source_user or current_user) num_values = len(sensor_data["values"]) event_resolution = sensor_data["duration"] / num_values start = sensor_data["start"] @@ -512,6 +516,29 @@ def load_bdf(sensor_data: dict) -> BeliefsDataFrame: ) +class PostSensorDataRequestSchema(PostSensorDataSchema): + """Validate posted sensor data without building a BeliefsDataFrame.""" + + @post_load() + def post_load_sequence(self, data: dict, **kwargs) -> dict: + sensor_data = { + "values": data["values"], + "start": datetime_isoformat(data["start"]), + "duration": duration_isoformat(data["duration"]), + "unit": data["unit"], + } + if "prior" in data: + sensor_data["prior"] = datetime_isoformat(data["prior"]) + elif "horizon" in data: + sensor_data["horizon"] = duration_isoformat(data["horizon"]) + else: + # Preserve request-time semantics when processing happens later in a worker. + sensor_data["prior"] = datetime_isoformat(server_now()) + if "type" in data: + sensor_data["type"] = data["type"] + return dict(sensor=data["sensor"], sensor_data=sensor_data) + + class GetSensorDataSchemaEntityAddress(GetSensorDataSchema): """DEPRECATED, only here to support deprecated endpoints""" diff --git a/flexmeasures/api/common/utils/api_utils.py b/flexmeasures/api/common/utils/api_utils.py index b3dfd90585..9633ed0e2a 100644 --- a/flexmeasures/api/common/utils/api_utils.py +++ b/flexmeasures/api/common/utils/api_utils.py @@ -12,6 +12,7 @@ from werkzeug.exceptions import Forbidden, Unauthorized from numpy import array from psycopg2.errors import UniqueViolation +from rq import Worker from rq.job import Job, JobStatus, NoSuchJobError from sqlalchemy import select from sqlalchemy.exc import IntegrityError @@ -19,14 +20,22 @@ from flexmeasures.data import db from flexmeasures.data.models.audit_log import AssetAuditLog from flexmeasures.data.models.user import Account +from flexmeasures.data.services.data_ingestion import ( + add_beliefs_to_db_and_enqueue_forecasting_jobs, +) from flexmeasures.data.models.generic_assets import GenericAsset from flexmeasures.data.models.time_series import Sensor -from flexmeasures.data.utils import save_to_db +from flexmeasures.data.utils import ( + SAVE_TO_DB_SUCCESS, + SAVE_TO_DB_SUCCESS_BUT_NOTHING_NEW, + SAVE_TO_DB_SUCCESS_WITH_UNCHANGED_BELIEFS_SKIPPED, +) from flexmeasures.auth.policy import check_access from flexmeasures.api.common.responses import ( invalid_replacement, ResponseTuple, request_processed, + request_accepted_for_processing, already_received_and_successfully_processed, ) from flexmeasures.data.schemas.generic_assets import GenericAssetSchema as AssetSchema @@ -146,20 +155,95 @@ def save_and_enqueue( forecasting_jobs: list[Job] | None = None, save_changed_beliefs_only: bool = True, ) -> ResponseTuple: - # Attempt to save - status = save_to_db(data, save_changed_beliefs_only=save_changed_beliefs_only) - db.session.commit() - - # Only enqueue forecasting jobs upon successfully saving new data - if status[:7] == "success" and status != "success_but_nothing_new": - enqueue_forecasting_jobs(forecasting_jobs) + status = add_beliefs_to_db_and_enqueue_forecasting_jobs( + data, + forecasting_jobs=forecasting_jobs, + save_changed_beliefs_only=save_changed_beliefs_only, + ) # Pick a response - if status == "success": + if status == SAVE_TO_DB_SUCCESS: + return request_processed() + elif status in ( + SAVE_TO_DB_SUCCESS_WITH_UNCHANGED_BELIEFS_SKIPPED, + SAVE_TO_DB_SUCCESS_BUT_NOTHING_NEW, + ): + return already_received_and_successfully_processed() + return invalid_replacement() + + +def process_sensor_data_ingestion( + sensor_id: int, + user_id: int, + sensor_data: dict | None = None, + uploaded_files: list[dict] | None = None, + upload_data: dict | None = None, + forecasting_jobs: list[Job] | None = None, + save_changed_beliefs_only: bool = True, +) -> ResponseTuple: + """Process sensor data ingestion asynchronously when possible. + + If an ingestion queue with connected workers is available, enqueue a background + job and return ``202 Accepted``. Otherwise, process the data synchronously and + return the resulting ingestion response. + """ + ingestion_queue = current_app.queues.get("ingestion") + if ingestion_queue is None: + current_app.logger.warning( + "No ingestion queue configured. Processing sensor data directly." + ) + else: + workers = Worker.all(queue=ingestion_queue) + if workers: + forecasting_job_ids = ( + [job.id for job in forecasting_jobs] + if forecasting_jobs is not None + else None + ) + job = ingestion_queue.enqueue( + add_beliefs_to_db_and_enqueue_forecasting_jobs, + sensor_id=sensor_id, + user_id=user_id, + sensor_data=sensor_data, + uploaded_files=uploaded_files, + upload_data=upload_data, + forecasting_job_ids=forecasting_job_ids, + save_changed_beliefs_only=save_changed_beliefs_only, + meta={"sensor_id": sensor_id}, + ttl=current_app.config.get( + "FLEXMEASURES_JOB_TTL", timedelta(-1) + ).total_seconds(), + # No need to keep ingestion results for the FLEXMEASURES_PLANNING_TTL + result_ttl=int( + current_app.config.get( + "FLEXMEASURES_JOB_TTL", timedelta(-1) + ).total_seconds() + ), + ) + return request_accepted_for_processing( + job.id, + "Sensor data has been accepted for processing.", + ) + else: + current_app.logger.warning( + "No workers connected to the ingestion queue. Processing sensor data directly." + ) + + status = add_beliefs_to_db_and_enqueue_forecasting_jobs( + sensor_id=sensor_id, + user_id=user_id, + sensor_data=sensor_data, + uploaded_files=uploaded_files, + upload_data=upload_data, + forecasting_jobs=forecasting_jobs, + save_changed_beliefs_only=save_changed_beliefs_only, + ) + + if status == SAVE_TO_DB_SUCCESS: return request_processed() elif status in ( - "success_with_unchanged_beliefs_skipped", - "success_but_nothing_new", + SAVE_TO_DB_SUCCESS_WITH_UNCHANGED_BELIEFS_SKIPPED, + SAVE_TO_DB_SUCCESS_BUT_NOTHING_NEW, ): return already_received_and_successfully_processed() return invalid_replacement() diff --git a/flexmeasures/api/common/utils/args_parsing.py b/flexmeasures/api/common/utils/args_parsing.py index 9665f42708..b9c84e006b 100644 --- a/flexmeasures/api/common/utils/args_parsing.py +++ b/flexmeasures/api/common/utils/args_parsing.py @@ -1,11 +1,14 @@ from flask import jsonify +from flask import current_app from flask import Request from flask_json import JsonError from webargs import ValidationError from webargs.flaskparser import parser from webargs.multidictproxy import MultiDictProxy from werkzeug.datastructures import MultiDict +from werkzeug.exceptions import RequestEntityTooLarge +from flexmeasures.api.common.responses import request_too_large from flexmeasures.data.schemas.utils import FMValidationError """ @@ -45,6 +48,25 @@ def validation_error_handler(error: FMValidationError): return response +def request_entity_too_large_handler(error: RequestEntityTooLarge): + response_data, status_code = request_too_large(error.description) + response = jsonify(response_data) + response.status_code = status_code + return response + + +def _enforce_request_size_limit(request: Request, config_key: str): + max_size = current_app.config.get(config_key) + if max_size is None or request.content_length is None: + return + if request.content_length > max_size: + raise RequestEntityTooLarge( + description=( + f"Request body exceeds the configured limit of {max_size} bytes." + ) + ) + + @parser.location_loader("args_and_json") def load_data(request, schema): """ @@ -99,6 +121,8 @@ def combined_sensor_data_upload(request: Request, schema): MultiDictProxy: A proxy object wrapping the merged data from path parameters and uploaded files. """ + _enforce_request_size_limit(request, "FLEXMEASURES_MAX_SENSOR_DATA_INGESTION_BYTES") + data = MultiDict(request.view_args) data.update(request.files) belief_time = request.form.get("belief-time-measured-instantly") @@ -132,6 +156,11 @@ def combined_sensor_data_description(request: Request, schema): MultiDictProxy: A proxy object wrapping the merged data from path parameters, URL and/or uploaded json. """ + if request.method == "POST": + _enforce_request_size_limit( + request, "FLEXMEASURES_MAX_SENSOR_DATA_INGESTION_BYTES" + ) + # combine data data = MultiDict(request.view_args) data.update(request.args) # Url (GET) diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index fa7b0b1120..ad504245ba 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -15,7 +15,6 @@ from marshmallow import fields, Schema, ValidationError, validates_schema import marshmallow.validate as validate from rq.job import Job, JobStatus, NoSuchJobError -import timely_beliefs as tb from webargs.flaskparser import use_args, use_kwargs from sqlalchemy import delete, select, or_ @@ -33,12 +32,13 @@ GetSensorDataSchema, GetSensorDataQuerySchema, PostSensorDataSchema, + PostSensorDataRequestSchema, ) from flexmeasures.api.common.schemas.sensors import SensorId # noqa F401 from flexmeasures.api.common.schemas.users import AccountIdField from flexmeasures.api.common.utils.api_utils import ( job_status_description, - save_and_enqueue, + process_sensor_data_ingestion, ) from flexmeasures.auth.policy import check_access from flexmeasures.auth.decorators import permission_required_for_context @@ -55,6 +55,7 @@ SensorSchema, SensorIdField, SensorDataFileSchema, + SensorDataFileRequestSchema, ) from flexmeasures.data.schemas.times import ( AwareDateTimeField, @@ -497,19 +498,21 @@ def index( @route("/data/upload", methods=["POST"]) @use_args( - SensorDataFileSchema(), location="combined_sensor_data_upload", as_kwargs=True + SensorDataFileRequestSchema(), + location="combined_sensor_data_upload", + as_kwargs=True, ) @permission_required_for_context( "create-children", - ctx_arg_name="data", - ctx_loader=lambda data: data[0].sensor if data else None, - pass_ctx_to_loader=True, + ctx_arg_name="sensor", ) def upload_data( self, - data: list[tb.BeliefsDataFrame], + sensor: Sensor, + uploaded_files: list, filenames: list[str], unit: str | None = None, + belief_time_measured_instantly: bool | None = None, **kwargs, ): """ @@ -526,6 +529,8 @@ def upload_data( The resolution of the data has to match the sensor's required resolution, but FlexMeasures will attempt to upsample lower resolutions. The list of values may include null values. + The request body is limited by FLEXMEASURES_MAX_SENSOR_DATA_INGESTION_BYTES + (3 MiB by default). security: - ApiKeyAuth: [] @@ -542,6 +547,8 @@ def upload_data( uploaded-files: contentType: application/octet-stream responses: + 202: + description: ACCEPTED 200: description: PROCESSED content: @@ -596,33 +603,52 @@ def upload_data( description: UNAUTHORIZED 403: description: INVALID_SENDER + 413: + description: PAYLOAD_TOO_LARGE 422: description: UNPROCESSABLE_ENTITY tags: - Sensors """ - sensor = data[0].sensor - AssetAuditLog.add_record( sensor.generic_asset, f"Data from {join_words_into_a_list(filenames)} uploaded to sensor '{sensor.name}': {sensor.id}", ) - response, code = save_and_enqueue(data) + files_for_job = [] + for file in uploaded_files: + files_for_job.append( + dict( + filename=file.filename, + content_type=file.content_type, + content=file.read(), + ) + ) + upload_data = { + "belief-time-measured-instantly": ( + "on" if belief_time_measured_instantly else "off" + ), + } + if unit is not None: + upload_data["unit"] = unit + response, code = process_sensor_data_ingestion( + sensor_id=sensor.id, + user_id=current_user.id, + uploaded_files=files_for_job, + upload_data=upload_data, + ) return response, code @route("//data", methods=["POST"]) @use_args( - PostSensorDataSchema(), + PostSensorDataRequestSchema(), location="combined_sensor_data_description", as_kwargs=True, ) @permission_required_for_context( "create-children", - ctx_arg_name="bdf", - ctx_loader=lambda bdf: bdf.sensor, - pass_ctx_to_loader=True, + ctx_arg_name="sensor", ) - def post_data(self, id: int, bdf: tb.BeliefsDataFrame): + def post_data(self, id: int, sensor: Sensor, sensor_data: dict): """ .. :quickref: Data; Post sensor data --- @@ -639,6 +665,8 @@ def post_data(self, id: int, bdf: tb.BeliefsDataFrame): The resolution of the data has to match the sensor's required resolution, but FlexMeasures will attempt to upsample lower resolutions. The list of values may include null values. + The request body is limited by FLEXMEASURES_MAX_SENSOR_DATA_INGESTION_BYTES + (3 MiB by default). security: - ApiAuthKey: [] @@ -660,6 +688,8 @@ def post_data(self, id: int, bdf: tb.BeliefsDataFrame): "duration": "PT1H" "unit": "m³/h" responses: + 202: + description: ACCEPTED 200: description: PROCESSED 400: @@ -668,12 +698,18 @@ def post_data(self, id: int, bdf: tb.BeliefsDataFrame): description: UNAUTHORIZED 403: description: INVALID_SENDER + 413: + description: PAYLOAD_TOO_LARGE 422: description: UNPROCESSABLE_ENTITY tags: - Sensors """ - response, code = save_and_enqueue(bdf) + response, code = process_sensor_data_ingestion( + sensor_id=sensor.id, + user_id=current_user.id, + sensor_data=sensor_data, + ) return response, code @route("//data", methods=["GET"]) diff --git a/flexmeasures/api/v3_0/tests/test_sensor_data.py b/flexmeasures/api/v3_0/tests/test_sensor_data.py index 6cb3a14bf2..9d310ff0b7 100644 --- a/flexmeasures/api/v3_0/tests/test_sensor_data.py +++ b/flexmeasures/api/v3_0/tests/test_sensor_data.py @@ -1,7 +1,7 @@ from __future__ import annotations from datetime import timedelta -from flask import url_for +from flask import current_app, url_for import pytest from sqlalchemy import event from sqlalchemy.engine import Engine @@ -11,6 +11,10 @@ from flexmeasures.api.v3_0.tests.utils import make_sensor_data_request_for_gas_sensor +def _fake_ingestion_worker(queue): + return [object()] + + @pytest.mark.parametrize( "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True ) @@ -289,6 +293,64 @@ def test_post_sensor_data_bad_auth( assert post_data_response.status_code == status_code +@pytest.mark.parametrize( + "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True +) +def test_post_sensor_data_returns_accepted_job( + client, + setup_api_test_data, + requesting_user, + monkeypatch, +): + monkeypatch.setattr( + "flexmeasures.api.common.utils.api_utils.Worker.all", + _fake_ingestion_worker, + ) + current_app.queues["ingestion"].empty() + post_data = make_sensor_data_request_for_gas_sensor() + sensor = setup_api_test_data["some gas sensor"] + + response = client.post( + url_for("SensorAPI:post_data", id=sensor.id), + json=post_data, + ) + + assert response.status_code == 202 + assert response.json["status"] == "ACCEPTED" + assert response.json["job_monitor_url"] == url_for( + "JobAPI:get_job_status", uuid=response.json["job_id"] + ) + job = current_app.queues["ingestion"].fetch_job(response.json["job_id"]) + assert job.kwargs["sensor_id"] == sensor.id + assert job.kwargs["sensor_data"] == post_data + assert "data" not in job.kwargs + + +@pytest.mark.parametrize( + "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True +) +def test_post_sensor_data_rejects_large_json( + client, + setup_api_test_data, + requesting_user, + monkeypatch, +): + monkeypatch.setitem( + current_app.config, + "FLEXMEASURES_MAX_SENSOR_DATA_INGESTION_BYTES", + 1, + ) + sensor = setup_api_test_data["some gas sensor"] + + response = client.post( + url_for("SensorAPI:post_data", id=sensor.id), + json=make_sensor_data_request_for_gas_sensor(), + ) + + assert response.status_code == 413 + assert response.json["status"] == "PAYLOAD_TOO_LARGE" + + @pytest.mark.parametrize( "request_field, new_value, error_field, error_text", [ @@ -318,7 +380,13 @@ def test_post_invalid_sensor_data( error_field, error_text, requesting_user, + monkeypatch, ): + monkeypatch.setattr( + "flexmeasures.api.common.utils.api_utils.Worker.all", + _fake_ingestion_worker, + ) + current_app.queues["ingestion"].empty() post_data = make_sensor_data_request_for_gas_sensor() sensor = setup_api_test_data["some gas sensor"] post_data[request_field] = new_value @@ -333,6 +401,30 @@ def test_post_invalid_sensor_data( error_text in response.json["message"]["combined_sensor_data_description"][error_field][0] ) + assert current_app.queues["ingestion"].count == 0 + + +@pytest.mark.parametrize( + "requesting_user", ["test_supplier_user_4@seita.nl"], indirect=True +) +def test_post_sensor_data_rejects_unknown_sensor_before_queueing( + client, + requesting_user, + monkeypatch, +): + monkeypatch.setattr( + "flexmeasures.api.common.utils.api_utils.Worker.all", + _fake_ingestion_worker, + ) + current_app.queues["ingestion"].empty() + + response = client.post( + url_for("SensorAPI:post_data", id=999999), + json=make_sensor_data_request_for_gas_sensor(), + ) + + assert response.status_code == 404 + assert current_app.queues["ingestion"].count == 0 @pytest.mark.parametrize( diff --git a/flexmeasures/api/v3_0/tests/test_sensors_api.py b/flexmeasures/api/v3_0/tests/test_sensors_api.py index f22cc7f5c2..d8d033ca16 100644 --- a/flexmeasures/api/v3_0/tests/test_sensors_api.py +++ b/flexmeasures/api/v3_0/tests/test_sensors_api.py @@ -5,7 +5,7 @@ import io import json -from flask import url_for +from flask import current_app, url_for from sqlalchemy import select, func from flexmeasures.data.models.time_series import TimedBelief @@ -341,6 +341,107 @@ def test_upload_csv_file(client, db, setup_api_test_data, sensor_name, requestin ) +@pytest.mark.parametrize("requesting_user", ["test_admin_user@seita.nl"], indirect=True) +def test_upload_csv_file_returns_accepted_job( + client, setup_api_test_data, requesting_user, monkeypatch +): + monkeypatch.setattr( + "flexmeasures.api.common.utils.api_utils.Worker.all", + lambda queue: [object()], + ) + current_app.queues["ingestion"].empty() + auth_token = get_auth_token(client, "test_admin_user@seita.nl", "testtest") + csv_content = """event_start,event_value +2022-12-16T05:11:00Z,4 +""" + sensor = setup_api_test_data["some gas sensor"] + file = (io.BytesIO(csv_content.encode("utf-8")), "test.csv") + + response = client.post( + url_for("SensorAPI:upload_data", id=sensor.id), + data={"uploaded-files": file}, + content_type="multipart/form-data", + headers={"Authorization": auth_token}, + ) + + assert response.status_code == 202 + assert response.json["status"] == "ACCEPTED" + assert response.json["job_monitor_url"] == url_for( + "JobAPI:get_job_status", uuid=response.json["job_id"] + ) + job = current_app.queues["ingestion"].fetch_job(response.json["job_id"]) + assert job.kwargs["sensor_id"] == sensor.id + assert job.kwargs["uploaded_files"][0]["filename"] == "test.csv" + assert job.kwargs["uploaded_files"][0]["content"] == csv_content.encode("utf-8") + assert "data" not in job.kwargs + + +@pytest.mark.parametrize("requesting_user", ["test_admin_user@seita.nl"], indirect=True) +def test_upload_csv_file_rejects_large_upload( + client, setup_api_test_data, requesting_user, monkeypatch +): + monkeypatch.setitem( + current_app.config, + "FLEXMEASURES_MAX_SENSOR_DATA_INGESTION_BYTES", + 1, + ) + auth_token = get_auth_token(client, "test_admin_user@seita.nl", "testtest") + sensor = setup_api_test_data["some gas sensor"] + file = ( + io.BytesIO(b"event_start,event_value\n2022-12-16T05:11:00Z,4\n"), + "test.csv", + ) + + response = client.post( + url_for("SensorAPI:upload_data", id=sensor.id), + data={"uploaded-files": file}, + content_type="multipart/form-data", + headers={"Authorization": auth_token}, + ) + + assert response.status_code == 413 + assert response.json["status"] == "PAYLOAD_TOO_LARGE" + + +@pytest.mark.parametrize("requesting_user", ["test_admin_user@seita.nl"], indirect=True) +def test_upload_csv_file_measured_instantly_with_resampling( + client, db, setup_api_test_data, requesting_user +): + """Regression test: uploading data with belief-time-measured-instantly=on and resampling + needed should complete with a 200 status and not trigger the O(N²) slow path in + resample_events that previously caused server hangs and OOM crashes. + + The "some gas sensor" has 10-minute resolution. We upload 5-minute data, triggering + downsampling. With belief_time_measured_instantly=True, each event gets a unique belief_time + which previously caused the slow track in resample_events with O(N²) memory usage. + """ + auth_token = get_auth_token(client, "test_admin_user@seita.nl", "testtest") + # 5-minute resolution data -> needs downsampling to sensor's 10-minute resolution + csv_content = """event_start,event_value +2022-12-16T05:00:00Z,10 +2022-12-16T05:05:00Z,20 +2022-12-16T05:10:00Z,30 +2022-12-16T05:15:00Z,40 +2022-12-16T05:20:00Z,50 +2022-12-16T05:25:00Z,60 +""" + sensor = setup_api_test_data["some gas sensor"] + file = (io.BytesIO(csv_content.encode("utf-8")), "test.csv") + + data = { + "uploaded-files": file, + "belief-time-measured-instantly": "on", + } + + response = client.post( + url_for("SensorAPI:upload_data", id=sensor.id), + data=data, + content_type="multipart/form-data", + headers={"Authorization": auth_token}, + ) + assert response.status_code == 200 + + @pytest.mark.parametrize("requesting_user", ["test_admin_user@seita.nl"], indirect=True) def test_upload_excel_file(client, requesting_user): import openpyxl @@ -647,9 +748,12 @@ def test_fetch_sensor_stats( assert record["Min value"] assert record["Max value"] if source == "Test Admin User (ID: 7)": - sum_values = 162.0 - count_values = 36 - mean_value = 4.5 + # 36 values from CSV/Excel uploads (upsampled from 1H to 10min) + # + 3 values from test_upload_csv_file_measured_instantly_with_resampling + # (downsampled from 5min to 10min: values 15, 35, 55) + sum_values = 267.0 + count_values = 39 + mean_value = 267.0 / 39 elif source == "Test Supplier User (ID: 6)": sum_values = 275.1 count_values = 3 diff --git a/flexmeasures/app.py b/flexmeasures/app.py index 538d503eed..c574f92129 100644 --- a/flexmeasures/app.py +++ b/flexmeasures/app.py @@ -110,6 +110,7 @@ def create( # noqa C901 app.queues = dict( forecasting=Queue(connection=redis_conn, name="forecasting"), scheduling=Queue(connection=redis_conn, name="scheduling"), + ingestion=Queue(connection=redis_conn, name="ingestion"), # reporting=Queue(connection=redis_conn, name="reporting"), # labelling=Queue(connection=redis_conn, name="labelling"), # alerting=Queue(connection=redis_conn, name="alerting"), diff --git a/flexmeasures/cli/jobs.py b/flexmeasures/cli/jobs.py index f14d3e2c82..452aac934e 100644 --- a/flexmeasures/cli/jobs.py +++ b/flexmeasures/cli/jobs.py @@ -291,7 +291,7 @@ def run_job(job_id: str): "--queue", default=None, required=True, - help="State which queue(s) to work on (using '|' as separator), e.g. 'forecasting', 'scheduling' or 'forecasting|scheduling'.", + help="State which queue(s) to work on (using '|' as separator), e.g. 'forecasting', 'scheduling', 'ingestion' or 'forecasting|scheduling'.", ) @click.option( "--name", @@ -310,7 +310,7 @@ def run_job(job_id: str): ) def run_worker(queue: str, name: str | None, with_scheduler: bool): """ - Start a worker process for forecasting and/or scheduling jobs. + Start a worker process for forecasting, scheduling and/or ingestion jobs. We use the app context to find out which redis queues to use. """ diff --git a/flexmeasures/data/schemas/sensors.py b/flexmeasures/data/schemas/sensors.py index af73d93ae1..2ceeb2ac0c 100644 --- a/flexmeasures/data/schemas/sensors.py +++ b/flexmeasures/data/schemas/sensors.py @@ -32,12 +32,14 @@ from flexmeasures.data import ma, db from flexmeasures.data.models.generic_assets import GenericAsset from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.models.user import User from flexmeasures.data.schemas.utils import ( FMValidationError, MarshmallowClickMixin, with_appcontext_if_needed, convert_to_quantity, ) +from flexmeasures.data.services.data_sources import get_or_create_source from flexmeasures.utils.time_utils import get_timezone from flexmeasures.utils.unit_utils import ( is_valid_unit, @@ -606,6 +608,10 @@ class SensorDataFileDescriptionSchema(Schema): class SensorDataFileSchema(SensorDataFileDescriptionSchema): sensor = SensorIdField(data_key="id") + def __init__(self, *args, source_user: User | None = None, **kwargs): + super().__init__(*args, **kwargs) + self.source_user = source_user + _valid_content_types = { "text/csv", "text/plain", @@ -677,7 +683,7 @@ def post_load(self, fields, **kwargs): bdf = tb.read_csv( file, sensor, - source=current_user.data_source[0], + source=get_or_create_source(self.source_user or current_user), belief_time=( pd.Timestamp.utcnow() if not belief_time_measured_instantly @@ -728,9 +734,16 @@ def post_load(self, fields, **kwargs): is_stock_unit(from_unit) for is_stock_unit in known_stock_unit_validators ): - bdf = bdf.resample_events(sensor.event_resolution, method="sum") + bdf = bdf.resample_events( + sensor.event_resolution, + method="sum", + keep_only_most_recent_belief=True, + ) else: - bdf = bdf.resample_events(sensor.event_resolution) + bdf = bdf.resample_events( + sensor.event_resolution, + keep_only_most_recent_belief=True, + ) dfs.append(bdf) except Exception as e: error_message = ( @@ -747,6 +760,16 @@ def post_load(self, fields, **kwargs): return fields +class SensorDataFileRequestSchema(SensorDataFileSchema): + """Validate a sensor data upload without parsing or resampling its files.""" + + @post_load + def post_load(self, fields, **kwargs): + files: list[FileStorage] = fields["uploaded_files"] + fields["filenames"] = [file.filename for file in files] + return fields + + class QuantitySchema(Schema): """Represents a quantity string like '1 EUR/MWh'.""" diff --git a/flexmeasures/data/services/data_ingestion.py b/flexmeasures/data/services/data_ingestion.py new file mode 100644 index 0000000000..1b431caff0 --- /dev/null +++ b/flexmeasures/data/services/data_ingestion.py @@ -0,0 +1,143 @@ +""" +Logic around data ingestion (jobs) +""" + +from __future__ import annotations + +from io import BytesIO + +from flask import current_app +from rq.job import Job +from rq.job import NoSuchJobError +import timely_beliefs as tb +from werkzeug.datastructures import FileStorage + +from flexmeasures.data import db +from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.models.user import User +from flexmeasures.data.utils import ( + SAVE_TO_DB_SUCCESS_WITH_CHANGES_STATUSES, + save_to_db, +) + + +def _get_ingestion_context(sensor_id: int, user_id: int) -> tuple[Sensor, User]: + sensor = db.session.get(Sensor, sensor_id) + if sensor is None: + raise ValueError(f"No such sensor: {sensor_id}") + user = db.session.get(User, user_id) + if user is None: + raise ValueError(f"No such user: {user_id}") + return sensor, user + + +def _load_json_sensor_data( + sensor_id: int, + user_id: int, + sensor_data: dict, +) -> tb.BeliefsDataFrame: + """Validate and transform raw JSON sensor data into a BeliefsDataFrame.""" + + from flexmeasures.api.common.schemas.sensor_data import PostSensorDataSchema + + _sensor, user = _get_ingestion_context(sensor_id, user_id) + payload = dict(sensor_data) + payload.pop("id", None) + payload["sensor"] = sensor_id + return PostSensorDataSchema(source_user=user).load(payload)["bdf"] + + +def _file_storage_from_payload(file_payload: dict) -> FileStorage: + stream = BytesIO(file_payload["content"]) + stream.name = file_payload["filename"] + return FileStorage( + stream=stream, + filename=file_payload["filename"], + content_type=file_payload["content_type"], + ) + + +def _load_uploaded_sensor_data( + sensor_id: int, + user_id: int, + uploaded_files: list[dict], + upload_data: dict, +) -> list[tb.BeliefsDataFrame]: + """Validate and transform raw uploaded files into BeliefsDataFrames.""" + + from flexmeasures.data.schemas.sensors import SensorDataFileSchema + + _sensor, user = _get_ingestion_context(sensor_id, user_id) + payload = dict(upload_data) + payload["id"] = sensor_id + payload["uploaded-files"] = [ + _file_storage_from_payload(file_payload) for file_payload in uploaded_files + ] + return SensorDataFileSchema(source_user=user).load(payload)["data"] + + +def add_beliefs_to_db_and_enqueue_forecasting_jobs( + data: tb.BeliefsDataFrame | list[tb.BeliefsDataFrame] | None = None, + sensor_id: int | None = None, + user_id: int | None = None, + sensor_data: dict | None = None, + uploaded_files: list[dict] | None = None, + upload_data: dict | None = None, + forecasting_jobs: list[Job] | None = None, + forecasting_job_ids: list[str] | None = None, + save_changed_beliefs_only: bool = True, +) -> str: + """Save sensor data to the database and optionally enqueue forecasting jobs. + + This function is intended to be called as an RQ job by an ingestion queue worker, + but can also be called directly (e.g. as a fallback when no workers are available). + + :param data: BeliefsDataFrame (or list thereof) to be saved. + :param sensor_id: Sensor ID for raw JSON or file ingestion. + :param user_id: User ID used to resolve the source of raw ingested data. + :param sensor_data: Raw JSON payload from the sensor data endpoint. + :param uploaded_files: Uploaded file contents and metadata. + :param upload_data: Raw form payload from the sensor data upload endpoint. + :param forecasting_jobs: Optional list of forecasting Jobs to enqueue after saving. + :param forecasting_job_ids: Optional list of forecasting Job ids to enqueue after saving. + :param save_changed_beliefs_only: If True, skip saving beliefs whose value hasn't changed. + :returns: Status string as returned by ``save_to_db``. + """ + if sensor_data is not None: + if sensor_id is None or user_id is None: + raise ValueError("Expected sensor_id and user_id for raw sensor data.") + data = _load_json_sensor_data(sensor_id, user_id, sensor_data) + elif uploaded_files is not None: + if sensor_id is None or user_id is None: + raise ValueError("Expected sensor_id and user_id for uploaded sensor data.") + data = _load_uploaded_sensor_data( + sensor_id, + user_id, + uploaded_files, + upload_data or {}, + ) + if data is None: + raise ValueError("Expected data, sensor_data, or uploaded_files.") + + status = save_to_db(data, save_changed_beliefs_only=save_changed_beliefs_only) + db.session.commit() + + # Only enqueue forecasting jobs upon successfully saving new data + if status in SAVE_TO_DB_SUCCESS_WITH_CHANGES_STATUSES: + if forecasting_jobs is not None: + for job in forecasting_jobs: + current_app.queues["forecasting"].enqueue_job(job) + if forecasting_job_ids is not None: + connection = current_app.queues["forecasting"].connection + for job_id in forecasting_job_ids: + try: + job = Job.fetch(job_id, connection=connection) + except NoSuchJobError: + current_app.logger.warning( + "Forecasting job %s no longer exists; skipping enqueue.", + job_id, + ) + continue + current_app.queues["forecasting"].enqueue_job(job) + + return status diff --git a/flexmeasures/data/services/time_series.py b/flexmeasures/data/services/time_series.py index 8b326ccd3d..bec17970da 100644 --- a/flexmeasures/data/services/time_series.py +++ b/flexmeasures/data/services/time_series.py @@ -145,8 +145,16 @@ def _drop_unchanged_beliefs_compared_to_db( source = bdf.lineage.sources[0] # unique source event_start = bdf.event_starts[0] # unique event_start belief_time = bdf.lineage.belief_times[0] # unique belief time + # Compare by ID rather than object identity: the candidate bdf may have been + # deserialized from an RQ job queue (pickled in a different process), so its + # DataSource objects are detached and won't be identical to the freshly-loaded + # ones in bdf_db even when they represent the same DB row. + # Also filter by event_start: bdf_db may contain beliefs for multiple event_starts, + # and we must not let a newer belief_time from a different event_start contaminate + # the most-recent-belief-time lookup for this candidate's event_start. bdf_db_from_source = bdf_db[ - (bdf_db.sources == source) & (bdf_db.event_starts == event_start) + (bdf_db.sources.map(lambda s: s.id) == source.id) + & (bdf_db.event_starts == event_start) ] if bdf_db_from_source.empty: return bdf @@ -161,21 +169,33 @@ def _drop_unchanged_beliefs_compared_to_db( previous_most_recent_beliefs = bdf_db_from_source[ bdf_db_from_source.belief_times == most_recent_bt ] - compare_fields = ["event_start", "source", "cumulative_probability", "event_value"] - a = bdf.reset_index().set_index(compare_fields) - b = previous_most_recent_beliefs.reset_index().set_index(compare_fields) - bdf = a.drop( - b.index, - errors="ignore", - axis=0, - ) + # Use source_id (integer) instead of source (object) for robust cross-session + # comparison. Detached ORM instances (for example after serialization boundaries + # or different session lifecycles) may represent the same DB row but still fail + # object-identity based comparison in pandas indices. + a_df = bdf.reset_index() + a_df["source_id"] = a_df["source"].map(lambda s: s.id) + b_df = previous_most_recent_beliefs.reset_index() + b_df["source_id"] = b_df["source"].map(lambda s: s.id) + + compare_fields = [ + "event_start", + "source_id", + "cumulative_probability", + "event_value", + ] + a = a_df.set_index(compare_fields) + b = b_df.set_index(compare_fields) + dropped = a.drop(b.index, errors="ignore", axis=0) # Keep whole probabilistic beliefs, not just the parts that changed - c = bdf.reset_index().set_index(["event_start", "source"]) - d = a.reset_index().set_index(["event_start", "source"]) + c = dropped.reset_index().set_index(["event_start", "source_id"]) + d = a_df.set_index(["event_start", "source_id"]) bdf = d[d.index.isin(c.index)] - bdf = bdf.reset_index().set_index( - ["event_start", "belief_time", "source", "cumulative_probability"] + bdf = ( + bdf.reset_index() + .drop(columns=["source_id"], errors="ignore") + .set_index(["event_start", "belief_time", "source", "cumulative_probability"]) ) return bdf diff --git a/flexmeasures/data/tests/test_data_ingestion.py b/flexmeasures/data/tests/test_data_ingestion.py new file mode 100644 index 0000000000..36ea7e5e4c --- /dev/null +++ b/flexmeasures/data/tests/test_data_ingestion.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +from flexmeasures.data.services.data_ingestion import ( + add_beliefs_to_db_and_enqueue_forecasting_jobs, +) +from flexmeasures.data.utils import SAVE_TO_DB_SUCCESS_BUT_NOTHING_NEW +from flexmeasures.tests.utils import get_test_sensor + + +def test_ingestion_service_accepts_beliefs_data_frame(setup_beliefs, db): + sensor = get_test_sensor(db) + bdf = sensor.search_beliefs(source="ENTSO-E", most_recent_beliefs_only=False).iloc[ + :1 + ] + + status = add_beliefs_to_db_and_enqueue_forecasting_jobs( + data=bdf, + save_changed_beliefs_only=True, + ) + + assert status == SAVE_TO_DB_SUCCESS_BUT_NOTHING_NEW diff --git a/flexmeasures/data/utils.py b/flexmeasures/data/utils.py index 55c1ce2263..84d2c2c423 100644 --- a/flexmeasures/data/utils.py +++ b/flexmeasures/data/utils.py @@ -14,6 +14,22 @@ from flexmeasures.data.services.time_series import drop_unchanged_beliefs +SAVE_TO_DB_SUCCESS = "success" +SAVE_TO_DB_SUCCESS_WITH_UNCHANGED_BELIEFS_SKIPPED = ( + "success_with_unchanged_beliefs_skipped" +) +SAVE_TO_DB_SUCCESS_BUT_NOTHING_NEW = "success_but_nothing_new" +SAVE_TO_DB_SUCCESS_STATUSES = ( + SAVE_TO_DB_SUCCESS, + SAVE_TO_DB_SUCCESS_WITH_UNCHANGED_BELIEFS_SKIPPED, + SAVE_TO_DB_SUCCESS_BUT_NOTHING_NEW, +) +SAVE_TO_DB_SUCCESS_WITH_CHANGES_STATUSES = ( + SAVE_TO_DB_SUCCESS, + SAVE_TO_DB_SUCCESS_WITH_UNCHANGED_BELIEFS_SKIPPED, +) + + def save_to_session(objects: list[db.Model], overwrite: bool = False): """ Utility function to save to database, either efficiently with a bulk save, or inefficiently with a merge save. @@ -91,9 +107,9 @@ def save_to_db( :param save_changed_beliefs_only: if True, unchanged beliefs are skipped (updated beliefs are only stored if they represent changed beliefs) if False, all updated beliefs are stored :returns: status string, one of the following: - - 'success': all beliefs were saved - - 'success_with_unchanged_beliefs_skipped': not all beliefs represented a state change - - 'success_but_nothing_new': no beliefs represented a state change + - SAVE_TO_DB_SUCCESS: all beliefs were saved + - SAVE_TO_DB_SUCCESS_WITH_UNCHANGED_BELIEFS_SKIPPED: not all beliefs represented a state change + - SAVE_TO_DB_SUCCESS_BUT_NOTHING_NEW: no beliefs represented a state change """ # Convert to list @@ -102,7 +118,7 @@ def save_to_db( else: timed_values_list = data - status = "success" + status = SAVE_TO_DB_SUCCESS values_saved = 0 for timed_values in timed_values_list: @@ -124,7 +140,7 @@ def save_to_db( timed_values = drop_unchanged_beliefs(timed_values) len_after = len(timed_values) if len_after < len_before: - status = "success_with_unchanged_beliefs_skipped" + status = SAVE_TO_DB_SUCCESS_WITH_UNCHANGED_BELIEFS_SKIPPED # Work around bug in which groupby still introduces an index level, even though we asked it not to if None in timed_values.index.names: @@ -132,9 +148,10 @@ def save_to_db( if timed_values.empty: # No state changes among the beliefs + current_app.logger.info("No changes needing to be saved to DB.") continue - current_app.logger.info("SAVING TO DB...") + current_app.logger.info("SAVING DATA ...") TimedBelief.add_to_session( session=db.session, beliefs_data_frame=timed_values, @@ -144,11 +161,12 @@ def save_to_db( ), ) values_saved += len(timed_values) + current_app.logger.info(f"SAVED {len(timed_values)} values TO DB.") # Flush to bring up potential unique violations (due to attempting to replace beliefs) db.session.flush() if values_saved == 0: - status = "success_but_nothing_new" + status = SAVE_TO_DB_SUCCESS_BUT_NOTHING_NEW return status diff --git a/flexmeasures/ui/static/js/flexmeasures.js b/flexmeasures/ui/static/js/flexmeasures.js index 911f8a829a..c64d5f5d52 100644 --- a/flexmeasures/ui/static/js/flexmeasures.js +++ b/flexmeasures/ui/static/js/flexmeasures.js @@ -783,15 +783,19 @@ function loadSensorStats(sensor_id, event_start_time="", event_end_time="", fres }); } - // Notify the "Delete data" panel of the overall first/last event times - // across all sources so the "Select all data" link can populate the inputs. + // Notify the "Delete data" panel only when the stats cover all sensor data. + // The selected-duration stats should not redefine "all sensor data". const firstEventDates = Object.values(data) .map(d => new Date(d["First event start"])) .filter(d => !isNaN(d.getTime())); const lastEventDates = Object.values(data) .map(d => new Date(d["Last event end"])) .filter(d => !isNaN(d.getTime())); - if (firstEventDates.length > 0 && lastEventDates.length > 0) { + if ( + !toggleStatsCheckbox.checked + && firstEventDates.length > 0 + && lastEventDates.length > 0 + ) { document.dispatchEvent(new CustomEvent('sensorDataRangeAvailable', { detail: { firstEventStart: new Date(Math.min(...firstEventDates)), diff --git a/flexmeasures/ui/static/js/ui-utils.js b/flexmeasures/ui/static/js/ui-utils.js index fc7893ca24..314050d6c3 100644 --- a/flexmeasures/ui/static/js/ui-utils.js +++ b/flexmeasures/ui/static/js/ui-utils.js @@ -486,3 +486,100 @@ export function initDeleteAssetButton() { ); }); } + +/** + * Poll a background job until it reaches a terminal state (finished or failed). + * + * Calls GET /api/v3_0/jobs/ every `intervalMs` milliseconds and updates + * a toast notification as the status changes. Polling stops automatically on + * success, failure, or network error, and also when the optional AbortSignal + * fires (useful for component/page teardown). + * + * @param {string} jobUuid - UUID returned by the upload endpoint. + * @param {object} [options] + * @param {number} [options.intervalMs=3000] - Polling interval in ms. + * @param {string} [options.processingMessage="Processing…"] - Toast text while queued/started. + * @param {string} [options.successMessage="Job completed successfully."] - Toast text on finish. + * @param {string} [options.errorMessage] - Override toast text on failure (defaults to the + * server message when absent). + * @param {AbortSignal} [options.signal] - Abort polling externally (e.g. page unload). + * @param {function} [options.onStatus] - Called with the full response JSON on each poll. + * @param {function} [options.onFinished] - Called with the full response JSON on finish. + * @param {function} [options.onFailed] - Called with the full response JSON on failure. + * @returns {function} stopPolling - Call to cancel polling manually. + */ +export function pollJobStatus(jobUuid, options = {}) { + const { + intervalMs = 3000, + processingMessage = "Processing\u2026", + successMessage = "Job completed successfully.", + errorMessage = null, + signal = null, + onStatus = null, + onFinished = null, + onFailed = null, + } = options; + + const url = `${apiBasePath}/api/v3_0/jobs/${encodeURIComponent(jobUuid)}`; + + // Show an initial "processing" info toast right away. + showToast(processingMessage, "info"); + + let stopped = false; + + function stop() { + stopped = true; + clearInterval(intervalId); + } + + // Honour an external AbortSignal (e.g. page navigation / component unmount). + if (signal) { + signal.addEventListener("abort", stop, { once: true }); + } + + async function poll() { + if (stopped) return; + + let data; + try { + const response = await fetch(url, { credentials: "same-origin" }); + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } + data = await response.json(); + } catch (err) { + stop(); + showToast("Could not reach job status endpoint: " + err.message, "error"); + if (onFailed) onFailed(null); + return; + } + + const status = (data.status || "").toUpperCase(); + if (onStatus) onStatus(data); + + if (status === "FINISHED") { + stop(); + showToast(successMessage, "success"); + if (onFinished) onFinished(data); + } else if (status === "FAILED" || status === "STOPPED" || status === "CANCELED") { + stop(); + const msg = errorMessage || data.message || "Job failed."; + showToast(msg, "error"); + if (onFailed) onFailed(data); + } else if (status === "STARTED") { // This is the shown status when ingestion is in progress + // show a inprogress message + const inProgressMessage = data.message || "Job is in progress."; + showToast(inProgressMessage, "info"); + } else { + // QUEUED / STARTED / DEFERRED / SCHEDULED → keep polling, show status updates + const statusToast = data.message || `Job status: ${status}`; + console.log(`[pollJobStatus] Still processing: ${statusToast}`); + } + } + + // Poll immediately on first tick, then on each interval. + const intervalId = setInterval(poll, intervalMs); + poll(); + + return stop; +} diff --git a/flexmeasures/ui/static/openapi-specs.json b/flexmeasures/ui/static/openapi-specs.json index eaa82471b1..83b52f648a 100644 --- a/flexmeasures/ui/static/openapi-specs.json +++ b/flexmeasures/ui/static/openapi-specs.json @@ -484,7 +484,7 @@ }, "post": { "summary": "Post sensor data", - "description": "Send data values via JSON, where the duration and number of values determine the resolution.\n\nThe example request posts four values for a duration of one hour, where the first\nevent start is at the given start time, and subsequent events start in 15 minute intervals throughout the one hour duration.\n\nThe sensor is the one with ID=1.\nThe unit has to be convertible to the sensor's unit.\nThe resolution of the data has to match the sensor's required resolution, but\nFlexMeasures will attempt to upsample lower resolutions.\nThe list of values may include null values.\n", + "description": "Send data values via JSON, where the duration and number of values determine the resolution.\n\nThe example request posts four values for a duration of one hour, where the first\nevent start is at the given start time, and subsequent events start in 15 minute intervals throughout the one hour duration.\n\nThe sensor is the one with ID=1.\nThe unit has to be convertible to the sensor's unit.\nThe resolution of the data has to match the sensor's required resolution, but\nFlexMeasures will attempt to upsample lower resolutions.\nThe list of values may include null values.\nThe request body is limited by FLEXMEASURES_MAX_SENSOR_DATA_INGESTION_BYTES\n(3 MiB by default).\n", "security": [ { "ApiAuthKey": [] @@ -526,6 +526,9 @@ } }, "responses": { + "202": { + "description": "ACCEPTED" + }, "200": { "description": "PROCESSED" }, @@ -538,6 +541,9 @@ "403": { "description": "INVALID_SENDER" }, + "413": { + "description": "PAYLOAD_TOO_LARGE" + }, "422": { "description": "UNPROCESSABLE_ENTITY" } @@ -1521,7 +1527,7 @@ "/api/v3_0/sensors/{id}/data/upload": { "post": { "summary": "Upload sensor data by file", - "description": "The file should have columns for a timestamp (event_start) and a value (event_value).\nThe timestamp should be in ISO 8601 format.\nThe value should be a numeric value.\n\nThe unit has to be convertible to the sensor's unit.\nThe resolution of the data has to match the sensor's required resolution, but\nFlexMeasures will attempt to upsample lower resolutions.\nThe list of values may include null values.\n", + "description": "The file should have columns for a timestamp (event_start) and a value (event_value).\nThe timestamp should be in ISO 8601 format.\nThe value should be a numeric value.\n\nThe unit has to be convertible to the sensor's unit.\nThe resolution of the data has to match the sensor's required resolution, but\nFlexMeasures will attempt to upsample lower resolutions.\nThe list of values may include null values.\nThe request body is limited by FLEXMEASURES_MAX_SENSOR_DATA_INGESTION_BYTES\n(3 MiB by default).\n", "security": [ { "ApiKeyAuth": [] @@ -1552,6 +1558,9 @@ } }, "responses": { + "202": { + "description": "ACCEPTED" + }, "200": { "description": "PROCESSED", "content": { @@ -1623,6 +1632,9 @@ "403": { "description": "INVALID_SENDER" }, + "413": { + "description": "PAYLOAD_TOO_LARGE" + }, "422": { "description": "UNPROCESSABLE_ENTITY" } diff --git a/flexmeasures/ui/templates/includes/graphs.html b/flexmeasures/ui/templates/includes/graphs.html index c91e66a556..cde70a3344 100644 --- a/flexmeasures/ui/templates/includes/graphs.html +++ b/flexmeasures/ui/templates/includes/graphs.html @@ -23,7 +23,7 @@ // Import local js (the FM version is used for cache-busting, causing the browser to fetch the updated version from the server) import { convertToCSV } from "{{ url_for('flexmeasures_ui.static', filename='js/data-utils.js') }}?v={{ flexmeasures_version }}"; - import {apiBasePath} from "{{ url_for('flexmeasures_ui.static', filename='js/ui-utils.js') }}?v={{ flexmeasures_version }}"; + import { apiBasePath, pollJobStatus } from "{{ url_for('flexmeasures_ui.static', filename='js/ui-utils.js') }}?v={{ flexmeasures_version }}"; import { subtract, computeSimulationRanges, lastNMonths, encodeUrlQuery, getOffsetBetweenTimezonesForDate, toIsoStringWithOffset } from "{{ url_for('flexmeasures_ui.static', filename='js/daterange-utils.js') }}?v={{ flexmeasures_version }}"; import { partition, updateBeliefs, beliefTimedelta, setAbortableTimeout } from "{{ url_for('flexmeasures_ui.static', filename='js/replay-utils.js') }}?v={{ flexmeasures_version }}"; import { decompressChartData, checkDSTTransitions, checkSourceMasking } from "{{ url_for('flexmeasures_ui.static', filename='js/chart-data-utils.js') }}?v={{ flexmeasures_version }}"; @@ -161,12 +161,17 @@ } // Upload Logic - {% if active_page == "sensors" and user_can_update_sensor %} + {% if active_page == "sensors" and user_can_create_children_sensor %} let upload = document.querySelector('#submitUpload'); let uploadSpinner = document.querySelector("#spinner-upload-sensor-data"); + function finishUploadStatus() { + uploadSpinner.classList.add('d-none'); + upload.disabled = false; + } upload.addEventListener('click', function (e) { - upload.classList.add('d-none'); + upload.disabled = true; uploadSpinner.classList.remove('d-none'); + showToast("Uploading data...", "info"); var formData = new FormData(document.getElementById('uploadForm')); fetch(apiBasePath + '/api/v3_0/sensors/' + {{ sensor.id }} + '/data/upload', { method: 'POST', @@ -175,56 +180,73 @@ .then(async function (response) { if (response.ok) { const data = await response.json(); - showToast(data.message, "success"); - // Update date range - picker.setDateRange( - storeStartDate, - subtract(storeEndDate, 1), - ); - uploadSpinner.classList.add('d-none'); - upload.classList.remove('d-none'); - // Reload the chart so newly-uploaded data is immediately visible - document.dispatchEvent(new CustomEvent('newDataAvailable')); + if (response.status === 202 && data.job_id) { + // Track the background ingestion job and refresh when done. + pollJobStatus(data.job_id, { + processingMessage: "Upload received. Processing data in the background\u2026", + successMessage: "Data processed successfully.", + onFinished: () => { + finishUploadStatus(); + document.dispatchEvent(new CustomEvent('newDataAvailable')); + loadSensorStats({{ sensor.id }}, "", "", true); + }, + onFailed: () => { + finishUploadStatus(); + }, + }); + } else { + // No queued job: data was saved synchronously. + showToast(data.message, "success"); + // Update date range + picker.setDateRange( + storeStartDate, + subtract(storeEndDate, 1), + ); + document.dispatchEvent(new CustomEvent('newDataAvailable')); + finishUploadStatus(); + } } else { const data = await response.json(); - const messageKeys = Object.keys(data.message); - messageKeys.forEach(mKey => { - if (data.message[mKey] === undefined) { - showToast(data.message, "error"); - } - else { - for (const [key, value] of Object.entries(data.message[mKey])) { - if (typeof value === 'string') { - showToast(value, "error"); - } else if (Array.isArray(value)) { - value.forEach(element => { - showToast(element, "error"); - }); - } else if (typeof value === 'object' && value !== null) { - Object.values(value).forEach(element => { - if (Array.isArray(element)) { - element.forEach(item => showToast(item, "error")); - } else { + if (typeof data.message === "string") { + showToast(data.message, "error"); + } else { + const messageKeys = Object.keys(data.message); + messageKeys.forEach(mKey => { + if (data.message[mKey] === undefined) { + showToast(data.message, "error"); + } + else { + for (const [key, value] of Object.entries(data.message[mKey])) { + if (typeof value === 'string') { + showToast(value, "error"); + } else if (Array.isArray(value)) { + value.forEach(element => { showToast(element, "error"); - } - }); - } else { - // fallback for unexpected types - showToast(String(value), "error"); + }); + } else if (typeof value === 'object' && value !== null) { + Object.values(value).forEach(element => { + if (Array.isArray(element)) { + element.forEach(item => showToast(item, "error")); + } else { + showToast(element, "error"); + } + }); + } else { + // fallback for unexpected types + showToast(String(value), "error"); + } } } - } - }); - uploadSpinner.classList.add('d-none'); - upload.classList.remove('d-none'); + }); + } + finishUploadStatus(); } }) .catch(function (error) { // Network or unexpected error console.error("Upload error:", error); showToast("An error occurred: " + error.message, "error"); - uploadSpinner.classList.add('d-none'); - upload.classList.remove('d-none'); + finishUploadStatus(); }); }); {% endif %} @@ -353,8 +375,21 @@ {% endif %} if (timerangeVar in data) { - var start = new Date(data[timerangeVar].start); - var end = new Date(data[timerangeVar].end); + var fullDataStart = new Date(data[timerangeVar].start); + var fullDataEnd = new Date(data[timerangeVar].end); + {% if active_page == "sensors" %} + if (!isNaN(fullDataStart.getTime()) && !isNaN(fullDataEnd.getTime())) { + document.dispatchEvent(new CustomEvent('sensorDataRangeAvailable', { + detail: { + firstEventStart: fullDataStart, + lastEventEnd: fullDataEnd, + } + })); + } + {% endif %} + + var start = new Date(fullDataStart); + var end = new Date(fullDataEnd); end.setSeconds(end.getSeconds() - 1); // -1 second in case most recent event ends at midnight start.setHours(0, 0, 0, 0); // get start of first day end.setHours(0, 0, 0, 0); // get start of last day diff --git a/flexmeasures/ui/templates/includes/toasts.html b/flexmeasures/ui/templates/includes/toasts.html index d934f5fa0d..92c9ffc145 100644 --- a/flexmeasures/ui/templates/includes/toasts.html +++ b/flexmeasures/ui/templates/includes/toasts.html @@ -31,14 +31,6 @@ }); } - function showAllToasts() { - const toastElements = document.querySelectorAll(".toast"); - toastElements.forEach((toast) => { - const toastInstance = new bootstrap.Toast(toast); - toastInstance.show(); - }); - } - function maybeHideCloseToastBtn() { const remainingToasts = toastStack.querySelectorAll(".toast"); if (remainingToasts.length === 0) { @@ -55,27 +47,30 @@ * GLOBAL FUNCTION: showToast * Attached to window so it can be called from anywhere. */ - window.showToast = function(message, type, { highlightDuplicates = true, showDuplicateCount = true } = {}) { + window.showToast = function(message, type, { highlightDuplicates = true, showDuplicateCount = true, delay = null } = {}) { let colorClass; let colorStyle = ""; let title; - let delay; + let toastDelay; // Determine the type of toast if (type == "error") { - delay = 10000; + toastDelay = 10000; colorClass = "bg-danger"; title = "Error"; } else if (type == "success") { - delay = 2000; + toastDelay = 2000; colorClass = "bg-success"; title = "Success"; } else { - delay = 5000; + toastDelay = 5000; // JINJA VARIABLE WORKS HERE NOW: colorStyle = "background-color: {{ primary_color | default('#007bff') }};"; title = "Info"; } + if (delay !== null) { + toastDelay = delay; + } // Search for duplicate const existingToasts = toastStack.querySelectorAll(".toast"); @@ -107,7 +102,7 @@ const toast = document.createElement("div"); toast.classList.add("toast", "mb-1"); toast.setAttribute("data-bs-autohide", "true"); - toast.setAttribute("data-bs-delay", delay); + toast.setAttribute("data-bs-delay", toastDelay); toast.setAttribute("role", "alert"); toast.setAttribute("aria-live", "assertive"); toast.setAttribute("aria-atomic", "true"); @@ -118,14 +113,16 @@ ${title} -
- ${message} -
+
`; + const toastBody = toast.querySelector(".toast-body"); + toastBody.dataset.originalMessage = message; + toastBody.innerHTML = message; toastStack.insertAdjacentElement("afterbegin", toast); closeToastBtn.style.display = "block"; - showAllToasts(); + const toastInstance = new bootstrap.Toast(toast); + toastInstance.show(); // Cleanup listeners const handleClose = () => { @@ -139,4 +136,4 @@ }; })(); - \ No newline at end of file + diff --git a/flexmeasures/ui/templates/sensors/index.html b/flexmeasures/ui/templates/sensors/index.html index 3f23b2f1d7..81036d2a0d 100644 --- a/flexmeasures/ui/templates/sensors/index.html +++ b/flexmeasures/ui/templates/sensors/index.html @@ -50,7 +50,9 @@

Edit {{ sensor.name }}

+ {% endif %} + {% if user_can_create_children_sensor %}
Upload data
@@ -73,7 +75,12 @@

Upload {{ sensor.name }} data

- The resolution does not have to be the sensor resolution ― FlexMeasures will attempt to convert your data to the sensor resolution. Not all resolutions fit. What works best: Upsampling. - Data can have gaps, but FlexMeasures still needs to be able to guess the frequency. - Duplicates will be removed. -- For the unit of the data (e.g. kW), see help text on the unit selection below." +- For the unit of the data (e.g. kW), see help text on the unit selection below. +{% if config.FLEXMEASURES_MAX_SENSOR_DATA_INGESTION_BYTES is not none -%} +- Uploads are limited to {{ "%.1f"|format(config.FLEXMEASURES_MAX_SENSOR_DATA_INGESTION_BYTES / 1024 / 1024) }} MiB per request. +{%- else -%} +- This server has no FlexMeasures-specific upload size limit configured. +{%- endif %}" style="color: white; cursor: pointer;" > @@ -138,7 +145,7 @@

Upload {{ sensor.name }} data

style="margin-top: 20px; float: right; border: 1px solid var(--light-gray);"> Upload file -
+
Loading...
@@ -148,9 +155,6 @@

Upload {{ sensor.name }} data

- {% endif %} - - {% if user_can_create_children_sensor %}
Forecast
@@ -221,7 +225,7 @@

Upload {{ sensor.name }} data

- Select all data + Select all sensor data