Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/content/releases/os_upgrading/2.59.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,9 @@ As announced in DefectDojo 2.57.0, the Stub Findings feature has been removed. T

Any requests to this endpoint will now return a 404 Not Found error. The Stub Findings UI is no longer available.

## Configuration change in Watson Search Indexing

In [PR 14881](https://github.com/DefectDojo/django-DefectDojo/pull/14881)We optimized the way the Django Watson search index is updated during imports and reimports. There is not a single configuration setting to manage the threshold: `DD_WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE`. The default value should work fine for most instances.


For more information, check the [Release Notes](https://github.com/DefectDojo/django-DefectDojo/releases/tag/2.59.0).
3 changes: 3 additions & 0 deletions dojo/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ def ready(self):
register_django_pghistory_models()
configure_audit_system()

from dojo.middleware import install_intermediate_flush_hook # noqa: PLC0415
install_intermediate_flush_hook()


def get_model_fields_with_extra(model, extra_fields=()):
return get_model_fields(get_model_default_fields(model), extra_fields)
Expand Down
3 changes: 3 additions & 0 deletions dojo/celery_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ def dojo_dispatch_task(task_or_sig: _SupportsSi | _SupportsApplyAsync | Signatur
- Inject `async_user_id` if missing.
- Capture and inject pghistory context if available.
- Respect `sync=True` (foreground execution) and user `block_execution`.
- Respect `force_async=True` (background execution even when the caller
would otherwise run synchronously, e.g. user has `block_execution`).
`force_async` wins over `sync` and `block_execution`.
- Support `countdown=<seconds>` for async dispatch.

Returns:
Expand Down
5 changes: 5 additions & 0 deletions dojo/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ def get_tasks(self):
def we_want_async(*args, func=None, **kwargs):
from dojo.utils import get_current_user # noqa: PLC0415 circular import

force_async = kwargs.get("force_async", False)
if force_async:
logger.debug("dojo_async_task %s: running task in the background as force_async=True has been found as kwarg", func)
return True

sync = kwargs.get("sync", False)
if sync:
logger.debug("dojo_async_task %s: running task in the foreground as sync=True has been found as kwarg", func)
Expand Down
128 changes: 71 additions & 57 deletions dojo/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,69 +243,83 @@ class AsyncSearchContextMiddleware(SearchContextMiddleware):
"""

def _close_search_context(self, request):
"""Override watson's close behavior to trigger async updates when above threshold."""
"""Override watson's close behavior to always dispatch index updates asynchronously."""
if search_context_manager.is_active():
from django.conf import settings # noqa: PLC0415 circular import

# Extract tasks and check if we should trigger async update
captured_tasks = self._extract_tasks_for_async()

# Get total number of instances across all model types
total_instances = sum(len(pk_list) for pk_list in captured_tasks.values())
threshold = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_THRESHOLD", 100)

# only needed when at least one model instance is updated
if total_instances > 0:
# If threshold is below 0, async updating is disabled
if threshold < 0:
logger.debug(f"AsyncSearchContextMiddleware: Async updating disabled (threshold={threshold}), using synchronous update")
elif total_instances > threshold:
logger.debug(f"AsyncSearchContextMiddleware: {total_instances} instances > {threshold} threshold, triggering async update")
self._trigger_async_index_update(captured_tasks)
# Invalidate to prevent synchronous index update by super()._close_search_context()
search_context_manager.invalidate()
else:
logger.debug(f"AsyncSearchContextMiddleware: {total_instances} instances <= {threshold} threshold, using synchronous update")
# Let watson handle synchronous update for small numbers
objects, _is_invalid = search_context_manager._stack[-1]
_drain_search_context_to_async(objects, source="AsyncSearchContextMiddleware")

# The set is now empty (or was already empty); watson's `end()` will
# bulk-save an empty iterator and short-circuit. No need to invalidate.
super()._close_search_context(request)

def _extract_tasks_for_async(self):
"""Extract tasks from the search context and group by model type for async processing."""
current_tasks, _is_invalid = search_context_manager._stack[-1]

# Group by model type for efficient batch processing
model_groups = {}
for _engine, obj in current_tasks:
model_key = f"{obj._meta.app_label}.{obj._meta.model_name}"
if model_key not in model_groups:
model_groups[model_key] = []
model_groups[model_key].append(obj.pk)

# Log what we extracted per model type
for model_key, pk_list in model_groups.items():
logger.debug(f"AsyncSearchContextMiddleware: Extracted {len(pk_list)} {model_key} instances for async indexing")
def _drain_search_context_to_async(objects, source):
"""
Group `objects` ({(engine, obj), ...}) by model, dispatch one
force_async celery task per WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE-sized
batch, and `set.discard()` the drained entries from `objects` in place.

`objects` is the `set` inside `search_context_manager._stack[-1][0]`.
Mutating it in place is safe because watson's `_stack` is `threading.local`
and callers (request close + the wrapped `add_to_context`) hold the
active reference.
"""
if not objects:
return

from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import
from dojo.tasks import update_watson_search_index_for_model # noqa: PLC0415 circular import

# Snapshot before grouping so we don't iterate while mutating.
snapshot = list(objects)
model_groups = {}
for _engine, obj in snapshot:
model_key = f"{obj._meta.app_label}.{obj._meta.model_name}"
model_groups.setdefault(model_key, []).append(obj.pk)

batch_size = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE", 1000)
for model_name, pk_list in model_groups.items():
batches = [pk_list[i:i + batch_size] for i in range(0, len(pk_list), batch_size)]
# force_async=True keeps indexing off the request path even for users
# with block_execution=True — index updates are slow and never need
# to be synchronous from the user's perspective.
for i, batch in enumerate(batches, 1):
logger.debug(f"{source}: Triggering batch {i}/{len(batches)} for {model_name}: {len(batch)} instances")
dojo_dispatch_task(update_watson_search_index_for_model, model_name, batch, force_async=True)

for entry in snapshot:
objects.discard(entry)


def install_intermediate_flush_hook():
"""
Wrap `watson.search.search_context_manager.add_to_context` with a
size-based flush. Once the per-request set reaches
`WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE`, drain it into async tasks
and clear it in place. Bounds memory on long-running requests
(large imports) and starts celery batches earlier instead of
dispatching all at end-of-request.

Idempotent — safe to call multiple times.
Setting WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE to 0 or below disables
the hook at runtime.
"""
cls = search_context_manager.__class__
if getattr(cls, "_dd_intermediate_flush_installed", False):
return

return model_groups
original_add = cls.add_to_context

def _trigger_async_index_update(self, model_groups):
"""Trigger async tasks to update search indexes, chunking large lists into batches of settings.WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE."""
if not model_groups:
def add_to_context_with_flush(self, engine, obj):
original_add(self, engine, obj)
threshold = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE", 1000)
if threshold <= 0 or not self._stack:
return
objects, is_invalid = self._stack[-1]
if is_invalid or len(objects) < threshold:
return
_drain_search_context_to_async(objects, source="AsyncSearchContextMiddleware[intermediate]")

# Import here to avoid circular import
from django.conf import settings # noqa: PLC0415 circular import

from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import
from dojo.tasks import update_watson_search_index_for_model # noqa: PLC0415 circular import

# Create tasks per model type, chunking large lists into configurable batches
for model_name, pk_list in model_groups.items():
# Chunk into batches using configurable batch size (compatible with Python 3.11)
batch_size = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE", 1000)
batches = [pk_list[i:i + batch_size] for i in range(0, len(pk_list), batch_size)]

# Create tasks for each batch and log each one
for i, batch in enumerate(batches, 1):
logger.debug(f"AsyncSearchContextMiddleware: Triggering batch {i}/{len(batches)} for {model_name}: {len(batch)} instances")
dojo_dispatch_task(update_watson_search_index_for_model, model_name, batch)
cls.add_to_context = add_to_context_with_flush
cls._dd_intermediate_flush_installed = True
logger.debug("AsyncSearchContextMiddleware: intermediate flush hook installed on %s", cls.__name__)
14 changes: 11 additions & 3 deletions dojo/settings/settings.dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,17 @@
DD_TAG_BULK_ADD_BATCH_SIZE=(int, 1000),
# Tagulous slug truncate unique setting. Set to -1 to use tagulous internal default (5)
DD_TAGULOUS_SLUG_TRUNCATE_UNIQUE=(int, -1),
# Minimum number of model updated instances before search index updates as performaed asynchronously. Set to -1 to disable async updates.
DD_WATSON_ASYNC_INDEX_UPDATE_THRESHOLD=(int, 10),
# Batch size for async watson search-index update tasks. Also doubles as
# the per-request intermediate-flush threshold: once the in-memory watson
# context reaches this many pending objects mid-request,
# AsyncSearchContextMiddleware flushes them to async celery tasks instead
# of waiting for end-of-request. Set to 0 (or negative) to disable the
# intermediate flush.
DD_WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE=(int, 1000),
# When True, the async watson indexer auto-derives select_related/prefetch_related
# paths from each adapter's `fields`/`store` to avoid N+1 queries during indexing.
# Falls back to a plain queryset on any error (logged).
DD_WATSON_INDEX_PREFETCH_ENABLED=(bool, True),
DD_FOOTER_VERSION=(str, ""),
# models should be passed to celery by ID, default is False (for now)
DD_DATABASE_ENGINE=(str, "django.db.backends.postgresql"),
Expand Down Expand Up @@ -869,8 +877,8 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param
CELERY_IMPORTS = ("dojo.tools.tool_issue_updater", )

# Watson async index update settings
WATSON_ASYNC_INDEX_UPDATE_THRESHOLD = env("DD_WATSON_ASYNC_INDEX_UPDATE_THRESHOLD")
WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE = env("DD_WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE")
WATSON_INDEX_PREFETCH_ENABLED = env("DD_WATSON_INDEX_PREFETCH_ENABLED")

# Celery beat scheduled tasks
CELERY_BEAT_SCHEDULE = {
Expand Down
9 changes: 7 additions & 2 deletions dojo/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ def update_watson_search_index_for_model(model_name, pk_list, *args, **kwargs):
"""
from watson.search import SearchContextManager, default_search_engine # noqa: PLC0415 circular import

from dojo.utils_watson_prefetch import build_indexing_queryset # noqa: PLC0415 circular import

logger.debug(f"Starting async watson index update for {len(pk_list)} {model_name} instances")

try:
Expand All @@ -194,8 +196,11 @@ def update_watson_search_index_for_model(model_name, pk_list, *args, **kwargs):
app_label, model_name = model_name.split(".")
model_class = apps.get_model(app_label, model_name)

# Bulk load instances and add them to the context
instances = model_class.objects.filter(pk__in=pk_list)
# Bulk load instances and add them to the context. The queryset auto-derives
# select_related/prefetch_related from the adapter's fields/store paths to
# avoid N+1 queries during indexing. Disable via DD_WATSON_INDEX_PREFETCH_ENABLED=False.
adapter = engine.get_adapter(model_class)
instances = build_indexing_queryset(model_class, pk_list, adapter)
instances_added = 0
instances_skipped = 0

Expand Down
133 changes: 133 additions & 0 deletions dojo/utils_watson_prefetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""
Query prefetch helper for the async watson search indexer.

Watson's `SearchAdapter._resolve_field` walks `__`-separated relation paths via
per-instance `getattr`, which triggers one query per FK hop per object during
indexing. For deep adapter `fields`/`store` paths (e.g.
`finding__test__engagement__product__name`) on a 1000-row batch this means
thousands of extra queries.

`build_prefetched_queryset` introspects the adapter paths against the model's
`_meta`, classifies each prefix as FK chain (`select_related`) or M2M / reverse
(`prefetch_related`), and applies them in a single query plan. On any failure
the caller is expected to fall back to the plain queryset — watson still works
correctly, just slower.

Toggle: ``settings.WATSON_INDEX_PREFETCH_ENABLED`` (default True).
"""

import logging

from django.core.exceptions import FieldDoesNotExist

logger = logging.getLogger(__name__)


def _classify_path(model, prefix):
"""
Walk a `__`-separated relation prefix against `model._meta`.

Returns
-------
"select" | "prefetch" | None
- "select": pure FK / OneToOne chain (safe for select_related).
- "prefetch": chain contains a many-to-many or reverse-many leg.
- None: unresolvable (callable on adapter, GenericForeignKey, typo, etc.) —
caller should drop this path.

"""
is_multi = False
current = model
for part in prefix.split("__"):
try:
field = current._meta.get_field(part)
except FieldDoesNotExist:
return None
if getattr(field, "many_to_many", False) or getattr(field, "one_to_many", False):
is_multi = True
related = getattr(field, "related_model", None)
if related is None:
# Reached a concrete field (e.g. CharField) — chain ends here. The
# caller passes the prefix without the leaf, so this should be rare.
return "prefetch" if is_multi else "select"
current = related
return "prefetch" if is_multi else "select"


def derive_relation_paths(model, adapter):
"""
Inspect adapter `fields` + `store` and return ``(select_paths, prefetch_paths)``.

Each entry is a relation prefix suitable for passing to
`QuerySet.select_related` / `QuerySet.prefetch_related`. Paths that cannot
be resolved against ``model._meta`` are dropped (watson will resolve them
at indexing time the slow way).
"""
select_paths = set()
prefetch_paths = set()

raw_paths = tuple(getattr(adapter, "fields", ()) or ()) + tuple(getattr(adapter, "store", ()) or ())
for path in raw_paths:
if "__" not in path:
continue
prefix = path.rsplit("__", 1)[0]
classification = _classify_path(model, prefix)
if classification == "select":
select_paths.add(prefix)
elif classification == "prefetch":
prefetch_paths.add(prefix)
# None: drop silently — adapter property/GFK, watson handles at runtime.

return select_paths, prefetch_paths


def build_indexing_queryset(model, pk_list, adapter):
"""
Build the queryset used by the async watson indexer.

Applies `select_related` / `prefetch_related` derived from the adapter when
``settings.WATSON_INDEX_PREFETCH_ENABLED`` is True (default). On any error
we log loudly and return the plain queryset so indexing still succeeds.
"""
from django.conf import settings # noqa: PLC0415 -- settings access at call time

base_qs = model.objects.filter(pk__in=pk_list)

if not getattr(settings, "WATSON_INDEX_PREFETCH_ENABLED", True):
logger.debug(
"WATSON_INDEX_PREFETCH_ENABLED=False, indexing %s with plain queryset",
model.__name__,
)
return base_qs

try:
select_paths, prefetch_paths = derive_relation_paths(model, adapter)
except Exception:
logger.exception(
"Watson prefetch path derivation failed for %s — falling back to plain queryset",
model.__name__,
)
return base_qs

if not select_paths and not prefetch_paths:
return base_qs

try:
qs = base_qs
if select_paths:
qs = qs.select_related(*select_paths)
if prefetch_paths:
qs = qs.prefetch_related(*prefetch_paths)
logger.debug(
"Watson indexing %s with select_related=%s prefetch_related=%s",
model.__name__, sorted(select_paths), sorted(prefetch_paths),
)
except Exception:
logger.exception(
"Watson prefetch application failed for %s (select=%s prefetch=%s) — "
"falling back to plain queryset",
model.__name__, sorted(select_paths), sorted(prefetch_paths),
)
return base_qs
else:
return qs
Loading
Loading