"""End-to-end: local websockets server drives a Streamer's TX path.""" from __future__ import annotations import asyncio import json import time import numpy as np import websockets from ria_toolkit_oss.agent.config import AgentConfig from ria_toolkit_oss.agent.streamer import Streamer from ria_toolkit_oss.agent.ws_client import WsClient from ria_toolkit_oss.sdr.mock import MockSDR class RecordingMockSDR(MockSDR): def __init__(self, buffer_size: int): super().__init__(buffer_size=buffer_size) self.tx_produced: list[np.ndarray] = [] def _stream_tx(self, callback): self._enable_tx = True self._tx_initialized = True while self._enable_tx: result = callback(self.rx_buffer_size) self.tx_produced.append(np.asarray(result).copy()) time.sleep(0.005) def _iq_frame(samples: np.ndarray) -> bytes: interleaved = np.empty(samples.size * 2, dtype=np.float32) interleaved[0::2] = samples.real interleaved[1::2] = samples.imag return interleaved.tobytes() def test_server_tx_start_binary_stop_cycle_over_real_ws(): BUF = 16 sdr = RecordingMockSDR(buffer_size=BUF) marker = np.arange(BUF, dtype=np.complex64) + 1 async def scenario(): control_frames: list[dict] = [] done = asyncio.Event() async def server_handler(ws): try: # Drain initial heartbeat. first = await asyncio.wait_for(ws.recv(), timeout=2.0) control_frames.append(json.loads(first)) await ws.send( json.dumps( { "type": "tx_start", "app_id": "tx-app", "radio_config": { "device": "mock", "buffer_size": BUF, "tx_sample_rate": 1_000_000, "tx_center_frequency": 2.45e9, "tx_gain": -20, "underrun_policy": "zero", }, } ) ) # Push a few binary IQ frames. for _ in range(3): await ws.send(_iq_frame(marker)) # Wait for at least "armed" + "transmitting" statuses. for _ in range(100): msg = await asyncio.wait_for(ws.recv(), timeout=2.0) if isinstance(msg, str): control_frames.append(json.loads(msg)) if any(f.get("type") == "tx_status" and f.get("state") == "transmitting" for f in control_frames): break await ws.send(json.dumps({"type": "tx_stop", "app_id": "tx-app"})) # Drain trailing statuses. try: while True: msg = await asyncio.wait_for(ws.recv(), timeout=0.5) if isinstance(msg, str): control_frames.append(json.loads(msg)) except (asyncio.TimeoutError, Exception): pass finally: done.set() server = await websockets.serve(server_handler, "127.0.0.1", 0) port = server.sockets[0].getsockname()[1] try: client = WsClient( f"ws://127.0.0.1:{port}", token="", heartbeat_interval=10.0, reconnect_pause=0.05, ) streamer = Streamer( ws=client, sdr_factory=lambda d, i: sdr, cfg=AgentConfig(tx_enabled=True), ) task = asyncio.create_task( client.run( on_message=streamer.on_message, heartbeat=streamer.build_heartbeat, on_binary=streamer.on_binary, ) ) await asyncio.wait_for(done.wait(), timeout=5.0) client.stop() task.cancel() try: await task except (asyncio.CancelledError, Exception): pass finally: server.close() await server.wait_closed() return control_frames, streamer controls, streamer = asyncio.run(scenario()) # Heartbeat reached the server. assert any(f.get("type") == "heartbeat" for f in controls) # tx_status lifecycle: armed → transmitting → done. tx_states = [f["state"] for f in controls if f.get("type") == "tx_status"] assert tx_states[0] == "armed" assert "transmitting" in tx_states assert tx_states[-1] == "done" # TX callback saw our marker buffer at least once. assert any(np.array_equal(b, marker) for b in sdr.tx_produced) # Session cleared. assert streamer._tx is None