Skip to content

Move sensor data ingestion to job queue#2101

Open
Copilot wants to merge 53 commits into
mainfrom
copilot/move-post-data-logic-to-job-queue
Open

Move sensor data ingestion to job queue#2101
Copilot wants to merge 53 commits into
mainfrom
copilot/move-post-data-logic-to-job-queue

Conversation

Copy link
Copy Markdown
Contributor

Copilot AI commented Apr 13, 2026

Description

  • Move sensor data ingestion (incl. resampling) to a dedicated ingestion job queue so large POSTs can be processed asynchronously when a worker is available
  • Keep a synchronous fallback when no ingestion queue or worker is available
  • Return 202 Accepted responses with a job_monitor_url and job_id when ingestion is queued
  • Keep unchanged-belief detection consistent between synchronous and queued ingestion paths
  • Add a configurable sensor data ingestion payload limit via FLEXMEASURES_MAX_SENSOR_DATA_INGESTION_BYTES
  • Update API documentation, host documentation, configuration docs, and the OpenAPI spec for queued ingestion and payload limits
  • Improve sensor-page upload feedback with polling, longer-lived Toast notifications, and chart/stat refresh after processing finishes
  • Use constants for the three possible success status strings when saving beliefs
  • Added changelog item in documentation/changelog.rst

Look & Feel

  • API requests that post large amounts of sensor data can return quickly with 202 Accepted while ingestion is processed in the background when an ingestion worker is available
  • Accepted upload responses include a job status URL that API clients and the UI can use to monitor processing
  • Sensor-page uploads show immediate feedback, keep polling the background job, and refresh the chart once processing succeeds
  • If no ingestion worker is available, the existing synchronous path still works as a fallback

How to test

  1. Start FlexMeasures with Redis configured.
  2. Run a worker that handles the ingestion queue, for example:
    flexmeasures jobs run-worker --queue "forecasting|scheduling|ingestion"
  3. Upload a large CSV file to a sensor through the UI or POST data to the sensor data API and confirm the request returns 202 Accepted quickly with a job_monitor_url.
  4. Confirm the UI shows an accepted/processing Toast, keeps polling the job status endpoint, and refreshes the chart after the worker finishes.
  5. Repeat the same flow without an ingestion worker and confirm the synchronous fallback still works.
  6. Try a payload larger than FLEXMEASURES_MAX_SENSOR_DATA_INGESTION_BYTES and confirm it is rejected with 413 Payload Too Large.
  7. Run the relevant tests:
    • uv run pytest flexmeasures/api/v3_0/tests/test_sensor_data.py
    • uv run pytest flexmeasures/api/v3_0/tests/test_sensors_api.py
    • uv run pytest flexmeasures/data/tests/test_data_ingestion.py

Also, here is a small script to test the sensor data endpoint:

import requests

token_localhost = "xxxxxxxxxxxxxxxxxxxxxxx"

res = requests.post("http://localhost:5000/api/v3_0/sensors/123/data",
    headers = {
        "Authorization": token_localhost,
    },
    json={
        "values": [
            -11.28,
            -11.28,
            -20,
            -11.28
        ],
        "start": "2026-04-20T13:00:00+02:00",
        "duration": "PT1H",
        "unit": "kWh"
    }
)
print(res.text)

Further Improvements

  • Support larger ingestion payloads through temporary object/file storage instead of sending the full payload through Redis

Related Items


Sign-off

  • I agree to contribute to the project under Apache 2 License.
  • To the best of my knowledge, the proposed patch is not based on code under GPL or other license that is incompatible with FlexMeasures

Copilot AI changed the title [WIP] Move the logic behind POSTing data to job queue Move sensor data ingestion to job queue Apr 13, 2026
Copilot AI requested a review from joshuaunity April 13, 2026 13:38
@joshuaunity
Copy link
Copy Markdown
Contributor

@copilot, add a proper step-by-step guide on how to test this

Copy link
Copy Markdown
Contributor Author

Copilot AI commented Apr 16, 2026

@copilot, add a proper step-by-step guide on how to test this

Updated the PR description with a step-by-step testing guide covering both the queued path (with an ingestion worker) and the synchronous fallback path (no workers). See the "How to test" section below.

@joshuaunity
Copy link
Copy Markdown
Contributor

joshuaunity commented Apr 16, 2026

This works from my tests

From 2.5+ sec to about 300ms

The fallback (run synchronously if no available worker) also works nicely without any issues

Copy link
Copy Markdown
Contributor

@joshuaunity joshuaunity left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works pretty well, and the code is clean and straightforward.

