From 00107b5968c0400c2b3a38b556176df003eb6d07 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 14:09:25 -0700 Subject: [PATCH 01/32] athenad and webrtcd updates --- system/athena/athenad.py | 33 +++++++- system/webrtc/webrtcd.py | 164 ++++++++++++++++++++++++++++++--------- 2 files changed, 160 insertions(+), 37 deletions(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index b52ef21ba63702..7dc7e8c24967cc 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -28,7 +28,7 @@ create_connection) import cereal.messaging as messaging -from cereal import log +from cereal import car, log from cereal.services import SERVICE_LIST from openpilot.common.api import Api, get_key_pair from openpilot.common.utils import CallbackReader, get_upload_stream @@ -44,6 +44,7 @@ ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai') HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4")) LOCAL_PORT_WHITELIST = {22, } # SSH +WEBRTCD_PORT = 5001 LOG_ATTR_NAME = 'user.upload' LOG_ATTR_VALUE_MAX_UNIX_TIME = int.to_bytes(2147483647, 4, sys.byteorder) @@ -536,6 +537,16 @@ def getSshAuthorizedKeys() -> str: def getGithubUsername() -> str: return cast(str, Params().get("GithubUsername") or "") + +@dispatcher.add_method +def getNotCar() -> bool: + cp_bytes = Params().get("CarParamsPersistent") + if cp_bytes is not None: + with car.CarParams.from_bytes(cp_bytes) as CP: + return CP.notCar + return False + + @dispatcher.add_method def getSimInfo(): return HARDWARE.get_sim_info() @@ -557,6 +568,26 @@ def getNetworks(): return HARDWARE.get_networks() +@dispatcher.add_method +def startJoystickStream(sdp: str) -> dict: + from openpilot.system.webrtc.webrtcd import StreamRequestBody + body = StreamRequestBody(sdp, ["driver"], ["testJoystick", "soundRequest", "livestreamCameraSwitch"], ["carState"]) + try: + resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream", + json=asdict(body), timeout=10) + if not resp.ok: + try: + error_body = resp.json() + raise Exception(error_body.get("message", f"webrtcd returned {resp.status_code}")) + except ValueError: + resp.raise_for_status() + return resp.json() + except requests.ConnectTimeout: + raise Exception("webrtc took too long to respond. is it on?") from None + except requests.ConnectionError: + raise Exception("webrtc is not running. turn on comma body ignition.") from None + + @dispatcher.add_method def takeSnapshot() -> str | dict[str, str] | None: from openpilot.system.camerad.snapshot import jpeg_write, snapshot diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index d2c90cafb5b2e6..3152f27edc1f0b 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -1,10 +1,10 @@ #!/usr/bin/env python3 - import argparse import asyncio +import contextlib import json -import uuid import logging +import uuid from dataclasses import dataclass, field from typing import Any, TYPE_CHECKING @@ -21,6 +21,8 @@ from openpilot.system.webrtc.schema import generate_field from cereal import messaging, log +INITIAL_CAMERA = "driver" +REQUIRED_VIDEO_CODEC = "H264" class CerealOutgoingMessageProxy: def __init__(self, sm: messaging.SubMaster): @@ -83,11 +85,16 @@ def start(self): assert self.task is None self.task = asyncio.create_task(self.run()) - def stop(self): - if self.task is None or self.task.done(): + async def stop(self): + if self.task is None: return - self.task.cancel() + task = self.task self.task = None + if task.done(): + return + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task async def run(self): from aiortc.exceptions import InvalidStateError @@ -122,14 +129,11 @@ def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], o from aiortc.mediastreams import VideoStreamTrack from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack from teleoprtc import WebRTCAnswerBuilder - from teleoprtc.info import parse_info_from_offer - config = parse_info_from_offer(sdp) builder = WebRTCAnswerBuilder(sdp) - assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks" - for cam in cameras: - builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack()) + self.video_track = LiveStreamVideoStreamTrack(INITIAL_CAMERA) if not debug_mode else VideoStreamTrack() + builder.add_video_stream(INITIAL_CAMERA, self.video_track) self.stream = builder.stream() self.identifier = str(uuid.uuid4()) @@ -145,27 +149,38 @@ def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], o self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge) self.run_task: asyncio.Task | None = None + self._cleanup_lock = asyncio.Lock() + self._cleanup_done = False self.logger = logging.getLogger("webrtcd") - self.logger.info("New stream session (%s), cameras %s, incoming services %s, outgoing services %s", - self.identifier, cameras, incoming_services, outgoing_services) + self.logger.info( + "New stream session (%s), cameras %s, incoming services %s, outgoing services %s", + self.identifier, cameras, incoming_services, outgoing_services, + ) def start(self): self.run_task = asyncio.create_task(self.run()) - def stop(self): - if self.run_task.done(): - return - self.run_task.cancel() + async def stop(self): + if self.run_task is not None and not self.run_task.done() and self.run_task is not asyncio.current_task(): + self.run_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self.run_task self.run_task = None - asyncio.run(self.post_run_cleanup()) + await self.post_run_cleanup() async def get_answer(self): return await self.stream.start() - async def message_handler(self, message: bytes): - assert self.incoming_bridge is not None + def message_handler(self, message: bytes): try: - self.incoming_bridge.send(message) + payload = json.loads(message) if isinstance(message, (bytes, str)) else None + if not isinstance(payload, dict): + raise ValueError + + if self.incoming_bridge is not None: + self.incoming_bridge.send(message) + except ValueError: + self.logger.warning("Ignoring malformed request: %s", payload) except Exception: self.logger.exception("Cereal incoming proxy failure") @@ -175,24 +190,32 @@ async def run(self): if self.stream.has_messaging_channel(): if self.incoming_bridge is not None: await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services) - self.stream.set_message_handler(self.message_handler) + # set camera to default + self.incoming_bridge.send(json.dumps({"type": "livestreamCameraSwitch", "data": {"camera": INITIAL_CAMERA}}).encode()) + self.stream.set_message_handler(self.message_handler) if self.outgoing_bridge_runner is not None: channel = self.stream.get_messaging_channel() self.outgoing_bridge_runner.proxy.add_channel(channel) self.outgoing_bridge_runner.start() + self.logger.info("Stream session (%s) connected", self.identifier) await self.stream.wait_for_disconnection() - await self.post_run_cleanup() self.logger.info("Stream session (%s) ended", self.identifier) except Exception: self.logger.exception("Stream session failure") + finally: + await self.post_run_cleanup() async def post_run_cleanup(self): - await self.stream.stop() - if self.outgoing_bridge is not None: - self.outgoing_bridge_runner.stop() + async with self._cleanup_lock: + if self._cleanup_done: + return + self._cleanup_done = True + if self.outgoing_bridge_runner is not None: + await self.outgoing_bridge_runner.stop() + await self.stream.stop() @dataclass @@ -202,19 +225,84 @@ class StreamRequestBody: bridge_services_in: list[str] = field(default_factory=list) bridge_services_out: list[str] = field(default_factory=list) +def _add_cors_headers(_, response: 'web.Response'): + response.headers["Access-Control-Allow-Origin"] = "*" + response.headers["Access-Control-Allow-Headers"] = "Content-Type" + response.headers["Access-Control-Allow-Methods"] = "POST, OPTIONS" + response.headers["Access-Control-Allow-Private-Network"] = "true" -async def get_stream(request: 'web.Request'): - stream_dict, debug_mode = request.app['streams'], request.app['debug'] - raw_body = await request.json() - body = StreamRequestBody(**raw_body) - session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode) - answer = await session.get_answer() - session.start() +@web.middleware +async def cors_middleware(request: 'web.Request', handler): + try: + response = await handler(request) + except web.HTTPException as ex: + _add_cors_headers(request, ex) + raise + _add_cors_headers(request, response) + return response + + +async def stream_options(request: 'web.Request'): + response = web.Response() + _add_cors_headers(request, response) + return response + + +def _validate_sdp_video_codecs(sdp: str): + import aiortc.sdp + desc = aiortc.sdp.SessionDescription.parse(sdp) + required_mime = f"video/{REQUIRED_VIDEO_CODEC}" + for m in desc.media: + if m.kind != "video": + continue + offered_mimes = {c.mimeType for c in m.rtp.codecs} + if required_mime not in offered_mimes: + raise web.HTTPBadRequest( + text=json.dumps({"error": "unsupported_codec", "message": f"Frontend must offer {REQUIRED_VIDEO_CODEC} via setCodecPreferences()"}), + content_type="application/json", + ) - stream_dict[session.identifier] = session - return web.json_response({"sdp": answer.sdp, "type": answer.type}) +async def get_stream(request: 'web.Request'): + logger = logging.getLogger("webrtcd") + try: + stream_dict, debug_mode = request.app['streams'], request.app['debug'] + + raw_body = await request.json() + body = StreamRequestBody(**raw_body) + _validate_sdp_video_codecs(body.sdp) + + async with request.app['stream_lock']: + # Fully disconnect any other active stream before starting the replacement. + for sid, s in list(stream_dict.items()): + if s.run_task and not s.run_task.done(): + try: + ch = s.stream.get_messaging_channel() + ch.send(json.dumps({"type": "connectionReplaced", "data": "Another device has connected, closing this session."})) + except Exception: + pass + await s.stop() + del stream_dict[sid] + + session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode) + try: + answer = await session.get_answer() + except Exception: + await session.stop() + raise + session.start() + + stream_dict[session.identifier] = session + + response = web.json_response({"sdp": answer.sdp, "type": answer.type}) + _add_cors_headers(request, response) + return response + except web.HTTPException: + raise + except Exception: + logger.exception("Error in /stream handler") + raise async def get_schema(request: 'web.Request'): @@ -224,6 +312,7 @@ async def get_schema(request: 'web.Request'): schema_dict = {s: generate_field(log.Event.schema.fields[s]) for s in services} return web.json_response(schema_dict) + async def post_notify(request: 'web.Request'): try: payload = await request.json() @@ -239,9 +328,10 @@ async def post_notify(request: 'web.Request'): return web.Response(status=200, text="OK") + async def on_shutdown(app: 'web.Application'): for session in app['streams'].values(): - session.stop() + await session.stop() del app['streams'] @@ -251,11 +341,13 @@ def webrtcd_thread(host: str, port: int, debug: bool): logging.getLogger("WebRTCStream").setLevel(logging_level) logging.getLogger("webrtcd").setLevel(logging_level) - app = web.Application() + app = web.Application(middlewares=[cors_middleware]) app['streams'] = dict() + app['stream_lock'] = asyncio.Lock() app['debug'] = debug app.on_shutdown.append(on_shutdown) + app.router.add_route("OPTIONS", "/stream", stream_options) app.router.add_post("/stream", get_stream) app.router.add_post("/notify", post_notify) app.router.add_get("/schema", get_schema) From fe36daf49e070a070cce91c89b91f7b8308cf7e4 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 15:53:58 -0700 Subject: [PATCH 02/32] remove feature stream services from webrtcd split --- system/athena/athenad.py | 2 +- system/webrtc/webrtcd.py | 22 ++++++++-------------- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 7dc7e8c24967cc..6b925fc2d24df3 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -571,7 +571,7 @@ def getNetworks(): @dispatcher.add_method def startJoystickStream(sdp: str) -> dict: from openpilot.system.webrtc.webrtcd import StreamRequestBody - body = StreamRequestBody(sdp, ["driver"], ["testJoystick", "soundRequest", "livestreamCameraSwitch"], ["carState"]) + body = StreamRequestBody(sdp, ["driver"], ["testJoystick"], ["carState"]) try: resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream", json=asdict(body), timeout=10) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 3152f27edc1f0b..5f66e62bb85fde 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -21,7 +21,6 @@ from openpilot.system.webrtc.schema import generate_field from cereal import messaging, log -INITIAL_CAMERA = "driver" REQUIRED_VIDEO_CODEC = "H264" class CerealOutgoingMessageProxy: @@ -129,11 +128,14 @@ def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], o from aiortc.mediastreams import VideoStreamTrack from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack from teleoprtc import WebRTCAnswerBuilder + from teleoprtc.info import parse_info_from_offer + config = parse_info_from_offer(sdp) builder = WebRTCAnswerBuilder(sdp) - self.video_track = LiveStreamVideoStreamTrack(INITIAL_CAMERA) if not debug_mode else VideoStreamTrack() - builder.add_video_stream(INITIAL_CAMERA, self.video_track) + assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks" + for cam in cameras: + builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack()) self.stream = builder.stream() self.identifier = str(uuid.uuid4()) @@ -172,15 +174,9 @@ async def get_answer(self): return await self.stream.start() def message_handler(self, message: bytes): + assert self.incoming_bridge is not None try: - payload = json.loads(message) if isinstance(message, (bytes, str)) else None - if not isinstance(payload, dict): - raise ValueError - - if self.incoming_bridge is not None: - self.incoming_bridge.send(message) - except ValueError: - self.logger.warning("Ignoring malformed request: %s", payload) + self.incoming_bridge.send(message) except Exception: self.logger.exception("Cereal incoming proxy failure") @@ -190,9 +186,7 @@ async def run(self): if self.stream.has_messaging_channel(): if self.incoming_bridge is not None: await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services) - # set camera to default - self.incoming_bridge.send(json.dumps({"type": "livestreamCameraSwitch", "data": {"camera": INITIAL_CAMERA}}).encode()) - self.stream.set_message_handler(self.message_handler) + self.stream.set_message_handler(self.message_handler) if self.outgoing_bridge_runner is not None: channel = self.stream.get_messaging_channel() self.outgoing_bridge_runner.proxy.add_channel(channel) From 8fddb97e10bd42bf2abbfd2fcb3fd9d498e001ee Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 14:09:24 -0700 Subject: [PATCH 03/32] stream encoder thread --- cereal/log.capnp | 12 +++++ cereal/services.py | 2 + system/loggerd/encoderd.cc | 57 +++++++++++++++++++++- system/loggerd/loggerd.h | 31 +++--------- system/webrtc/device/video.py | 20 +++----- system/webrtc/tests/test_stream_session.py | 2 +- 6 files changed, 85 insertions(+), 39 deletions(-) diff --git a/cereal/log.capnp b/cereal/log.capnp index 73a34a2ea8c6f1..45a6b4431b61dd 100644 --- a/cereal/log.capnp +++ b/cereal/log.capnp @@ -2427,6 +2427,15 @@ struct AudioFeedback { blockNum @1 :UInt16; } +struct LiveStreamCamera { + camera @0 :CameraType; + + enum CameraType { + driver @0; + wideRoad @1; + } +} + struct Touch { sec @0 :Int64; usec @1 :Int64; @@ -2536,6 +2545,9 @@ struct Event { livestreamWideRoadEncodeData @121 :EncodeData; livestreamDriverEncodeData @122 :EncodeData; + livestreamCameraEncodeData @152 :EncodeData; + livestreamCameraSwitch @153 :LiveStreamCamera; + # *********** Custom: reserved for forks *********** # DO change the name of the field diff --git a/cereal/services.py b/cereal/services.py index c2d38d852db133..d3dc8d4fa7e1a5 100755 --- a/cereal/services.py +++ b/cereal/services.py @@ -77,6 +77,7 @@ def __init__(self, should_log: bool, frequency: float, decimation: Optional[int] "rawAudioData": (False, 20.), "bookmarkButton": (True, 0., 1), "audioFeedback": (True, 0., 1), + "livestreamCameraSwitch": (False, 0.), "roadEncodeData": (False, 20., None, QueueSize.BIG), "driverEncodeData": (False, 20., None, QueueSize.BIG), "wideRoadEncodeData": (False, 20., None, QueueSize.BIG), @@ -92,6 +93,7 @@ def __init__(self, should_log: bool, frequency: float, decimation: Optional[int] "livestreamWideRoadEncodeData": (False, 20., None, QueueSize.MEDIUM), "livestreamRoadEncodeData": (False, 20., None, QueueSize.MEDIUM), "livestreamDriverEncodeData": (False, 20., None, QueueSize.MEDIUM), + "livestreamCameraEncodeData": (False, 20., None, QueueSize.MEDIUM), "customReservedRawData0": (True, 0.), } SERVICE_LIST = {name: Service(*vals) for diff --git a/system/loggerd/encoderd.cc b/system/loggerd/encoderd.cc index 9d4b81a3f90230..b00b7b66aed33a 100644 --- a/system/loggerd/encoderd.cc +++ b/system/loggerd/encoderd.cc @@ -151,6 +151,61 @@ void encoderd_thread(const LogCameraInfo (&cameras)[N]) { } } +template +void stream_encoderd_thread(const LogCameraInfo (&cameras)[N]) { + while (!do_exit) { + if (!VisionIpcClient::getAvailableStreams("camerad", false).empty()) break; + util::sleep_for(100); + } + + SubMaster sm({"livestreamCameraSwitch"}); + const LogCameraInfo *active_cam = &cameras[0]; + + while (!do_exit) { + VisionIpcClient vipc_client("camerad", active_cam->stream_type, false); + if (!vipc_client.connect(false)) { + util::sleep_for(5); + continue; + } + + // init encoder + const VisionBuf &buf_info = vipc_client.buffers[0]; + LOGW("stream encoder init %zux%zu", buf_info.width, buf_info.height); + assert(buf_info.width > 0 && buf_info.height > 0); + auto encoder = std::make_unique(active_cam->encoder_infos[0], buf_info.width, buf_info.height); + encoder->encoder_open(); + + while (!do_exit) { + sm.update(0); + + // Switch camera if the request differs from the current one + if (sm.updated("livestreamCameraSwitch")) { + auto requested = sm["livestreamCameraSwitch"].getLivestreamCameraSwitch().getCamera(); + VisionStreamType requested_stream = requested == cereal::LiveStreamCamera::CameraType::DRIVER + ? VISION_STREAM_DRIVER : VISION_STREAM_WIDE_ROAD; + if (requested_stream != active_cam->stream_type) { + LOGW("stream encoder switching camera"); + auto it = std::find_if(std::begin(cameras), std::end(cameras), + [requested_stream](const auto &cam) { return cam.stream_type == requested_stream; }); + if (it != std::end(cameras)) active_cam = &(*it); + break; // reinit encoder with new camera selection + } + } + + // encode frame + VisionIpcBufExtra extra; + VisionBuf *buf = vipc_client.recv(&extra); + if (buf == nullptr) continue; + if (buf->get_frame_id() != extra.frame_id) continue; + if (encoder->encode_frame(buf, &extra) == -1) { + LOGE("stream encoder: failed to encode frame. frame_id: %d", extra.frame_id); + } + } + + encoder->encoder_close(); + } +} + int main(int argc, char* argv[]) { if (!Hardware::PC()) { int ret; @@ -162,7 +217,7 @@ int main(int argc, char* argv[]) { if (argc > 1) { std::string arg1(argv[1]); if (arg1 == "--stream") { - encoderd_thread(stream_cameras_logged); + stream_encoderd_thread(stream_cameras_logged); } else { LOGE("Argument '%s' is not supported", arg1.c_str()); } diff --git a/system/loggerd/loggerd.h b/system/loggerd/loggerd.h index 6aa0c8be40b96f..804550d80c772c 100644 --- a/system/loggerd/loggerd.h +++ b/system/loggerd/loggerd.h @@ -47,8 +47,8 @@ struct EncoderSettings { } static EncoderSettings StreamEncoderSettings() { - int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 1'000'000; - return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 15}; + int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 4'000'000; + return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 5}; } }; @@ -100,28 +100,13 @@ const EncoderInfo main_driver_encoder_info = { INIT_ENCODE_FUNCTIONS(DriverEncode), }; -const EncoderInfo stream_road_encoder_info = { - .publish_name = "livestreamRoadEncodeData", - //.thumbnail_name = "thumbnail", - .record = false, - .get_settings = [](int){return EncoderSettings::StreamEncoderSettings();}, - INIT_ENCODE_FUNCTIONS(LivestreamRoadEncode), -}; - -const EncoderInfo stream_wide_road_encoder_info = { - .publish_name = "livestreamWideRoadEncodeData", +const EncoderInfo stream_encoder_info = { + .publish_name = "livestreamCameraEncodeData", .record = false, .get_settings = [](int){return EncoderSettings::StreamEncoderSettings();}, INIT_ENCODE_FUNCTIONS(LivestreamWideRoadEncode), }; -const EncoderInfo stream_driver_encoder_info = { - .publish_name = "livestreamDriverEncodeData", - .record = false, - .get_settings = [](int){return EncoderSettings::StreamEncoderSettings();}, - INIT_ENCODE_FUNCTIONS(LivestreamDriverEncode), -}; - const EncoderInfo qcam_encoder_info = { .publish_name = "qRoadEncodeData", .filename = "qcamera.ts", @@ -153,20 +138,20 @@ const LogCameraInfo driver_camera_info{ const LogCameraInfo stream_road_camera_info{ .thread_name = "road_cam_encoder", .stream_type = VISION_STREAM_ROAD, - .encoder_infos = {stream_road_encoder_info} + .encoder_infos = {stream_encoder_info} }; const LogCameraInfo stream_wide_road_camera_info{ .thread_name = "wide_road_cam_encoder", .stream_type = VISION_STREAM_WIDE_ROAD, - .encoder_infos = {stream_wide_road_encoder_info} + .encoder_infos = {stream_encoder_info} }; const LogCameraInfo stream_driver_camera_info{ .thread_name = "driver_cam_encoder", .stream_type = VISION_STREAM_DRIVER, - .encoder_infos = {stream_driver_encoder_info} + .encoder_infos = {stream_encoder_info} }; const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info}; -const LogCameraInfo stream_cameras_logged[] = {stream_road_camera_info, stream_wide_road_camera_info, stream_driver_camera_info}; +const LogCameraInfo stream_cameras_logged[] = {stream_driver_camera_info, stream_wide_road_camera_info}; diff --git a/system/webrtc/device/video.py b/system/webrtc/device/video.py index 50feab4f4a910d..3ca55a140f6f03 100644 --- a/system/webrtc/device/video.py +++ b/system/webrtc/device/video.py @@ -9,18 +9,11 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack): - camera_to_sock_mapping = { - "driver": "livestreamDriverEncodeData", - "wideRoad": "livestreamWideRoadEncodeData", - "road": "livestreamRoadEncodeData", - } - def __init__(self, camera_type: str): dt = DT_DMON if camera_type == "driver" else DT_MDL super().__init__(camera_type, dt) - self._sock = messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True) - self._pts = 0 + self._sock = messaging.sub_sock("livestreamCameraEncodeData", conflate=True) self._t0_ns = time.monotonic_ns() async def recv(self): @@ -30,15 +23,14 @@ async def recv(self): break await asyncio.sleep(0.005) - evta = getattr(msg, msg.which()) + encode_data = getattr(msg, msg.which()) - packet = av.Packet(evta.header + evta.data) + packet = av.Packet(encode_data.header + encode_data.data) packet.time_base = self._time_base - self._pts = ((time.monotonic_ns() - self._t0_ns) * self._clock_rate) // 1_000_000_000 - packet.pts = self._pts - self.log_debug("track sending frame %d", self._pts) - + pts = ((time.monotonic_ns() - self._t0_ns) * self._clock_rate) // 1_000_000_000 + packet.pts = pts + self.log_debug("track sending frame %d", pts) return packet def codec_preference(self) -> str | None: diff --git a/system/webrtc/tests/test_stream_session.py b/system/webrtc/tests/test_stream_session.py index f44d217d58ced6..9730f9e16e3d07 100644 --- a/system/webrtc/tests/test_stream_session.py +++ b/system/webrtc/tests/test_stream_session.py @@ -67,7 +67,7 @@ def test_incoming_proxy(self, mocker): mocked_pubmaster.reset_mock() def test_livestream_track(self, mocker): - fake_msg = messaging.new_message("livestreamDriverEncodeData") + fake_msg = messaging.new_message("livestreamCameraEncodeData") config = {"receive.return_value": fake_msg.to_bytes()} mocker.patch("msgq.SubSocket", spec=True, **config) From 429679654f1ebaf71c92f111f32aa68c79d512eb Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 15:15:26 -0700 Subject: [PATCH 04/32] reduce diff --- system/webrtc/device/video.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/system/webrtc/device/video.py b/system/webrtc/device/video.py index 3ca55a140f6f03..691f3f618736c2 100644 --- a/system/webrtc/device/video.py +++ b/system/webrtc/device/video.py @@ -14,6 +14,7 @@ def __init__(self, camera_type: str): super().__init__(camera_type, dt) self._sock = messaging.sub_sock("livestreamCameraEncodeData", conflate=True) + self._pts = 0 self._t0_ns = time.monotonic_ns() async def recv(self): @@ -23,14 +24,15 @@ async def recv(self): break await asyncio.sleep(0.005) - encode_data = getattr(msg, msg.which()) + evta = getattr(msg, msg.which()) - packet = av.Packet(encode_data.header + encode_data.data) + packet = av.Packet(evta.header + evta.data) packet.time_base = self._time_base - pts = ((time.monotonic_ns() - self._t0_ns) * self._clock_rate) // 1_000_000_000 - packet.pts = pts - self.log_debug("track sending frame %d", pts) + self._pts = ((time.monotonic_ns() - self._t0_ns) * self._clock_rate) // 1_000_000_000 + packet.pts = self._pts + self.log_debug("track sending frame %d", self._pts) + return packet def codec_preference(self) -> str | None: From 65f2332d1e97521d6195d75e45152bc3030dcceb Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 16:04:58 -0700 Subject: [PATCH 05/32] wire webrtc to livestream camera encoder --- system/webrtc/webrtcd.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 5f66e62bb85fde..822c55fa567a13 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -21,6 +21,7 @@ from openpilot.system.webrtc.schema import generate_field from cereal import messaging, log +INITIAL_CAMERA = "driver" REQUIRED_VIDEO_CODEC = "H264" class CerealOutgoingMessageProxy: @@ -128,14 +129,10 @@ def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], o from aiortc.mediastreams import VideoStreamTrack from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack from teleoprtc import WebRTCAnswerBuilder - from teleoprtc.info import parse_info_from_offer - config = parse_info_from_offer(sdp) builder = WebRTCAnswerBuilder(sdp) - assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks" - for cam in cameras: - builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack()) + builder.add_video_stream(INITIAL_CAMERA, LiveStreamVideoStreamTrack(INITIAL_CAMERA) if not debug_mode else VideoStreamTrack()) self.stream = builder.stream() self.identifier = str(uuid.uuid4()) @@ -186,6 +183,8 @@ async def run(self): if self.stream.has_messaging_channel(): if self.incoming_bridge is not None: await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services) + if "livestreamCameraSwitch" in self.incoming_bridge_services: + self.incoming_bridge.send(json.dumps({"type": "livestreamCameraSwitch", "data": {"camera": INITIAL_CAMERA}}).encode()) self.stream.set_message_handler(self.message_handler) if self.outgoing_bridge_runner is not None: channel = self.stream.get_messaging_channel() From 8c59f83f10070e984585a6b104a2a93145b0e5a3 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 16:17:07 -0700 Subject: [PATCH 06/32] request livestream camera switch service --- system/athena/athenad.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 6b925fc2d24df3..34c2a766a338a0 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -571,7 +571,7 @@ def getNetworks(): @dispatcher.add_method def startJoystickStream(sdp: str) -> dict: from openpilot.system.webrtc.webrtcd import StreamRequestBody - body = StreamRequestBody(sdp, ["driver"], ["testJoystick"], ["carState"]) + body = StreamRequestBody(sdp, ["driver"], ["testJoystick", "livestreamCameraSwitch"], ["carState"]) try: resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream", json=asdict(body), timeout=10) From 3499ac6fb65a9e5bd112b4b2d11a8bded826a536 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 14:09:25 -0700 Subject: [PATCH 07/32] add frame timing headers --- system/webrtc/device/video.py | 27 ++++++++++++++++++++++++--- system/webrtc/webrtcd.py | 31 +++++++++++++++++++++++++++---- 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/system/webrtc/device/video.py b/system/webrtc/device/video.py index 50feab4f4a910d..35036d7b505973 100644 --- a/system/webrtc/device/video.py +++ b/system/webrtc/device/video.py @@ -1,4 +1,5 @@ import asyncio +import struct import time import av @@ -7,6 +8,13 @@ from cereal import messaging from openpilot.common.realtime import DT_MDL, DT_DMON +# arbitrary 16-byte UUID identifying openpilot frame-timing SEI messages +TIMING_SEI_UUID = bytes([ + 0xa5, 0xe0, 0xc4, 0xa4, 0x5b, 0x6e, 0x4e, 0x1e, + 0x9c, 0x7e, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, +]) +_SEI_PREFIX = b'\x00\x00\x00\x01\x06\x05\x30' + TIMING_SEI_UUID + class LiveStreamVideoStreamTrack(TiciVideoStreamTrack): camera_to_sock_mapping = { @@ -22,6 +30,21 @@ def __init__(self, camera_type: str): self._sock = messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True) self._pts = 0 self._t0_ns = time.monotonic_ns() + self.timing_sei_enabled = False + + def _build_frame_data(self, msg) -> bytes: + encode_data = getattr(msg, msg.which()) + if not self.timing_sei_enabled: + return encode_data.header + encode_data.data + + idx = encode_data.idx + sei_nal = _SEI_PREFIX + struct.pack('>4d', + (idx.timestampEof - idx.timestampSof) / 1e6, + (msg.logMonoTime - idx.timestampEof) / 1e6, + (time.monotonic_ns() - msg.logMonoTime) / 1e6, + time.time() * 1000, # noqa: TID251 + ) + b'\x80' + return encode_data.header + sei_nal + encode_data.data async def recv(self): while True: @@ -30,9 +53,7 @@ async def recv(self): break await asyncio.sleep(0.005) - evta = getattr(msg, msg.which()) - - packet = av.Packet(evta.header + evta.data) + packet = av.Packet(self._build_frame_data(msg)) packet.time_base = self._time_base self._pts = ((time.monotonic_ns() - self._t0_ns) * self._clock_rate) // 1_000_000_000 diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 5f66e62bb85fde..176fb6e3dc2948 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +import time import argparse import asyncio import contextlib @@ -132,10 +133,13 @@ def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], o config = parse_info_from_offer(sdp) builder = WebRTCAnswerBuilder(sdp) + self.video_tracks = [] assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks" for cam in cameras: - builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack()) + track = LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack() + self.video_tracks.append(track) + builder.add_video_stream(cam, track) self.stream = builder.stream() self.identifier = str(uuid.uuid4()) @@ -174,9 +178,28 @@ async def get_answer(self): return await self.stream.start() def message_handler(self, message: bytes): - assert self.incoming_bridge is not None try: - self.incoming_bridge.send(message) + payload = json.loads(message) if isinstance(message, (bytes, str)) else None + if isinstance(payload, dict): + msg_type = payload.get("type") + + if msg_type == "clockSync": + data = payload.get("data", {}) + pong = json.dumps({"type": "clockSync", "data": { + "action": "pong", "browserSendTime": data.get("browserSendTime"), "deviceTime": time.time() * 1000, # noqa: TID251 + }}) + self.stream.get_messaging_channel().send(pong) + return + + if msg_type == "enableTimingSei": + enabled = bool(payload.get("data", {}).get("enabled")) + for track in self.video_tracks: + if hasattr(track, 'timing_sei_enabled'): + track.timing_sei_enabled = enabled + return + + if self.incoming_bridge is not None: + self.incoming_bridge.send(message) except Exception: self.logger.exception("Cereal incoming proxy failure") @@ -186,7 +209,7 @@ async def run(self): if self.stream.has_messaging_channel(): if self.incoming_bridge is not None: await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services) - self.stream.set_message_handler(self.message_handler) + self.stream.set_message_handler(self.message_handler) if self.outgoing_bridge_runner is not None: channel = self.stream.get_messaging_channel() self.outgoing_bridge_runner.proxy.add_channel(channel) From 9ae526b5e4e9f45409a1fd8fe3a0356b63b4de7d Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 16:48:46 -0700 Subject: [PATCH 08/32] rfctr --- system/webrtc/webrtcd.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 176fb6e3dc2948..7e44ba6851e4d2 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -198,8 +198,7 @@ def message_handler(self, message: bytes): track.timing_sei_enabled = enabled return - if self.incoming_bridge is not None: - self.incoming_bridge.send(message) + self.incoming_bridge.send(message) except Exception: self.logger.exception("Cereal incoming proxy failure") @@ -209,7 +208,7 @@ async def run(self): if self.stream.has_messaging_channel(): if self.incoming_bridge is not None: await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services) - self.stream.set_message_handler(self.message_handler) + self.stream.set_message_handler(self.message_handler) if self.outgoing_bridge_runner is not None: channel = self.stream.get_messaging_channel() self.outgoing_bridge_runner.proxy.add_channel(channel) From 753a6a17f591d48553f9e9d276e36893d354f65f Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 16:49:33 -0700 Subject: [PATCH 09/32] diff --- system/webrtc/webrtcd.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 7e44ba6851e4d2..e684771c571a7f 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -137,9 +137,7 @@ def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], o assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks" for cam in cameras: - track = LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack() - self.video_tracks.append(track) - builder.add_video_stream(cam, track) + builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack())) self.stream = builder.stream() self.identifier = str(uuid.uuid4()) From 48c56fa7f7577c5aace4187da0436c808ddbf5e4 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 16:49:51 -0700 Subject: [PATCH 10/32] clean --- system/webrtc/webrtcd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index e684771c571a7f..25dcadfc0c91df 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -137,7 +137,7 @@ def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], o assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks" for cam in cameras: - builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack())) + builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack()) self.stream = builder.stream() self.identifier = str(uuid.uuid4()) From 6deb216f33600f1ce67acd89bcb88961bd98fe4e Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 16:57:03 -0700 Subject: [PATCH 11/32] remove camera list in favour of init camera field --- system/athena/athenad.py | 2 +- system/webrtc/webrtcd.py | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 34c2a766a338a0..b414bff40dfe09 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -571,7 +571,7 @@ def getNetworks(): @dispatcher.add_method def startJoystickStream(sdp: str) -> dict: from openpilot.system.webrtc.webrtcd import StreamRequestBody - body = StreamRequestBody(sdp, ["driver"], ["testJoystick", "livestreamCameraSwitch"], ["carState"]) + body = StreamRequestBody(sdp, "driver", ["testJoystick", "livestreamCameraSwitch"], ["carState"]) try: resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream", json=asdict(body), timeout=10) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 822c55fa567a13..f83ec0f0072fd1 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -21,7 +21,6 @@ from openpilot.system.webrtc.schema import generate_field from cereal import messaging, log -INITIAL_CAMERA = "driver" REQUIRED_VIDEO_CODEC = "H264" class CerealOutgoingMessageProxy: @@ -125,17 +124,18 @@ async def add_services_if_needed(self, services): class StreamSession: shared_pub_master = DynamicPubMaster([]) - def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False): + def __init__(self, sdp: str, init_camera: str, incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False): from aiortc.mediastreams import VideoStreamTrack from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack from teleoprtc import WebRTCAnswerBuilder builder = WebRTCAnswerBuilder(sdp) - builder.add_video_stream(INITIAL_CAMERA, LiveStreamVideoStreamTrack(INITIAL_CAMERA) if not debug_mode else VideoStreamTrack()) + builder.add_video_stream(init_camera, LiveStreamVideoStreamTrack(init_camera) if not debug_mode else VideoStreamTrack()) self.stream = builder.stream() self.identifier = str(uuid.uuid4()) + self.init_camera = init_camera self.incoming_bridge: CerealIncomingMessageProxy | None = None self.incoming_bridge_services = incoming_services @@ -152,8 +152,8 @@ def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], o self._cleanup_done = False self.logger = logging.getLogger("webrtcd") self.logger.info( - "New stream session (%s), cameras %s, incoming services %s, outgoing services %s", - self.identifier, cameras, incoming_services, outgoing_services, + "New stream session (%s), init camera %s, incoming services %s, outgoing services %s", + self.identifier, init_camera, incoming_services, outgoing_services, ) def start(self): @@ -184,7 +184,7 @@ async def run(self): if self.incoming_bridge is not None: await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services) if "livestreamCameraSwitch" in self.incoming_bridge_services: - self.incoming_bridge.send(json.dumps({"type": "livestreamCameraSwitch", "data": {"camera": INITIAL_CAMERA}}).encode()) + self.incoming_bridge.send(json.dumps({"type": "livestreamCameraSwitch", "data": {"camera": self.init_camera}}).encode()) self.stream.set_message_handler(self.message_handler) if self.outgoing_bridge_runner is not None: channel = self.stream.get_messaging_channel() @@ -214,7 +214,7 @@ async def post_run_cleanup(self): @dataclass class StreamRequestBody: sdp: str - cameras: list[str] + initCamera: str bridge_services_in: list[str] = field(default_factory=list) bridge_services_out: list[str] = field(default_factory=list) @@ -278,7 +278,7 @@ async def get_stream(request: 'web.Request'): await s.stop() del stream_dict[sid] - session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode) + session = StreamSession(body.sdp, body.initCamera, body.bridge_services_in, body.bridge_services_out, debug_mode) try: answer = await session.get_answer() except Exception: From 49af84c8b57219b29fd4ed41f132e08f37445e3f Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 17:07:48 -0700 Subject: [PATCH 12/32] remove cors --- system/webrtc/webrtcd.py | 22 +--------------------- 1 file changed, 1 insertion(+), 21 deletions(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 5f66e62bb85fde..76bd86bc3ad6f5 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -219,27 +219,9 @@ class StreamRequestBody: bridge_services_in: list[str] = field(default_factory=list) bridge_services_out: list[str] = field(default_factory=list) -def _add_cors_headers(_, response: 'web.Response'): - response.headers["Access-Control-Allow-Origin"] = "*" - response.headers["Access-Control-Allow-Headers"] = "Content-Type" - response.headers["Access-Control-Allow-Methods"] = "POST, OPTIONS" - response.headers["Access-Control-Allow-Private-Network"] = "true" - - -@web.middleware -async def cors_middleware(request: 'web.Request', handler): - try: - response = await handler(request) - except web.HTTPException as ex: - _add_cors_headers(request, ex) - raise - _add_cors_headers(request, response) - return response - async def stream_options(request: 'web.Request'): response = web.Response() - _add_cors_headers(request, response) return response @@ -290,7 +272,6 @@ async def get_stream(request: 'web.Request'): stream_dict[session.identifier] = session response = web.json_response({"sdp": answer.sdp, "type": answer.type}) - _add_cors_headers(request, response) return response except web.HTTPException: raise @@ -335,13 +316,12 @@ def webrtcd_thread(host: str, port: int, debug: bool): logging.getLogger("WebRTCStream").setLevel(logging_level) logging.getLogger("webrtcd").setLevel(logging_level) - app = web.Application(middlewares=[cors_middleware]) + app = web.Application() app['streams'] = dict() app['stream_lock'] = asyncio.Lock() app['debug'] = debug app.on_shutdown.append(on_shutdown) - app.router.add_route("OPTIONS", "/stream", stream_options) app.router.add_post("/stream", get_stream) app.router.add_post("/notify", post_notify) app.router.add_get("/schema", get_schema) From ab7795ee0f3d50f78f1262b8703fd79184d00e0b Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 17:15:17 -0700 Subject: [PATCH 13/32] clean --- system/webrtc/webrtcd.py | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 76bd86bc3ad6f5..d0fe7b349f0680 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -21,7 +21,6 @@ from openpilot.system.webrtc.schema import generate_field from cereal import messaging, log -REQUIRED_VIDEO_CODEC = "H264" class CerealOutgoingMessageProxy: def __init__(self, sm: messaging.SubMaster): @@ -225,21 +224,6 @@ async def stream_options(request: 'web.Request'): return response -def _validate_sdp_video_codecs(sdp: str): - import aiortc.sdp - desc = aiortc.sdp.SessionDescription.parse(sdp) - required_mime = f"video/{REQUIRED_VIDEO_CODEC}" - for m in desc.media: - if m.kind != "video": - continue - offered_mimes = {c.mimeType for c in m.rtp.codecs} - if required_mime not in offered_mimes: - raise web.HTTPBadRequest( - text=json.dumps({"error": "unsupported_codec", "message": f"Frontend must offer {REQUIRED_VIDEO_CODEC} via setCodecPreferences()"}), - content_type="application/json", - ) - - async def get_stream(request: 'web.Request'): logger = logging.getLogger("webrtcd") try: @@ -247,7 +231,6 @@ async def get_stream(request: 'web.Request'): raw_body = await request.json() body = StreamRequestBody(**raw_body) - _validate_sdp_video_codecs(body.sdp) async with request.app['stream_lock']: # Fully disconnect any other active stream before starting the replacement. @@ -264,6 +247,12 @@ async def get_stream(request: 'web.Request'): session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode) try: answer = await session.get_answer() + except ValueError as e: + await session.stop() + raise web.HTTPBadRequest( + text=json.dumps({"error": "invalid_sdp", "message": str(e)}), + content_type="application/json", + ) from e except Exception: await session.stop() raise From d3ff632c7410fa1e51755e1d26af5b99c40e64cc Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 17:16:17 -0700 Subject: [PATCH 14/32] remove unused --- system/webrtc/webrtcd.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index d0fe7b349f0680..dd1fa18194fadb 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -219,11 +219,6 @@ class StreamRequestBody: bridge_services_out: list[str] = field(default_factory=list) -async def stream_options(request: 'web.Request'): - response = web.Response() - return response - - async def get_stream(request: 'web.Request'): logger = logging.getLogger("webrtcd") try: From 503b085a3da3bb85bec2cfd96ee4b9832e9563fb Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 19:10:05 -0700 Subject: [PATCH 15/32] remove extra try except --- system/webrtc/webrtcd.py | 78 ++++++++++++++++++---------------------- 1 file changed, 35 insertions(+), 43 deletions(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index dd1fa18194fadb..d90c9e65b3cc54 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -190,7 +190,6 @@ async def run(self): channel = self.stream.get_messaging_channel() self.outgoing_bridge_runner.proxy.add_channel(channel) self.outgoing_bridge_runner.start() - self.logger.info("Stream session (%s) connected", self.identifier) await self.stream.wait_for_disconnection() @@ -220,48 +219,41 @@ class StreamRequestBody: async def get_stream(request: 'web.Request'): - logger = logging.getLogger("webrtcd") - try: - stream_dict, debug_mode = request.app['streams'], request.app['debug'] - - raw_body = await request.json() - body = StreamRequestBody(**raw_body) - - async with request.app['stream_lock']: - # Fully disconnect any other active stream before starting the replacement. - for sid, s in list(stream_dict.items()): - if s.run_task and not s.run_task.done(): - try: - ch = s.stream.get_messaging_channel() - ch.send(json.dumps({"type": "connectionReplaced", "data": "Another device has connected, closing this session."})) - except Exception: - pass - await s.stop() - del stream_dict[sid] - - session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode) - try: - answer = await session.get_answer() - except ValueError as e: - await session.stop() - raise web.HTTPBadRequest( - text=json.dumps({"error": "invalid_sdp", "message": str(e)}), - content_type="application/json", - ) from e - except Exception: - await session.stop() - raise - session.start() - - stream_dict[session.identifier] = session - - response = web.json_response({"sdp": answer.sdp, "type": answer.type}) - return response - except web.HTTPException: - raise - except Exception: - logger.exception("Error in /stream handler") - raise + stream_dict, debug_mode = request.app['streams'], request.app['debug'] + + raw_body = await request.json() + body = StreamRequestBody(**raw_body) + + async with request.app['stream_lock']: + # Fully disconnect any other active stream before starting the replacement. + for sid, s in list(stream_dict.items()): + if s.run_task and not s.run_task.done(): + try: + ch = s.stream.get_messaging_channel() + ch.send(json.dumps({"type": "connectionReplaced", "data": "Another device has connected, closing this session."})) + except Exception: + pass + await s.stop() + del stream_dict[sid] + + session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode) + try: + answer = await session.get_answer() + except ValueError as e: + await session.stop() + raise web.HTTPBadRequest( + text=json.dumps({"error": "invalid_sdp", "message": str(e)}), + content_type="application/json", + ) from e + except Exception: + await session.stop() + raise + session.start() + + stream_dict[session.identifier] = session + + response = web.json_response({"sdp": answer.sdp, "type": answer.type}) + return response async def get_schema(request: 'web.Request'): From dfe118238ab30c629e68d6ac4e07ab7d89f11f08 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 11 May 2026 19:31:06 -0700 Subject: [PATCH 16/32] remove video_tracks unused --- system/webrtc/webrtcd.py | 1 - 1 file changed, 1 deletion(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 25dcadfc0c91df..08b26c47b585f0 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -133,7 +133,6 @@ def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], o config = parse_info_from_offer(sdp) builder = WebRTCAnswerBuilder(sdp) - self.video_tracks = [] assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks" for cam in cameras: From e860431630a7908c2a5c90433728696c86a5a843 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Tue, 12 May 2026 09:39:58 -0700 Subject: [PATCH 17/32] add back exception trace --- system/athena/athenad.py | 8 ++++---- system/webrtc/webrtcd.py | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 6b925fc2d24df3..5e21550a4e582b 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -582,10 +582,10 @@ def startJoystickStream(sdp: str) -> dict: except ValueError: resp.raise_for_status() return resp.json() - except requests.ConnectTimeout: - raise Exception("webrtc took too long to respond. is it on?") from None - except requests.ConnectionError: - raise Exception("webrtc is not running. turn on comma body ignition.") from None + except requests.ConnectTimeout as e: + raise Exception("webrtc took too long to respond. is it on?") from e + except requests.ConnectionError as e: + raise Exception("webrtc is not running. turn on comma body ignition.") from e @dispatcher.add_method diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index d90c9e65b3cc54..4f8c38fe718936 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -1,10 +1,11 @@ #!/usr/bin/env python3 + import argparse import asyncio import contextlib import json -import logging import uuid +import logging from dataclasses import dataclass, field from typing import Any, TYPE_CHECKING From ea25eac5866cb43ec59beb57458f999f05b5ae8d Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Tue, 12 May 2026 11:38:57 -0700 Subject: [PATCH 18/32] add stream road camera info to stream cameras --- system/loggerd/loggerd.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system/loggerd/loggerd.h b/system/loggerd/loggerd.h index 804550d80c772c..8c32dde89eb98b 100644 --- a/system/loggerd/loggerd.h +++ b/system/loggerd/loggerd.h @@ -154,4 +154,4 @@ const LogCameraInfo stream_driver_camera_info{ }; const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info}; -const LogCameraInfo stream_cameras_logged[] = {stream_driver_camera_info, stream_wide_road_camera_info}; +const LogCameraInfo stream_cameras_logged[] = {stream_driver_camera_info, stream_wide_road_camera_info, stream_driver_camera_info}; From 6427d17d9d9579794f1ce3c316a9dc44fcdfccaf Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Tue, 12 May 2026 11:40:08 -0700 Subject: [PATCH 19/32] fix --- system/loggerd/loggerd.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system/loggerd/loggerd.h b/system/loggerd/loggerd.h index 8c32dde89eb98b..fe09d98d9460da 100644 --- a/system/loggerd/loggerd.h +++ b/system/loggerd/loggerd.h @@ -154,4 +154,4 @@ const LogCameraInfo stream_driver_camera_info{ }; const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info}; -const LogCameraInfo stream_cameras_logged[] = {stream_driver_camera_info, stream_wide_road_camera_info, stream_driver_camera_info}; +const LogCameraInfo stream_cameras_logged[] = {stream_driver_camera_info, stream_wide_road_camera_info, stream_road_camera_info}; From d736aea4c2839e68cb800a7579a3bf0ebf367517 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Tue, 12 May 2026 11:42:59 -0700 Subject: [PATCH 20/32] add back assert --- system/webrtc/webrtcd.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 08b26c47b585f0..783ea52473fa98 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 + import time import argparse import asyncio @@ -175,6 +176,7 @@ async def get_answer(self): return await self.stream.start() def message_handler(self, message: bytes): + assert self.incoming_bridge is not None try: payload = json.loads(message) if isinstance(message, (bytes, str)) else None if isinstance(payload, dict): From cc0a8c02069e9993e9a343488e86b00ffa8836d8 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Tue, 12 May 2026 16:26:39 -0700 Subject: [PATCH 21/32] clean diff --- system/webrtc/webrtcd.py | 1 - 1 file changed, 1 deletion(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 4f8c38fe718936..eb39e82bb7553e 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -221,7 +221,6 @@ class StreamRequestBody: async def get_stream(request: 'web.Request'): stream_dict, debug_mode = request.app['streams'], request.app['debug'] - raw_body = await request.json() body = StreamRequestBody(**raw_body) From 0f38abee5d9e6f03452631516b625f70d6645781 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Mon, 18 May 2026 19:03:11 -0700 Subject: [PATCH 22/32] clean diff --- system/webrtc/webrtcd.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index eb39e82bb7553e..7a51d45034e42b 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -252,8 +252,7 @@ async def get_stream(request: 'web.Request'): stream_dict[session.identifier] = session - response = web.json_response({"sdp": answer.sdp, "type": answer.type}) - return response + return web.json_response({"sdp": answer.sdp, "type": answer.type}) async def get_schema(request: 'web.Request'): From 1c32d41b34854a8028107c4af5f9facdb22dee2c Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Thu, 21 May 2026 11:07:36 -0700 Subject: [PATCH 23/32] add testJoystick only on body --- system/athena/athenad.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 5e21550a4e582b..fc1edd712a2d21 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -569,9 +569,18 @@ def getNetworks(): @dispatcher.add_method -def startJoystickStream(sdp: str) -> dict: +def startStream(sdp: str) -> dict: from openpilot.system.webrtc.webrtcd import StreamRequestBody - body = StreamRequestBody(sdp, ["driver"], ["testJoystick"], ["carState"]) + bridge_services_in = ["livestreamCameraSwitch"] + + # get live car params to avoid stale notCar edge case + cp_bytes = Params().get("CarParams") + if cp_bytes is not None: + with car.CarParams.from_bytes(cp_bytes) as CP: + if CP.notCar: + bridge_services_in.append("testJoystick") + + body = StreamRequestBody(sdp, "driver", bridge_services_in, ["carState"]) try: resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream", json=asdict(body), timeout=10) From be391a1f1cb399acfd5054dbf8e23063728a661d Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Thu, 21 May 2026 11:15:09 -0700 Subject: [PATCH 24/32] fix camera list --- system/athena/athenad.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index fc1edd712a2d21..a97429e808ba7e 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -580,7 +580,7 @@ def startStream(sdp: str) -> dict: if CP.notCar: bridge_services_in.append("testJoystick") - body = StreamRequestBody(sdp, "driver", bridge_services_in, ["carState"]) + body = StreamRequestBody(sdp, ["driver"], bridge_services_in, ["carState"]) try: resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream", json=asdict(body), timeout=10) From 2170f6be02236d19db5b40472ab16b0349886d0e Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Thu, 21 May 2026 11:18:45 -0700 Subject: [PATCH 25/32] remove reference to future service --- system/athena/athenad.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index a97429e808ba7e..2bff7886ddd03e 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -571,7 +571,7 @@ def getNetworks(): @dispatcher.add_method def startStream(sdp: str) -> dict: from openpilot.system.webrtc.webrtcd import StreamRequestBody - bridge_services_in = ["livestreamCameraSwitch"] + bridge_services_in = [] # get live car params to avoid stale notCar edge case cp_bytes = Params().get("CarParams") From 08071fe27a748fdfa519ceaae45627573c26a6d4 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Thu, 21 May 2026 13:08:40 -0700 Subject: [PATCH 26/32] video_tracks list add back --- system/webrtc/webrtcd.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index ed57950710627a..4d976d2043eac4 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -134,9 +134,12 @@ def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], o config = parse_info_from_offer(sdp) builder = WebRTCAnswerBuilder(sdp) + self.video_tracks = [] assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks" for cam in cameras: - builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack()) + video_track = LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack() + builder.add_video_stream(cam, video_track) + self.video_tracks.append(video_track) self.stream = builder.stream() self.identifier = str(uuid.uuid4()) From 96b76fb054c7006b4c5f97b4126c9391e49366a0 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Thu, 21 May 2026 18:36:07 -0700 Subject: [PATCH 27/32] encode all cameras and swap in video track in webrtc --- cereal/log.capnp | 12 ----- cereal/services.py | 2 - system/athena/athenad.py | 2 +- system/loggerd/encoderd.cc | 57 +--------------------- system/loggerd/loggerd.h | 25 ++++++++-- system/webrtc/device/video.py | 14 +++++- system/webrtc/tests/test_stream_session.py | 2 +- system/webrtc/webrtcd.py | 9 ++-- 8 files changed, 42 insertions(+), 81 deletions(-) diff --git a/cereal/log.capnp b/cereal/log.capnp index 5b4315b7a05da2..d12cd6cdc831ae 100644 --- a/cereal/log.capnp +++ b/cereal/log.capnp @@ -2428,15 +2428,6 @@ struct AudioFeedback { blockNum @1 :UInt16; } -struct LiveStreamCamera { - camera @0 :CameraType; - - enum CameraType { - driver @0; - wideRoad @1; - } -} - struct Touch { sec @0 :Int64; usec @1 :Int64; @@ -2546,9 +2537,6 @@ struct Event { livestreamWideRoadEncodeData @121 :EncodeData; livestreamDriverEncodeData @122 :EncodeData; - livestreamCameraEncodeData @152 :EncodeData; - livestreamCameraSwitch @153 :LiveStreamCamera; - # *********** Custom: reserved for forks *********** # DO change the name of the field diff --git a/cereal/services.py b/cereal/services.py index d3dc8d4fa7e1a5..c2d38d852db133 100755 --- a/cereal/services.py +++ b/cereal/services.py @@ -77,7 +77,6 @@ def __init__(self, should_log: bool, frequency: float, decimation: Optional[int] "rawAudioData": (False, 20.), "bookmarkButton": (True, 0., 1), "audioFeedback": (True, 0., 1), - "livestreamCameraSwitch": (False, 0.), "roadEncodeData": (False, 20., None, QueueSize.BIG), "driverEncodeData": (False, 20., None, QueueSize.BIG), "wideRoadEncodeData": (False, 20., None, QueueSize.BIG), @@ -93,7 +92,6 @@ def __init__(self, should_log: bool, frequency: float, decimation: Optional[int] "livestreamWideRoadEncodeData": (False, 20., None, QueueSize.MEDIUM), "livestreamRoadEncodeData": (False, 20., None, QueueSize.MEDIUM), "livestreamDriverEncodeData": (False, 20., None, QueueSize.MEDIUM), - "livestreamCameraEncodeData": (False, 20., None, QueueSize.MEDIUM), "customReservedRawData0": (True, 0.), } SERVICE_LIST = {name: Service(*vals) for diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 8f6407c747988d..6bccaae63eb424 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -580,7 +580,7 @@ def startStream(sdp: str) -> dict: if CP.notCar: bridge_services_in.append("testJoystick") - body = StreamRequestBody(sdp, "driver", bridge_services_in, ["carState"]) + body = StreamRequestBody(sdp, "wideRoad", bridge_services_in, ["carState"]) try: resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream", json=asdict(body), timeout=10) diff --git a/system/loggerd/encoderd.cc b/system/loggerd/encoderd.cc index b00b7b66aed33a..9d4b81a3f90230 100644 --- a/system/loggerd/encoderd.cc +++ b/system/loggerd/encoderd.cc @@ -151,61 +151,6 @@ void encoderd_thread(const LogCameraInfo (&cameras)[N]) { } } -template -void stream_encoderd_thread(const LogCameraInfo (&cameras)[N]) { - while (!do_exit) { - if (!VisionIpcClient::getAvailableStreams("camerad", false).empty()) break; - util::sleep_for(100); - } - - SubMaster sm({"livestreamCameraSwitch"}); - const LogCameraInfo *active_cam = &cameras[0]; - - while (!do_exit) { - VisionIpcClient vipc_client("camerad", active_cam->stream_type, false); - if (!vipc_client.connect(false)) { - util::sleep_for(5); - continue; - } - - // init encoder - const VisionBuf &buf_info = vipc_client.buffers[0]; - LOGW("stream encoder init %zux%zu", buf_info.width, buf_info.height); - assert(buf_info.width > 0 && buf_info.height > 0); - auto encoder = std::make_unique(active_cam->encoder_infos[0], buf_info.width, buf_info.height); - encoder->encoder_open(); - - while (!do_exit) { - sm.update(0); - - // Switch camera if the request differs from the current one - if (sm.updated("livestreamCameraSwitch")) { - auto requested = sm["livestreamCameraSwitch"].getLivestreamCameraSwitch().getCamera(); - VisionStreamType requested_stream = requested == cereal::LiveStreamCamera::CameraType::DRIVER - ? VISION_STREAM_DRIVER : VISION_STREAM_WIDE_ROAD; - if (requested_stream != active_cam->stream_type) { - LOGW("stream encoder switching camera"); - auto it = std::find_if(std::begin(cameras), std::end(cameras), - [requested_stream](const auto &cam) { return cam.stream_type == requested_stream; }); - if (it != std::end(cameras)) active_cam = &(*it); - break; // reinit encoder with new camera selection - } - } - - // encode frame - VisionIpcBufExtra extra; - VisionBuf *buf = vipc_client.recv(&extra); - if (buf == nullptr) continue; - if (buf->get_frame_id() != extra.frame_id) continue; - if (encoder->encode_frame(buf, &extra) == -1) { - LOGE("stream encoder: failed to encode frame. frame_id: %d", extra.frame_id); - } - } - - encoder->encoder_close(); - } -} - int main(int argc, char* argv[]) { if (!Hardware::PC()) { int ret; @@ -217,7 +162,7 @@ int main(int argc, char* argv[]) { if (argc > 1) { std::string arg1(argv[1]); if (arg1 == "--stream") { - stream_encoderd_thread(stream_cameras_logged); + encoderd_thread(stream_cameras_logged); } else { LOGE("Argument '%s' is not supported", arg1.c_str()); } diff --git a/system/loggerd/loggerd.h b/system/loggerd/loggerd.h index fe09d98d9460da..9d0744d0d7ebf3 100644 --- a/system/loggerd/loggerd.h +++ b/system/loggerd/loggerd.h @@ -100,13 +100,28 @@ const EncoderInfo main_driver_encoder_info = { INIT_ENCODE_FUNCTIONS(DriverEncode), }; -const EncoderInfo stream_encoder_info = { - .publish_name = "livestreamCameraEncodeData", +const EncoderInfo stream_road_encoder_info = { + .publish_name = "livestreamRoadEncodeData", + //.thumbnail_name = "thumbnail", + .record = false, + .get_settings = [](int){return EncoderSettings::StreamEncoderSettings();}, + INIT_ENCODE_FUNCTIONS(LivestreamRoadEncode), +}; + +const EncoderInfo stream_wide_road_encoder_info = { + .publish_name = "livestreamWideRoadEncodeData", .record = false, .get_settings = [](int){return EncoderSettings::StreamEncoderSettings();}, INIT_ENCODE_FUNCTIONS(LivestreamWideRoadEncode), }; +const EncoderInfo stream_driver_encoder_info = { + .publish_name = "livestreamDriverEncodeData", + .record = false, + .get_settings = [](int){return EncoderSettings::StreamEncoderSettings();}, + INIT_ENCODE_FUNCTIONS(LivestreamDriverEncode), +}; + const EncoderInfo qcam_encoder_info = { .publish_name = "qRoadEncodeData", .filename = "qcamera.ts", @@ -138,19 +153,19 @@ const LogCameraInfo driver_camera_info{ const LogCameraInfo stream_road_camera_info{ .thread_name = "road_cam_encoder", .stream_type = VISION_STREAM_ROAD, - .encoder_infos = {stream_encoder_info} + .encoder_infos = {stream_road_encoder_info} }; const LogCameraInfo stream_wide_road_camera_info{ .thread_name = "wide_road_cam_encoder", .stream_type = VISION_STREAM_WIDE_ROAD, - .encoder_infos = {stream_encoder_info} + .encoder_infos = {stream_wide_road_encoder_info} }; const LogCameraInfo stream_driver_camera_info{ .thread_name = "driver_cam_encoder", .stream_type = VISION_STREAM_DRIVER, - .encoder_infos = {stream_encoder_info} + .encoder_infos = {stream_driver_encoder_info} }; const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info}; diff --git a/system/webrtc/device/video.py b/system/webrtc/device/video.py index 691f3f618736c2..b46d0222030794 100644 --- a/system/webrtc/device/video.py +++ b/system/webrtc/device/video.py @@ -9,14 +9,26 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack): + camera_to_sock_mapping = { + "driver": "livestreamDriverEncodeData", + "wideRoad": "livestreamWideRoadEncodeData", + "road": "livestreamRoadEncodeData", + } + def __init__(self, camera_type: str): dt = DT_DMON if camera_type == "driver" else DT_MDL super().__init__(camera_type, dt) - self._sock = messaging.sub_sock("livestreamCameraEncodeData", conflate=True) + self._sock = self._make_sock(camera_type) self._pts = 0 self._t0_ns = time.monotonic_ns() + def _make_sock(self, camera_type: str) -> messaging.SubSocket: + return messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True) + + def switch_camera(self, camera_type: str) -> None: + self._sock = self._make_sock(camera_type) + async def recv(self): while True: msg = messaging.recv_one_or_none(self._sock) diff --git a/system/webrtc/tests/test_stream_session.py b/system/webrtc/tests/test_stream_session.py index 9730f9e16e3d07..f44d217d58ced6 100644 --- a/system/webrtc/tests/test_stream_session.py +++ b/system/webrtc/tests/test_stream_session.py @@ -67,7 +67,7 @@ def test_incoming_proxy(self, mocker): mocked_pubmaster.reset_mock() def test_livestream_track(self, mocker): - fake_msg = messaging.new_message("livestreamCameraEncodeData") + fake_msg = messaging.new_message("livestreamDriverEncodeData") config = {"receive.return_value": fake_msg.to_bytes()} mocker.patch("msgq.SubSocket", spec=True, **config) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index d4d52aebf07386..59b733420a03c7 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -131,7 +131,8 @@ def __init__(self, sdp: str, init_camera: str, incoming_services: list[str], out builder = WebRTCAnswerBuilder(sdp) - builder.add_video_stream(init_camera, LiveStreamVideoStreamTrack(init_camera) if not debug_mode else VideoStreamTrack()) + self.video_track = LiveStreamVideoStreamTrack(init_camera) if not debug_mode else VideoStreamTrack() + builder.add_video_stream(init_camera, self.video_track) self.stream = builder.stream() self.identifier = str(uuid.uuid4()) @@ -173,6 +174,10 @@ async def get_answer(self): def message_handler(self, message: bytes): assert self.incoming_bridge is not None try: + msg_json = json.loads(message) + if msg_json.get("type") == "livestreamCameraSwitch" and hasattr(self.video_track, "switch_camera"): + self.video_track.switch_camera(msg_json["data"]["camera"]) + return self.incoming_bridge.send(message) except Exception: self.logger.exception("Cereal incoming proxy failure") @@ -183,8 +188,6 @@ async def run(self): if self.stream.has_messaging_channel(): if self.incoming_bridge is not None: await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services) - if "livestreamCameraSwitch" in self.incoming_bridge_services: - self.incoming_bridge.send(json.dumps({"type": "livestreamCameraSwitch", "data": {"camera": self.init_camera}}).encode()) self.stream.set_message_handler(self.message_handler) if self.outgoing_bridge_runner is not None: channel = self.stream.get_messaging_channel() From fc3bdac47fffb2284824eaae1438a344d2c232b8 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Thu, 21 May 2026 19:04:52 -0700 Subject: [PATCH 28/32] clean --- system/loggerd/loggerd.h | 2 +- system/webrtc/webrtcd.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/system/loggerd/loggerd.h b/system/loggerd/loggerd.h index 9d0744d0d7ebf3..01bce2c9ea7e7b 100644 --- a/system/loggerd/loggerd.h +++ b/system/loggerd/loggerd.h @@ -169,4 +169,4 @@ const LogCameraInfo stream_driver_camera_info{ }; const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info}; -const LogCameraInfo stream_cameras_logged[] = {stream_driver_camera_info, stream_wide_road_camera_info, stream_road_camera_info}; +const LogCameraInfo stream_cameras_logged[] = {stream_road_camera_info, stream_wide_road_camera_info, stream_driver_camera_info}; diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 59b733420a03c7..91d35556d7b55c 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -136,7 +136,6 @@ def __init__(self, sdp: str, init_camera: str, incoming_services: list[str], out self.stream = builder.stream() self.identifier = str(uuid.uuid4()) - self.init_camera = init_camera self.incoming_bridge: CerealIncomingMessageProxy | None = None self.incoming_bridge_services = incoming_services From 2cdd6f65a7ee331a984d167cd70ee08c90fd5546 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Thu, 21 May 2026 19:15:59 -0700 Subject: [PATCH 29/32] explicitly gate bridge send --- system/webrtc/webrtcd.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 91d35556d7b55c..1caf3ccf5120c3 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -142,7 +142,7 @@ def __init__(self, sdp: str, init_camera: str, incoming_services: list[str], out self.outgoing_bridge: CerealOutgoingMessageProxy | None = None self.outgoing_bridge_runner: CerealProxyRunner | None = None if len(incoming_services) > 0: - self.incoming_bridge = CerealIncomingMessageProxy(self.shared_pub_master) + self.incoming_bridge = CerealIncomingMessageProxy(self.shared_pub_master, incoming_services) if len(outgoing_services) > 0: self.outgoing_bridge = CerealOutgoingMessageProxy(messaging.SubMaster(outgoing_services)) self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge) @@ -177,6 +177,9 @@ def message_handler(self, message: bytes): if msg_json.get("type") == "livestreamCameraSwitch" and hasattr(self.video_track, "switch_camera"): self.video_track.switch_camera(msg_json["data"]["camera"]) return + + if msg_json.get("type") not in self.incoming_bridge_services: + return self.incoming_bridge.send(message) except Exception: self.logger.exception("Cereal incoming proxy failure") From 207fefe3c4ce3de0829b1dd8ceceea6bfa087b49 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Thu, 21 May 2026 19:22:13 -0700 Subject: [PATCH 30/32] clean leftover --- system/webrtc/webrtcd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 1caf3ccf5120c3..1239a1759cdea2 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -142,7 +142,7 @@ def __init__(self, sdp: str, init_camera: str, incoming_services: list[str], out self.outgoing_bridge: CerealOutgoingMessageProxy | None = None self.outgoing_bridge_runner: CerealProxyRunner | None = None if len(incoming_services) > 0: - self.incoming_bridge = CerealIncomingMessageProxy(self.shared_pub_master, incoming_services) + self.incoming_bridge = CerealIncomingMessageProxy(self.shared_pub_master) if len(outgoing_services) > 0: self.outgoing_bridge = CerealOutgoingMessageProxy(messaging.SubMaster(outgoing_services)) self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge) From 1d2c44f74a6d21ce070a47f97331853492f39bcf Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Fri, 22 May 2026 10:46:44 -0700 Subject: [PATCH 31/32] rearrange video.py --- system/webrtc/device/video.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/system/webrtc/device/video.py b/system/webrtc/device/video.py index 3ba4a93e9c2256..8a07c5b9be9c92 100644 --- a/system/webrtc/device/video.py +++ b/system/webrtc/device/video.py @@ -32,6 +32,12 @@ def __init__(self, camera_type: str): self._t0_ns = time.monotonic_ns() self.timing_sei_enabled = False + def _make_sock(self, camera_type: str) -> messaging.SubSocket: + return messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True) + + def switch_camera(self, camera_type: str) -> None: + self._sock = self._make_sock(camera_type) + def _build_frame_data(self, msg) -> bytes: encode_data = getattr(msg, msg.which()) if not self.timing_sei_enabled: @@ -46,12 +52,6 @@ def _build_frame_data(self, msg) -> bytes: ) + b'\x80' return encode_data.header + sei_nal + encode_data.data - def _make_sock(self, camera_type: str) -> messaging.SubSocket: - return messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True) - - def switch_camera(self, camera_type: str) -> None: - self._sock = self._make_sock(camera_type) - async def recv(self): while True: msg = messaging.recv_one_or_none(self._sock) From 33c83cdf520d2fbdb144a6af45d4995172dee4c9 Mon Sep 17 00:00:00 2001 From: stefpi <19478336+stefpi@users.noreply.github.com> Date: Fri, 22 May 2026 19:23:43 -0700 Subject: [PATCH 32/32] fix lint --- system/webrtc/device/video.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/system/webrtc/device/video.py b/system/webrtc/device/video.py index e790a58f994ba1..8a07c5b9be9c92 100644 --- a/system/webrtc/device/video.py +++ b/system/webrtc/device/video.py @@ -52,12 +52,6 @@ def _build_frame_data(self, msg) -> bytes: ) + b'\x80' return encode_data.header + sei_nal + encode_data.data - def _make_sock(self, camera_type: str) -> messaging.SubSocket: - return messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True) - - def switch_camera(self, camera_type: str) -> None: - self._sock = self._make_sock(camera_type) - async def recv(self): while True: msg = messaging.recv_one_or_none(self._sock)