Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1eabb00
default file size and addding documentation
nishika26 Apr 17, 2026
7f5d86f
default file size and addding documentation
nishika26 Apr 17, 2026
fa853c3
Merge branch 'main' into bug/file_size_default
nishika26 Apr 17, 2026
8e3d29d
coderabbit reviews
nishika26 Apr 17, 2026
bed1d1a
test cases failing
nishika26 Apr 17, 2026
d02bac8
changing the logic
nishika26 Apr 17, 2026
8b7556c
fixing test cases
nishika26 Apr 17, 2026
ca6c17e
Merge branch 'main' into enhancement/collection_batching
nishika26 May 5, 2026
baaeac2
adding alembic file
nishika26 May 5, 2026
47525e4
adding logic to the pr
nishika26 May 5, 2026
0720a17
Merge branch 'main' into enhancement/collection_batching
nishika26 May 5, 2026
2a2e268
pushing few changes
nishika26 May 6, 2026
eb38c73
Merge branch 'main' into enhancement/collection_batching
nishika26 May 7, 2026
0d7f4a1
adding test cases
nishika26 May 9, 2026
fb4e874
Merge branch 'main' into enhancement/collection_batching
nishika26 May 10, 2026
290d5f0
fixing test cases
nishika26 May 10, 2026
9f95a76
increasing test cases
nishika26 May 10, 2026
cb97654
incrreading test coverage
nishika26 May 10, 2026
fd37d14
coderabbit reviews
nishika26 May 11, 2026
510912c
removing assistant
nishika26 May 13, 2026
954f1ed
Merge branch 'main' into enhancement/collection_batching
nishika26 May 13, 2026
4a492da
fixing test cases
nishika26 May 14, 2026
305e3ef
Merge branch 'main' into enhancement/collection_batching
nishika26 May 14, 2026
979f756
fixing alembic migration
nishika26 May 14, 2026
0e027c2
test cases fix
nishika26 May 14, 2026
3cc2312
Merge branch 'main' into enhancement/collection_batching
nishika26 May 15, 2026
a8878cb
left docs and adding openai file deletion
nishika26 May 15, 2026
8f83837
Merge branch 'main' into enhancement/collection_batching
vprashrex May 28, 2026
45c8a5a
fix(openai): improve error handling for failed file uploads in vector…
vprashrex May 29, 2026
dee64d4
refactor(tests): replace get_assistant_collection with get_vector_sto…
vprashrex May 29, 2026
afe321c
refactor(collection): remove llm_service_id from CollectionPublic model
vprashrex May 29, 2026
02207cb
fix(openai): enhance error logging for batch attachment failures and …
vprashrex May 30, 2026
9440735
refactor(docs): simplify description for Collections in OpenAPI schema
vprashrex Jun 1, 2026
a88a2ab
Merge branch 'main' into enhancement/collection_batching
Prajna1999 Jun 3, 2026
f657f08
Merge branch 'main' into enhancement/collection_batching
vprashrex Jun 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""add batch tracking to collection_jobs

Revision ID: 055
Revises: 054
Create Date: 2026-04-13

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "055"
down_revision = "054"
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"collection_jobs",
sa.Column(
"total_batches",
sa.Integer(),
nullable=True,
comment="Total number of batches the documents are split into",
),
)
op.add_column(
"collection_jobs",
sa.Column(
"current_batch_number",
sa.Integer(),
nullable=True,
comment="Which batch is currently being processed (1-indexed)",
),
)
op.add_column(
"collection_jobs",
sa.Column(
"documents_uploaded",
sa.JSON(),
nullable=True,
comment="List of document IDs successfully uploaded so far",
),
)
op.add_column(
"document",
sa.Column(
"openai_file_id",
sa.String(),
nullable=True,
comment="File ID assigned by the LLM provider (e.g. OpenAI file ID) to avoid re-uploading",
),
)


def downgrade():
op.drop_column("collection_jobs", "total_batches")
op.drop_column("collection_jobs", "current_batch_number")
op.drop_column("collection_jobs", "documents_uploaded")
op.drop_column("document", "openai_file_id")
2 changes: 1 addition & 1 deletion backend/app/api/docs/documents/upload.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Upload a document to Kaapi.

- If only a file is provided, the document will be uploaded and stored, and its ID will be returned.
- If only a file is provided, the document will be uploaded and stored, and its ID will be returned. The maximum file size allowed for upload is 25 MB.
- If a target format is specified, a transformation job will also be created to transform document into target format in the background. The response will include both the uploaded document details and information about the transformation job.
- If a callback URL is provided, you will receive a notification at that URL once the document transformation job is completed.

