-
Notifications
You must be signed in to change notification settings - Fork 175
Expand file tree
/
Copy pathresearch_manager.py
More file actions
74 lines (62 loc) · 2.44 KB
/
research_manager.py
File metadata and controls
74 lines (62 loc) · 2.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from __future__ import annotations
import asyncio
from agents import Runner, custom_span, gen_trace_id, trace
import temporalio.workflow
from tests.contrib.openai_agents.research_agents.planner_agent import (
WebSearchItem,
WebSearchPlan,
new_planner_agent,
)
from tests.contrib.openai_agents.research_agents.search_agent import new_search_agent
from tests.contrib.openai_agents.research_agents.writer_agent import (
ReportData,
new_writer_agent,
)
class ResearchManager:
def __init__(self):
self.search_agent = new_search_agent()
self.planner_agent = new_planner_agent()
self.writer_agent = new_writer_agent()
async def run(self, query: str) -> str:
trace_id = gen_trace_id()
with trace("Research trace", trace_id=trace_id):
search_plan = await self._plan_searches(query)
search_results = await self._perform_searches(search_plan)
report = await self._write_report(query, search_results)
return report.markdown_report
async def _plan_searches(self, query: str) -> WebSearchPlan:
result = await Runner.run(
self.planner_agent,
f"Query: {query}",
)
return result.final_output_as(WebSearchPlan)
async def _perform_searches(self, search_plan: WebSearchPlan) -> list[str]:
with custom_span("Search the web"):
num_completed = 0
tasks = [
asyncio.create_task(self._search(item)) for item in search_plan.searches
]
results = []
for task in temporalio.workflow.as_completed(tasks):
result = await task
if result is not None:
results.append(result)
num_completed += 1
return results
async def _search(self, item: WebSearchItem) -> str | None:
input = f"Search term: {item.query}\nReason for searching: {item.reason}"
try:
result = await Runner.run(
self.search_agent,
input,
)
return str(result.final_output)
except Exception:
raise
async def _write_report(self, query: str, search_results: list[str]) -> ReportData:
input = f"Original query: {query}\nSummarized search results: {search_results}"
result = await Runner.run(
self.writer_agent,
input,
)
return result.final_output_as(ReportData)