From fce66be45b268d21f8d84a2d3486f145bf5bba51 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 10 Jun 2026 20:41:25 -0700 Subject: [PATCH] fix: split large result POSTs so wide-taxonomy batches fit under proxy limits Wide-taxonomy pipelines (e.g. the global moths model with ~29k classes) emit roughly 2 MB per detection because each classification carries full-length labels, scores, and logits arrays. A single processed batch of two dozen images with several detections each therefore serialized to 110-140 MB, which reverse proxies rejected with HTTP 413 even after the body limit was raised to 512 MB. The results for one batch were already scoped to the current batch only (no accumulation across batches), so the size came purely from payload width times detection count. This change splits the results for a batch across multiple POST requests, each kept at or below a configurable byte cap, so no single request body exceeds the proxy limit. - Add chunk_results_by_size() and make post_batch_results() serialize each result once, greedily pack them into byte-bounded chunks, and POST each chunk; it now returns True only if every chunk succeeds. A single result that exceeds the cap on its own is sent alone (and logged) rather than dropped. - Add AMI_ANTENNA_RESULT_POST_MAX_BYTES setting (default 25 MB) and thread it through ResultPoster to post_batch_results. - Add tests asserting each POST body stays under the cap, no results are dropped, and the unsplit baseline would have exceeded the cap. Co-Authored-By: Claude Opus 4.8 (1M context) --- trapdata/antenna/client.py | 131 ++++++++++-- trapdata/antenna/result_posting.py | 12 +- .../antenna/tests/test_result_chunking.py | 195 ++++++++++++++++++ trapdata/antenna/worker.py | 5 +- trapdata/settings.py | 17 ++ 5 files changed, 341 insertions(+), 19 deletions(-) create mode 100644 trapdata/antenna/tests/test_result_chunking.py diff --git a/trapdata/antenna/client.py b/trapdata/antenna/client.py index 5e8cde6..3c1b3e0 100644 --- a/trapdata/antenna/client.py +++ b/trapdata/antenna/client.py @@ -1,5 +1,6 @@ """Antenna API client for fetching jobs and posting results.""" +import json import socket import requests @@ -14,6 +15,12 @@ from trapdata.api.utils import get_http_session from trapdata.common.logs import logger +# Default maximum size (bytes) of a single result POST body. Used when a caller +# does not pass an explicit limit (e.g. settings.antenna_result_post_max_bytes). +# Chosen to stay well under the reverse-proxy request-body limit (commonly 100 MB) +# while still allowing several wide-taxonomy detections per request. +DEFAULT_RESULT_POST_MAX_BYTES = 25 * 1024 * 1024 + def get_full_service_name(service_name: str) -> str: """Build full service name with hostname. @@ -71,41 +78,133 @@ def get_jobs( return [] +def _result_json_size(result_json: dict) -> int: + """Approximate the serialized byte size of one result entry. + + Uses a compact JSON encoding (no extra whitespace) so the estimate tracks + what ``requests`` actually sends. The few bytes of array/comma framing that + join entries in the final payload are ignored; they are negligible next to + the per-result content for wide-taxonomy classifiers. + """ + return len(json.dumps(result_json, separators=(",", ":")).encode("utf-8")) + + +def chunk_results_by_size( + results_json: list[dict], + max_bytes: int, +) -> list[list[dict]]: + """Greedily pack already-serialized result dicts into byte-bounded chunks. + + Each returned chunk, once wrapped in the ``{"results": [...]}`` envelope, is + intended to stay at or below ``max_bytes``. A single result that exceeds + ``max_bytes`` on its own is emitted as its own chunk (it cannot be split + further without changing the per-image API contract), and a warning is + logged so the oversize case is visible. + + Args: + results_json: Result entries already converted to JSON-compatible dicts. + max_bytes: Target maximum size in bytes for one POST body. + + Returns: + A list of chunks, where each chunk is a list of result dicts. Returns an + empty list when given no results. + """ + if not results_json: + return [] + if max_bytes <= 0: + # Defensive: a non-positive cap would otherwise loop forever. Fall back + # to posting everything in a single chunk. + return [list(results_json)] + + # Account for the constant envelope overhead of ``{"results":[]}``. + envelope_overhead = len(b'{"results":[]}') + budget = max(1, max_bytes - envelope_overhead) + + chunks: list[list[dict]] = [] + current: list[dict] = [] + current_size = 0 + + for result_json in results_json: + size = _result_json_size(result_json) + if size > budget: + logger.warning( + f"Single result entry is {size} bytes, larger than the " + f"{max_bytes}-byte POST limit; sending it in its own request. " + "It may still be rejected by the server or proxy." + ) + # +1 accounts for the comma joining this entry to the previous one. + added_size = size + (1 if current else 0) + if current and current_size + added_size > budget: + chunks.append(current) + current = [] + current_size = 0 + added_size = size + current.append(result_json) + current_size += added_size + + if current: + chunks.append(current) + return chunks + + def post_batch_results( base_url: str, auth_token: str, job_id: int, results: list[AntennaTaskResult], + max_bytes: int = DEFAULT_RESULT_POST_MAX_BYTES, ) -> bool: """ - Post batch results back to the API. + Post batch results back to the API, splitting large batches across requests. + + The results for one processed batch are serialized once and packed into one + or more POST bodies that each stay at or below ``max_bytes``. This prevents a + single dense, wide-taxonomy batch from producing a request body large enough + to be rejected by the reverse proxy (HTTP 413). Args: base_url: Antenna API base URL (e.g., "http://localhost:8000/api/v2") auth_token: API authentication token job_id: Job ID results: List of AntennaTaskResult objects + max_bytes: Maximum size in bytes of a single POST body. Returns: - True if successful, False otherwise + True only if every chunk was posted successfully, False otherwise. """ + if not results: + return True + url = f"{base_url.rstrip('/')}/jobs/{job_id}/result/" - payload = AntennaTaskResults(results=results) + # Serialize each result once, then group into byte-bounded chunks so we never + # pay the serialization cost twice. + results_json = [ + AntennaTaskResults(results=[r]).model_dump(mode="json")["results"][0] + for r in results + ] + chunks = chunk_results_by_size(results_json, max_bytes) + + all_ok = True with get_http_session(auth_token) as session: - try: - response = session.post( - url, json=payload.model_dump(mode="json"), timeout=60 - ) - response.raise_for_status() - result = AntennaResultPostResponse.model_validate(response.json()) - logger.debug( - f"Posted {len(results)} results to job {job_id}: {result.results_queued} queued" - ) - return True - except requests.RequestException as e: - logger.error(f"Failed to post results to {url}: {e}") - return False + for chunk_idx, chunk in enumerate(chunks): + payload = {"results": chunk} + try: + response = session.post(url, json=payload, timeout=60) + response.raise_for_status() + result = AntennaResultPostResponse.model_validate(response.json()) + logger.debug( + f"Posted chunk {chunk_idx + 1}/{len(chunks)} " + f"({len(chunk)} results) to job {job_id}: " + f"{result.results_queued} queued" + ) + except requests.RequestException as e: + logger.error( + f"Failed to post result chunk {chunk_idx + 1}/{len(chunks)} " + f"to {url}: {e}" + ) + all_ok = False + return all_ok def get_user_projects(base_url: str, auth_token: str) -> list[dict]: diff --git a/trapdata/antenna/result_posting.py b/trapdata/antenna/result_posting.py index cd76737..c287fa3 100644 --- a/trapdata/antenna/result_posting.py +++ b/trapdata/antenna/result_posting.py @@ -24,7 +24,7 @@ from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait from dataclasses import dataclass -from trapdata.antenna.client import post_batch_results +from trapdata.antenna.client import DEFAULT_RESULT_POST_MAX_BYTES, post_batch_results from trapdata.common.logs import logger @@ -69,9 +69,11 @@ def __init__( self, max_pending: int = 5, future_timeout: float = 30.0, + max_post_bytes: int = DEFAULT_RESULT_POST_MAX_BYTES, ): self.max_pending = max_pending self.future_timeout = future_timeout # Timeout for individual future waits + self.max_post_bytes = max_post_bytes # Per-POST body size cap (bytes) self.executor = ThreadPoolExecutor( max_workers=2, thread_name_prefix="result_poster" ) @@ -167,7 +169,13 @@ def _post_with_timing( True if successful, False otherwise """ try: - success = post_batch_results(base_url, auth_token, job_id, results) + success = post_batch_results( + base_url, + auth_token, + job_id, + results, + max_bytes=self.max_post_bytes, + ) elapsed_time = time.time() - start_time with self._metrics_lock: diff --git a/trapdata/antenna/tests/test_result_chunking.py b/trapdata/antenna/tests/test_result_chunking.py new file mode 100644 index 0000000..a80b8a9 --- /dev/null +++ b/trapdata/antenna/tests/test_result_chunking.py @@ -0,0 +1,195 @@ +"""Tests for byte-bounded chunking of result POSTs to the Antenna API. + +These tests reproduce the production failure mode where a single result POST for +a wide-taxonomy pipeline (e.g. the global moths model, ~29k classes) grew to +100+ MB and was rejected by the reverse proxy with HTTP 413. The fix splits the +results for one processed batch across multiple POSTs so each request body stays +under a configurable size cap. +""" + +import datetime +import json +from unittest import TestCase +from unittest.mock import MagicMock, patch + +from trapdata.antenna.client import chunk_results_by_size, post_batch_results +from trapdata.antenna.schemas import AntennaTaskResult, AntennaTaskResults +from trapdata.api.schemas import ( + AlgorithmReference, + BoundingBox, + ClassificationResponse, + DetectionResponse, + PipelineResultsResponse, + SourceImageResponse, +) + + +def _make_detection(image_id: str, num_classes: int) -> DetectionResponse: + """Build a DetectionResponse carrying full-width classification arrays. + + This mirrors what the global moths classifier emits: one classification with + labels/scores/logits arrays each of length ``num_classes``. + """ + labels = [f"Genus_species_{i:06d}" for i in range(num_classes)] + scores = [1.0 / num_classes] * num_classes + logits = [float(i) for i in range(num_classes)] + return DetectionResponse( + source_image_id=image_id, + bbox=BoundingBox(x1=0, y1=0, x2=100, y2=100), + algorithm=AlgorithmReference(name="Object Detector", key="detector"), + timestamp=datetime.datetime(2024, 1, 1, 0, 0, 0), + classifications=[ + ClassificationResponse( + classification=labels[0], + labels=labels, + scores=scores, + logits=logits, + algorithm=AlgorithmReference( + name="Global Species Classifier", key="global_moths_2024" + ), + timestamp=datetime.datetime(2024, 1, 1, 0, 0, 0), + ) + ], + ) + + +def _make_result( + image_id: str, num_classes: int, detections_per_image: int = 1 +) -> AntennaTaskResult: + """Build one AntennaTaskResult for a single image with N wide detections.""" + detections = [ + _make_detection(image_id, num_classes) for _ in range(detections_per_image) + ] + return AntennaTaskResult( + reply_subject=f"reply.{image_id}", + result=PipelineResultsResponse( + pipeline="global_moths_2024", + source_images=[ + SourceImageResponse(id=image_id, url=f"http://x/{image_id}") + ], + detections=detections, + total_time=1.0, + ), + ) + + +def _serialized_body_size(results: list[AntennaTaskResult]) -> int: + """Size in bytes of the JSON body actually sent for a list of results.""" + payload = AntennaTaskResults(results=results).model_dump(mode="json") + return len(json.dumps(payload).encode("utf-8")) + + +class TestChunkResultsBySize(TestCase): + """Unit tests for the greedy byte-bounded packer.""" + + def test_empty_results_yields_no_chunks(self): + self.assertEqual(chunk_results_by_size([], max_bytes=1000), []) + + def test_each_chunk_stays_under_cap(self): + # ~29k-class detections: each result is ~2 MB. A cap of 5 MB should pack + # at most ~2 results per chunk. + num_classes = 29_000 + results = [_make_result(f"img{i}", num_classes) for i in range(8)] + results_json = AntennaTaskResults(results=results).model_dump(mode="json")[ + "results" + ] + + max_bytes = 5 * 1024 * 1024 + chunks = chunk_results_by_size(results_json, max_bytes=max_bytes) + + self.assertGreater(len(chunks), 1, "expected the batch to be split") + for chunk in chunks: + body_size = len(json.dumps({"results": chunk}).encode("utf-8")) + self.assertLessEqual( + body_size, + max_bytes, + f"chunk body {body_size} exceeded cap {max_bytes}", + ) + + def test_no_results_dropped(self): + num_classes = 1_000 + results = [_make_result(f"img{i}", num_classes) for i in range(10)] + results_json = AntennaTaskResults(results=results).model_dump(mode="json")[ + "results" + ] + chunks = chunk_results_by_size(results_json, max_bytes=500_000) + total = sum(len(c) for c in chunks) + self.assertEqual(total, len(results)) + + def test_oversize_single_result_gets_own_chunk(self): + # One result larger than the cap cannot be split below one image; it must + # still be emitted (in its own chunk) rather than dropped. + num_classes = 29_000 + results = [_make_result("big", num_classes, detections_per_image=4)] + results_json = AntennaTaskResults(results=results).model_dump(mode="json")[ + "results" + ] + chunks = chunk_results_by_size(results_json, max_bytes=1 * 1024 * 1024) + self.assertEqual(len(chunks), 1) + self.assertEqual(len(chunks[0]), 1) + + +class TestPostBatchResultsChunking(TestCase): + """post_batch_results must split a large batch across multiple POST bodies.""" + + def test_large_batch_split_into_multiple_under_cap_posts(self): + num_classes = 29_000 + # 8 images, each a single wide detection (~2 MB) -> ~15 MB total. + results = [_make_result(f"img{i}", num_classes) for i in range(8)] + + max_bytes = 4 * 1024 * 1024 + posted_bodies: list[int] = [] + + fake_response = MagicMock() + fake_response.raise_for_status.return_value = None + fake_response.json.return_value = { + "status": "accepted", + "job_id": 1, + "results_queued": 0, + } + + def fake_post(url, json=None, timeout=None): + body_size = len(__import__("json").dumps(json).encode("utf-8")) + posted_bodies.append(body_size) + return fake_response + + fake_session = MagicMock() + fake_session.post.side_effect = fake_post + fake_session.__enter__.return_value = fake_session + fake_session.__exit__.return_value = False + + with patch( + "trapdata.antenna.client.get_http_session", return_value=fake_session + ): + ok = post_batch_results( + base_url="http://x/api/v2", + auth_token="t", + job_id=1, + results=results, + max_bytes=max_bytes, + ) + + self.assertTrue(ok) + self.assertGreater(len(posted_bodies), 1, "expected multiple POSTs") + for size in posted_bodies: + self.assertLessEqual(size, max_bytes, f"POST body {size} exceeded cap") + + def test_unsplit_baseline_would_exceed_cap(self): + # Sanity check that the un-chunked body really is over the cap, so the + # test above is exercising a real split, not a no-op. + num_classes = 29_000 + results = [_make_result(f"img{i}", num_classes) for i in range(8)] + body_size = _serialized_body_size(results) + self.assertGreater(body_size, 4 * 1024 * 1024) + + def test_empty_results_is_noop_success(self): + with patch("trapdata.antenna.client.get_http_session") as get_session: + ok = post_batch_results( + base_url="http://x/api/v2", + auth_token="t", + job_id=1, + results=[], + max_bytes=1000, + ) + self.assertTrue(ok) + get_session.assert_not_called() diff --git a/trapdata/antenna/worker.py b/trapdata/antenna/worker.py index 2b7e1db..832deb9 100644 --- a/trapdata/antenna/worker.py +++ b/trapdata/antenna/worker.py @@ -465,7 +465,10 @@ def _process_job( if not classifier: classifier = classifier_class(source_images=[], detections=[]) detector = APIMothDetector([]) - result_poster = ResultPoster(max_pending=MAX_PENDING_POSTS) + result_poster = ResultPoster( + max_pending=MAX_PENDING_POSTS, + max_post_bytes=settings.antenna_result_post_max_bytes, + ) if use_binary_filter: binary_filter = MothClassifierBinary( diff --git a/trapdata/settings.py b/trapdata/settings.py index b07e043..b08d70b 100644 --- a/trapdata/settings.py +++ b/trapdata/settings.py @@ -42,6 +42,14 @@ class Settings(BaseSettings): antenna_api_auth_token: str = "" antenna_service_name: str = "AMI Data Companion" antenna_api_batch_size: int = 24 + # Maximum size (in bytes) of a single result POST body to the Antenna API. + # The results for one processed batch are split across multiple POSTs so that + # no single request exceeds this limit. Wide-taxonomy classifiers (e.g. the + # global moths model with ~29k classes) emit ~2 MB per detection because each + # classification carries full-length labels/scores/logits arrays, so a dense + # batch can otherwise produce a 100+ MB body that reverse proxies reject (413). + # Default 25 MB leaves headroom under common proxy limits (typically 100 MB). + antenna_result_post_max_bytes: int = 25 * 1024 * 1024 @pydantic.field_validator("image_base_path", "user_data_path") def validate_path(cls, v): @@ -170,6 +178,15 @@ class Config: "kivy_type": "numeric", "kivy_section": "antenna", }, + "antenna_result_post_max_bytes": { + "title": "Antenna Result POST Max Bytes", + "description": ( + "Maximum size in bytes of a single result POST body; results " + "for a batch are split across multiple POSTs to stay under it" + ), + "kivy_type": "numeric", + "kivy_section": "antenna", + }, "antenna_service_name": { "title": "Antenna Service Name", "description": "Name for the processing service registration (hostname will be added automatically)",