From 00107b5968c0400c2b3a38b556176df003eb6d07 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 14:09:25 -0700 Subject: [PATCH 01/12] athenad and webrtcd updates --- system/athena/athenad.py | 33 +++++++- system/webrtc/webrtcd.py | 164 ++++++++++++++++++++++++++++++--------- 2 files changed, 160 insertions(+), 37 deletions(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index b52ef21ba63702..7dc7e8c24967cc 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -28,7 +28,7 @@ create_connection) import cereal.messaging as messaging -from cereal import log +from cereal import car, log from cereal.services import SERVICE_LIST from openpilot.common.api import Api, get_key_pair from openpilot.common.utils import CallbackReader, get_upload_stream @@ -44,6 +44,7 @@ ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai') HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4")) LOCAL_PORT_WHITELIST = {22, } # SSH +WEBRTCD_PORT = 5001 LOG_ATTR_NAME = 'user.upload' LOG_ATTR_VALUE_MAX_UNIX_TIME = int.to_bytes(2147483647, 4, sys.byteorder) @@ -536,6 +537,16 @@ def getSshAuthorizedKeys() -> str: def getGithubUsername() -> str: return cast(str, Params().get("GithubUsername") or "") + +@dispatcher.add_method +def getNotCar() -> bool: + cp_bytes = Params().get("CarParamsPersistent") + if cp_bytes is not None: + with car.CarParams.from_bytes(cp_bytes) as CP: + return CP.notCar + return False + + @dispatcher.add_method def getSimInfo(): return HARDWARE.get_sim_info() @@ -557,6 +568,26 @@ def getNetworks(): return HARDWARE.get_networks() +@dispatcher.add_method +def startJoystickStream(sdp: str) -> dict: + from openpilot.system.webrtc.webrtcd import StreamRequestBody + body = StreamRequestBody(sdp, ["driver"], ["testJoystick", "soundRequest", "livestreamCameraSwitch"], ["carState"]) + try: + resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream", + json=asdict(body), timeout=10) + if not resp.ok: + try: + error_body = resp.json() + raise Exception(error_body.get("message", f"webrtcd returned {resp.status_code}")) + except ValueError: + resp.raise_for_status() + return resp.json() + except requests.ConnectTimeout: + raise Exception("webrtc took too long to respond. is it on?") from None + except requests.ConnectionError: + raise Exception("webrtc is not running. turn on comma body ignition.") from None + + @dispatcher.add_method def takeSnapshot() -> str | dict[str, str] | None: from openpilot.system.camerad.snapshot import jpeg_write, snapshot diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index d2c90cafb5b2e6..3152f27edc1f0b 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -1,10 +1,10 @@ #!/usr/bin/env python3 - import argparse import asyncio +import contextlib import json -import uuid import logging +import uuid from dataclasses import dataclass, field from typing import Any, TYPE_CHECKING @@ -21,6 +21,8 @@ from openpilot.system.webrtc.schema import generate_field from cereal import messaging, log +INITIAL_CAMERA = "driver" +REQUIRED_VIDEO_CODEC = "H264" class CerealOutgoingMessageProxy: def __init__(self, sm: messaging.SubMaster): @@ -83,11 +85,16 @@ def start(self): assert self.task is None self.task = asyncio.create_task(self.run()) - def stop(self): - if self.task is None or self.task.done(): + async def stop(self): + if self.task is None: return - self.task.cancel() + task = self.task self.task = None + if task.done(): + return + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task async def run(self): from aiortc.exceptions import InvalidStateError @@ -122,14 +129,11 @@ def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], o from aiortc.mediastreams import VideoStreamTrack from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack from teleoprtc import WebRTCAnswerBuilder - from teleoprtc.info import parse_info_from_offer - config = parse_info_from_offer(sdp) builder = WebRTCAnswerBuilder(sdp) - assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks" - for cam in cameras: - builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack()) + self.video_track = LiveStreamVideoStreamTrack(INITIAL_CAMERA) if not debug_mode else VideoStreamTrack() + builder.add_video_stream(INITIAL_CAMERA, self.video_track) self.stream = builder.stream() self.identifier = str(uuid.uuid4()) @@ -145,27 +149,38 @@ def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], o self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge) self.run_task: asyncio.Task | None = None + self._cleanup_lock = asyncio.Lock() + self._cleanup_done = False self.logger = logging.getLogger("webrtcd") - self.logger.info("New stream session (%s), cameras %s, incoming services %s, outgoing services %s", - self.identifier, cameras, incoming_services, outgoing_services) + self.logger.info( + "New stream session (%s), cameras %s, incoming services %s, outgoing services %s", + self.identifier, cameras, incoming_services, outgoing_services, + ) def start(self): self.run_task = asyncio.create_task(self.run()) - def stop(self): - if self.run_task.done(): - return - self.run_task.cancel() + async def stop(self): + if self.run_task is not None and not self.run_task.done() and self.run_task is not asyncio.current_task(): + self.run_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self.run_task self.run_task = None - asyncio.run(self.post_run_cleanup()) + await self.post_run_cleanup() async def get_answer(self): return await self.stream.start() - async def message_handler(self, message: bytes): - assert self.incoming_bridge is not None + def message_handler(self, message: bytes): try: - self.incoming_bridge.send(message) + payload = json.loads(message) if isinstance(message, (bytes, str)) else None + if not isinstance(payload, dict): + raise ValueError + + if self.incoming_bridge is not None: + self.incoming_bridge.send(message) + except ValueError: + self.logger.warning("Ignoring malformed request: %s", payload) except Exception: self.logger.exception("Cereal incoming proxy failure") @@ -175,24 +190,32 @@ async def run(self): if self.stream.has_messaging_channel(): if self.incoming_bridge is not None: await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services) - self.stream.set_message_handler(self.message_handler) + # set camera to default + self.incoming_bridge.send(json.dumps({"type": "livestreamCameraSwitch", "data": {"camera": INITIAL_CAMERA}}).encode()) + self.stream.set_message_handler(self.message_handler) if self.outgoing_bridge_runner is not None: channel = self.stream.get_messaging_channel() self.outgoing_bridge_runner.proxy.add_channel(channel) self.outgoing_bridge_runner.start() + self.logger.info("Stream session (%s) connected", self.identifier) await self.stream.wait_for_disconnection() - await self.post_run_cleanup() self.logger.info("Stream session (%s) ended", self.identifier) except Exception: self.logger.exception("Stream session failure") + finally: + await self.post_run_cleanup() async def post_run_cleanup(self): - await self.stream.stop() - if self.outgoing_bridge is not None: - self.outgoing_bridge_runner.stop() + async with self._cleanup_lock: + if self._cleanup_done: + return + self._cleanup_done = True + if self.outgoing_bridge_runner is not None: + await self.outgoing_bridge_runner.stop() + await self.stream.stop() @dataclass @@ -202,19 +225,84 @@ class StreamRequestBody: bridge_services_in: list[str] = field(default_factory=list) bridge_services_out: list[str] = field(default_factory=list) +def _add_cors_headers(_, response: 'web.Response'): + response.headers["Access-Control-Allow-Origin"] = "*" + response.headers["Access-Control-Allow-Headers"] = "Content-Type" + response.headers["Access-Control-Allow-Methods"] = "POST, OPTIONS" + response.headers["Access-Control-Allow-Private-Network"] = "true" -async def get_stream(request: 'web.Request'): - stream_dict, debug_mode = request.app['streams'], request.app['debug'] - raw_body = await request.json() - body = StreamRequestBody(**raw_body) - session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode) - answer = await session.get_answer() - session.start() +@web.middleware +async def cors_middleware(request: 'web.Request', handler): + try: + response = await handler(request) + except web.HTTPException as ex: + _add_cors_headers(request, ex) + raise + _add_cors_headers(request, response) + return response + + +async def stream_options(request: 'web.Request'): + response = web.Response() + _add_cors_headers(request, response) + return response + + +def _validate_sdp_video_codecs(sdp: str): + import aiortc.sdp + desc = aiortc.sdp.SessionDescription.parse(sdp) + required_mime = f"video/{REQUIRED_VIDEO_CODEC}" + for m in desc.media: + if m.kind != "video": + continue + offered_mimes = {c.mimeType for c in m.rtp.codecs} + if required_mime not in offered_mimes: + raise web.HTTPBadRequest( + text=json.dumps({"error": "unsupported_codec", "message": f"Frontend must offer {REQUIRED_VIDEO_CODEC} via setCodecPreferences()"}), + content_type="application/json", + ) - stream_dict[session.identifier] = session - return web.json_response({"sdp": answer.sdp, "type": answer.type}) +async def get_stream(request: 'web.Request'): + logger = logging.getLogger("webrtcd") + try: + stream_dict, debug_mode = request.app['streams'], request.app['debug'] + + raw_body = await request.json() + body = StreamRequestBody(**raw_body) + _validate_sdp_video_codecs(body.sdp) + + async with request.app['stream_lock']: + # Fully disconnect any other active stream before starting the replacement. + for sid, s in list(stream_dict.items()): + if s.run_task and not s.run_task.done(): + try: + ch = s.stream.get_messaging_channel() + ch.send(json.dumps({"type": "connectionReplaced", "data": "Another device has connected, closing this session."})) + except Exception: + pass + await s.stop() + del stream_dict[sid] + + session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode) + try: + answer = await session.get_answer() + except Exception: + await session.stop() + raise + session.start() + + stream_dict[session.identifier] = session + + response = web.json_response({"sdp": answer.sdp, "type": answer.type}) + _add_cors_headers(request, response) + return response + except web.HTTPException: + raise + except Exception: + logger.exception("Error in /stream handler") + raise async def get_schema(request: 'web.Request'): @@ -224,6 +312,7 @@ async def get_schema(request: 'web.Request'): schema_dict = {s: generate_field(log.Event.schema.fields[s]) for s in services} return web.json_response(schema_dict) + async def post_notify(request: 'web.Request'): try: payload = await request.json() @@ -239,9 +328,10 @@ async def post_notify(request: 'web.Request'): return web.Response(status=200, text="OK") + async def on_shutdown(app: 'web.Application'): for session in app['streams'].values(): - session.stop() + await session.stop() del app['streams'] @@ -251,11 +341,13 @@ def webrtcd_thread(host: str, port: int, debug: bool): logging.getLogger("WebRTCStream").setLevel(logging_level) logging.getLogger("webrtcd").setLevel(logging_level) - app = web.Application() + app = web.Application(middlewares=[cors_middleware]) app['streams'] = dict() + app['stream_lock'] = asyncio.Lock() app['debug'] = debug app.on_shutdown.append(on_shutdown) + app.router.add_route("OPTIONS", "/stream", stream_options) app.router.add_post("/stream", get_stream) app.router.add_post("/notify", post_notify) app.router.add_get("/schema", get_schema) From fe36daf49e070a070cce91c89b91f7b8308cf7e4 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 15:53:58 -0700 Subject: [PATCH 02/12] remove feature stream services from webrtcd split --- system/athena/athenad.py | 2 +- system/webrtc/webrtcd.py | 22 ++++++++-------------- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 7dc7e8c24967cc..6b925fc2d24df3 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -571,7 +571,7 @@ def getNetworks(): @dispatcher.add_method def startJoystickStream(sdp: str) -> dict: from openpilot.system.webrtc.webrtcd import StreamRequestBody - body = StreamRequestBody(sdp, ["driver"], ["testJoystick", "soundRequest", "livestreamCameraSwitch"], ["carState"]) + body = StreamRequestBody(sdp, ["driver"], ["testJoystick"], ["carState"]) try: resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream", json=asdict(body), timeout=10) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 3152f27edc1f0b..5f66e62bb85fde 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -21,7 +21,6 @@ from openpilot.system.webrtc.schema import generate_field from cereal import messaging, log -INITIAL_CAMERA = "driver" REQUIRED_VIDEO_CODEC = "H264" class CerealOutgoingMessageProxy: @@ -129,11 +128,14 @@ def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], o from aiortc.mediastreams import VideoStreamTrack from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack from teleoprtc import WebRTCAnswerBuilder + from teleoprtc.info import parse_info_from_offer + config = parse_info_from_offer(sdp) builder = WebRTCAnswerBuilder(sdp) - self.video_track = LiveStreamVideoStreamTrack(INITIAL_CAMERA) if not debug_mode else VideoStreamTrack() - builder.add_video_stream(INITIAL_CAMERA, self.video_track) + assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks" + for cam in cameras: + builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack()) self.stream = builder.stream() self.identifier = str(uuid.uuid4()) @@ -172,15 +174,9 @@ async def get_answer(self): return await self.stream.start() def message_handler(self, message: bytes): + assert self.incoming_bridge is not None try: - payload = json.loads(message) if isinstance(message, (bytes, str)) else None - if not isinstance(payload, dict): - raise ValueError - - if self.incoming_bridge is not None: - self.incoming_bridge.send(message) - except ValueError: - self.logger.warning("Ignoring malformed request: %s", payload) + self.incoming_bridge.send(message) except Exception: self.logger.exception("Cereal incoming proxy failure") @@ -190,9 +186,7 @@ async def run(self): if self.stream.has_messaging_channel(): if self.incoming_bridge is not None: await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services) - # set camera to default - self.incoming_bridge.send(json.dumps({"type": "livestreamCameraSwitch", "data": {"camera": INITIAL_CAMERA}}).encode()) - self.stream.set_message_handler(self.message_handler) + self.stream.set_message_handler(self.message_handler) if self.outgoing_bridge_runner is not None: channel = self.stream.get_messaging_channel() self.outgoing_bridge_runner.proxy.add_channel(channel) From 49af84c8b57219b29fd4ed41f132e08f37445e3f Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 17:07:48 -0700 Subject: [PATCH 03/12] remove cors --- system/webrtc/webrtcd.py | 22 +--------------------- 1 file changed, 1 insertion(+), 21 deletions(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 5f66e62bb85fde..76bd86bc3ad6f5 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -219,27 +219,9 @@ class StreamRequestBody: bridge_services_in: list[str] = field(default_factory=list) bridge_services_out: list[str] = field(default_factory=list) -def _add_cors_headers(_, response: 'web.Response'): - response.headers["Access-Control-Allow-Origin"] = "*" - response.headers["Access-Control-Allow-Headers"] = "Content-Type" - response.headers["Access-Control-Allow-Methods"] = "POST, OPTIONS" - response.headers["Access-Control-Allow-Private-Network"] = "true" - - -@web.middleware -async def cors_middleware(request: 'web.Request', handler): - try: - response = await handler(request) - except web.HTTPException as ex: - _add_cors_headers(request, ex) - raise - _add_cors_headers(request, response) - return response - async def stream_options(request: 'web.Request'): response = web.Response() - _add_cors_headers(request, response) return response @@ -290,7 +272,6 @@ async def get_stream(request: 'web.Request'): stream_dict[session.identifier] = session response = web.json_response({"sdp": answer.sdp, "type": answer.type}) - _add_cors_headers(request, response) return response except web.HTTPException: raise @@ -335,13 +316,12 @@ def webrtcd_thread(host: str, port: int, debug: bool): logging.getLogger("WebRTCStream").setLevel(logging_level) logging.getLogger("webrtcd").setLevel(logging_level) - app = web.Application(middlewares=[cors_middleware]) + app = web.Application() app['streams'] = dict() app['stream_lock'] = asyncio.Lock() app['debug'] = debug app.on_shutdown.append(on_shutdown) - app.router.add_route("OPTIONS", "/stream", stream_options) app.router.add_post("/stream", get_stream) app.router.add_post("/notify", post_notify) app.router.add_get("/schema", get_schema) From ab7795ee0f3d50f78f1262b8703fd79184d00e0b Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 17:15:17 -0700 Subject: [PATCH 04/12] clean --- system/webrtc/webrtcd.py | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 76bd86bc3ad6f5..d0fe7b349f0680 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -21,7 +21,6 @@ from openpilot.system.webrtc.schema import generate_field from cereal import messaging, log -REQUIRED_VIDEO_CODEC = "H264" class CerealOutgoingMessageProxy: def __init__(self, sm: messaging.SubMaster): @@ -225,21 +224,6 @@ async def stream_options(request: 'web.Request'): return response -def _validate_sdp_video_codecs(sdp: str): - import aiortc.sdp - desc = aiortc.sdp.SessionDescription.parse(sdp) - required_mime = f"video/{REQUIRED_VIDEO_CODEC}" - for m in desc.media: - if m.kind != "video": - continue - offered_mimes = {c.mimeType for c in m.rtp.codecs} - if required_mime not in offered_mimes: - raise web.HTTPBadRequest( - text=json.dumps({"error": "unsupported_codec", "message": f"Frontend must offer {REQUIRED_VIDEO_CODEC} via setCodecPreferences()"}), - content_type="application/json", - ) - - async def get_stream(request: 'web.Request'): logger = logging.getLogger("webrtcd") try: @@ -247,7 +231,6 @@ async def get_stream(request: 'web.Request'): raw_body = await request.json() body = StreamRequestBody(**raw_body) - _validate_sdp_video_codecs(body.sdp) async with request.app['stream_lock']: # Fully disconnect any other active stream before starting the replacement. @@ -264,6 +247,12 @@ async def get_stream(request: 'web.Request'): session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode) try: answer = await session.get_answer() + except ValueError as e: + await session.stop() + raise web.HTTPBadRequest( + text=json.dumps({"error": "invalid_sdp", "message": str(e)}), + content_type="application/json", + ) from e except Exception: await session.stop() raise From d3ff632c7410fa1e51755e1d26af5b99c40e64cc Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 17:16:17 -0700 Subject: [PATCH 05/12] remove unused --- system/webrtc/webrtcd.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index d0fe7b349f0680..dd1fa18194fadb 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -219,11 +219,6 @@ class StreamRequestBody: bridge_services_out: list[str] = field(default_factory=list) -async def stream_options(request: 'web.Request'): - response = web.Response() - return response - - async def get_stream(request: 'web.Request'): logger = logging.getLogger("webrtcd") try: From 503b085a3da3bb85bec2cfd96ee4b9832e9563fb Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 19:10:05 -0700 Subject: [PATCH 06/12] remove extra try except --- system/webrtc/webrtcd.py | 78 ++++++++++++++++++---------------------- 1 file changed, 35 insertions(+), 43 deletions(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index dd1fa18194fadb..d90c9e65b3cc54 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -190,7 +190,6 @@ async def run(self): channel = self.stream.get_messaging_channel() self.outgoing_bridge_runner.proxy.add_channel(channel) self.outgoing_bridge_runner.start() - self.logger.info("Stream session (%s) connected", self.identifier) await self.stream.wait_for_disconnection() @@ -220,48 +219,41 @@ class StreamRequestBody: async def get_stream(request: 'web.Request'): - logger = logging.getLogger("webrtcd") - try: - stream_dict, debug_mode = request.app['streams'], request.app['debug'] - - raw_body = await request.json() - body = StreamRequestBody(**raw_body) - - async with request.app['stream_lock']: - # Fully disconnect any other active stream before starting the replacement. - for sid, s in list(stream_dict.items()): - if s.run_task and not s.run_task.done(): - try: - ch = s.stream.get_messaging_channel() - ch.send(json.dumps({"type": "connectionReplaced", "data": "Another device has connected, closing this session."})) - except Exception: - pass - await s.stop() - del stream_dict[sid] - - session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode) - try: - answer = await session.get_answer() - except ValueError as e: - await session.stop() - raise web.HTTPBadRequest( - text=json.dumps({"error": "invalid_sdp", "message": str(e)}), - content_type="application/json", - ) from e - except Exception: - await session.stop() - raise - session.start() - - stream_dict[session.identifier] = session - - response = web.json_response({"sdp": answer.sdp, "type": answer.type}) - return response - except web.HTTPException: - raise - except Exception: - logger.exception("Error in /stream handler") - raise + stream_dict, debug_mode = request.app['streams'], request.app['debug'] + + raw_body = await request.json() + body = StreamRequestBody(**raw_body) + + async with request.app['stream_lock']: + # Fully disconnect any other active stream before starting the replacement. + for sid, s in list(stream_dict.items()): + if s.run_task and not s.run_task.done(): + try: + ch = s.stream.get_messaging_channel() + ch.send(json.dumps({"type": "connectionReplaced", "data": "Another device has connected, closing this session."})) + except Exception: + pass + await s.stop() + del stream_dict[sid] + + session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode) + try: + answer = await session.get_answer() + except ValueError as e: + await session.stop() + raise web.HTTPBadRequest( + text=json.dumps({"error": "invalid_sdp", "message": str(e)}), + content_type="application/json", + ) from e + except Exception: + await session.stop() + raise + session.start() + + stream_dict[session.identifier] = session + + response = web.json_response({"sdp": answer.sdp, "type": answer.type}) + return response async def get_schema(request: 'web.Request'): From e860431630a7908c2a5c90433728696c86a5a843 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Tue, 12 May 2026 09:39:58 -0700 Subject: [PATCH 07/12] add back exception trace --- system/athena/athenad.py | 8 ++++---- system/webrtc/webrtcd.py | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 6b925fc2d24df3..5e21550a4e582b 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -582,10 +582,10 @@ def startJoystickStream(sdp: str) -> dict: except ValueError: resp.raise_for_status() return resp.json() - except requests.ConnectTimeout: - raise Exception("webrtc took too long to respond. is it on?") from None - except requests.ConnectionError: - raise Exception("webrtc is not running. turn on comma body ignition.") from None + except requests.ConnectTimeout as e: + raise Exception("webrtc took too long to respond. is it on?") from e + except requests.ConnectionError as e: + raise Exception("webrtc is not running. turn on comma body ignition.") from e @dispatcher.add_method diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index d90c9e65b3cc54..4f8c38fe718936 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -1,10 +1,11 @@ #!/usr/bin/env python3 + import argparse import asyncio import contextlib import json -import logging import uuid +import logging from dataclasses import dataclass, field from typing import Any, TYPE_CHECKING From cc0a8c02069e9993e9a343488e86b00ffa8836d8 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Tue, 12 May 2026 16:26:39 -0700 Subject: [PATCH 08/12] clean diff --- system/webrtc/webrtcd.py | 1 - 1 file changed, 1 deletion(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 4f8c38fe718936..eb39e82bb7553e 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -221,7 +221,6 @@ class StreamRequestBody: async def get_stream(request: 'web.Request'): stream_dict, debug_mode = request.app['streams'], request.app['debug'] - raw_body = await request.json() body = StreamRequestBody(**raw_body) From 0f38abee5d9e6f03452631516b625f70d6645781 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 18 May 2026 19:03:11 -0700 Subject: [PATCH 09/12] clean diff --- system/webrtc/webrtcd.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index eb39e82bb7553e..7a51d45034e42b 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -252,8 +252,7 @@ async def get_stream(request: 'web.Request'): stream_dict[session.identifier] = session - response = web.json_response({"sdp": answer.sdp, "type": answer.type}) - return response + return web.json_response({"sdp": answer.sdp, "type": answer.type}) async def get_schema(request: 'web.Request'): From 1c32d41b34854a8028107c4af5f9facdb22dee2c Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Thu, 21 May 2026 11:07:36 -0700 Subject: [PATCH 10/12] add testJoystick only on body --- system/athena/athenad.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 5e21550a4e582b..fc1edd712a2d21 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -569,9 +569,18 @@ def getNetworks(): @dispatcher.add_method -def startJoystickStream(sdp: str) -> dict: +def startStream(sdp: str) -> dict: from openpilot.system.webrtc.webrtcd import StreamRequestBody - body = StreamRequestBody(sdp, ["driver"], ["testJoystick"], ["carState"]) + bridge_services_in = ["livestreamCameraSwitch"] + + # get live car params to avoid stale notCar edge case + cp_bytes = Params().get("CarParams") + if cp_bytes is not None: + with car.CarParams.from_bytes(cp_bytes) as CP: + if CP.notCar: + bridge_services_in.append("testJoystick") + + body = StreamRequestBody(sdp, "driver", bridge_services_in, ["carState"]) try: resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream", json=asdict(body), timeout=10) From be391a1f1cb399acfd5054dbf8e23063728a661d Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Thu, 21 May 2026 11:15:09 -0700 Subject: [PATCH 11/12] fix camera list --- system/athena/athenad.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index fc1edd712a2d21..a97429e808ba7e 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -580,7 +580,7 @@ def startStream(sdp: str) -> dict: if CP.notCar: bridge_services_in.append("testJoystick") - body = StreamRequestBody(sdp, "driver", bridge_services_in, ["carState"]) + body = StreamRequestBody(sdp, ["driver"], bridge_services_in, ["carState"]) try: resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream", json=asdict(body), timeout=10) From 2170f6be02236d19db5b40472ab16b0349886d0e Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Thu, 21 May 2026 11:18:45 -0700 Subject: [PATCH 12/12] remove reference to future service --- system/athena/athenad.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index a97429e808ba7e..2bff7886ddd03e 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -571,7 +571,7 @@ def getNetworks(): @dispatcher.add_method def startStream(sdp: str) -> dict: from openpilot.system.webrtc.webrtcd import StreamRequestBody - bridge_services_in = ["livestreamCameraSwitch"] + bridge_services_in = [] # get live car params to avoid stale notCar edge case cp_bytes = Params().get("CarParams")