Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions workers/executor/executors/legacy_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,10 +619,12 @@ def _failure(child_result: ExecutionResult) -> ExecutionResult:
)
step = 1

extraction_metrics: dict = {}
try:
# ---- Step 1: Extract ----
if not skip_extraction:
step += 1
extraction_start = time.monotonic()
extract_ctx = ExecutionContext(
executor_name=context.executor_name,
operation=Operation.EXTRACT.value,
Expand All @@ -640,6 +642,9 @@ def _failure(child_result: ExecutionResult) -> ExecutionResult:
return _failure(extract_result)
_absorb(extract_result)
extracted_text = extract_result.data.get(IKeys.EXTRACTED_TEXT, "")
extraction_metrics = {
"extraction": {"time_taken(s)": time.monotonic() - extraction_start}
}
Comment on lines +645 to +647

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid top-level metrics key collision with user prompt names

"extraction" is currently injected as a normal top-level metrics key (Line [645]), but top-level metrics keys are also used for output/prompt names. If a user defines an output named "extraction", the merge at Line [804] will silently co-mingle unrelated metrics under the same key.

Use a reserved namespace for pipeline-level metrics (e.g., metrics["_pipeline"]["extraction"]) or enforce/reserve "extraction" as a disallowed output name before merge.

Also applies to: 804-811

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@workers/executor/executors/legacy_executor.py` around lines 645 - 647, The
current extraction_metrics dict uses a top-level key "extraction" which can
collide with user-defined output/prompt names when merged later; update this to
use a reserved pipeline namespace (e.g., set extraction_metrics = {"_pipeline":
{"extraction": {"time_taken(s)": ...}}}) and then merge into the main metrics
map so pipeline-level metrics live under metrics["_pipeline"]; adjust the merge
logic where metrics and extraction_metrics are combined (the existing merge
around the metrics variable at lines ~804-811) to preserve the "_pipeline"
namespace, or alternatively add a pre-merge check to disallow user outputs named
"extraction" if you prefer the blocking approach. Ensure you update references
to extraction_metrics and the merge operation accordingly.


# ---- Step 2: Summarize (if enabled) ----
if is_summarization:
Expand Down Expand Up @@ -700,6 +705,7 @@ def _failure(child_result: ExecutionResult) -> ExecutionResult:
source_file_name=source_file_name,
extracted_text=extracted_text,
index_metrics=index_metrics,
extraction_metrics=extraction_metrics,
)

output_map = structured_output.get(PSKeys.OUTPUT, {}) or {}
Expand Down Expand Up @@ -787,17 +793,21 @@ def _finalize_pipeline_result(
source_file_name: str,
extracted_text: str,
index_metrics: dict,
extraction_metrics: dict | None = None,
) -> None:
"""Populate metadata/metrics in structured_output after pipeline completion."""
if "metadata" not in structured_output:
structured_output["metadata"] = {}
structured_output["metadata"]["file_name"] = source_file_name
if extracted_text:
structured_output["metadata"]["extracted_text"] = extracted_text
if index_metrics:
new_metrics = self._merge_pipeline_metrics(
index_metrics or {}, extraction_metrics or {}
)
if new_metrics:
existing_metrics = structured_output.get("metrics", {})
structured_output["metrics"] = self._merge_pipeline_metrics(
existing_metrics, index_metrics
existing_metrics, new_metrics
)

def _run_pipeline_summarize(
Expand Down
Loading