Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
5b301b7
feat(assessment): Implement L1 pipeline with topic relevance and dupl…
vprashrex May 27, 2026
b412829
feat(export): Expand output columns to include topic relevance and du…
vprashrex May 28, 2026
c12ac18
feat(post-processing): Implement post-processing configuration for as…
vprashrex May 31, 2026
c1791d5
feat(assessment): Enhance attachment handling in L1 pipeline with mix…
vprashrex Jun 2, 2026
97651d2
Merge branch 'main' into feat/assessment-pipeline-l1
vprashrex Jun 2, 2026
98acf86
feat(tests): update assessment run status to 'l2_processing' and refa…
vprashrex Jun 2, 2026
0addb71
refactor(tests): streamline patching of run_assessment_run in TestSta…
vprashrex Jun 2, 2026
ad8e29f
feat(tests): add comprehensive tests for L1 duplicate detection and p…
vprashrex Jun 2, 2026
e020717
feat: implement prefilter pipeline with topic relevance and duplicate…
vprashrex Jun 2, 2026
4a4e4f8
Refactor assessment tests and add new functionality
vprashrex Jun 4, 2026
bb30f88
feat: refactor assessment prefilter configuration and enhance pipelin…
vprashrex Jun 4, 2026
e89f1f2
feat: enhance error handling in assessment pipeline and improve attac…
vprashrex Jun 4, 2026
87ee6a5
feat: add error handling for deterministic failures in assessment eva…
vprashrex Jun 4, 2026
61798b6
feat: update assessment prefilter constants for provider and model co…
vprashrex Jun 4, 2026
d4d88a2
feat: enhance assessment tests with additional attachment handling an…
vprashrex Jun 4, 2026
827547d
feat: improve test readability by formatting patch calls in assessmen…
vprashrex Jun 4, 2026
8bd54a7
feat: add commented alternative model and duplicate store configurati…
vprashrex Jun 4, 2026
26c4230
Merge branch 'main' into feat/assessment-pipeline-l1
vprashrex Jun 4, 2026
7345d0b
Merge branch 'main' into feat/assessment-pipeline-l1
vprashrex Jun 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Add L1 pipeline columns to assessment_run

Revision ID: 064
Revises: 063
Create Date: 2026-05-27 00:00:00.000000

