diff --git a/docs/content/releases/os_upgrading/2.59.md b/docs/content/releases/os_upgrading/2.59.md index 0ce83f4a1bc..a36e9e88b65 100644 --- a/docs/content/releases/os_upgrading/2.59.md +++ b/docs/content/releases/os_upgrading/2.59.md @@ -88,4 +88,9 @@ As announced in DefectDojo 2.57.0, the Stub Findings feature has been removed. T Any requests to this endpoint will now return a 404 Not Found error. The Stub Findings UI is no longer available. +## Configuration change in Watson Search Indexing + +In [PR 14881](https://github.com/DefectDojo/django-DefectDojo/pull/14881)We optimized the way the Django Watson search index is updated during imports and reimports. There is not a single configuration setting to manage the threshold: `DD_WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE`. The default value should work fine for most instances. + + For more information, check the [Release Notes](https://github.com/DefectDojo/django-DefectDojo/releases/tag/2.59.0). diff --git a/dojo/apps.py b/dojo/apps.py index 4b1af1ef192..41c36b8a83b 100644 --- a/dojo/apps.py +++ b/dojo/apps.py @@ -106,6 +106,9 @@ def ready(self): register_django_pghistory_models() configure_audit_system() + from dojo.middleware import install_intermediate_flush_hook # noqa: PLC0415 + install_intermediate_flush_hook() + def get_model_fields_with_extra(model, extra_fields=()): return get_model_fields(get_model_default_fields(model), extra_fields) diff --git a/dojo/celery_dispatch.py b/dojo/celery_dispatch.py index fde94336ec4..23e230f6a05 100644 --- a/dojo/celery_dispatch.py +++ b/dojo/celery_dispatch.py @@ -62,6 +62,9 @@ def dojo_dispatch_task(task_or_sig: _SupportsSi | _SupportsApplyAsync | Signatur - Inject `async_user_id` if missing. - Capture and inject pghistory context if available. - Respect `sync=True` (foreground execution) and user `block_execution`. + - Respect `force_async=True` (background execution even when the caller + would otherwise run synchronously, e.g. user has `block_execution`). + `force_async` wins over `sync` and `block_execution`. - Support `countdown=` for async dispatch. Returns: diff --git a/dojo/decorators.py b/dojo/decorators.py index bc3b898a3e1..296bafa306a 100644 --- a/dojo/decorators.py +++ b/dojo/decorators.py @@ -58,6 +58,11 @@ def get_tasks(self): def we_want_async(*args, func=None, **kwargs): from dojo.utils import get_current_user # noqa: PLC0415 circular import + force_async = kwargs.get("force_async", False) + if force_async: + logger.debug("dojo_async_task %s: running task in the background as force_async=True has been found as kwarg", func) + return True + sync = kwargs.get("sync", False) if sync: logger.debug("dojo_async_task %s: running task in the foreground as sync=True has been found as kwarg", func) diff --git a/dojo/middleware.py b/dojo/middleware.py index f3939d68c48..a576244312a 100644 --- a/dojo/middleware.py +++ b/dojo/middleware.py @@ -243,69 +243,83 @@ class AsyncSearchContextMiddleware(SearchContextMiddleware): """ def _close_search_context(self, request): - """Override watson's close behavior to trigger async updates when above threshold.""" + """Override watson's close behavior to always dispatch index updates asynchronously.""" if search_context_manager.is_active(): - from django.conf import settings # noqa: PLC0415 circular import - - # Extract tasks and check if we should trigger async update - captured_tasks = self._extract_tasks_for_async() - - # Get total number of instances across all model types - total_instances = sum(len(pk_list) for pk_list in captured_tasks.values()) - threshold = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_THRESHOLD", 100) - - # only needed when at least one model instance is updated - if total_instances > 0: - # If threshold is below 0, async updating is disabled - if threshold < 0: - logger.debug(f"AsyncSearchContextMiddleware: Async updating disabled (threshold={threshold}), using synchronous update") - elif total_instances > threshold: - logger.debug(f"AsyncSearchContextMiddleware: {total_instances} instances > {threshold} threshold, triggering async update") - self._trigger_async_index_update(captured_tasks) - # Invalidate to prevent synchronous index update by super()._close_search_context() - search_context_manager.invalidate() - else: - logger.debug(f"AsyncSearchContextMiddleware: {total_instances} instances <= {threshold} threshold, using synchronous update") - # Let watson handle synchronous update for small numbers + objects, _is_invalid = search_context_manager._stack[-1] + _drain_search_context_to_async(objects, source="AsyncSearchContextMiddleware") + # The set is now empty (or was already empty); watson's `end()` will + # bulk-save an empty iterator and short-circuit. No need to invalidate. super()._close_search_context(request) - def _extract_tasks_for_async(self): - """Extract tasks from the search context and group by model type for async processing.""" - current_tasks, _is_invalid = search_context_manager._stack[-1] - - # Group by model type for efficient batch processing - model_groups = {} - for _engine, obj in current_tasks: - model_key = f"{obj._meta.app_label}.{obj._meta.model_name}" - if model_key not in model_groups: - model_groups[model_key] = [] - model_groups[model_key].append(obj.pk) - # Log what we extracted per model type - for model_key, pk_list in model_groups.items(): - logger.debug(f"AsyncSearchContextMiddleware: Extracted {len(pk_list)} {model_key} instances for async indexing") +def _drain_search_context_to_async(objects, source): + """ + Group `objects` ({(engine, obj), ...}) by model, dispatch one + force_async celery task per WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE-sized + batch, and `set.discard()` the drained entries from `objects` in place. + + `objects` is the `set` inside `search_context_manager._stack[-1][0]`. + Mutating it in place is safe because watson's `_stack` is `threading.local` + and callers (request close + the wrapped `add_to_context`) hold the + active reference. + """ + if not objects: + return + + from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import + from dojo.tasks import update_watson_search_index_for_model # noqa: PLC0415 circular import + + # Snapshot before grouping so we don't iterate while mutating. + snapshot = list(objects) + model_groups = {} + for _engine, obj in snapshot: + model_key = f"{obj._meta.app_label}.{obj._meta.model_name}" + model_groups.setdefault(model_key, []).append(obj.pk) + + batch_size = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE", 1000) + for model_name, pk_list in model_groups.items(): + batches = [pk_list[i:i + batch_size] for i in range(0, len(pk_list), batch_size)] + # force_async=True keeps indexing off the request path even for users + # with block_execution=True — index updates are slow and never need + # to be synchronous from the user's perspective. + for i, batch in enumerate(batches, 1): + logger.debug(f"{source}: Triggering batch {i}/{len(batches)} for {model_name}: {len(batch)} instances") + dojo_dispatch_task(update_watson_search_index_for_model, model_name, batch, force_async=True) + + for entry in snapshot: + objects.discard(entry) + + +def install_intermediate_flush_hook(): + """ + Wrap `watson.search.search_context_manager.add_to_context` with a + size-based flush. Once the per-request set reaches + `WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE`, drain it into async tasks + and clear it in place. Bounds memory on long-running requests + (large imports) and starts celery batches earlier instead of + dispatching all at end-of-request. + + Idempotent — safe to call multiple times. + Setting WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE to 0 or below disables + the hook at runtime. + """ + cls = search_context_manager.__class__ + if getattr(cls, "_dd_intermediate_flush_installed", False): + return - return model_groups + original_add = cls.add_to_context - def _trigger_async_index_update(self, model_groups): - """Trigger async tasks to update search indexes, chunking large lists into batches of settings.WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE.""" - if not model_groups: + def add_to_context_with_flush(self, engine, obj): + original_add(self, engine, obj) + threshold = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE", 1000) + if threshold <= 0 or not self._stack: return + objects, is_invalid = self._stack[-1] + if is_invalid or len(objects) < threshold: + return + _drain_search_context_to_async(objects, source="AsyncSearchContextMiddleware[intermediate]") - # Import here to avoid circular import - from django.conf import settings # noqa: PLC0415 circular import - - from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import - from dojo.tasks import update_watson_search_index_for_model # noqa: PLC0415 circular import - - # Create tasks per model type, chunking large lists into configurable batches - for model_name, pk_list in model_groups.items(): - # Chunk into batches using configurable batch size (compatible with Python 3.11) - batch_size = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE", 1000) - batches = [pk_list[i:i + batch_size] for i in range(0, len(pk_list), batch_size)] - - # Create tasks for each batch and log each one - for i, batch in enumerate(batches, 1): - logger.debug(f"AsyncSearchContextMiddleware: Triggering batch {i}/{len(batches)} for {model_name}: {len(batch)} instances") - dojo_dispatch_task(update_watson_search_index_for_model, model_name, batch) + cls.add_to_context = add_to_context_with_flush + cls._dd_intermediate_flush_installed = True + logger.debug("AsyncSearchContextMiddleware: intermediate flush hook installed on %s", cls.__name__) diff --git a/dojo/settings/settings.dist.py b/dojo/settings/settings.dist.py index 522ad54e7c0..eb75a76c1fd 100644 --- a/dojo/settings/settings.dist.py +++ b/dojo/settings/settings.dist.py @@ -115,9 +115,17 @@ DD_TAG_BULK_ADD_BATCH_SIZE=(int, 1000), # Tagulous slug truncate unique setting. Set to -1 to use tagulous internal default (5) DD_TAGULOUS_SLUG_TRUNCATE_UNIQUE=(int, -1), - # Minimum number of model updated instances before search index updates as performaed asynchronously. Set to -1 to disable async updates. - DD_WATSON_ASYNC_INDEX_UPDATE_THRESHOLD=(int, 10), + # Batch size for async watson search-index update tasks. Also doubles as + # the per-request intermediate-flush threshold: once the in-memory watson + # context reaches this many pending objects mid-request, + # AsyncSearchContextMiddleware flushes them to async celery tasks instead + # of waiting for end-of-request. Set to 0 (or negative) to disable the + # intermediate flush. DD_WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE=(int, 1000), + # When True, the async watson indexer auto-derives select_related/prefetch_related + # paths from each adapter's `fields`/`store` to avoid N+1 queries during indexing. + # Falls back to a plain queryset on any error (logged). + DD_WATSON_INDEX_PREFETCH_ENABLED=(bool, True), DD_FOOTER_VERSION=(str, ""), # models should be passed to celery by ID, default is False (for now) DD_DATABASE_ENGINE=(str, "django.db.backends.postgresql"), @@ -869,8 +877,8 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param CELERY_IMPORTS = ("dojo.tools.tool_issue_updater", ) # Watson async index update settings -WATSON_ASYNC_INDEX_UPDATE_THRESHOLD = env("DD_WATSON_ASYNC_INDEX_UPDATE_THRESHOLD") WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE = env("DD_WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE") +WATSON_INDEX_PREFETCH_ENABLED = env("DD_WATSON_INDEX_PREFETCH_ENABLED") # Celery beat scheduled tasks CELERY_BEAT_SCHEDULE = { diff --git a/dojo/tasks.py b/dojo/tasks.py index c49c69ab57c..54c62427b13 100644 --- a/dojo/tasks.py +++ b/dojo/tasks.py @@ -182,6 +182,8 @@ def update_watson_search_index_for_model(model_name, pk_list, *args, **kwargs): """ from watson.search import SearchContextManager, default_search_engine # noqa: PLC0415 circular import + from dojo.utils_watson_prefetch import build_indexing_queryset # noqa: PLC0415 circular import + logger.debug(f"Starting async watson index update for {len(pk_list)} {model_name} instances") try: @@ -194,8 +196,11 @@ def update_watson_search_index_for_model(model_name, pk_list, *args, **kwargs): app_label, model_name = model_name.split(".") model_class = apps.get_model(app_label, model_name) - # Bulk load instances and add them to the context - instances = model_class.objects.filter(pk__in=pk_list) + # Bulk load instances and add them to the context. The queryset auto-derives + # select_related/prefetch_related from the adapter's fields/store paths to + # avoid N+1 queries during indexing. Disable via DD_WATSON_INDEX_PREFETCH_ENABLED=False. + adapter = engine.get_adapter(model_class) + instances = build_indexing_queryset(model_class, pk_list, adapter) instances_added = 0 instances_skipped = 0 diff --git a/dojo/utils_watson_prefetch.py b/dojo/utils_watson_prefetch.py new file mode 100644 index 00000000000..95b2f4ca865 --- /dev/null +++ b/dojo/utils_watson_prefetch.py @@ -0,0 +1,133 @@ +""" +Query prefetch helper for the async watson search indexer. + +Watson's `SearchAdapter._resolve_field` walks `__`-separated relation paths via +per-instance `getattr`, which triggers one query per FK hop per object during +indexing. For deep adapter `fields`/`store` paths (e.g. +`finding__test__engagement__product__name`) on a 1000-row batch this means +thousands of extra queries. + +`build_prefetched_queryset` introspects the adapter paths against the model's +`_meta`, classifies each prefix as FK chain (`select_related`) or M2M / reverse +(`prefetch_related`), and applies them in a single query plan. On any failure +the caller is expected to fall back to the plain queryset — watson still works +correctly, just slower. + +Toggle: ``settings.WATSON_INDEX_PREFETCH_ENABLED`` (default True). +""" + +import logging + +from django.core.exceptions import FieldDoesNotExist + +logger = logging.getLogger(__name__) + + +def _classify_path(model, prefix): + """ + Walk a `__`-separated relation prefix against `model._meta`. + + Returns + ------- + "select" | "prefetch" | None + - "select": pure FK / OneToOne chain (safe for select_related). + - "prefetch": chain contains a many-to-many or reverse-many leg. + - None: unresolvable (callable on adapter, GenericForeignKey, typo, etc.) — + caller should drop this path. + + """ + is_multi = False + current = model + for part in prefix.split("__"): + try: + field = current._meta.get_field(part) + except FieldDoesNotExist: + return None + if getattr(field, "many_to_many", False) or getattr(field, "one_to_many", False): + is_multi = True + related = getattr(field, "related_model", None) + if related is None: + # Reached a concrete field (e.g. CharField) — chain ends here. The + # caller passes the prefix without the leaf, so this should be rare. + return "prefetch" if is_multi else "select" + current = related + return "prefetch" if is_multi else "select" + + +def derive_relation_paths(model, adapter): + """ + Inspect adapter `fields` + `store` and return ``(select_paths, prefetch_paths)``. + + Each entry is a relation prefix suitable for passing to + `QuerySet.select_related` / `QuerySet.prefetch_related`. Paths that cannot + be resolved against ``model._meta`` are dropped (watson will resolve them + at indexing time the slow way). + """ + select_paths = set() + prefetch_paths = set() + + raw_paths = tuple(getattr(adapter, "fields", ()) or ()) + tuple(getattr(adapter, "store", ()) or ()) + for path in raw_paths: + if "__" not in path: + continue + prefix = path.rsplit("__", 1)[0] + classification = _classify_path(model, prefix) + if classification == "select": + select_paths.add(prefix) + elif classification == "prefetch": + prefetch_paths.add(prefix) + # None: drop silently — adapter property/GFK, watson handles at runtime. + + return select_paths, prefetch_paths + + +def build_indexing_queryset(model, pk_list, adapter): + """ + Build the queryset used by the async watson indexer. + + Applies `select_related` / `prefetch_related` derived from the adapter when + ``settings.WATSON_INDEX_PREFETCH_ENABLED`` is True (default). On any error + we log loudly and return the plain queryset so indexing still succeeds. + """ + from django.conf import settings # noqa: PLC0415 -- settings access at call time + + base_qs = model.objects.filter(pk__in=pk_list) + + if not getattr(settings, "WATSON_INDEX_PREFETCH_ENABLED", True): + logger.debug( + "WATSON_INDEX_PREFETCH_ENABLED=False, indexing %s with plain queryset", + model.__name__, + ) + return base_qs + + try: + select_paths, prefetch_paths = derive_relation_paths(model, adapter) + except Exception: + logger.exception( + "Watson prefetch path derivation failed for %s — falling back to plain queryset", + model.__name__, + ) + return base_qs + + if not select_paths and not prefetch_paths: + return base_qs + + try: + qs = base_qs + if select_paths: + qs = qs.select_related(*select_paths) + if prefetch_paths: + qs = qs.prefetch_related(*prefetch_paths) + logger.debug( + "Watson indexing %s with select_related=%s prefetch_related=%s", + model.__name__, sorted(select_paths), sorted(prefetch_paths), + ) + except Exception: + logger.exception( + "Watson prefetch application failed for %s (select=%s prefetch=%s) — " + "falling back to plain queryset", + model.__name__, sorted(select_paths), sorted(prefetch_paths), + ) + return base_qs + else: + return qs diff --git a/unittests/test_celery_dispatch_force_async.py b/unittests/test_celery_dispatch_force_async.py new file mode 100644 index 00000000000..699d9bfa941 --- /dev/null +++ b/unittests/test_celery_dispatch_force_async.py @@ -0,0 +1,38 @@ +""" +Tests for the `force_async` kwarg on dojo_dispatch_task / we_want_async. + +`force_async=True` is for callers (e.g. the watson async indexer middleware) +that should always run their celery task in the background even when the +current user has `block_execution=True` or the caller passes `sync=True`. +""" + +from unittest.mock import patch + +from dojo.decorators import we_want_async + +from .dojo_test_case import DojoTestCase + + +class TestForceAsync(DojoTestCase): + + def test_force_async_true_overrides_sync(self): + """force_async=True wins even when sync=True is also present.""" + self.assertTrue(we_want_async(sync=True, force_async=True)) + + def test_force_async_true_overrides_block_execution(self): + """force_async=True ignores Dojo_User.wants_block_execution().""" + with patch("dojo.utils.get_current_user") as get_user, \ + patch("dojo.models.Dojo_User.wants_block_execution", return_value=True): + get_user.return_value = object() # any truthy non-None user + self.assertTrue(we_want_async(force_async=True)) + + def test_force_async_false_falls_through_to_normal_logic(self): + """force_async=False is the same as not passing it at all.""" + with patch("dojo.utils.get_current_user") as get_user, \ + patch("dojo.models.Dojo_User.wants_block_execution", return_value=True): + get_user.return_value = object() + self.assertFalse(we_want_async(force_async=False)) + + def test_sync_still_honoured_without_force_async(self): + """Existing sync=True behavior is unchanged.""" + self.assertFalse(we_want_async(sync=True)) diff --git a/unittests/test_tag_inheritance_perf.py b/unittests/test_tag_inheritance_perf.py index a0cfa3958ab..163acc71d3b 100644 --- a/unittests/test_tag_inheritance_perf.py +++ b/unittests/test_tag_inheritance_perf.py @@ -501,7 +501,13 @@ def test_baseline_zap_scan_reimport_no_change_v3(self): # to the full active-user set (legacy doesn't filter by RBAC role) but # the per-Alert ForeignKey.validate EXISTS probe is gone, netting -7 # against the pre-Track-B numbers. - EXPECTED_ZAP_IMPORT_V2 = 1378 - EXPECTED_ZAP_IMPORT_V3 = 1256 - EXPECTED_ZAP_REIMPORT_NO_CHANGE_V2 = 69 - EXPECTED_ZAP_REIMPORT_NO_CHANGE_V3 = 87 + # perf/watson-index-prefetch: -52 queries on both V2/V3 import paths from + # adapter-derived select_related/prefetch_related in the async watson + # indexer (executed inline under CELERY_TASK_ALWAYS_EAGER). + # Removal of WATSON_ASYNC_INDEX_UPDATE_THRESHOLD makes async dispatch + # unconditional; reimport no-change paths now go through the celery task + # path (+5 queries vs. the previous sync-under-threshold branch). + EXPECTED_ZAP_IMPORT_V2 = 1326 + EXPECTED_ZAP_IMPORT_V3 = 1204 + EXPECTED_ZAP_REIMPORT_NO_CHANGE_V2 = 74 + EXPECTED_ZAP_REIMPORT_NO_CHANGE_V3 = 92 diff --git a/unittests/test_watson_async_search_index.py b/unittests/test_watson_async_search_index.py index b4fca670e71..8edba606ac7 100644 --- a/unittests/test_watson_async_search_index.py +++ b/unittests/test_watson_async_search_index.py @@ -10,9 +10,10 @@ from .dojo_test_case import DojoAPITestCase +@override_settings(CELERY_TASK_ALWAYS_EAGER=True) class TestWatsonAsyncSearchIndex(DojoAPITestCase): - """Test Watson search indexing works correctly for both sync and async updates.""" + """Test Watson search indexing dispatches async and finding becomes searchable.""" def setUp(self): """Set up test data and API client.""" @@ -89,25 +90,8 @@ def _import_and_check_watson_index(self, expected_message): return finding - def test_sync_watson_indexing_single_finding(self): - """Test that single finding import uses sync indexing and finding is searchable.""" - # Default threshold is 100, so single finding should use sync indexing - self._import_and_check_watson_index( - "Finding {finding_id} should be found in Watson search index", - ) - - @override_settings(WATSON_ASYNC_INDEX_UPDATE_THRESHOLD=0) - def test_async_watson_indexing_single_finding(self): - """Test that with threshold=0, single finding uses async indexing and is searchable.""" - # With threshold=0, even single finding should trigger async indexing + def test_watson_indexing_single_finding(self): + """Single finding import dispatches async indexing and finding is searchable.""" self._import_and_check_watson_index( "Finding {finding_id} should be found in Watson search index after async update", ) - - @override_settings(WATSON_ASYNC_INDEX_UPDATE_THRESHOLD=-1) - def test_disabled_async_watson_indexing(self): - """Test that with threshold=-1, async is disabled and sync indexing works.""" - # With threshold=-1, async should be completely disabled - self._import_and_check_watson_index( - "Finding {finding_id} should be found in Watson search index with sync update", - ) diff --git a/unittests/test_watson_index_prefetch.py b/unittests/test_watson_index_prefetch.py new file mode 100644 index 00000000000..c92cb7ac0d2 --- /dev/null +++ b/unittests/test_watson_index_prefetch.py @@ -0,0 +1,204 @@ +""" +Unit tests for dojo.utils_watson_prefetch — the helper that auto-applies +select_related/prefetch_related to the async watson indexer queryset. +""" + +import contextlib +from unittest.mock import patch + +from django.contrib.auth.models import User +from django.db import connection +from django.test import override_settings +from django.test.utils import CaptureQueriesContext +from django.utils import timezone +from watson.search import default_search_engine + +from dojo.models import ( + Endpoint, + Engagement, + Finding, + Product, + Product_Type, + Test, + Test_Type, + Vulnerability_Id, +) +from dojo.utils_watson_prefetch import ( + build_indexing_queryset, + derive_relation_paths, +) + +from .dojo_test_case import DojoTestCase + + +class TestDeriveRelationPaths(DojoTestCase): + + """Pure-Python introspection of adapter fields/store paths.""" + + def _adapter(self, model): + return default_search_engine.get_adapter(model) + + def test_product_paths(self): + """Product adapter stores `prod_type__name` — single FK hop, select_related.""" + select, prefetch = derive_relation_paths(Product, self._adapter(Product)) + self.assertIn("prod_type", select) + self.assertEqual(prefetch, set()) + + def test_finding_paths(self): + """Finding adapter has deep FK chains + jira_issue FK; all are select_related.""" + select, _ = derive_relation_paths(Finding, self._adapter(Finding)) + self.assertIn("test__engagement__product", select) + self.assertIn("jira_issue", select) + + def test_vulnerability_id_paths(self): + """Vulnerability_Id stores finding__test__engagement__product__name.""" + select, _ = derive_relation_paths(Vulnerability_Id, self._adapter(Vulnerability_Id)) + self.assertIn("finding__test__engagement__product", select) + + def test_endpoint_paths(self): + """Endpoint stores product__name — single FK hop.""" + select, _ = derive_relation_paths(Endpoint, self._adapter(Endpoint)) + self.assertIn("product", select) + + def test_unknown_path_dropped_silently(self): + """Adapter paths that don't resolve against _meta are dropped, not raised.""" + + class FakeAdapter: + fields = ("does_not_exist__nope",) + store = () + + select, prefetch = derive_relation_paths(Product, FakeAdapter()) + self.assertEqual(select, set()) + self.assertEqual(prefetch, set()) + + def test_plain_field_names_ignored(self): + """Field paths with no `__` (i.e. local CharField/TextField) are skipped.""" + + class FakeAdapter: + fields = ("name", "description") + store = () + + select, prefetch = derive_relation_paths(Product, FakeAdapter()) + self.assertEqual(select, set()) + self.assertEqual(prefetch, set()) + + +class TestBuildIndexingQueryset(DojoTestCase): + + """Behaviour of build_indexing_queryset under settings + error conditions.""" + + def test_prefetch_enabled_applies_select_related(self): + adapter = default_search_engine.get_adapter(Finding) + qs = build_indexing_queryset(Finding, [], adapter) + # select_related populates query.select_related as a dict + self.assertTrue(qs.query.select_related) + + @override_settings(WATSON_INDEX_PREFETCH_ENABLED=False) + def test_setting_disabled_returns_plain_queryset(self): + adapter = default_search_engine.get_adapter(Finding) + qs = build_indexing_queryset(Finding, [], adapter) + # Plain queryset has select_related == False (the default). + self.assertFalse(qs.query.select_related) + + def test_falls_back_when_derivation_raises(self): + """If path derivation blows up we log + return the plain queryset.""" + adapter = default_search_engine.get_adapter(Finding) + with patch( + "dojo.utils_watson_prefetch.derive_relation_paths", + side_effect=RuntimeError("boom"), + ), self.assertLogs("dojo.utils_watson_prefetch", level="ERROR") as captured: + qs = build_indexing_queryset(Finding, [], adapter) + self.assertFalse(qs.query.select_related) + self.assertTrue(any("falling back" in msg for msg in captured.output)) + + def test_unresolved_paths_are_dropped(self): + """Adapter paths that don't classify produce a plain queryset, not an error.""" + adapter = default_search_engine.get_adapter(Finding) + with patch( + "dojo.utils_watson_prefetch.derive_relation_paths", + return_value=(set(), set()), + ): + qs = build_indexing_queryset(Finding, [], adapter) + self.assertFalse(qs.query.select_related) + + +class TestPrefetchReducesQueriesOnIndexerPath(DojoTestCase): + + """ + End-to-end query-count check: iterating the indexer queryset and walking the + adapter's FK chain (`finding.test.engagement.product.name`, `finding.jira_issue`) + must produce dramatically fewer queries with prefetch enabled than with the + plain queryset. Locks in the N+1 elimination claim that perf tests only + observe indirectly through the import path. + """ + + N_FINDINGS = 5 + + @classmethod + def setUpTestData(cls): + now = timezone.now() + user, _ = User.objects.get_or_create(username="watson_prefetch_user", defaults={"is_active": True}) + pt, _ = Product_Type.objects.get_or_create(name="Watson Prefetch PT") + product = Product.objects.create(name="watson-prefetch-product", description="x", prod_type=pt) + eng = Engagement.objects.create(product=product, target_start=now, target_end=now) + tt, _ = Test_Type.objects.get_or_create(name="Watson Prefetch TT") + test = Test.objects.create(engagement=eng, test_type=tt, target_start=now, target_end=now) + cls.pk_list = [ + Finding.objects.create( + test=test, title=f"watson-prefetch-{i}", severity="Medium", reporter=user, + ).pk + for i in range(cls.N_FINDINGS) + ] + + def _walk_adapter_paths(self, finding): + # Touch the deep FK chains the Finding adapter resolves at index time. + # Without prefetch, each attribute hop is its own SELECT per finding. + # jira_issue is a reverse OneToOne — `_state.fields_cache` membership + # signals select_related populated it without triggering a fresh fetch. + _ = finding.test.engagement.product.name + # Access fires the query (or hits a populated cache); either way the + # query-count assertion captures the difference. + with contextlib.suppress(finding.__class__.jira_issue.RelatedObjectDoesNotExist): + _ = finding.jira_issue + + def _count_queries_with(self, *, prefetch_enabled): + adapter = default_search_engine.get_adapter(Finding) + with override_settings(WATSON_INDEX_PREFETCH_ENABLED=prefetch_enabled): + qs = build_indexing_queryset(Finding, self.pk_list, adapter) + with CaptureQueriesContext(connection) as ctx: + for f in qs: + self._walk_adapter_paths(f) + return len(ctx.captured_queries) + + def test_prefetch_enabled_uses_fewer_queries_than_plain_qs(self): + with_prefetch = self._count_queries_with(prefetch_enabled=True) + without_prefetch = self._count_queries_with(prefetch_enabled=False) + + # Plain qs: 1 SELECT + N * (test + engagement + product + jira_issue) ≈ 1 + 4N + # Prefetched qs: 1 SELECT with joins (select_related collapses the FK chain) + # Concrete numbers for N=5: ~21 vs 1. Assert a healthy margin without + # pinning exact counts (watson registrations may add more paths later). + self.assertLess( + with_prefetch, without_prefetch, + f"prefetch should reduce queries (with={with_prefetch}, without={without_prefetch})", + ) + self.assertGreaterEqual( + without_prefetch - with_prefetch, self.N_FINDINGS, + f"prefetch should save at least N queries on N findings " + f"(saved {without_prefetch - with_prefetch}, N={self.N_FINDINGS})", + ) + + def test_prefetch_enabled_single_query_for_fk_chain(self): + """With prefetch on, the FK chain walk must not issue extra SELECTs.""" + adapter = default_search_engine.get_adapter(Finding) + qs = build_indexing_queryset(Finding, self.pk_list, adapter) + # Force evaluation first, then capture only the attribute-walk phase. + findings = list(qs) + with CaptureQueriesContext(connection) as ctx: + for f in findings: + self._walk_adapter_paths(f) + self.assertEqual( + len(ctx.captured_queries), 0, + f"adapter FK walk should be query-free after prefetch; got {len(ctx.captured_queries)}: " + f"{[q['sql'] for q in ctx.captured_queries]}", + ) diff --git a/unittests/test_watson_intermediate_flush.py b/unittests/test_watson_intermediate_flush.py new file mode 100644 index 00000000000..624b516bf77 --- /dev/null +++ b/unittests/test_watson_intermediate_flush.py @@ -0,0 +1,121 @@ +""" +Unit tests for the AsyncSearchContextMiddleware intermediate-flush hook. + +The hook wraps `watson.search.search_context_manager.add_to_context` so that +once the per-request context reaches `WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE`, +accumulated pks are drained to async celery tasks mid-request and discarded +from the in-memory set. +""" + +from unittest.mock import patch + +from django.test import override_settings +from watson.search import search_context_manager + +from dojo.middleware import ( + _drain_search_context_to_async, # noqa: PLC2701 -- internal helper under test + install_intermediate_flush_hook, +) +from dojo.models import Product, Product_Type + +from .dojo_test_case import DojoTestCase + + +class TestIntermediateFlushHook(DojoTestCase): + + @classmethod + def setUpClass(cls): + super().setUpClass() + # Hook is installed at app startup via dojo.apps.ready(); ensure it's + # present in case test order isolates it. + install_intermediate_flush_hook() + + def setUp(self): + super().setUp() + self.prod_type = Product_Type.objects.create(name="Intermediate Flush PT") + # Pre-create some products to add to the watson context manually. + self.products = [ + Product.objects.create( + name=f"intermediate-flush-prod-{i}", + description="intermediate flush fixture", + prod_type=self.prod_type, + ) + for i in range(5) + ] + + def _open_context(self): + if not search_context_manager.is_active(): + search_context_manager.start() + + def _close_context(self): + # Invalidate before end() so watson doesn't try to bulk-save against + # the test DB (we only care about the in-memory set bookkeeping). + if search_context_manager.is_active(): + search_context_manager.invalidate() + search_context_manager.end() + + def tearDown(self): + self._close_context() + super().tearDown() + + def test_drain_dispatches_and_discards(self): + """_drain_search_context_to_async dispatches per model and clears the set in place.""" + self._open_context() + objects = search_context_manager._stack[-1][0] + for p in self.products: + objects.add((object(), p)) + self.assertEqual(len(objects), len(self.products)) + + # `dojo_dispatch_task` is re-imported at call time inside the helper, + # so patch at its definition site. + with patch("dojo.celery_dispatch.dojo_dispatch_task") as dispatch: + _drain_search_context_to_async(objects, source="test") + + self.assertEqual(dispatch.call_count, 1, "one batch dispatched (5 pks << batch size)") + _task, model_name, pk_list = dispatch.call_args.args + self.assertEqual(model_name, "dojo.product") + self.assertEqual(sorted(pk_list), sorted(p.pk for p in self.products)) + self.assertTrue(dispatch.call_args.kwargs.get("force_async")) + self.assertEqual(len(objects), 0, "drained entries must be discarded from the set") + + @override_settings(WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE=3) + def test_add_to_context_triggers_drain_at_threshold(self): + """Wrapped add_to_context fires the drain exactly when len(objects) >= threshold.""" + self._open_context() + objects = search_context_manager._stack[-1][0] + engine_marker = object() + + with patch("dojo.middleware._drain_search_context_to_async") as drain: + for p in self.products[:2]: + search_context_manager.add_to_context(engine_marker, p) + drain.assert_not_called() + + search_context_manager.add_to_context(engine_marker, self.products[2]) + # Third add brings set size to 3 (== threshold) → drain triggers. + drain.assert_called_once() + self.assertIs(drain.call_args.args[0], objects) + + @override_settings(WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE=0) + def test_threshold_zero_disables_drain(self): + """Threshold <= 0 short-circuits the hook regardless of set size.""" + self._open_context() + engine_marker = object() + + with patch("dojo.middleware._drain_search_context_to_async") as drain: + for p in self.products: + search_context_manager.add_to_context(engine_marker, p) + drain.assert_not_called() + + @override_settings(WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE=2) + def test_invalidated_context_skips_drain(self): + """If the search context is invalidated, add_to_context must not drain.""" + self._open_context() + search_context_manager.invalidate() + engine_marker = object() + + with patch("dojo.middleware._drain_search_context_to_async") as drain: + for p in self.products[:3]: + # add_to_context still records into the (now-invalid) set; the + # hook should detect the invalid flag and bail out. + search_context_manager.add_to_context(engine_marker, p) + drain.assert_not_called()