From 276d7c8c6dbc519451dcb15a75e19b293379df6a Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 26 Jun 2026 15:58:20 -0700 Subject: [PATCH 1/5] chore(main): enable async pipeline workers for all existing projects Turn on the `async_pipeline_workers` feature flag for every project that exists at deploy time, rolling out async ML processing (workers that pull tasks from the NATS queue instead of the synchronous push API) across the whole platform at once. The flag lives in the `feature_flags` JSONB column. The data migration reads each project's flags, sets the one boolean, and writes it back, leaving the other feature flags untouched. The reverse flips the flag back off for every project. New projects keep the model default of False until opted in separately. Co-Authored-By: Claude --- .../0094_enable_async_pipeline_workers.py | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 ami/main/migrations/0094_enable_async_pipeline_workers.py diff --git a/ami/main/migrations/0094_enable_async_pipeline_workers.py b/ami/main/migrations/0094_enable_async_pipeline_workers.py new file mode 100644 index 000000000..ac95b9766 --- /dev/null +++ b/ami/main/migrations/0094_enable_async_pipeline_workers.py @@ -0,0 +1,47 @@ +""" +Turn on the ``async_pipeline_workers`` feature flag for every existing project. + +This rolls out async ML processing (workers that pull tasks from the NATS queue +instead of the synchronous push API) to all projects at once. New projects keep +the model default of ``False`` until they are opted in separately; this migration +only updates rows that exist at deploy time. + +The flag lives inside the ``feature_flags`` JSONB column (a ``ProjectFeatureFlags`` +pydantic model). We read each project's flags through the historical model, set the +one boolean, and write it back. The reverse flips the flag back off for every +project — a blanket disable. It does not restore per-project values from before the +rollout, because the field default is ``False`` and this is the first global enable, +so no project is expected to have been ``True`` beforehand. +""" + +from django.db import migrations + + +def enable_async_pipeline_workers(apps, schema_editor): + Project = apps.get_model("main", "Project") + for project in Project.objects.all(): + flags = project.feature_flags + if not flags.async_pipeline_workers: + flags.async_pipeline_workers = True + project.feature_flags = flags + project.save(update_fields=["feature_flags"]) + + +def disable_async_pipeline_workers(apps, schema_editor): + Project = apps.get_model("main", "Project") + for project in Project.objects.all(): + flags = project.feature_flags + if flags.async_pipeline_workers: + flags.async_pipeline_workers = False + project.feature_flags = flags + project.save(update_fields=["feature_flags"]) + + +class Migration(migrations.Migration): + dependencies = [ + ("main", "0093_occurrence_project_score_index"), + ] + + operations = [ + migrations.RunPython(enable_async_pipeline_workers, disable_async_pipeline_workers), + ] From 8ee44eb8d4d3ff29efb4b51918f8217360b4ee78 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 26 Jun 2026 16:06:28 -0700 Subject: [PATCH 2/5] feat(main): make async pipeline workers the default for new projects MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Flip the `async_pipeline_workers` default in `ProjectFeatureFlags` to True so projects created from now on use the async processing service (psv2) — workers that pull tasks from the NATS queue — without an operator opting them in. No migration is required: the `feature_flags` field deconstructs by its schema class and default-factory references, neither of which changes when a default inside the pydantic model changes (`makemigrations --check` reports no changes). The companion data migration handles existing projects. Co-Authored-By: Claude --- ami/main/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ami/main/models.py b/ami/main/models.py index 8d65888bb..3662b4107 100644 --- a/ami/main/models.py +++ b/ami/main/models.py @@ -278,7 +278,7 @@ class ProjectFeatureFlags(pydantic.BaseModel): default_filters: bool = False # Whether to show default filters form in UI # Feature flag for jobs to reprocess all images in the project, even if already processed reprocess_all_images: bool = False - async_pipeline_workers: bool = False # Whether to use async pipeline workers that pull tasks from a queue + async_pipeline_workers: bool = True # Whether to use async pipeline workers that pull tasks from a queue def get_default_feature_flags() -> ProjectFeatureFlags: From bd4d88143cf4aff30db245d02818814d7c6168a6 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 26 Jun 2026 16:15:26 -0700 Subject: [PATCH 3/5] refactor(main): toggle async flag in place with jsonb_set Address CodeRabbit review on the data migration: replace the read/modify/save loop with a single DB-side `jsonb_set` UPDATE that toggles only the `async_pipeline_workers` key. Updating the one key server-side leaves the other feature flags untouched even if another process changes one of them during the deploy (the previous loop rewrote the whole JSONB value and could clobber a concurrent sibling change), and it runs as one statement instead of one save per row. A `WHERE ... IS DISTINCT FROM` guard skips rows already at the target value. Co-Authored-By: Claude --- .../0094_enable_async_pipeline_workers.py | 53 +++++++++++-------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/ami/main/migrations/0094_enable_async_pipeline_workers.py b/ami/main/migrations/0094_enable_async_pipeline_workers.py index ac95b9766..29418a982 100644 --- a/ami/main/migrations/0094_enable_async_pipeline_workers.py +++ b/ami/main/migrations/0094_enable_async_pipeline_workers.py @@ -3,38 +3,49 @@ This rolls out async ML processing (workers that pull tasks from the NATS queue instead of the synchronous push API) to all projects at once. New projects keep -the model default of ``False`` until they are opted in separately; this migration -only updates rows that exist at deploy time. +whatever the model default is at creation time; this migration only updates rows +that exist at deploy time. The flag lives inside the ``feature_flags`` JSONB column (a ``ProjectFeatureFlags`` -pydantic model). We read each project's flags through the historical model, set the -one boolean, and write it back. The reverse flips the flag back off for every -project — a blanket disable. It does not restore per-project values from before the -rollout, because the field default is ``False`` and this is the first global enable, -so no project is expected to have been ``True`` beforehand. +pydantic model). The update toggles only the one key server-side with ``jsonb_set`` +rather than reading each project's JSON into Python, mutating it, and writing the +whole value back. Doing it in place leaves the other feature flags untouched even +if another process changes one of them during the deploy, and it runs as a single +statement instead of one save per row. The reverse flips the flag back off for +every project — a blanket disable. It does not restore per-project values from +before the rollout, because the field default is ``False`` and this is the first +global enable, so no project is expected to have been ``True`` beforehand. """ from django.db import migrations -def enable_async_pipeline_workers(apps, schema_editor): +def _set_async_pipeline_workers(apps, schema_editor, *, enabled): Project = apps.get_model("main", "Project") - for project in Project.objects.all(): - flags = project.feature_flags - if not flags.async_pipeline_workers: - flags.async_pipeline_workers = True - project.feature_flags = flags - project.save(update_fields=["feature_flags"]) + table = schema_editor.connection.ops.quote_name(Project._meta.db_table) + with schema_editor.connection.cursor() as cursor: + cursor.execute( + f""" + UPDATE {table} + SET feature_flags = jsonb_set( + COALESCE(feature_flags, '{{}}'::jsonb), + '{{async_pipeline_workers}}', + to_jsonb(%s::boolean), + true + ) + WHERE COALESCE((feature_flags ->> 'async_pipeline_workers')::boolean, false) + IS DISTINCT FROM %s + """, + [enabled, enabled], + ) + + +def enable_async_pipeline_workers(apps, schema_editor): + _set_async_pipeline_workers(apps, schema_editor, enabled=True) def disable_async_pipeline_workers(apps, schema_editor): - Project = apps.get_model("main", "Project") - for project in Project.objects.all(): - flags = project.feature_flags - if flags.async_pipeline_workers: - flags.async_pipeline_workers = False - project.feature_flags = flags - project.save(update_fields=["feature_flags"]) + _set_async_pipeline_workers(apps, schema_editor, enabled=False) class Migration(migrations.Migration): From 1be120493fdc48da6167ca10712ac77f351527a9 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 26 Jun 2026 16:37:15 -0700 Subject: [PATCH 4/5] test(jobs): pin sync dispatch by disabling the flag explicitly MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `test_ml_job_dispatch_mode_set_on_creation` asserted that an ML job on a default project dispatches via sync_api, which relied on `async_pipeline_workers` defaulting to False. Now that the default is True, the sync branch must set the flag off explicitly to exercise that path — the async branch already sets it on. Pins both transitions instead of leaning on the default. Co-Authored-By: Claude --- ami/jobs/tests/test_jobs.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ami/jobs/tests/test_jobs.py b/ami/jobs/tests/test_jobs.py index 7a7f8d5ff..4aab76437 100644 --- a/ami/jobs/tests/test_jobs.py +++ b/ami/jobs/tests/test_jobs.py @@ -1215,7 +1215,10 @@ def test_dispatch_mode_filtering(self): def test_ml_job_dispatch_mode_set_on_creation(self): """Test that ML jobs get dispatch_mode set based on project feature flags at creation time.""" - # Without async flag, ML job should default to sync_api + # With the async flag disabled, an ML job is dispatched via sync_api. + self.project.feature_flags.async_pipeline_workers = False + self.project.save() + sync_job = Job.objects.create( job_type_key=MLJob.key, project=self.project, From 29cf61917b91267df2033c38bd5fa141466d643c Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 26 Jun 2026 17:11:35 -0700 Subject: [PATCH 5/5] docs(main): correct reverse-migration docstring on prior flag state MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The reverse is a blanket disable; some projects may have had the flag enabled individually before this rollout, so the docstring no longer claims no project was True beforehand — it states the reverse returns every project to the off state rather than to its prior value. Co-Authored-By: Claude --- ami/main/migrations/0094_enable_async_pipeline_workers.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ami/main/migrations/0094_enable_async_pipeline_workers.py b/ami/main/migrations/0094_enable_async_pipeline_workers.py index 29418a982..109df3451 100644 --- a/ami/main/migrations/0094_enable_async_pipeline_workers.py +++ b/ami/main/migrations/0094_enable_async_pipeline_workers.py @@ -13,8 +13,9 @@ if another process changes one of them during the deploy, and it runs as a single statement instead of one save per row. The reverse flips the flag back off for every project — a blanket disable. It does not restore per-project values from -before the rollout, because the field default is ``False`` and this is the first -global enable, so no project is expected to have been ``True`` beforehand. +before the rollout: some projects may have had the flag enabled individually +beforehand, so the reverse returns every project to the off state rather than to +its prior value. """ from django.db import migrations