"""

import sqlalchemy as sa
from alembic import op

revision = "064"
down_revision = "063"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column(
"assessment_run",
sa.Column(
"l1_object_store_url",
sa.String(),
nullable=True,
comment="S3 URL of stored L1 filter results JSON",
),
)
op.add_column(
"assessment_run",
sa.Column(
"l1_total_rows",
sa.Integer(),
nullable=True,
comment="Total rows fed into L1 pipeline",
),
)
op.add_column(
"assessment_run",
sa.Column(
"l1_total_passed",
sa.Integer(),
nullable=True,
comment="Rows that passed topic relevance and went to L2",
),
)
op.add_column(
"assessment_run",
sa.Column(
"l1_total_rejected",
sa.Integer(),
nullable=True,
comment="Rows rejected by topic relevance, stopped at L1",
),
)


def downgrade() -> None:
op.drop_column("assessment_run", "l1_total_rejected")
op.drop_column("assessment_run", "l1_total_passed")
op.drop_column("assessment_run", "l1_total_rows")
op.drop_column("assessment_run", "l1_object_store_url")
15 changes: 15 additions & 0 deletions backend/app/api/docs/assessment/update_post_processing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Save post-processing config for a single assessment run.

Stores the config inside the run's `input` JSON blob (key
`post_processing_config`). It is applied at export/preview time and never
re-runs the LLM, so it can be edited after the run completes.

The config has three optional sections:

- `computed_columns`: derived columns from formulas, e.g.
`{"name": "Total_Score", "formula": "@Novelty_score + @Usefulness_score"}`.
Formulas reference columns with `@` and support `+ - * /` and parentheses.
- `filter`: row filters combined with AND logic.
- `sort`: sort rules applied in priority order.

Pass `null` (or an empty body) to clear post-processing for the run.
42 changes: 40 additions & 2 deletions backend/app/api/routes/assessment/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from typing import Any, Literal

from fastapi import APIRouter, Depends, Query
from fastapi import APIRouter, Body, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse

from app.api.deps import AuthContextDep, SessionDep
Expand All @@ -12,6 +12,7 @@
get_assessment_by_id,
get_assessment_run_by_id as get_run_by_id,
list_assessment_runs as list_runs,
update_run_post_processing_config,
)
from app.models.assessment import (
Assessment,
Expand All @@ -33,6 +34,7 @@
load_export_rows_for_run,
sort_export_rows,
)
from app.services.assessment.utils.post_processing import apply_post_processing
from app.utils import APIResponse, load_description

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -65,6 +67,10 @@ def _build_run_public(
total_items=run.total_items,
error_message=run.error_message,
input=run.input,
l1_total_rows=run.l1_total_rows,
l1_total_passed=run.l1_total_passed,
l1_total_rejected=run.l1_total_rejected,
post_processing_config=(run.input or {}).get("post_processing_config"),
inserted_at=run.inserted_at,
updated_at=run.updated_at,
)
Expand Down Expand Up @@ -212,12 +218,44 @@ def export_assessment_run_results(
)
)

post_processing_config = (run.input or {}).get("post_processing_config") or None
base_label = assessment.experiment_name if assessment else f"run_{run.id}"

if export_format != "json":
return build_export_response(
export_rows=export_rows,
export_format=export_format,
base_name=f"{base_label}_run_{run.id}_results",
post_processing_config=post_processing_config,
)

return APIResponse.success_response(data=build_json_export_rows(export_rows))
rows = build_json_export_rows(export_rows)
rows = apply_post_processing(rows, post_processing_config)
return APIResponse.success_response(data=rows)


@router.patch(
"/runs/{run_id}/post-processing",
description=load_description("assessment/update_post_processing.md"),
response_model=APIResponse[AssessmentRunPublic],
dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))],
)
def update_post_processing(
run_id: int,
session: SessionDep,
auth_context: AuthContextDep,
config: dict[str, Any] | None = Body(default=None),
) -> APIResponse[AssessmentRunPublic]:
"""Save post-processing config (computed columns, sort, filter) for a run."""
run = get_run_by_id(
session=session,
run_id=run_id,
organization_id=auth_context.organization_.id,
project_id=auth_context.project_.id,
)
if run is None:
raise HTTPException(status_code=404, detail="Run not found")

run = update_run_post_processing_config(session=session, run=run, config=config)

return APIResponse.success_response(data=_build_run_public(session, run))
24 changes: 24 additions & 0 deletions backend/app/celery/tasks/job_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,30 @@ def run_tts_batch_submission(
)


@celery_app.task(
bind=True, queue="low_priority", priority=1, soft_time_limit=1800, time_limit=2100
)
def run_assessment_run(
Comment thread
vprashrex marked this conversation as resolved.
Outdated
self,
run_id: int,
organization_id: int,
project_id: int,
trace_id: str,
**kwargs,
):
Comment on lines +237 to +244
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Don't silently swallow extra task kwargs here.

run_assessment_pipeline never forwards **kwargs, so unexpected task arguments are accepted and dropped instead of failing fast. This segment also misses the required type annotations on the new function signature.

Suggested fix
 def run_assessment_pipeline(
-    self,
+    self: celery.Task,
     run_id: int,
     organization_id: int,
     project_id: int,
     trace_id: str,
-    **kwargs,
-):
+) -> None:

As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def run_assessment_pipeline(
self,
run_id: int,
organization_id: int,
project_id: int,
trace_id: str,
**kwargs,
):
def run_assessment_pipeline(
self: celery.Task,
run_id: int,
organization_id: int,
project_id: int,
trace_id: str,
) -> None:
🧰 Tools
🪛 Ruff (0.15.15)

[warning] 243-243: Unused function argument: kwargs

(ARG001)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/celery/tasks/job_execution.py` around lines 237 - 244, The
function run_assessment_pipeline currently accepts but silently drops **kwargs
and lacks type annotations; update its signature to include proper type hints
(e.g., run_id: int, organization_id: int, project_id: int, trace_id: str,
**kwargs: Any) and add a return type (likely -> None). Then either forward
**kwargs into the downstream pipeline invocation inside run_assessment_pipeline
(so extra task args are preserved) or explicitly validate/raise (TypeError) for
unexpected keys if they must not be accepted; ensure you import Any from typing
and apply the change where run_assessment_pipeline is defined.

from app.services.assessment.tasks import execute_assessment_run

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_assessment_run(
run_id=run_id,
organization_id=organization_id,
project_id=project_id,
),
)

Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

@celery_app.task(bind=True, queue="low_priority", priority=1)
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_tts_result_processing")
def run_tts_result_processing(
Expand Down
7 changes: 7 additions & 0 deletions backend/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ def AWS_S3_BUCKET(self) -> str:
DOC_TRANSFORMATION_PENDING_THRESHOLD_MINUTES: int = 30
PENDING_JOB_QUERY_TIMEOUT_MS: int = 1000

# Assessment
ASSESSMENT_L1_GEMINI_MODEL: str = "gemini-3.1-flash-lite"
ASSESSMENT_L1_CONCURRENT_WORKERS: int = 8
ASSESSMENT_L1_DUPLICATE_STORE_NAME: str = (
Comment thread
vprashrex marked this conversation as resolved.
Outdated
"fileSearchStores/inquilabcorpus-782mxjcwisaz"
)

@computed_field # type: ignore[prop-decorator]
@property
def COMPUTED_CELERY_WORKER_CONCURRENCY(self) -> int:
Expand Down
4 changes: 4 additions & 0 deletions backend/app/crud/assessment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
list_assessment_runs,
list_assessments,
recompute_assessment_status,
update_assessment_run_l1_stats,
update_assessment_run_status,
update_run_post_processing_config,
)
from app.crud.assessment.dataset import (
create_assessment_dataset,
Expand Down Expand Up @@ -42,5 +44,7 @@
"list_assessment_datasets",
"list_assessments",
"recompute_assessment_status",
"update_assessment_run_l1_stats",
"update_assessment_run_status",
"update_run_post_processing_config",
]
90 changes: 23 additions & 67 deletions backend/app/crud/assessment/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,8 @@
normalize_llm_text,
)
from app.services.assessment.utils.attachments import (
build_gemini_attachment_parts,
resolve_attachment_values,
resolve_image_mime_and_payload,
split_attachment_urls,
split_data_url,
to_direct_attachment_url,
)
from app.services.llm.providers.registry import LLMProvider

Expand Down Expand Up @@ -161,6 +158,7 @@ def build_openai_jsonl(
attachments: list[AssessmentAttachment],
prompt_template: str | None,
openai_params: dict,
row_indices: list[int] | None = None,
) -> list[dict[str, Any]]:
"""Build OpenAI batch JSONL data from dataset rows.

