"""Binary-frame delivery on the hub → agent WebSocket. Named to match the test matrix in ``Agent TX Streaming Handoff.md`` §A7. Exercises: - Binary frames are forwarded to an ``on_binary`` coroutine when supplied. - Binary frames are silently dropped (no crash) when ``on_binary`` is omitted, preserving the pre-TX behavior for RX-only deployments. """ from __future__ import annotations import asyncio import json import websockets from ria_toolkit_oss.agent.ws_client import WsClient async def _open_server(handler): server = await websockets.serve(handler, "127.0.0.1", 0) port = server.sockets[0].getsockname()[1] return server, port def test_binary_frame_forwarded_to_handler(): payload = bytes(range(128)) async def scenario(): received: list[bytes] = [] done = asyncio.Event() async def handler(ws): await ws.send(payload) done.set() try: await ws.wait_closed() except Exception: pass server, port = await _open_server(handler) try: client = WsClient( f"ws://127.0.0.1:{port}", token="", heartbeat_interval=10.0, reconnect_pause=0.05, ) async def on_bin(data): received.append(data) task = asyncio.create_task( client.run( on_message=lambda _m: asyncio.sleep(0), heartbeat=lambda: {"type": "heartbeat"}, on_binary=on_bin, ) ) for _ in range(50): if received: break await asyncio.sleep(0.02) client.stop() task.cancel() try: await task except (asyncio.CancelledError, Exception): pass finally: server.close() await server.wait_closed() return received received = asyncio.run(scenario()) assert received == [payload] def test_binary_frame_dropped_when_no_handler(): async def scenario(): crashes: list[Exception] = [] async def handler(ws): await ws.send(b"\x00\x01\x02\x03") await ws.send(json.dumps({"type": "ping"})) try: await ws.wait_closed() except Exception: pass messages: list[dict] = [] server, port = await _open_server(handler) try: client = WsClient( f"ws://127.0.0.1:{port}", token="", heartbeat_interval=10.0, reconnect_pause=0.05, ) async def on_msg(m): messages.append(m) task = asyncio.create_task(client.run(on_message=on_msg, heartbeat=lambda: {"type": "heartbeat"})) for _ in range(50): if messages: break await asyncio.sleep(0.02) client.stop() task.cancel() try: await task except (asyncio.CancelledError, Exception) as exc: crashes.append(exc) finally: server.close() await server.wait_closed() return messages, crashes messages, _ = asyncio.run(scenario()) assert messages and messages[0] == {"type": "ping"} def test_on_binary_exception_does_not_kill_connection(): """A buggy ``on_binary`` raises mid-stream; the WS loop keeps accepting frames.""" async def scenario(): delivered_binary = 0 delivered_control: list[dict] = [] async def handler(ws): await ws.send(b"\x10\x20\x30") await ws.send(b"\x40\x50\x60") await ws.send(json.dumps({"type": "ping"})) try: await ws.wait_closed() except Exception: pass server, port = await _open_server(handler) try: client = WsClient( f"ws://127.0.0.1:{port}", token="", heartbeat_interval=10.0, reconnect_pause=0.05, ) async def on_bin(data): nonlocal delivered_binary delivered_binary += 1 raise RuntimeError("handler broke") async def on_msg(m): delivered_control.append(m) task = asyncio.create_task( client.run( on_message=on_msg, heartbeat=lambda: {"type": "heartbeat"}, on_binary=on_bin, ) ) for _ in range(60): if delivered_control: break await asyncio.sleep(0.02) client.stop() task.cancel() try: await task except (asyncio.CancelledError, Exception): pass finally: server.close() await server.wait_closed() return delivered_binary, delivered_control bins, ctrls = asyncio.run(scenario()) # Both binary frames were delivered to the (crashing) handler. assert bins == 2 # The subsequent JSON frame still arrived — loop didn't die on the exceptions. assert ctrls and ctrls[0] == {"type": "ping"}