ria-toolkit-oss/tests/agent/test_disconnect.py
2026-04-13 11:48:15 -04:00

82 lines
2.3 KiB
Python

"""SdrDisconnectedError translation + streamer handling."""
from __future__ import annotations
import asyncio
from ria_toolkit_oss.agent.streamer import Streamer
from ria_toolkit_oss.sdr import SdrDisconnectedError
from ria_toolkit_oss.sdr.sdr import translate_disconnect
def test_translate_disconnect_usb_message():
exc = RuntimeError("libiio: Input/output error (errno 5)")
out = translate_disconnect(exc)
assert isinstance(out, SdrDisconnectedError)
def test_translate_disconnect_enodev_oserror():
exc = OSError(19, "No such device")
assert isinstance(translate_disconnect(exc), SdrDisconnectedError)
def test_translate_disconnect_passes_through_unrelated():
exc = ValueError("bad sample rate")
assert translate_disconnect(exc) is exc
def test_translate_disconnect_preserves_sdr_disconnected():
original = SdrDisconnectedError("already typed")
assert translate_disconnect(original) is original
class _FlakySdr:
"""SDR that raises SdrDisconnectedError on the first rx() call."""
def __init__(self) -> None:
self.closed = False
def rx(self, n): # noqa: D401 - trivial
raise SdrDisconnectedError("usb gone")
def close(self):
self.closed = True
class _Ws:
def __init__(self):
self.json_sent = []
self.bytes_sent = []
async def send_json(self, p):
self.json_sent.append(p)
async def send_bytes(self, b):
self.bytes_sent.append(b)
def test_streamer_reports_disconnected_and_ends_capture():
async def scenario():
ws = _Ws()
sdr = _FlakySdr()
streamer = Streamer(ws=ws, sdr_factory=lambda d, i: sdr)
await streamer.on_message(
{
"type": "start",
"app_id": "a",
"radio_config": {"device": "fake", "buffer_size": 8},
}
)
# Wait for the capture task to fail out.
for _ in range(50):
if streamer._capture_task and streamer._capture_task.done():
break
await asyncio.sleep(0.01)
return ws, sdr, streamer
ws, sdr, streamer = asyncio.run(scenario())
assert sdr.closed
errors = [m for m in ws.json_sent if m.get("type") == "error"]
assert errors, "expected an error frame"
assert "disconnected" in errors[-1]["message"].lower()