Expand All @@ -173,8 +171,11 @@ def build_openai_jsonl(
}
"""
jsonl_data = []
# Memoize per-item type probes across all rows in this build.
type_cache: dict[str, str] = {}

for idx, row in enumerate(rows):
for i, row in enumerate(rows):
idx = row_indices[i] if row_indices is not None else i
Comment thread
vprashrex marked this conversation as resolved.
Outdated
# Build input array
input_parts: list[dict[str, Any]] = []

Expand All @@ -186,7 +187,7 @@ def build_openai_jsonl(
# Attachments
for att in attachments:
cell_value = row.get(att.column, "")
input_parts.extend(resolve_attachment_values(cell_value, att))
input_parts.extend(resolve_attachment_values(cell_value, att, type_cache))
Comment thread
vprashrex marked this conversation as resolved.
Outdated

if not input_parts:
logger.warning("[build_openai_jsonl] Skipping empty row | idx=%s", idx)
Expand Down Expand Up @@ -219,6 +220,7 @@ def build_google_jsonl(
attachments: list[AssessmentAttachment],
prompt_template: str | None,
google_params: dict,
row_indices: list[int] | None = None,
) -> list[dict[str, Any]]:
"""Build Google (Gemini) batch JSONL data from dataset rows.

Expand All @@ -229,8 +231,11 @@ def build_google_jsonl(
}
"""
jsonl_data = []
# Memoize per-item type probes across all rows in this build.
type_cache: dict[str, str] = {}

for idx, row in enumerate(rows):
for i, row in enumerate(rows):
idx = row_indices[i] if row_indices is not None else i
parts: list[dict[str, Any]] = []

# Text prompt
Expand All @@ -240,64 +245,8 @@ def build_google_jsonl(

# Attachments (Gemini uses file_data for inline content)
for att in attachments:
cell_value = row.get(att.column, "").strip()
if not cell_value:
continue

cell_values = (
split_attachment_urls(cell_value)
if att.format == "url"
else [cell_value]
)

for item_value in cell_values:
normalized_value = (
to_direct_attachment_url(item_value, att.type)
if att.format == "url"
else item_value
)
if att.type == "image":
mime_type, payload = resolve_image_mime_and_payload(
normalized_value,
att.format,
)
if att.format == "url":
parts.append(
{
"fileData": {
"mimeType": mime_type,
"fileUri": normalized_value,
}
}
)
else:
parts.append(
{
"inlineData": {
"mimeType": mime_type,
"data": payload,
}
}
)
elif att.type == "pdf":
if att.format == "url":
parts.append(
{
"fileData": {
"mimeType": "application/pdf",
"fileUri": normalized_value,
}
}
)
else:
parts.append(
{
"inlineData": {
"mimeType": "application/pdf",
"data": split_data_url(normalized_value)[1],
}
}
)
cell_value = row.get(att.column, "")
parts.extend(build_gemini_attachment_parts(cell_value, att, type_cache))

if not parts:
logger.warning("[build_google_jsonl] Skipping empty row | idx=%s", idx)
Expand Down Expand Up @@ -349,6 +298,8 @@ def submit_assessment_batch(
assessment_input: dict[str, Any],
organization_id: int,
project_id: int,
preloaded_rows: list[dict[str, str]] | None = None,
row_indices: list[int] | None = None,
) -> BatchJob:
"""Build JSONL and submit a batch for one assessment run.

Expand All @@ -371,8 +322,11 @@ def submit_assessment_batch(
output_schema = assessment_input.get("output_schema")
attachments = [AssessmentAttachment(**a) for a in attachments_raw]

# Load dataset rows
rows = _load_dataset_rows(session, dataset)
# Use preloaded rows (post-L1 filtered) if provided, else load from dataset.
if preloaded_rows is not None:
rows = preloaded_rows
else:
rows = _load_dataset_rows(session, dataset)
if not rows:
raise ValueError(f"Dataset {dataset.id} has no rows")

Expand Down Expand Up @@ -412,6 +366,7 @@ def submit_assessment_batch(
attachments=attachments,
prompt_template=prompt_template,
openai_params=mapped_params,
row_indices=row_indices,
)

# Get OpenAI client and submit
Expand Down Expand Up @@ -452,6 +407,7 @@ def submit_assessment_batch(
attachments=attachments,
prompt_template=prompt_template,
google_params=mapped_params,
row_indices=row_indices,
)

# Get Gemini client and submit
Expand Down
Loading
Loading