Add core agent A2A adapter#1456
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces an A2A (Agent-to-Agent) protocol adapter for the core agent service, including Pydantic schemas, discovery and message-sending endpoints, and comprehensive unit tests. The review feedback focuses on improving robustness and error handling: specifically, wrapping SSE payload JSON parsing in a try-except block to prevent crashes, safely parsing the max_loop_count integer and providing a fallback UUID for uid, and breaking early from the SSE stream processing loop upon encountering an error.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| def _parse_sse_payload(chunk: str) -> dict[str, Any] | None: | ||
| for line in chunk.splitlines(): | ||
| if not line.startswith("data:"): | ||
| continue | ||
| payload = line.removeprefix("data:").strip() | ||
| if not payload or payload == "[DONE]": | ||
| return None | ||
| return json.loads(payload) | ||
| return None |
There was a problem hiding this comment.
Parsing the SSE payload with json.loads without error handling can cause the request to crash with a 500 Internal Server Error if the stream contains malformed JSON or non-JSON data. Wrap the parsing logic in a try-except block to handle json.JSONDecodeError gracefully.
| def _parse_sse_payload(chunk: str) -> dict[str, Any] | None: | |
| for line in chunk.splitlines(): | |
| if not line.startswith("data:"): | |
| continue | |
| payload = line.removeprefix("data:").strip() | |
| if not payload or payload == "[DONE]": | |
| return None | |
| return json.loads(payload) | |
| return None | |
| def _parse_sse_payload(chunk: str) -> dict[str, Any] | None: | |
| for line in chunk.splitlines(): | |
| if not line.startswith("data:"): | |
| continue | |
| payload = line.removeprefix("data:").strip() | |
| if not payload or payload == "[DONE]": | |
| return None | |
| try: | |
| return json.loads(payload) | |
| except json.JSONDecodeError: | |
| return None | |
| return None |
There was a problem hiding this comment.
Fixed in f901f33: _parse_sse_payload now catches malformed JSON and ignores non-dict payloads instead of letting the request fail. Added regression coverage in test_collect_completion_text_ignores_malformed_sse_payload.
There was a problem hiding this comment.
Follow-up after branch rewrite: the malformed SSE JSON fix is now in current public head 178eab5. _parse_sse_payload catches JSON decode errors and ignores non-dict payloads; regression coverage is test_collect_completion_text_ignores_malformed_sse_payload. Verified locally with PYTHONPATH=.. uv run --python 3.12 python -m pytest tests/test_a2a.py -q -> 8 passed.
| max_loop_count = metadata.get("max_loop_count", os.getenv("A2A_MAX_LOOP_COUNT", 5)) | ||
|
|
||
| return CustomCompletionInputs( | ||
| uid=( | ||
| _metadata_string(metadata, "uid") | ||
| or _metadata_string(request.message.metadata, "uid") | ||
| or request.message.context_id | ||
| or request.message.message_id | ||
| )[:64], | ||
| messages=[LLMMessage(role="user", content=text)], | ||
| stream=False, | ||
| meta_data={ | ||
| "caller": "a2a_http_json", | ||
| "caller_sid": request.message.message_id, | ||
| "workflow_id": _metadata_string(metadata, "workflow_id"), | ||
| "run_id": request.message.task_id, | ||
| "node_id": _metadata_string(metadata, "node_id"), | ||
| }, | ||
| model_config=model_config, | ||
| instruction=_metadata_mapping(metadata, "instruction"), | ||
| plugin=_metadata_mapping(metadata, "plugin"), | ||
| max_loop_count=int(max_loop_count), | ||
| ) |
There was a problem hiding this comment.
Parsing max_loop_count directly using int() without validation can raise a ValueError or TypeError if the environment variable or metadata contains an invalid integer representation. Additionally, if uid is empty, it can cause tracing issues with Span. Robustly parse max_loop_count with a fallback and ensure uid has a fallback UUID. When using os.getenv, default values should be provided as strings to prevent type issues.
try:
max_loop_count = int(metadata.get("max_loop_count") or os.getenv("A2A_MAX_LOOP_COUNT", "5"))
except (ValueError, TypeError):
max_loop_count = 5
return CustomCompletionInputs(
uid=(
_metadata_string(metadata, "uid")
or _metadata_string(request.message.metadata, "uid")
or request.message.context_id
or request.message.message_id
or str(uuid.uuid4())
)[:64],
messages=[LLMMessage(role="user", content=text)],
stream=False,
meta_data={
"caller": "a2a_http_json",
"caller_sid": request.message.message_id,
"workflow_id": _metadata_string(metadata, "workflow_id"),
"run_id": request.message.task_id,
"node_id": _metadata_string(metadata, "node_id"),
},
model_config=model_config,
instruction=_metadata_mapping(metadata, "instruction"),
plugin=_metadata_mapping(metadata, "plugin"),
max_loop_count=max_loop_count,
)References
- When using os.getenv in Python, default values should be provided as strings, not integers, to prevent TypeError if the environment variable is not set.
There was a problem hiding this comment.
Fixed in f901f33: max loop count parsing now falls back safely when metadata/env values are invalid, and blank request identifiers now get a UUID fallback. Added regression coverage for invalid loop counts and missing IDs.
There was a problem hiding this comment.
Follow-up after branch rewrite: the invalid max loop count and blank request ID fixes are now in current public head 178eab5. Invalid metadata/env loop counts fall back safely, and blank request identifiers get a UUID fallback; regression coverage covers invalid loop count and missing IDs. Verified locally with PYTHONPATH=.. uv run --python 3.12 python -m pytest tests/test_a2a.py -q -> 8 passed.
| if payload.get("code", 0) != 0: | ||
| error_message = str(payload.get("message") or "A2A agent execution failed") |
There was a problem hiding this comment.
When an error code is received in the SSE stream, the loop should break early to prevent unnecessary processing of subsequent chunks and potential secondary errors.
| if payload.get("code", 0) != 0: | |
| error_message = str(payload.get("message") or "A2A agent execution failed") | |
| if payload.get("code", 0) != 0: | |
| error_message = str(payload.get("message") or "A2A agent execution failed") | |
| break |
There was a problem hiding this comment.
Fixed in f901f33: stream collection now breaks immediately after an error payload, so later chunks are not processed after the error. Added regression coverage in test_collect_completion_text_stops_after_error_code.
There was a problem hiding this comment.
Follow-up after branch rewrite: the stream error handling fix is now in current public head 178eab5. Stream collection breaks immediately after an error payload so later chunks are not processed; regression coverage is test_collect_completion_text_stops_after_error_code. Verified locally with PYTHONPATH=.. uv run --python 3.12 python -m pytest tests/test_a2a.py -q -> 8 passed.
0f6143c to
fcce485
Compare
Signed-off-by: Rajesh Digambar Bagul <102693488+Rajesh270712@users.noreply.github.com>
Signed-off-by: Rajesh Digambar Bagul <102693488+Rajesh270712@users.noreply.github.com>
f901f33 to
178eab5
Compare
Summary
Adds a focused A2A adapter for the core agent service.
Related Issue
Fixes #709.
Problem
Issue #709 asks for Astron Agent to expose an A2A-compatible core agent surface so external A2A clients can discover the agent and send messages/tasks. The existing service did not expose an A2A discovery card or
message:sendHTTP adapter forcore/agent.Change
/.well-known/agent-card.jsonand/agent/v1/a2a/agent-card.jsonwith HTTP+JSON interface metadata, capabilities, skills, text I/O modes, and the existingx-consumer-usernameheader auth shape./agent/v1/a2a/message:sendto map A2A text parts into the existingCustomChatCompletionrunner and return A2A task envelopes for completed, submitted, and failed states.Tests
Validation run for this branch:
git diff --check HEAD~1..HEADpython3 -m compileall core/agent/api/v1/a2a.py core/agent/api/schemas/a2a.py core/agent/api/router.py core/agent/main.py core/agent/tests/test_a2a.pyPYTHONPATH=core .venv/bin/python -m pytest core/agent/tests/test_a2a.py -q-> 4 passedPYTHONPATH=core .venv/bin/python -m pytest core/agent/tests/test_a2a.py core/agent/tests/test_router_and_schemas.py core/agent/tests/test_workflow_agent.py core/agent/tests/test_main.py -q-> 20 passedPYTHONPATH=core .venv/bin/python -m pytest core/agent/tests -q-> 178 passed.venv/bin/python -m black --check core/agent/api/v1/a2a.py core/agent/api/schemas/a2a.py core/agent/api/router.py core/agent/main.py core/agent/tests/test_a2a.py.venv/bin/python -m isort --check-only --profile black core/agent/api/v1/a2a.py core/agent/api/schemas/a2a.py core/agent/api/router.py core/agent/main.py core/agent/tests/test_a2a.pyTestClientsmoke for/.well-known/agent-card.json,/agent/v1/a2a/agent-card.json, and authenticated/agent/v1/a2a/message:sendRisk Notes
A2A_MODEL_*environment defaults.Maintainer Attention
Please confirm whether the endpoint placement, auth declaration, and A2A schema shape match the direction you want for
core/agent.