Skip to content
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1eabb00
default file size and addding documentation
nishika26 Apr 17, 2026
7f5d86f
default file size and addding documentation
nishika26 Apr 17, 2026
fa853c3
Merge branch 'main' into bug/file_size_default
nishika26 Apr 17, 2026
8e3d29d
coderabbit reviews
nishika26 Apr 17, 2026
bed1d1a
test cases failing
nishika26 Apr 17, 2026
d02bac8
changing the logic
nishika26 Apr 17, 2026
8b7556c
fixing test cases
nishika26 Apr 17, 2026
ca6c17e
Merge branch 'main' into enhancement/collection_batching
nishika26 May 5, 2026
baaeac2
adding alembic file
nishika26 May 5, 2026
47525e4
adding logic to the pr
nishika26 May 5, 2026
0720a17
Merge branch 'main' into enhancement/collection_batching
nishika26 May 5, 2026
2a2e268
pushing few changes
nishika26 May 6, 2026
eb38c73
Merge branch 'main' into enhancement/collection_batching
nishika26 May 7, 2026
0d7f4a1
adding test cases
nishika26 May 9, 2026
fb4e874
Merge branch 'main' into enhancement/collection_batching
nishika26 May 10, 2026
290d5f0
fixing test cases
nishika26 May 10, 2026
9f95a76
increasing test cases
nishika26 May 10, 2026
cb97654
incrreading test coverage
nishika26 May 10, 2026
fd37d14
coderabbit reviews
nishika26 May 11, 2026
510912c
removing assistant
nishika26 May 13, 2026
954f1ed
Merge branch 'main' into enhancement/collection_batching
nishika26 May 13, 2026
4a492da
fixing test cases
nishika26 May 14, 2026
305e3ef
Merge branch 'main' into enhancement/collection_batching
nishika26 May 14, 2026
979f756
fixing alembic migration
nishika26 May 14, 2026
0e027c2
test cases fix
nishika26 May 14, 2026
3cc2312
Merge branch 'main' into enhancement/collection_batching
nishika26 May 15, 2026
a8878cb
left docs and adding openai file deletion
nishika26 May 15, 2026
8f83837
Merge branch 'main' into enhancement/collection_batching
vprashrex May 28, 2026
45c8a5a
fix(openai): improve error handling for failed file uploads in vector…
vprashrex May 29, 2026
dee64d4
refactor(tests): replace get_assistant_collection with get_vector_sto…
vprashrex May 29, 2026
afe321c
refactor(collection): remove llm_service_id from CollectionPublic model
vprashrex May 29, 2026
02207cb
fix(openai): enhance error logging for batch attachment failures and …
vprashrex May 30, 2026
9440735
refactor(docs): simplify description for Collections in OpenAPI schema
vprashrex Jun 1, 2026
a88a2ab
Merge branch 'main' into enhancement/collection_batching
Prajna1999 Jun 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""add batch tracking to collection_jobs

Revision ID: 058
Revises: 057
Create Date: 2026-04-13

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "058"
down_revision = "057"
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"collection_jobs",
sa.Column(
"total_batches",
sa.Integer(),
nullable=True,
comment="Total number of batches the documents are split into",
),
)
op.add_column(
"collection_jobs",
sa.Column(
"current_batch_number",
sa.Integer(),
nullable=True,
comment="Which batch is currently being processed (1-indexed)",
),
)
op.add_column(
"collection_jobs",
sa.Column(
"documents_uploaded",
sa.JSON(),
nullable=True,
comment="List of document IDs successfully uploaded so far",
),
)
op.add_column(
"document",
sa.Column(
"openai_file_id",
sa.String(),
nullable=True,
comment="File ID assigned by the LLM provider (e.g. OpenAI file ID) to avoid re-uploading",
),
)


def downgrade():
op.drop_column("collection_jobs", "total_batches")
op.drop_column("collection_jobs", "current_batch_number")
op.drop_column("collection_jobs", "documents_uploaded")
op.drop_column("document", "openai_file_id")
2 changes: 1 addition & 1 deletion backend/app/api/docs/documents/upload.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Upload a document to Kaapi.

