Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 32 additions & 22 deletions deeptutor/agents/research/agents/reporting_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1606,29 +1606,39 @@ async def _write_section_with_subsections(
self._get_mode_contract("section"),
)

# TODO Implement retry logic for LLM calls when JSON parsing or post-processing fails (e.g., malformed output, schema violations).
_chunks: list[str] = []
async for _c in self.stream_llm(
filled,
system_prompt,
stage="write_section_with_subsections",
trace_meta=self._build_trace_meta("Write section"),
):
_chunks.append(_c)
resp = "".join(_chunks)
data = extract_json_from_text(resp)
max_retries = 3
last_error = None
for attempt in range(max_retries):
_chunks: list[str] = []
async for _c in self.stream_llm(
filled,
system_prompt,
stage="write_section_with_subsections",
trace_meta=self._build_trace_meta(f"Write section (Attempt {attempt+1})"),
):
_chunks.append(_c)
resp = "".join(_chunks)
data = extract_json_from_text(resp)

try:
obj = ensure_json_dict(data)
ensure_keys(obj, ["section_content"])
content = obj.get("section_content", "")
if isinstance(content, str) and content.strip():
return content
raise ValueError("LLM returned empty or invalid section_content field")
except Exception as e:
raise ValueError(
f"Unable to parse LLM returned section content: {e!s}. Report generation failed."
)
try:
obj = ensure_json_dict(data)
ensure_keys(obj, ["section_content"])
content = obj.get("section_content", "")
if isinstance(content, str) and content.strip():
return content
raise ValueError("LLM returned empty or invalid section_content field")
except Exception as e:
last_error = e
if attempt < max_retries - 1:
# Append error feedback to prompt for retry
feedback = f"\n\nERROR IN PREVIOUS ATTEMPT: Your output failed validation: {e!s}. Please provide strict valid JSON with 'section_content'."
if feedback not in filled:
filled += feedback
continue

raise ValueError(
f"Unable to parse LLM returned section content after {max_retries} attempts: {last_error!s}. Report generation failed."
)

