#!/usr/bin/env python3 """Full-stack TX smoke test: localhost mock-hub → WS → agent → real Pluto. Same radio output as ``pluto_tx_smoke.py`` (continuous tone at 2 450.1 MHz), but drives the agent through the *real* WebSocket path instead of calling handlers in-process. Proves that the hub-driven path behaves identically: mock hub ── ws:// ──▶ WsClient.run() ──▶ Streamer.on_message └▶ Streamer.on_binary │ ▼ real Pluto This is the most rigorous check short of pointing the real ``ria-agent stream`` at a live ria-hub. If a tone appears on the spectrum analyzer here but *not* when ria-hub drives it, the fault is above the WS decoder (registration, capability gate, TX operator, hub's binary-frame publisher); everything downstream of ``ws.recv()`` is this script's code path. Usage:: python3 scripts/pluto_tx_ws_smoke.py # default 30s tone python3 scripts/pluto_tx_ws_smoke.py --identifier 192.168.3.1 python3 scripts/pluto_tx_ws_smoke.py --duration 0 # until Ctrl-C """ from __future__ import annotations import argparse import asyncio import json import logging import signal import sys import numpy as np import websockets from ria_toolkit_oss.agent.config import AgentConfig from ria_toolkit_oss.agent.streamer import Streamer from ria_toolkit_oss.agent.ws_client import WsClient def _make_iq_frame(buffer_size: int, tone_hz: float, sample_rate: float, phase_offset: float) -> tuple[bytes, float]: n = np.arange(buffer_size, dtype=np.float64) phase = 2.0 * np.pi * tone_hz / sample_rate * n + phase_offset amp = 0.7 iq = (amp * (np.cos(phase) + 1j * np.sin(phase))).astype(np.complex64) interleaved = np.empty(buffer_size * 2, dtype=np.float32) interleaved[0::2] = iq.real interleaved[1::2] = iq.imag next_phase = (2.0 * np.pi * tone_hz / sample_rate * buffer_size + phase_offset) % (2.0 * np.pi) return interleaved.tobytes(), next_phase def _make_pluto_factory(identifier: str | None): def factory(device: str, _ident: str | None): if device != "pluto": raise ValueError(f"this script only drives pluto; got device={device!r}") from ria_toolkit_oss.sdr.pluto import Pluto return Pluto(identifier=identifier) return factory async def _mock_hub_handler(ws, args, stop: asyncio.Event): """Server side of the WS. Sends tx_start, streams IQ, then tx_stop.""" # Drain the first heartbeat so the log is clean; we don't need to gate on # it for a localhost smoke test. try: first = await asyncio.wait_for(ws.recv(), timeout=2.0) if isinstance(first, str): payload = json.loads(first) if payload.get("type") == "heartbeat": caps = payload.get("capabilities") print(f"[mock-hub] agent heartbeat: capabilities={caps} " f"tx_enabled={payload.get('tx_enabled')}") except asyncio.TimeoutError: print("[mock-hub] warning: no heartbeat received in first 2s") # Arm the agent's TX path. await ws.send( json.dumps( { "type": "tx_start", "app_id": "ws-smoke", "radio_config": { "device": "pluto", "identifier": args.identifier, "tx_sample_rate": int(args.sample_rate), "tx_center_frequency": int(args.frequency), "tx_gain": int(args.gain), "buffer_size": int(args.buffer_size), "underrun_policy": "repeat", }, } ) ) print(f"[mock-hub] sent tx_start at {args.frequency/1e6:.3f} MHz, " f"gain={args.gain} dB") # Producer: push IQ frames at a steady clip. Use a concurrent receiver so # tx_status frames show up in real time rather than being queued behind # the sends. phase = 0.0 buffer_dt = args.buffer_size / args.sample_rate async def receiver(): try: while True: msg = await ws.recv() if isinstance(msg, str): print(f"[mock-hub] ← {msg}") except (websockets.ConnectionClosed, asyncio.CancelledError): pass recv_task = asyncio.create_task(receiver()) try: deadline = None if args.duration <= 0 else (asyncio.get_event_loop().time() + args.duration) while not stop.is_set(): if deadline is not None and asyncio.get_event_loop().time() >= deadline: break frame, phase = _make_iq_frame(args.buffer_size, args.tone, args.sample_rate, phase) try: await ws.send(frame) except websockets.ConnectionClosed: break # Slightly ahead of real-time; WS backpressure handles the rest. await asyncio.sleep(buffer_dt * 0.5) finally: try: await ws.send(json.dumps({"type": "tx_stop", "app_id": "ws-smoke"})) print("[mock-hub] sent tx_stop") except websockets.ConnectionClosed: pass # Give the agent a moment to emit `tx_status: done` before we tear down. await asyncio.sleep(0.3) recv_task.cancel() try: await recv_task except (asyncio.CancelledError, Exception): pass async def _run(args: argparse.Namespace) -> int: stop = asyncio.Event() loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): try: loop.add_signal_handler(sig, stop.set) except NotImplementedError: pass # Start the mock hub on a local port. async def handler(ws): try: await _mock_hub_handler(ws, args, stop) finally: stop.set() server = await websockets.serve(handler, "127.0.0.1", 0) port = server.sockets[0].getsockname()[1] print(f"[mock-hub] listening on ws://127.0.0.1:{port}") # Run the agent — exactly as ``ria-agent stream`` would, just with a # different URL and an in-memory AgentConfig instead of one loaded from # ``~/.ria/agent.json``. client = WsClient( f"ws://127.0.0.1:{port}", token="", heartbeat_interval=5.0, reconnect_pause=0.5, ) streamer = Streamer( ws=client, sdr_factory=_make_pluto_factory(args.identifier), cfg=AgentConfig(tx_enabled=True, tx_max_gain_db=0.0), ) client_task = asyncio.create_task( client.run( on_message=streamer.on_message, heartbeat=streamer.build_heartbeat, on_binary=streamer.on_binary, ) ) try: await stop.wait() finally: client.stop() client_task.cancel() try: await client_task except (asyncio.CancelledError, Exception): pass server.close() await server.wait_closed() print("Done.") return 0 def main() -> int: p = argparse.ArgumentParser( description="Full-stack TX smoke: localhost mock-hub → WS → agent → Pluto.", ) p.add_argument("--identifier", default=None, help="Pluto IP/hostname (default: auto-discover pluto.local)") p.add_argument("--frequency", type=float, default=2_450_000_000.0, help="TX LO in Hz (default 2.45 GHz)") p.add_argument("--gain", type=float, default=0.0, help="TX gain in dB; Pluto range [-89, 0] (default 0)") p.add_argument("--sample-rate", type=float, default=1_000_000.0, help="Baseband sample rate (default 1 Msps)") p.add_argument("--tone", type=float, default=100_000.0, help="Baseband tone offset in Hz (default 100 kHz)") p.add_argument("--buffer-size", type=int, default=4096, help="Complex samples per frame (default 4096)") p.add_argument( "--duration", type=float, default=30.0, help="Seconds to transmit; 0 = run until Ctrl-C (default 30)" ) p.add_argument("--log-level", default="INFO") args = p.parse_args() logging.basicConfig( level=getattr(logging, args.log_level.upper(), logging.INFO), format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) try: return asyncio.run(_run(args)) except KeyboardInterrupt: return 130 if __name__ == "__main__": sys.exit(main())