@nhoening, are there any additions you would like to make to this?

@nhoening
Copy link
Copy Markdown
Contributor

This works pretty well, and the code is clean and straightforward.

@nhoening, are there any additions you would like to make to this?

So basically you are asking for an early review :)

I also like the code and that there is some documentation already, as well. And the fallback is good.

  • Documentation might not catch all places that hosts should see when they get started. For instance, here we don't mention the new queue.
  • Create a changelog entry, plus a notice for upgrading - it is recommended to add a worker to the ingestion queue or let existing workers handle it, as well. @Flix6x how would you do this in our infra, for example?
  • The function api_utils.save_and_enqueue() is where the real logic happens - which other places use that util function, and are they affected?
  • There is a warning about "No workers connected", but we can also add a likewise warning about "No ingestion queue configured".

@joshuaunity
Copy link
Copy Markdown
Contributor

The function api_utils.save_and_enqueue() is where the real logic happens - which other places use that util function, and are they affected?

Yes, they are all affected and will use the queue if a worker is available.

One more place I found is in the file below, the function post_data_deprecated
flexmeasures/api/v3_0/deprecated.py

Signed-off-by: JDev <45713692+joshuaunity@users.noreply.github.com>
@Flix6x
Copy link
Copy Markdown
Contributor

Flix6x commented Apr 17, 2026

Create a changelog entry, plus a notice for upgrading - it is recommended to add a worker to the ingestion queue or let existing workers handle it, as well. @Flix6x how would you do this in our infra, for example?

I guess by running flexmeasures jobs run-worker --queue scheduling|ingestion|forecasting, and until I make that change I'd expect the ingestion to still happen within the request.

@joshuaunity joshuaunity added the enhancement New feature or request label Apr 17, 2026
Signed-off-by: joshuaunity <oghenerobojosh01@gmail.com>
nhoening added 2 commits May 13, 2026 23:47
Signed-off-by: Nicolas Höning <nicolas@seita.nl>
Signed-off-by: Nicolas Höning <nicolas@seita.nl>
@nhoening nhoening requested a review from Flix6x May 13, 2026 21:57
Signed-off-by: Nicolas Höning <nicolas@seita.nl>
@nhoening nhoening added this to the 0.33.0 milestone May 13, 2026
@nhoening nhoening self-assigned this May 13, 2026
Flix6x added 3 commits May 14, 2026 15:31
…a-logic-to-job-queue

# Conflicts:
#	documentation/changelog.rst
…ard queues

Signed-off-by: F.N. Claessen <claessen@seita.nl>
Signed-off-by: F.N. Claessen <claessen@seita.nl>
@Flix6x
Copy link
Copy Markdown
Contributor

Flix6x commented May 14, 2026

  • Auth works nicely! I verified that a regular user (no roles) cannot view an ingestion job in another account, even if it has the UUID.
  • The result key in the job also works great! I verified success, success_with_unchanged_beliefs_skipped, success_but_nothing_new all show up as expected.
  • Question for @joshuaunity: Ingestion jobs disappear very fast. Is that intentional?
  • I noticed an inconsistency with user_can_edit_sensor vs user_can_create_children_sensor. For a regular user (no roles) the upload form was hidden (causing an error in the browser console), and the forecast form was shown (but the button did nothing). I'm pushing a fix that lets regular users upload data and create forecasts, in accordance with our Sensor.__acl__: "Everyone in the account and its consultant can add beliefs.".
  • Question for @joshuaunity: The toast notifications appear quite crowded. Can we reduce that a little, and streamline it with the toast notifications for forecasting? This could be a follow-up. See images below (but ignore the colors; I was testing on different accounts having their own account colors).
Toasts shown when triggering an ingestion job (by uploading file), Toasts shown when triggering a forecasting job,

Flix6x added 2 commits May 14, 2026 22:42
…ed to upload data and trigger forecasts

Signed-off-by: F.N. Claessen <claessen@seita.nl>
Signed-off-by: F.N. Claessen <claessen@seita.nl>
@nhoening
Copy link
Copy Markdown
Contributor

I could take on the crowded toasts. I worked on that a bit with Codex in the end.

You don't want to get rid of that Success Toast with the link right? (Link could remain internal, indeed).

I do like that there is new in-progress Toasts every 2 seconds, which you only see as the counter increases. It tells you that we are still checking ...

Signed-off-by: F.N. Claessen <claessen@seita.nl>
@Flix6x
Copy link
Copy Markdown
Contributor

Flix6x commented May 14, 2026

