#!/usr/bin/env python3 """Transmit a continuous tone through the agent's TX pipeline on a real Pluto. End-to-end smoke test for the Pluto + Streamer TX path. Drives the same ``Streamer`` the hub talks to, but in-process with a logging ``FakeWs`` so the script is self-contained — no hub required. Default: 100 kHz baseband tone × 2 450 MHz LO → carrier at 2 450.1 MHz, continuous until you Ctrl-C (or the ``--duration`` timer fires). A spectrum analyzer tuned to 2 450.1 MHz should show a clean CW spike as long as ``tx_status: transmitting`` prints. Usage:: python3 scripts/pluto_tx_smoke.py # auto-discover Pluto python3 scripts/pluto_tx_smoke.py --identifier 192.168.3.1 python3 scripts/pluto_tx_smoke.py --frequency 2.4e9 --gain -20 --duration 60 Flags map 1:1 onto the agent's ``radio_config``: --identifier Pluto IP or hostname (omitted → ip:pluto.local). --frequency TX LO in Hz. Default 2 450 MHz. --gain Pluto TX gain in dB. Pluto range is ``[-89, 0]``; more negative = more attenuation = less power. Default -30. --sample-rate Baseband sample rate. Default 1 MHz. --tone Baseband tone offset in Hz. Default 100 kHz; set 0 for DC (unmodulated carrier at exactly --frequency, but Pluto's LO leakage will dominate). --buffer-size Complex samples per WS frame. Default 4096. --duration Stop after this many seconds (0 = run until Ctrl-C). """ from __future__ import annotations import argparse import asyncio import logging import signal import sys import numpy as np from ria_toolkit_oss.agent.config import AgentConfig from ria_toolkit_oss.agent.streamer import Streamer class LoggingFakeWs: """In-process stand-in for the hub's WebSocket. Prints every ``tx_status`` + ``error`` frame the Streamer emits so the operator can watch the lifecycle (armed → transmitting → done) on stdout. """ async def send_json(self, payload: dict) -> None: t = payload.get("type") if t == "tx_status": state = payload.get("state") msg = payload.get("message") tail = f" — {msg}" if msg else "" print(f"[tx_status] {state}{tail}") elif t == "error": print(f"[error] {payload.get('message')}") async def send_bytes(self, data: bytes) -> None: # Agent side won't send RX bytes in this script (no RX session). pass def _make_iq_frame(buffer_size: int, tone_hz: float, sample_rate: float, phase_offset: float = 0.0) -> tuple[bytes, float]: """Return ``(interleaved_float32_bytes, next_phase)`` for a sine tone. Emitting one continuous phase-coherent tone requires threading the phase across frames; the returned ``next_phase`` should be fed back as ``phase_offset`` on the next call so the sinusoid doesn't glitch at frame boundaries. Amplitude is 0.7 to leave some headroom below the [-1, 1] cap that ``_verify_sample_format`` polices elsewhere in the toolkit. """ n = np.arange(buffer_size, dtype=np.float64) phase = 2.0 * np.pi * tone_hz / sample_rate * n + phase_offset amp = 0.7 iq = amp * (np.cos(phase) + 1j * np.sin(phase)) iq = iq.astype(np.complex64) interleaved = np.empty(buffer_size * 2, dtype=np.float32) interleaved[0::2] = iq.real interleaved[1::2] = iq.imag next_phase = (2.0 * np.pi * tone_hz / sample_rate * buffer_size + phase_offset) % (2.0 * np.pi) return interleaved.tobytes(), next_phase def _make_pluto_factory(identifier: str | None): def factory(device: str, _ident: str | None): if device != "pluto": raise ValueError(f"this script only drives pluto; got device={device!r}") from ria_toolkit_oss.sdr.pluto import Pluto return Pluto(identifier=identifier) return factory async def _run(args: argparse.Namespace) -> int: ws = LoggingFakeWs() cfg = AgentConfig( tx_enabled=True, # Pluto's TX gain range is [-89, 0]. Cap at 0 so a fat-fingered # --gain=+5 still gets rejected at the agent boundary rather than # turned into mystery attenuation by Pluto's setter. tx_max_gain_db=0.0, tx_max_duration_s=float(args.duration) if args.duration > 0 else None, ) streamer = Streamer(ws=ws, sdr_factory=_make_pluto_factory(args.identifier), cfg=cfg) await streamer.on_message( { "type": "tx_start", "app_id": "smoke", "radio_config": { "device": "pluto", "identifier": args.identifier, "tx_sample_rate": int(args.sample_rate), "tx_center_frequency": int(args.frequency), "tx_gain": int(args.gain), "buffer_size": int(args.buffer_size), # "repeat" keeps the last buffer on the air if we ever stall, # so a continuous carrier stays up even when Python GC or # asyncio scheduling briefly pauses the producer. "underrun_policy": "repeat", }, } ) # Abort if tx_start was rejected by an interlock (no session → nothing to do). if streamer._tx is None: print("tx_start rejected — see [tx_status] line above for the reason.", file=sys.stderr) return 2 print(f"Transmitting at {args.frequency/1e6:.3f} MHz with " f"{args.tone/1e3:.1f} kHz baseband tone at gain {args.gain} dB. " f"{'Running for ' + str(args.duration) + 's' if args.duration > 0 else 'Run until Ctrl-C'}.") # Arrange a clean shutdown on Ctrl-C. stop = asyncio.Event() loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): try: loop.add_signal_handler(sig, stop.set) except NotImplementedError: # add_signal_handler is not available on Windows event loops. pass # Produce buffers at the nominal sample-rate pace. We deliberately stay # slightly ahead of the radio — queue is bounded at 8, so backpressure # flows naturally. phase = 0.0 buffer_dt = args.buffer_size / args.sample_rate # Aim for one buffer every ``buffer_dt * 0.5`` seconds so the queue stays # topped up. The queue's own backpressure keeps us from spinning. produce_interval = buffer_dt * 0.5 try: async def producer(): nonlocal phase while not stop.is_set(): frame, phase = _make_iq_frame( args.buffer_size, args.tone, args.sample_rate, phase ) await streamer.on_binary(frame) await asyncio.sleep(produce_interval) producer_task = asyncio.create_task(producer()) if args.duration > 0: try: await asyncio.wait_for(stop.wait(), timeout=args.duration) except asyncio.TimeoutError: pass else: await stop.wait() stop.set() producer_task.cancel() try: await producer_task except (asyncio.CancelledError, Exception): pass finally: await streamer.on_message({"type": "tx_stop", "app_id": "smoke"}) print("TX session closed.") return 0 def main() -> int: p = argparse.ArgumentParser( description="End-to-end TX smoke test: agent → Pluto continuous tone.", ) p.add_argument("--identifier", default=None, help="Pluto IP/hostname (default: auto-discover pluto.local)") p.add_argument("--frequency", type=float, default=3_410_000_000.0, help="TX LO in Hz (default 2.45 GHz)") p.add_argument("--gain", type=float, default=-0.0, help="TX gain in dB; Pluto range [-89, 0] (default -30)") p.add_argument("--sample-rate", type=float, default=1_000_000.0, help="Baseband sample rate (default 1 Msps)") p.add_argument("--tone", type=float, default=100_000.0, help="Baseband tone offset in Hz; 0 = DC (default 100 kHz)") p.add_argument("--buffer-size", type=int, default=4096, help="Complex samples per frame (default 4096)") p.add_argument("--duration", type=float, default=60.0, help="Seconds to transmit; 0 = run until Ctrl-C (default 30)") p.add_argument("--log-level", default="INFO") args = p.parse_args() logging.basicConfig( level=getattr(logging, args.log_level.upper(), logging.INFO), format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) try: return asyncio.run(_run(args)) except KeyboardInterrupt: return 130 if __name__ == "__main__": sys.exit(main())