Expand Down
215 changes: 81 additions & 134 deletions backend/app/celery/tasks/job_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

from asgi_correlation_id import correlation_id
from celery import current_task
from opentelemetry import context as otel_context
from opentelemetry import trace
from opentelemetry.propagate import extract

from app.celery.celery_app import celery_app
from app.celery.utils import gevent_timeout
from app.core.config import settings

logger = logging.getLogger(__name__)

Expand All @@ -16,60 +15,17 @@ def _set_trace(trace_id: str) -> None:
logger.info(f"[_set_trace] Set correlation ID: {trace_id}")


def _extract_parent_context(task_instance) -> otel_context.Context:
"""Extract OTel parent context from Celery headers if available."""
headers = getattr(task_instance.request, "headers", None) or {}
carrier: dict[str, str] = {}

if isinstance(headers, dict):
for key, value in headers.items():
if isinstance(value, str):
carrier[str(key)] = value

nested = headers.get("otel", {})
if isinstance(nested, dict):
for key, value in nested.items():
if isinstance(value, str):
carrier[str(key)] = value

return extract(carrier)


def _run_with_otel_parent(task_instance, fn):
"""Attach extracted parent context and execute function.

When Celery auto-instrumentation is active, there is already a current
`run/...` span. Re-attaching extracted parent context here would make
service spans become siblings of `run/...` instead of children.

We only attach extracted context as a fallback when no active span exists.
"""
current_ctx = trace.get_current_span().get_span_context()
if current_ctx and current_ctx.is_valid:
return fn()

parent_ctx = _extract_parent_context(task_instance)
token = otel_context.attach(parent_ctx)
try:
return fn()
finally:
otel_context.detach(token)


@celery_app.task(bind=True, queue="high_priority", priority=9)
def run_llm_job(self, project_id: int, job_id: str, trace_id: str, **kwargs):
from app.services.llm.jobs import execute_job

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
),
return execute_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
)


Expand All @@ -78,15 +34,12 @@ def run_llm_chain_job(self, project_id: int, job_id: str, trace_id: str, **kwarg
from app.services.llm.jobs import execute_chain_job

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_chain_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
),
return execute_chain_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
)


Expand All @@ -95,15 +48,12 @@ def run_response_job(self, project_id: int, job_id: str, trace_id: str, **kwargs
from app.services.response.jobs import execute_job

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
),
return execute_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
)


Expand All @@ -112,34 +62,46 @@ def run_doctransform_job(self, project_id: int, job_id: str, trace_id: str, **kw
from app.services.doctransform.job import execute_job

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
),
return execute_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
)


@celery_app.task(bind=True, queue="low_priority", priority=1)
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_create_collection_job")
def run_create_collection_job(
self, project_id: int, job_id: str, trace_id: str, **kwargs
):
from app.services.collections.create_collection import execute_job
from app.services.collections.create_collection import execute_setup_job

_set_trace(trace_id)
return execute_setup_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
)


@celery_app.task(bind=True, queue="low_priority", priority=1)
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_collection_batch_job")
def run_collection_batch_job(
self, project_id: int, job_id: str, trace_id: str, **kwargs
):
from app.services.collections.create_collection import execute_batch_job

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
),
return execute_batch_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
)
Comment thread
nishika26 marked this conversation as resolved.


Expand All @@ -150,15 +112,12 @@ def run_delete_collection_job(
from app.services.collections.delete_collection import execute_job

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
),
return execute_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
)


Expand All @@ -169,15 +128,12 @@ def run_stt_batch_submission(
from app.services.stt_evaluations.batch_job import execute_batch_submission

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_batch_submission(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
),
return execute_batch_submission(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
)


Expand All @@ -188,15 +144,12 @@ def run_stt_metric_computation(
from app.services.stt_evaluations.metric_job import execute_metric_computation

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_metric_computation(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
),
return execute_metric_computation(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
)


Expand All @@ -207,15 +160,12 @@ def run_tts_batch_submission(
from app.services.tts_evaluations.batch_job import execute_batch_submission

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_batch_submission(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
),
return execute_batch_submission(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
)


Expand All @@ -228,13 +178,10 @@ def run_tts_result_processing(
)

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_tts_result_processing(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
),
return execute_tts_result_processing(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
)
Loading
Loading