You don't want to get rid of that Success Toast with the link right? (Link could remain internal, indeed).

My first thought was that I like the two ingestion toasts (of which only one is shown in my screenshot) that indicate success for:

  1. Job queued
  2. Job finished

In particular, I like that they show "Success", where the forecasting toast just says "Info".

But then, I think, isn't it a bit premature to claim success when the job is queued? From a user perspective, the action can only be considered a success when the job is actually finished. The forecasting toasts only show "Success" when the job is finished, so actually I prefer that. After all, you hit the button to get the job done (here: new data in the database), and nothing less should be claimed a success.

I do like that there is new in-progress Toasts every 2 seconds, which you only see as the counter increases. It tells you that we are still checking ...

For the UI, the "Checking status..." followed by actual status updates (i.e. QUEUED, STARTED, etc.) is probably more suitable than a link to the JSON representation of the job itself.

The "Upload received. Processing data in the background" in particular seems redundant.

@Flix6x
Copy link
Copy Markdown
Contributor

Flix6x commented May 14, 2026

@nhoening if you have an opinion about how long ingestion jobs should be kept around, let's hear it. :) I think we're just using an RQ default right now (500 seconds), unlike for the forecasting and scheduling queues, which we default to keep around for a full week.

@nhoening
Copy link
Copy Markdown
Contributor

@nhoening if you have an opinion about how long ingestion jobs should be kept around, let's hear it. :) I think we're just using an RQ default right now (500 seconds), unlike for the forecasting and scheduling queues, which we default to keep around for a full week.

Shorter, for sure. Maybe a day.

nhoening added 2 commits May 14, 2026 23:20
…m:FlexMeasures/flexmeasures into copilot/move-post-data-logic-to-job-queue
Signed-off-by: Nicolas Höning <nicolas@seita.nl>
Comment thread flexmeasures/data/schemas/sensors.py Outdated
Comment thread flexmeasures/utils/config_defaults.py Outdated
@nhoening
Copy link
Copy Markdown
Contributor

You don't want to get rid of that Success Toast with the link right?

I meant actually, that that should probably go, sorry for being unclear.

@joshuaunity wrote a JS function pollJobStatus() (in ui-utils.js) which we could use in a unified way to poll the other jobs, too. But forecasting and scheduling jobs have these dedicated endpoints ...

nhoening added 2 commits May 14, 2026 23:56
Signed-off-by: Nicolas Höning <nicolas@seita.nl>
Signed-off-by: Nicolas Höning <nicolas@seita.nl>
@nhoening
Copy link
Copy Markdown
Contributor

Scheduling/forecasting Jobs live a day, their results 7 days:

FLEXMEASURES_JOB_TTL: timedelta = timedelta(days=1)
FLEXMEASURES_PLANNING_TTL: timedelta = timedelta(days=7)
ttl=int(
            current_app.config.get(
                "FLEXMEASURES_JOB_TTL", timedelta(-1)
            ).total_seconds()
        ),
 result_ttl=int(
            current_app.config.get(
                "FLEXMEASURES_PLANNING_TTL", timedelta(-1)
            ).total_seconds()
        ),

Should we do just the same for ingestion? There might be more (smaller and less interesting) jobs for this.

We could simply reuse FLEXMEASURES_JOB_TTL for both right?

@nhoening
Copy link
Copy Markdown
Contributor

nhoening commented May 14, 2026

@Flix6x here is one thing about those most-recent-only filters: If in the uploaded data we tick the measured-instantly checkbox, we effectively overwrite belief times in the data, so if there were several per event, we now have multiple beliefs for each event with identical belief time. Then the filter would make sense, right?

@Flix6x
Copy link
Copy Markdown
Contributor

Flix6x commented May 14, 2026

@Flix6x here is one thing about those most-recent-only filters: If in the uploaded data we tick the measured-instantly checkbox, we effectively overwrite belief times in the data, so if there were several per event, we now have multiple beliefs for each event with identical belief time. Then the filter would make sense, right?

Hmm, best to try it. My first thought is that the filter would still not be needed. A good test might be to post data with a 2-hour resolution to a 6-hour sensor, then hit the (1-hour frequency) replay. My hunch is that with the filter we'd see a realization once every 6 hours, and without the filter we'd see an extra two forecasts, 2 and 4 hours into the 6-hour event.

@Flix6x
Copy link
Copy Markdown
Contributor

Flix6x commented May 15, 2026

Actually, I shouldn't call them forecasts, but partial realizations. That is, I expect each 6-hour realization to be updated twice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

API Data enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Move the logic behind POSTing data to the job queue

4 participants