Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions backend/app/alembic/versions/064_add_run_mode_to_evaluation_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""add run_mode column and unique run-name constraint to evaluation_run

Revision ID: 064
Revises: 063
Create Date: 2026-05-20 00:00:00.000000

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "064"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

increase the revision to 65 .. since revision 64 already exist

down_revision = "063"
branch_labels = None
depends_on = None

disable_per_migration_transaction = True

_UNIQUE_INDEX = "uq_evaluation_run_org_project_run_name"
_UNIQUE_CONSTRAINT = "uq_evaluation_run_org_project_run_name"


def upgrade():
# 1. Add run_mode as nullable first so existing rows are backfilled by the
# server default, then tighten to NOT NULL. The server default is left
# in place as a safety net.
with op.get_context().autocommit_block():
op.add_column(
"evaluation_run",
sa.Column(
"run_mode",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: can it be enum ? ['batch', 'fast_mode'] .. just suggestion

sa.String(length=10),
nullable=True,
server_default=sa.text("'batch'"),
comment="Execution mode: batch or fast",
),
)
op.execute("ALTER TABLE evaluation_run ALTER COLUMN run_mode SET NOT NULL")

# 2. Dedupe existing rows before adding the unique constraint.
# Keep the lowest-id row for each (organization_id, project_id,
# run_name) tuple and remove the rest.
with op.get_context().autocommit_block():
op.execute(
"""
DELETE FROM evaluation_run
WHERE id IN (
SELECT id
FROM (
SELECT id,
ROW_NUMBER() OVER (
PARTITION BY organization_id, project_id, run_name
ORDER BY id ASC
) AS rn
FROM evaluation_run
) sub
WHERE rn > 1
)
"""
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

# 3. Build the unique index CONCURRENTLY so the scan does not take an
# AccessExclusiveLock, then attach it as a named constraint via
# ADD CONSTRAINT ... USING INDEX (brief catalog-only lock).
with op.get_context().autocommit_block():
op.execute(
f"CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "
f'"{_UNIQUE_INDEX}" '
f"ON evaluation_run (organization_id, project_id, run_name)"
)
op.execute(
f"ALTER TABLE evaluation_run "
f'ADD CONSTRAINT "{_UNIQUE_CONSTRAINT}" '
f'UNIQUE USING INDEX "{_UNIQUE_INDEX}"'
)


def downgrade():
# Reverse in opposite order to upgrade().
op.execute(
f"ALTER TABLE evaluation_run "
f'DROP CONSTRAINT IF EXISTS "{_UNIQUE_CONSTRAINT}"'
)
with op.get_context().autocommit_block():
op.execute(f'DROP INDEX CONCURRENTLY IF EXISTS "{_UNIQUE_INDEX}"')
op.drop_column("evaluation_run", "run_mode")
42 changes: 34 additions & 8 deletions backend/app/api/docs/evaluation/create_evaluation.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
Start an evaluation run using the OpenAI Batch API.
Start an evaluation run against a stored dataset.

Evaluations allow you to systematically test LLM configurations against
predefined datasets with automatic progress tracking and result collection.
Two execution modes are supported via the optional `run_mode` field:

* `batch` (default) — submits the work to the OpenAI Batch API. Cost-efficient
for large datasets; turnaround can take up to 24 hours.
* `fast` — runs the evaluation synchronously through the OpenAI Responses API
and returns results within seconds-to-minutes. Restricted to text
evaluations on datasets with at most `EVAL_FAST_MAX_UNIQUE_ROWS` unique rows.

**Key Features:**
* Fetches dataset items from Langfuse and creates a batch processing job via the OpenAI Batch API
* Asynchronous processing with automatic progress tracking (checks every 60s)
* Fetches dataset items from Langfuse and creates a job (batch or fast)
* Uses a stored config (created via `/configs`) to define the provider parameters
* Stores results for comparison and analysis
* Use `GET /evaluations/{evaluation_id}` to monitor progress and retrieve results
* Same scoring semantics across both modes — cosine similarity, Langfuse traces,
and optional LLM-as-Judge correctness
* Use `GET /evaluations/{evaluation_id}` to monitor progress and retrieve results;
the response carries `run_mode` so clients can tell the two paths apart

## Example
## Example (batch — default)

```json
{
Expand All @@ -20,3 +26,23 @@ predefined datasets with automatic progress tracking and result collection.
"config_version": 1
}
```

## Example (fast)

```json
{
"dataset_id": 123,
"experiment_name": "may19-temp0.2-gpt4o-fast",
"config_id": "f54f0d67-4817-4103-9fdf-b74b3d46733e",
"config_version": 1,
"run_mode": "fast"
}
```

## Fast-mode error responses

| Status | Code | When |
| --- | --- | --- |
| 422 | `config_type_unsupported` | Resolved config is not a text-evaluation config |
| 422 | `dataset_too_large_for_fast` | Dataset exceeds `EVAL_FAST_MAX_UNIQUE_ROWS` unique rows |
| 409 | `run_name_already_exists` | A run with the same `experiment_name` already exists for this (organization, project) |
Comment thread
coderabbitai[bot] marked this conversation as resolved.
9 changes: 8 additions & 1 deletion backend/app/api/docs/evaluation/list_datasets.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
List all datasets for the current organization and project.

Returns a paginated list of datasets ordered by most recent first. Each dataset includes metadata (ID, name, item counts, duplication factor), Langfuse integration details, and object store URL.
Returns a paginated list of datasets ordered by most recent first. Each dataset includes metadata (ID, name, item counts, duplication factor), Langfuse integration details, object store URL, and an `eligible_for_fast` flag that is `true` when the dataset's unique-row count is within `EVAL_FAST_MAX_UNIQUE_ROWS` (and so can be used with `run_mode="fast"` on `POST /evaluations`).

## Query parameters

| Parameter | Description |
| --- | --- |
| `limit` / `offset` | Pagination (default 50 / 0; max limit 100) |
| `eligible_for` | If set to `fast`, the response is filtered to only datasets where `eligible_for_fast` is `true` |
32 changes: 27 additions & 5 deletions backend/app/api/routes/evaluations/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@
from app.core.cloud import get_cloud_storage
from app.crud.evaluations import (
get_dataset_by_id,
)
from app.crud.evaluations import (
list_datasets as list_evaluation_datasets,
)
from app.crud.evaluations.dataset import delete_dataset as delete_dataset_crud
from app.models.evaluation import DatasetUploadResponse, EvaluationDataset
from app.services.evaluations import (
upload_dataset as upload_evaluation_dataset,
is_dataset_fast_eligible,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This oneliner check needn't be wrapped around a function

validate_csv_file,
)
from app.services.evaluations import (
upload_dataset as upload_evaluation_dataset,
)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please combine these into a single import block

from app.utils import (
APIResponse,
load_description,
Expand All @@ -39,13 +44,15 @@ def _dataset_to_response(
dataset: EvaluationDataset, signed_url: str | None = None
) -> DatasetUploadResponse:
"""Convert a dataset model to a DatasetUploadResponse."""
original_items = dataset.dataset_metadata.get("original_items_count", 0)
return DatasetUploadResponse(
dataset_id=dataset.id,
dataset_name=dataset.name,
description=dataset.description,
total_items=dataset.dataset_metadata.get("total_items_count", 0),
original_items=dataset.dataset_metadata.get("original_items_count", 0),
original_items=original_items,
duplication_factor=dataset.dataset_metadata.get("duplication_factor", 1),
eligible_for_fast=is_dataset_fast_eligible(original_items_count=original_items),
langfuse_dataset_id=dataset.langfuse_dataset_id,
object_store_url=dataset.object_store_url,
signed_url=signed_url,
Expand Down Expand Up @@ -104,6 +111,15 @@ def list_datasets(
default=50, ge=1, le=100, description="Maximum number of datasets to return"
),
offset: int = Query(default=0, ge=0, description="Number of datasets to skip"),
eligible_for: str
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is taking only one enum type . So instead of typecasting to bool in line no 131, we can have a single queryParam eligible_for_fast to mirror the DatasetUploadResponse

| None = Query(
default=None,
description=(
"If 'fast', return only datasets eligible for run_mode='fast' "
"(unique-row count within EVAL_FAST_MAX_UNIQUE_ROWS)."
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: description not required, the queryParam is self-explanatory

),
enum=["fast"],
),
) -> APIResponse[list[DatasetUploadResponse]]:
"""List evaluation datasets."""
datasets = list_evaluation_datasets(
Expand All @@ -114,9 +130,15 @@ def list_datasets(
offset=offset,
)

return APIResponse.success_response(
data=[_dataset_to_response(dataset) for dataset in datasets]
)
dataset_responses = [_dataset_to_response(dataset) for dataset in datasets]
if eligible_for == "fast":
dataset_responses = [
dataset_response
for dataset_response in dataset_responses
if dataset_response.eligible_for_fast
]

return APIResponse.success_response(data=dataset_responses)


@router.get(
Expand Down
30 changes: 28 additions & 2 deletions backend/app/api/routes/evaluations/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
from uuid import UUID

from asgi_correlation_id import correlation_id
from fastapi import (
APIRouter,
Body,
Expand All @@ -12,13 +13,14 @@
)

from app.api.deps import AuthContextDep, SessionDep
from app.api.permissions import Permission, require_permission
from app.crud.evaluations import list_evaluation_runs as list_evaluation_runs_crud
from app.crud.evaluations.core import group_traces_by_question_id
from app.models.evaluation import EvaluationRunPublic
from app.api.permissions import Permission, require_permission
from app.models.evaluation import EvaluationRunPublic, RunModeEnum
from app.services.evaluations import (
get_evaluation_with_scores,
start_evaluation,
validate_and_start_fast_evaluation,
)
from app.utils import (
APIResponse,
Expand All @@ -45,8 +47,32 @@ def evaluate(
),
config_id: UUID = Body(..., description="Stored config ID"),
config_version: int = Body(..., ge=1, description="Stored config version"),
run_mode: RunModeEnum = Body(
default=RunModeEnum.BATCH,
description="Execution mode: 'batch' (default) or 'fast'",
),
) -> APIResponse[EvaluationRunPublic]:
"""Start an evaluation run."""
logger.info(
f"[evaluate] Starting evaluation | run_mode={run_mode.value} | "
f"experiment_name={experiment_name} | dataset_id={dataset_id} | "
f"org_id={auth_context.organization_.id} | "
f"project_id={auth_context.project_.id}"
)

if run_mode == RunModeEnum.FAST:
eval_run = validate_and_start_fast_evaluation(
session=session,
dataset_id=dataset_id,
run_name=experiment_name,
config_id=config_id,
config_version=config_version,
organization_id=auth_context.organization_.id,
project_id=auth_context.project_.id,
trace_id=correlation_id.get() or "N/A",
)
return APIResponse.success_response(data=eval_run)

eval_run = start_evaluation(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: method name can be changed to validate_and_start_batch_evaluation for consistency

session=session,
dataset_id=dataset_id,
Expand Down
7 changes: 7 additions & 0 deletions backend/app/celery/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def initialize_worker(**_) -> None:
include=[
"app.celery.tasks.job_execution",
"app.celery.tasks.notifications",
"app.celery.tasks.evaluation_fast",
],
)

Expand All @@ -186,6 +187,12 @@ def initialize_worker(**_) -> None:
routing_key="low",
queue_arguments={"x-max-priority": 1},
),
Queue(
"evaluations",
exchange=default_exchange,
routing_key="evaluations",
queue_arguments={"x-max-priority": 6},
),
Queue("cron", exchange=default_exchange, routing_key="cron"),
Queue("default", exchange=default_exchange, routing_key="default"),
),
Expand Down
51 changes: 51 additions & 0 deletions backend/app/celery/tasks/evaluation_fast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Celery task for the synchronous (fast) text-evaluation pipeline.

This module hosts the single orchestrator task per fast evaluation run. The
heavy lifting lives in `app/services/evaluations/fast.py`; this task is a thin
shim that sets the correlation id, attaches the OTel parent context, and
delegates.

See `Fast Evaluation SRD.md` for the design (queue, retries, idempotency).
"""

import logging

from celery import current_task

from app.celery.celery_app import celery_app
from app.celery.tasks.job_execution import _run_with_otel_parent, _set_trace
from app.celery.utils import gevent_timeout
from app.core.config import settings

logger = logging.getLogger(__name__)


@celery_app.task(bind=True, queue="evaluations", priority=6)
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_evaluation_fast")
def run_evaluation_fast(
self, eval_run_id: int, trace_id: str = "N/A", **kwargs
) -> None:
Comment thread
coderabbitai[bot] marked this conversation as resolved.
"""Run the fast evaluation pipeline for one EvaluationRun.

Idempotency: each stage is skipped on retry when its `batch_job` marker is
already set on the EvaluationRun, so Celery redelivery never re-calls
OpenAI for work that already succeeded.

Args:
eval_run_id: ID of the EvaluationRun (run_mode="fast").
trace_id: Correlation id from the enqueueing request, propagated into
the worker for log correlation.
"""
from app.services.evaluations.fast import execute_fast_evaluation

_set_trace(trace_id)
logger.info(
f"[run_evaluation_fast] Starting fast evaluation task | "
f"eval_run_id={eval_run_id} | task_id={current_task.request.id}"
)

return _run_with_otel_parent(
self,
lambda: execute_fast_evaluation(eval_run_id=eval_run_id),
)
16 changes: 16 additions & 0 deletions backend/app/celery/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,22 @@ def start_tts_result_processing(
return task_id


def start_fast_evaluation(eval_run_id: int, trace_id: str = "N/A") -> str:
"""Enqueue the run_evaluation_fast orchestrator task for one EvaluationRun."""
from app.celery.tasks.evaluation_fast import run_evaluation_fast

task_id = _enqueue_with_trace_context(
run_evaluation_fast,
eval_run_id=eval_run_id,
trace_id=trace_id,
)
logger.info(
f"[start_fast_evaluation] Enqueued fast eval | "
f"eval_run_id={eval_run_id} | task_id={task_id}"
)
return task_id


def get_task_status(task_id: str) -> Dict[str, Any]:
result = AsyncResult(task_id)
return {
Expand Down
6 changes: 6 additions & 0 deletions backend/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ def AWS_S3_BUCKET(self) -> str:
DOC_TRANSFORMATION_PENDING_THRESHOLD_MINUTES: int = 30
PENDING_JOB_QUERY_TIMEOUT_MS: int = 1000

# Fast evaluation (run_mode="fast") configuration.
# See "Fast Evaluation SRD.md" for the full design rationale.
EVAL_FAST_MAX_UNIQUE_ROWS: int = 10
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These three env variables could be inside a constants.py or something similar cause ideally .env should contain variables related to infra/devops.

EVAL_FAST_FAILURE_THRESHOLD: float = 0.5
EVAL_FAST_API_CONCURRENCY: int = 10

@computed_field # type: ignore[prop-decorator]
@property
def COMPUTED_CELERY_WORKER_CONCURRENCY(self) -> int:
Expand Down
Loading
Loading