- If only a file is provided, the document will be uploaded and stored, and its ID will be returned.
- If only a file is provided, the document will be uploaded and stored, and its ID will be returned. The maximum file size allowed for upload is 25 MB.
- If a target format is specified, a transformation job will also be created to transform document into target format in the background. The response will include both the uploaded document details and information about the transformation job.
- If a callback URL is provided, you will receive a notification at that URL once the document transformation job is completed.

Expand Down
30 changes: 24 additions & 6 deletions backend/app/celery/tasks/job_execution.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import logging
from typing import Any

import celery
from asgi_correlation_id import correlation_id
from celery import current_task
from opentelemetry import context as otel_context
Expand Down Expand Up @@ -133,16 +131,36 @@ def run_doctransform_job(self, project_id: int, job_id: str, trace_id: str, **kw


@celery_app.task(bind=True, queue="low_priority", priority=1)
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_create_collection_job")
def run_create_collection_job(
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_collection_setup_job")
def run_collection_setup_job(
self, project_id: int, job_id: str, trace_id: str, **kwargs
):
from app.services.collections.create_collection import execute_job
from app.services.collections.create_collection import execute_setup_job

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_job(
lambda: execute_setup_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
),
)


@celery_app.task(bind=True, queue="low_priority", priority=1)
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_collection_batch_job")
def run_collection_batch_job(
self, project_id: int, job_id: str, trace_id: str, **kwargs
):
from app.services.collections.create_collection import execute_batch_job

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_batch_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
Expand Down
26 changes: 22 additions & 4 deletions backend/app/celery/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,38 @@ def start_doctransform_job(
return task_id


def start_create_collection_job(
def start_collection_setup_job(
project_id: int, job_id: str, trace_id: str = "N/A", **kwargs
) -> str:
from app.celery.tasks.job_execution import run_create_collection_job
from app.celery.tasks.job_execution import run_collection_setup_job

task_id = _enqueue_with_trace_context(
run_create_collection_job,
run_collection_setup_job,
project_id=project_id,
job_id=job_id,
trace_id=trace_id,
**kwargs,
)
logger.info(
f"[start_create_collection_job] Started job {job_id} with Celery task {task_id}"
f"[start_collection_setup_job] Started job {job_id} with Celery task {task_id}"
)
return task_id


def start_collection_batch_job(
project_id: int, job_id: str, trace_id: str = "N/A", **kwargs
) -> str:
from app.celery.tasks.job_execution import run_collection_batch_job

task_id = _enqueue_with_trace_context(
run_collection_batch_job,
project_id=project_id,
job_id=job_id,
trace_id=trace_id,
**kwargs,
)
logger.info(
f"[start_collection_setup_job] Started job {job_id} with Celery task {task_id}"
)
Comment thread
nishika26 marked this conversation as resolved.
return task_id

Expand Down
54 changes: 22 additions & 32 deletions backend/app/crud/rag/open_ai.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import json
import logging
import functools as ft
from io import BytesIO
from typing import Iterable
import time

from openai import OpenAI, OpenAIError
from pydantic import BaseModel

from app.core.cloud import CloudStorage
from app.models import Document

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -78,11 +76,6 @@ def clean(self, resource):

class VectorStoreCleaner(ResourceCleaner):
def clean(self, resource):
logger.info(
f"[VectorStoreCleaner.clean] Starting vector store cleanup | {{'vector_store_id': '{resource}'}}"
)
for i in vs_ls(self.client, resource):
self.client.files.delete(i.id)
logger.info(
f"[VectorStoreCleaner.clean] Deleting vector store | {{'vector_store_id': '{resource}'}}"
)
Expand Down Expand Up @@ -118,36 +111,33 @@ def read(self, vector_store_id: str):
def update(
self,
vector_store_id: str,
storage: CloudStorage,
documents: Iterable[Document],
):
for docs in documents:
files = []
for d in docs:
# Get file bytes and wrap in BytesIO for OpenAI API
content = storage.get(d.object_store_url)
f_obj = BytesIO(content)
f_obj.name = d.fname
files.append(f_obj)
docs: list[Document],
) -> None:
if not docs:
return

logger.info(
f"[OpenAIVectorStoreCrud.update] Uploading files to vector store | {{'vector_store_id': '{vector_store_id}', 'file_count': {len(files)}}}"
)
req = self.client.vector_stores.file_batches.upload_and_poll(
try:
batch = self.client.vector_stores.file_batches.upload_and_poll(
vector_store_id=vector_store_id,
files=files,
files=[],
file_ids=[doc.openai_file_id for doc in docs],
Comment thread
nishika26 marked this conversation as resolved.
)
logger.info(
f"[OpenAIVectorStoreCrud.update] File upload completed | {{'vector_store_id': '{vector_store_id}', 'completed_files': {req.file_counts.completed}, 'total_files': {req.file_counts.total}}}"
f"[OpenAIVectorStoreCrud.update] Batch complete | "
f"{{'vector_store_id': '{vector_store_id}', "
f"'completed': {batch.file_counts.completed}, 'failed': {batch.file_counts.failed}}}"
)
if req.file_counts.completed != req.file_counts.total:
error_msg = f"OpenAI document processing error: {req.file_counts.completed}/{req.file_counts.total} files completed"
logger.error(
f"[OpenAIVectorStoreCrud.update] Document processing error | {{'vector_store_id': '{vector_store_id}', 'completed_files': {req.file_counts.completed}, 'total_files': {req.file_counts.total}}}"
if batch.file_counts.failed > 0:
logger.warning(
f"[OpenAIVectorStoreCrud.update] Batch had failures | "
f"{{'vector_store_id': '{vector_store_id}', 'failed_count': {batch.file_counts.failed}}}"
)
raise InterruptedError(error_msg)

yield from docs
except OpenAIError as err:
logger.error(
f"[OpenAIVectorStoreCrud.update] Batch attach failed | "
f"{{'vector_store_id': '{vector_store_id}', 'error': '{str(err)}'}}",
exc_info=True,
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def delete(self, vector_store_id: str, retries: int = 3):
if retries < 1:
Expand Down
27 changes: 26 additions & 1 deletion backend/app/models/collection_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,29 @@ class CollectionJob(SQLModel, table=True):
documents: list[str] | None = Field(
default=None,
sa_column=Column(
JSON, nullable=True, comment="List of documents given to make collection"
JSON, nullable=True, comment="List of document IDs given to make collection"
),
)
total_batches: int | None = Field(
default=None,
nullable=True,
sa_column_kwargs={
"comment": "Total number of batches the documents are split into"
},
)
current_batch_number: int | None = Field(
default=None,
nullable=True,
sa_column_kwargs={
"comment": "Which batch is currently being processed (1-indexed)"
},
)
documents_uploaded: list[str] | None = Field(
default=None,
sa_column=Column(
JSON,
nullable=True,
comment="List of document IDs successfully uploaded so far",
),
)

Expand Down Expand Up @@ -139,6 +161,9 @@ class CollectionJobUpdate(SQLModel):
collection_id: UUID | None = None
total_size_mb: float | None = None
trace_id: str | None = None
total_batches: int | None = None
current_batch_number: int | None = None
documents_uploaded: list[str] | None = None


##Response models
Expand Down
5 changes: 5 additions & 0 deletions backend/app/models/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ class Document(DocumentBase, table=True):
description="The size of the document in kilobytes",
sa_column_kwargs={"comment": "Size of the document in kilobytes (KB)"},
)
openai_file_id: str | None = Field(
default=None,
nullable=True,
sa_column_kwargs={"comment": "File ID assigned by OpenAI (avoid re-uploading)"},
)

# Foreign keys
source_document_id: UUID | None = Field(
Expand Down
Loading
Loading