Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions src/core/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field, HttpUrl

# --- Core Data Models for the Financial Research Agent System ---
# This module implements the "pure Pydantic implementation for multi-agent systems"
# required to standardize data flow and enhance commercial reliability.

class ResearchGoal(BaseModel):
"""
Defines the structured input request received from the Gradio UI or the MCP client.
This serves as the initial contract for the entire agent workflow.
"""
query: str = Field(..., description="The high-level financial research question or task.")
investment_target: Optional[str] = Field(None, description="Specific company, sector, or asset under investigation.")
time_horizon: str = Field("Next 12 months", description="The required time frame for the analysis (e.g., '6 months', 'long-term').")
required_format: str = Field("Comprehensive Report", description="The desired output format (e.g., 'Summary', 'Detailed Analysis', 'Presentation Slides').")

class ToolUsage(BaseModel):
"""
Details of a specific tool utilized during a research step (e.g., using a data acquisition tool like Akshare or Baostock, which were relevant in the ModelScope context).
"""
tool_name: str = Field(..., description="The name of the external tool or function used.")
arguments: Dict[str, Any] = Field(..., description="The arguments passed to the tool.")
result_summary: str = Field(..., description="A summary of the information retrieved or action taken by the tool.")

class AnalysisStep(BaseModel):
"""
Represents an intermediate step in the multi-agent research process (the iterative search-and-judge loops).
"""
agent_id: str = Field(..., description="Identifier of the agent responsible for this step.")
action: str = Field(..., description="Description of the agent's action (e.g., 'Searching market data', 'Synthesizing conflicting reports').")
tools_used: List[ToolUsage] = Field(default_factory=list, description="List of specific tool calls made during this step.")
reasoning: str = Field(..., description="The rationale for the agent's action.")

class FinancialAnalysisResult(BaseModel):
"""
Defines the final, structured output (the monetizable product) delivered by the agent system.
This structure ensures the output is professional and dependable for enterprise users.
"""
summary: str = Field(..., description="A concise executive summary of the findings.")
key_recommendation: str = Field(..., description="The primary investment or business recommendation based on the research.")
analysis_steps: List[AnalysisStep] = Field(default_factory=list, description="A verifiable trace of all steps taken by the agents.")
data_sources: List[str] = Field(default_factory=list, description="List of reliable sources used, including links or references.")
confidence_score: float = Field(..., description="A quantitative score (0.0 to 1.0) reflecting the system's confidence in the recommendation.")

class MCPToolDefinition(BaseModel):
"""
Schema for defining a tool that is exposed via the Model Context Protocol (MCP) server endpoint.
This facilitates integration with external clients like Claude Desktop.
"""
name: str = Field(..., description="The name of the tool exposed via MCP.")
description: str = Field(..., description="A brief description of what the tool does.")
endpoint_url: HttpUrl = Field(..., description="The API endpoint URL for the tool.")
143 changes: 143 additions & 0 deletions tests/core/test_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import unittest
import os
import sys
from pydantic import ValidationError

# Add the src directory to the path so we can import the new schemas module
# Note: This is necessary because src/core is a new folder structure
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))

# Import the newly created Pydantic models
# These schemas are foundational for the 'pure Pydantic implementation' for multi-agent systems
from src.core.schemas import (
ResearchGoal,
ToolUsage,
AnalysisStep,
FinancialAnalysisResult,
MCPToolDefinition
)

class TestAgentDataSchemas(unittest.TestCase):
"""
Tests the integrity of the core Pydantic data models used for agent communication
and output validation, ensuring data reliability for enterprise use.
"""

def test_research_goal_successful_validation(self):
"""Should validate a ResearchGoal with all fields provided."""
goal_data = {
"query": "Analyze Qwen's market position for the next quarter.",
"investment_target": "Qwen-LLM",
"time_horizon": "Q3 2025",
"required_format": "Detailed Analysis"
}
try:
ResearchGoal(**goal_data)
except ValidationError as e:
self.fail(f"ResearchGoal validation failed unexpectedly: {e}")

def test_research_goal_missing_required_field(self):
"""Should fail validation if the 'query' field is missing."""
invalid_data = {
"investment_target": "Alibaba stock",
"time_horizon": "6 months"
}
with self.assertRaises(ValidationError) as cm:
ResearchGoal(**invalid_data)

# Check that the error pertains to the missing 'query' field
self.assertIn('query', str(cm.exception))

def test_tool_usage_successful_validation(self):
"""Should validate a ToolUsage instance."""
tool_data = {
"tool_name": "AkshareDataGetter",
"arguments": {"stock_code": "600000.SH", "period": "weekly"},
"result_summary": "Retrieved 52 weeks of trading data."
}
try:
ToolUsage(**tool_data)
except ValidationError as e:
self.fail(f"ToolUsage validation failed unexpectedly: {e}")

def test_analysis_step_with_nested_tool_usage(self):
"""Should validate an AnalysisStep that includes ToolUsage records."""
tool_data = ToolUsage(
tool_name="BaostockAPI",
arguments={"query": "financial reports"},
result_summary="Acquired 2024 earnings report."
).model_dump() # Use model_dump() for Pydantic V2

step_data = {
"agent_id": "DataGatherer-A",
"action": "Acquiring Q4 2024 reports.",
"tools_used": [tool_data],
"reasoning": "Need current financials for valuation model."
}

try:
AnalysisStep(**step_data)
except ValidationError as e:
self.fail(f"AnalysisStep validation failed unexpectedly: {e}")

def test_financial_analysis_result_successful_validation(self):
"""Should validate the final report structure, including float confidence score."""
valid_result = {
"summary": "Qwen LLM market share is growing rapidly in Asia.",
"key_recommendation": "Strong Buy signal.",
"analysis_steps": [], # Optional list, can be empty
"data_sources": ["ModelScope.cn", "Official Press Release"],
"confidence_score": 0.85
}
try:
result = FinancialAnalysisResult(**valid_result)
self.assertIsInstance(result.confidence_score, float)
except ValidationError as e:
self.fail(f"FinancialAnalysisResult validation failed unexpectedly: {e}")

def test_financial_analysis_result_invalid_confidence_score_type(self):
"""
Should fail if confidence_score is not a valid number (e.g., a string).
(Updated assertion for Pydantic V2 error message)
"""
invalid_result = {
"summary": "Test",
"key_recommendation": "Hold",
"analysis_steps": [],
"data_sources": [],
"confidence_score": "high" # Should be float
}
with self.assertRaises(ValidationError) as cm:
FinancialAnalysisResult(**invalid_result)
# V2 error message often contains 'unable to parse string as a number'
self.assertIn('unable to parse string as a number', str(cm.exception))

def test_mcp_tool_definition_successful_validation(self):
"""Should validate the MCP definition, especially the HttpUrl field."""
# The application exposes an MCP server endpoint
mcp_data = {
"name": "TheDeterminatorsSearch",
"description": "Performs deep financial research.",
"endpoint_url": "http://localhost:7860/gradio_api/mcp/"
}
try:
MCPToolDefinition(**mcp_data)
except ValidationError as e:
self.fail(f"MCPToolDefinition validation failed unexpectedly: {e}")

def test_mcp_tool_definition_invalid_url(self):
"""Should fail if the endpoint_url is not a valid URL format."""
invalid_mcp_data = {
"name": "InvalidTool",
"description": "Test",
"endpoint_url": "not a url"
}
with self.assertRaises(ValidationError) as cm:
MCPToolDefinition(**invalid_mcp_data)

# Pydantic V2 URL validation error
self.assertIn('url_parsing', str(cm.exception))

if __name__ == '__main__':
unittest.main()

Loading