def _notify_progress(
self, callback: Callable[[dict[str, Any]], None] | None, status: str, **payload: Any
Expand Down
1 change: 1 addition & 0 deletions deeptutor/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ async def lifespan(app: FastAPI):
from deeptutor.events.event_bus import get_event_bus

event_bus = get_event_bus()
await event_bus.flush()
await event_bus.stop()
logger.info("EventBus stopped")
except Exception as e:
Expand Down
79 changes: 44 additions & 35 deletions deeptutor/api/routers/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,19 @@
from deeptutor.logging import get_logger
from deeptutor.services.config import PROJECT_ROOT, load_config_with_main

# Initialize logger with config
config = load_config_with_main("main.yaml", PROJECT_ROOT)
log_dir = config.get("paths", {}).get("user_log_dir") or config.get("logging", {}).get("log_dir")
logger = get_logger("Knowledge", level="INFO", log_dir=log_dir)
# Initialize logger lazily to avoid import-time config errors
_router_logger = None

def get_router_logger():
global _router_logger
if _router_logger is None:
try:
config = load_config_with_main("main.yaml", PROJECT_ROOT)
log_dir = config.get("paths", {}).get("user_log_dir") or config.get("logging", {}).get("log_dir")
except Exception:
log_dir = None
_router_logger = get_logger("Knowledge", level="INFO", log_dir=log_dir)
return _router_logger

router = APIRouter()

Expand Down Expand Up @@ -152,7 +161,7 @@ def _save_uploaded_files(
error_message = (
f"Validation failed for file '{original_filename}': {format_exception_message(e)}"
)
logger.error(error_message, exc_info=True)
get_router_logger().error(error_message, exc_info=True)
raise HTTPException(status_code=400, detail=error_message) from e

return uploaded_files, uploaded_file_paths
Expand All @@ -163,11 +172,11 @@ def _task_log(task_id: str, message: str, level: str = "info") -> None:
manager.ensure_task(task_id)
manager.emit_log(task_id, message)

log_method = getattr(logger, level, None)
log_method = getattr(get_router_logger(), level, None)
if callable(log_method):
log_method(f"[{task_id}] {message}")
else:
logger.info(f"[{task_id}] {message}")
get_router_logger().info(f"[{task_id}] {message}")


def _validate_registered_provider(raw_provider: str | None) -> str:
Expand Down Expand Up @@ -418,7 +427,7 @@ async def get_rag_providers():
providers = RAGService.list_providers()
return {"providers": providers}
except Exception as e:
logger.error(f"Error getting RAG providers: {e}")
get_router_logger().error(f"Error getting RAG providers: {e}")
raise HTTPException(status_code=500, detail=str(e))


Expand All @@ -431,7 +440,7 @@ async def get_all_kb_configs():
service = get_kb_config_service()
return service.get_all_configs()
except Exception as e:
logger.error(f"Error getting KB configs: {e}")
get_router_logger().error(f"Error getting KB configs: {e}")
raise HTTPException(status_code=500, detail=str(e))


Expand All @@ -445,7 +454,7 @@ async def get_kb_config(kb_name: str):
config = service.get_kb_config(kb_name)
return {"kb_name": kb_name, "config": config}
except Exception as e:
logger.error(f"Error getting config for KB '{kb_name}': {e}")
get_router_logger().error(f"Error getting config for KB '{kb_name}': {e}")
raise HTTPException(status_code=500, detail=str(e))


Expand All @@ -464,7 +473,7 @@ async def update_kb_config(kb_name: str, config: dict):
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating config for KB '{kb_name}': {e}")
get_router_logger().error(f"Error updating config for KB '{kb_name}': {e}")
raise HTTPException(status_code=500, detail=str(e))


Expand All @@ -478,7 +487,7 @@ async def sync_configs_from_metadata():
service.sync_all_from_metadata(_kb_base_dir)
return {"status": "success", "message": "Configurations synced from metadata files"}
except Exception as e:
logger.error(f"Error syncing configs: {e}")
get_router_logger().error(f"Error syncing configs: {e}")
raise HTTPException(status_code=500, detail=str(e))


Expand All @@ -490,7 +499,7 @@ async def get_default_kb():
default_kb = manager.get_default()
return {"default_kb": default_kb}
except Exception as e:
logger.error(f"Error getting default KB: {e}")
get_router_logger().error(f"Error getting default KB: {e}")
raise HTTPException(status_code=500, detail=str(e))


Expand All @@ -509,7 +518,7 @@ async def set_default_kb(kb_name: str):
except HTTPException:
raise
except Exception as e:
logger.error(f"Error setting default KB: {e}")
get_router_logger().error(f"Error setting default KB: {e}")
raise HTTPException(status_code=500, detail=str(e))


Expand All @@ -520,10 +529,10 @@ async def list_knowledge_bases():
manager = get_kb_manager()
kb_names = manager.list_knowledge_bases()

logger.debug(f"Found {len(kb_names)} knowledge bases: {kb_names}")
get_router_logger().debug(f"Found {len(kb_names)} knowledge bases: {kb_names}")

if not kb_names:
logger.debug("No knowledge bases found, returning empty list")
get_router_logger().debug("No knowledge bases found, returning empty list")
return []

result = []
Expand All @@ -532,7 +541,7 @@ async def list_knowledge_bases():
for name in kb_names:
try:
info = manager.get_info(name)
logger.debug(f"Successfully got info for KB '{name}': {info.get('statistics', {})}")
get_router_logger().debug(f"Successfully got info for KB '{name}': {info.get('statistics', {})}")
result.append(
KnowledgeBaseInfo(
name=info["name"],
Expand All @@ -545,11 +554,11 @@ async def list_knowledge_bases():
except Exception as e:
error_msg = f"Error getting info for KB '{name}': {e}"
errors.append(error_msg)
logger.warning(f"{error_msg}\n{traceback.format_exc()}")
get_router_logger().warning(f"{error_msg}\n{traceback.format_exc()}")
try:
kb_dir = manager.base_dir / name
if kb_dir.exists():
logger.debug(f"KB '{name}' directory exists, creating fallback info")
get_router_logger().debug(f"KB '{name}' directory exists, creating fallback info")
result.append(
KnowledgeBaseInfo(
name=name,
Expand All @@ -565,25 +574,25 @@ async def list_knowledge_bases():
)
)
except Exception as fallback_err:
logger.error(f"Fallback also failed for KB '{name}': {fallback_err}")
get_router_logger().error(f"Fallback also failed for KB '{name}': {fallback_err}")

if errors and not result:
error_detail = f"Failed to load knowledge bases. Errors: {'; '.join(errors)}"
logger.error(error_detail)
get_router_logger().error(error_detail)
raise HTTPException(status_code=500, detail=error_detail)

if errors:
logger.warning(
get_router_logger().warning(
f"Some KBs had errors, returning {len(result)} results. Errors: {errors}"
)

logger.debug(f"Returning {len(result)} knowledge bases")
get_router_logger().debug(f"Returning {len(result)} knowledge bases")
return result
except HTTPException:
raise
except Exception as e:
error_msg = f"Error listing knowledge bases: {e}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
get_router_logger().error(f"{error_msg}\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"Failed to list knowledge bases: {e!s}")


Expand All @@ -607,7 +616,7 @@ async def delete_knowledge_base(kb_name: str):
success = manager.delete_knowledge_base(kb_name, confirm=True)
if not success:
raise HTTPException(status_code=400, detail="Failed to delete knowledge base")
logger.info(f"KB '{kb_name}' deleted")
get_router_logger().info(f"KB '{kb_name}' deleted")
return {"message": f"Knowledge base '{kb_name}' deleted successfully"}
except ValueError:
raise HTTPException(status_code=404, detail=f"Knowledge base '{kb_name}' not found")
Expand Down Expand Up @@ -663,7 +672,7 @@ async def upload_files(
task_id = _build_unique_task_id("kb_upload", kb_name)
get_task_stream_manager().ensure_task(task_id)

logger.info(f"Uploading {len(uploaded_files)} files to KB '{kb_name}'")
get_router_logger().info(f"Uploading {len(uploaded_files)} files to KB '{kb_name}'")

background_tasks.add_task(
run_upload_processing_task,
Expand Down Expand Up @@ -704,7 +713,7 @@ async def create_knowledge_base(

rag_provider = _validate_registered_provider(rag_provider)

logger.info(f"Creating KB: {name}")
get_router_logger().info(f"Creating KB: {name}")
task_id = _build_unique_task_id("kb_init", name)
get_task_stream_manager().ensure_task(task_id)

Expand Down Expand Up @@ -743,7 +752,7 @@ async def create_knowledge_base(

manager = get_kb_manager()
if name not in manager.list_knowledge_bases():
logger.warning(f"KB {name} not found in config, registering manually")
get_router_logger().warning(f"KB {name} not found in config, registering manually")
initializer._register_to_config()

allowed_extensions = FileTypeRouter.get_extensions_for_provider(rag_provider)
Expand All @@ -760,7 +769,7 @@ async def create_knowledge_base(

background_tasks.add_task(run_initialization_task, initializer, task_id)

logger.success(f"KB '{name}' created, processing {len(uploaded_files)} files in background")
get_router_logger().success(f"KB '{name}' created, processing {len(uploaded_files)} files in background")

return {
"message": f"Knowledge base '{name}' created. Processing {len(uploaded_files)} files in background.",
Expand All @@ -772,8 +781,8 @@ async def create_knowledge_base(
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to create KB: {e}")
logger.debug(traceback.format_exc())
get_router_logger().error(f"Failed to create KB: {e}")
get_router_logger().debug(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))


Expand Down Expand Up @@ -912,7 +921,7 @@ async def websocket_progress(websocket: WebSocket, kb_name: str):
break

except Exception as e:
logger.debug(f"Progress WS error: {e}")
get_router_logger().debug(f"Progress WS error: {e}")
try:
await websocket.send_json({"type": "error", "message": str(e)})
except Exception:
Expand Down Expand Up @@ -941,7 +950,7 @@ async def link_folder(kb_name: str, request: LinkFolderRequest):
try:
manager = get_kb_manager()
folder_info = manager.link_folder(kb_name, request.folder_path)
logger.info(f"Linked folder '{request.folder_path}' to KB '{kb_name}'")
get_router_logger().info(f"Linked folder '{request.folder_path}' to KB '{kb_name}'")
return LinkedFolderInfo(**folder_info)
except ValueError as e:
error_msg = str(e)
Expand Down Expand Up @@ -973,7 +982,7 @@ async def unlink_folder(kb_name: str, folder_id: str):
success = manager.unlink_folder(kb_name, folder_id)
if not success:
raise HTTPException(status_code=404, detail=f"Folder '{folder_id}' not found")
logger.info(f"Unlinked folder '{folder_id}' from KB '{kb_name}'")
get_router_logger().info(f"Unlinked folder '{folder_id}' from KB '{kb_name}'")
return {"message": "Folder unlinked successfully", "folder_id": folder_id}
except ValueError:
raise HTTPException(status_code=404, detail=f"Knowledge base '{kb_name}' not found")
Expand Down Expand Up @@ -1011,7 +1020,7 @@ async def sync_folder(kb_name: str, folder_id: str, background_tasks: Background
if not files_to_process:
return {"message": "No new or modified files to sync", "files": [], "file_count": 0}

logger.info(
get_router_logger().info(
f"Syncing {len(files_to_process)} files from folder '{folder_path}' to KB '{kb_name}'"
)
task_id = _build_unique_task_id("kb_upload", f"{kb_name}_folder_{folder_id}")
Expand Down
Loading
Loading