ria-toolkit-oss/tests/agent/test_integration_tx.py
ben 22b035dbee
Some checks failed
Build Sphinx Docs Set / Build Docs (pull_request) Has been cancelled
Test with tox / Test with tox (3.10) (pull_request) Has been cancelled
Test with tox / Test with tox (3.11) (pull_request) Has been cancelled
Test with tox / Test with tox (3.12) (pull_request) Has been cancelled
Build Project / Build Project (3.12) (pull_request) Has been cancelled
Build Project / Build Project (3.11) (pull_request) Has been cancelled
Build Project / Build Project (3.10) (pull_request) Has been cancelled
format fixes
2026-04-20 13:51:15 -04:00

142 lines
4.9 KiB
Python

"""End-to-end: local websockets server drives a Streamer's TX path."""
from __future__ import annotations
import asyncio
import json
import time
import numpy as np
import websockets
from ria_toolkit_oss.agent.config import AgentConfig
from ria_toolkit_oss.agent.streamer import Streamer
from ria_toolkit_oss.agent.ws_client import WsClient
from ria_toolkit_oss.sdr.mock import MockSDR
class RecordingMockSDR(MockSDR):
def __init__(self, buffer_size: int):
super().__init__(buffer_size=buffer_size)
self.tx_produced: list[np.ndarray] = []
def _stream_tx(self, callback):
self._enable_tx = True
self._tx_initialized = True
while self._enable_tx:
result = callback(self.rx_buffer_size)
self.tx_produced.append(np.asarray(result).copy())
time.sleep(0.005)
def _iq_frame(samples: np.ndarray) -> bytes:
interleaved = np.empty(samples.size * 2, dtype=np.float32)
interleaved[0::2] = samples.real
interleaved[1::2] = samples.imag
return interleaved.tobytes()
def test_server_tx_start_binary_stop_cycle_over_real_ws():
BUF = 16
sdr = RecordingMockSDR(buffer_size=BUF)
marker = np.arange(BUF, dtype=np.complex64) + 1
async def scenario():
control_frames: list[dict] = []
done = asyncio.Event()
async def server_handler(ws):
try:
# Drain initial heartbeat.
first = await asyncio.wait_for(ws.recv(), timeout=2.0)
control_frames.append(json.loads(first))
await ws.send(
json.dumps(
{
"type": "tx_start",
"app_id": "tx-app",
"radio_config": {
"device": "mock",
"buffer_size": BUF,
"tx_sample_rate": 1_000_000,
"tx_center_frequency": 2.45e9,
"tx_gain": -20,
"underrun_policy": "zero",
},
}
)
)
# Push a few binary IQ frames.
for _ in range(3):
await ws.send(_iq_frame(marker))
# Wait for at least "armed" + "transmitting" statuses.
for _ in range(100):
msg = await asyncio.wait_for(ws.recv(), timeout=2.0)
if isinstance(msg, str):
control_frames.append(json.loads(msg))
if any(f.get("type") == "tx_status" and f.get("state") == "transmitting" for f in control_frames):
break
await ws.send(json.dumps({"type": "tx_stop", "app_id": "tx-app"}))
# Drain trailing statuses.
try:
while True:
msg = await asyncio.wait_for(ws.recv(), timeout=0.5)
if isinstance(msg, str):
control_frames.append(json.loads(msg))
except (asyncio.TimeoutError, Exception):
pass
finally:
done.set()
server = await websockets.serve(server_handler, "127.0.0.1", 0)
port = server.sockets[0].getsockname()[1]
try:
client = WsClient(
f"ws://127.0.0.1:{port}",
token="",
heartbeat_interval=10.0,
reconnect_pause=0.05,
)
streamer = Streamer(
ws=client,
sdr_factory=lambda d, i: sdr,
cfg=AgentConfig(tx_enabled=True),
)
task = asyncio.create_task(
client.run(
on_message=streamer.on_message,
heartbeat=streamer.build_heartbeat,
on_binary=streamer.on_binary,
)
)
await asyncio.wait_for(done.wait(), timeout=5.0)
client.stop()
task.cancel()
try:
await task
except (asyncio.CancelledError, Exception):
pass
finally:
server.close()
await server.wait_closed()
return control_frames, streamer
controls, streamer = asyncio.run(scenario())
# Heartbeat reached the server.
assert any(f.get("type") == "heartbeat" for f in controls)
# tx_status lifecycle: armed → transmitting → done.
tx_states = [f["state"] for f in controls if f.get("type") == "tx_status"]
assert tx_states[0] == "armed"
assert "transmitting" in tx_states
assert tx_states[-1] == "done"
# TX callback saw our marker buffer at least once.
assert any(np.array_equal(b, marker) for b in sdr.tx_produced)
# Session cleared.
assert streamer._tx is None