zfp-oss #27

Merged
benchinnery merged 15 commits from zfp-oss into main 2026-04-23 11:10:43 -04:00
19 changed files with 124 additions and 148 deletions
Showing only changes of commit 062a0e766f - Show all commits

View File

@ -66,8 +66,9 @@ class LoggingFakeWs:
pass pass
def _make_iq_frame(buffer_size: int, tone_hz: float, sample_rate: float, def _make_iq_frame(
phase_offset: float = 0.0) -> tuple[bytes, float]: buffer_size: int, tone_hz: float, sample_rate: float, phase_offset: float = 0.0
) -> tuple[bytes, float]:
"""Return ``(interleaved_float32_bytes, next_phase)`` for a sine tone. """Return ``(interleaved_float32_bytes, next_phase)`` for a sine tone.
Emitting one continuous phase-coherent tone requires threading the phase Emitting one continuous phase-coherent tone requires threading the phase
@ -93,7 +94,9 @@ def _make_pluto_factory(identifier: str | None):
if device != "pluto": if device != "pluto":
raise ValueError(f"this script only drives pluto; got device={device!r}") raise ValueError(f"this script only drives pluto; got device={device!r}")
from ria_toolkit_oss.sdr.pluto import Pluto from ria_toolkit_oss.sdr.pluto import Pluto
return Pluto(identifier=identifier) return Pluto(identifier=identifier)
return factory return factory
@ -130,13 +133,14 @@ async def _run(args: argparse.Namespace) -> int:
# Abort if tx_start was rejected by an interlock (no session → nothing to do). # Abort if tx_start was rejected by an interlock (no session → nothing to do).
if streamer._tx is None: if streamer._tx is None:
print("tx_start rejected — see [tx_status] line above for the reason.", print("tx_start rejected — see [tx_status] line above for the reason.", file=sys.stderr)
file=sys.stderr)
return 2 return 2
print(f"Transmitting at {args.frequency/1e6:.3f} MHz with " print(
f"Transmitting at {args.frequency/1e6:.3f} MHz with "
f"{args.tone/1e3:.1f} kHz baseband tone at gain {args.gain} dB. " f"{args.tone/1e3:.1f} kHz baseband tone at gain {args.gain} dB. "
f"{'Running for ' + str(args.duration) + 's' if args.duration > 0 else 'Run until Ctrl-C'}.") f"{'Running for ' + str(args.duration) + 's' if args.duration > 0 else 'Run until Ctrl-C'}."
)
# Arrange a clean shutdown on Ctrl-C. # Arrange a clean shutdown on Ctrl-C.
stop = asyncio.Event() stop = asyncio.Event()
@ -157,12 +161,11 @@ async def _run(args: argparse.Namespace) -> int:
# topped up. The queue's own backpressure keeps us from spinning. # topped up. The queue's own backpressure keeps us from spinning.
produce_interval = buffer_dt * 0.5 produce_interval = buffer_dt * 0.5
try: try:
async def producer(): async def producer():
nonlocal phase nonlocal phase
while not stop.is_set(): while not stop.is_set():
frame, phase = _make_iq_frame( frame, phase = _make_iq_frame(args.buffer_size, args.tone, args.sample_rate, phase)
args.buffer_size, args.tone, args.sample_rate, phase
)
await streamer.on_binary(frame) await streamer.on_binary(frame)
await asyncio.sleep(produce_interval) await asyncio.sleep(produce_interval)
@ -193,20 +196,17 @@ def main() -> int:
p = argparse.ArgumentParser( p = argparse.ArgumentParser(
description="End-to-end TX smoke test: agent → Pluto continuous tone.", description="End-to-end TX smoke test: agent → Pluto continuous tone.",
) )
p.add_argument("--identifier", default=None, p.add_argument("--identifier", default=None, help="Pluto IP/hostname (default: auto-discover pluto.local)")
help="Pluto IP/hostname (default: auto-discover pluto.local)") p.add_argument("--frequency", type=float, default=3_410_000_000.0, help="TX LO in Hz (default 2.45 GHz)")
p.add_argument("--frequency", type=float, default=3_410_000_000.0, p.add_argument("--gain", type=float, default=-0.0, help="TX gain in dB; Pluto range [-89, 0] (default -30)")
help="TX LO in Hz (default 2.45 GHz)") p.add_argument("--sample-rate", type=float, default=1_000_000.0, help="Baseband sample rate (default 1 Msps)")
p.add_argument("--gain", type=float, default=-0.0, p.add_argument(
help="TX gain in dB; Pluto range [-89, 0] (default -30)") "--tone", type=float, default=100_000.0, help="Baseband tone offset in Hz; 0 = DC (default 100 kHz)"
p.add_argument("--sample-rate", type=float, default=1_000_000.0, )
help="Baseband sample rate (default 1 Msps)") p.add_argument("--buffer-size", type=int, default=4096, help="Complex samples per frame (default 4096)")
p.add_argument("--tone", type=float, default=100_000.0, p.add_argument(
help="Baseband tone offset in Hz; 0 = DC (default 100 kHz)") "--duration", type=float, default=60.0, help="Seconds to transmit; 0 = run until Ctrl-C (default 30)"
p.add_argument("--buffer-size", type=int, default=4096, )
help="Complex samples per frame (default 4096)")
p.add_argument("--duration", type=float, default=60.0,
help="Seconds to transmit; 0 = run until Ctrl-C (default 30)")
p.add_argument("--log-level", default="INFO") p.add_argument("--log-level", default="INFO")
args = p.parse_args() args = p.parse_args()

View File

@ -41,8 +41,7 @@ from ria_toolkit_oss.agent.streamer import Streamer
from ria_toolkit_oss.agent.ws_client import WsClient from ria_toolkit_oss.agent.ws_client import WsClient
def _make_iq_frame(buffer_size: int, tone_hz: float, sample_rate: float, def _make_iq_frame(buffer_size: int, tone_hz: float, sample_rate: float, phase_offset: float) -> tuple[bytes, float]:
phase_offset: float) -> tuple[bytes, float]:
n = np.arange(buffer_size, dtype=np.float64) n = np.arange(buffer_size, dtype=np.float64)
phase = 2.0 * np.pi * tone_hz / sample_rate * n + phase_offset phase = 2.0 * np.pi * tone_hz / sample_rate * n + phase_offset
amp = 0.7 amp = 0.7
@ -59,7 +58,9 @@ def _make_pluto_factory(identifier: str | None):
if device != "pluto": if device != "pluto":
raise ValueError(f"this script only drives pluto; got device={device!r}") raise ValueError(f"this script only drives pluto; got device={device!r}")
from ria_toolkit_oss.sdr.pluto import Pluto from ria_toolkit_oss.sdr.pluto import Pluto
return Pluto(identifier=identifier) return Pluto(identifier=identifier)
return factory return factory
@ -73,13 +74,14 @@ async def _mock_hub_handler(ws, args, stop: asyncio.Event):
payload = json.loads(first) payload = json.loads(first)
if payload.get("type") == "heartbeat": if payload.get("type") == "heartbeat":
caps = payload.get("capabilities") caps = payload.get("capabilities")
print(f"[mock-hub] agent heartbeat: capabilities={caps} " print(f"[mock-hub] agent heartbeat: capabilities={caps} " f"tx_enabled={payload.get('tx_enabled')}")
f"tx_enabled={payload.get('tx_enabled')}")
except asyncio.TimeoutError: except asyncio.TimeoutError:
print("[mock-hub] warning: no heartbeat received in first 2s") print("[mock-hub] warning: no heartbeat received in first 2s")
# Arm the agent's TX path. # Arm the agent's TX path.
await ws.send(json.dumps({ await ws.send(
json.dumps(
{
"type": "tx_start", "type": "tx_start",
"app_id": "ws-smoke", "app_id": "ws-smoke",
"radio_config": { "radio_config": {
@ -91,9 +93,10 @@ async def _mock_hub_handler(ws, args, stop: asyncio.Event):
"buffer_size": int(args.buffer_size), "buffer_size": int(args.buffer_size),
"underrun_policy": "repeat", "underrun_policy": "repeat",
}, },
})) }
print(f"[mock-hub] sent tx_start at {args.frequency/1e6:.3f} MHz, " )
f"gain={args.gain} dB") )
print(f"[mock-hub] sent tx_start at {args.frequency/1e6:.3f} MHz, " f"gain={args.gain} dB")
# Producer: push IQ frames at a steady clip. Use a concurrent receiver so # Producer: push IQ frames at a steady clip. Use a concurrent receiver so
# tx_status frames show up in real time rather than being queued behind # tx_status frames show up in real time rather than being queued behind
@ -112,15 +115,11 @@ async def _mock_hub_handler(ws, args, stop: asyncio.Event):
recv_task = asyncio.create_task(receiver()) recv_task = asyncio.create_task(receiver())
try: try:
deadline = None if args.duration <= 0 else ( deadline = None if args.duration <= 0 else (asyncio.get_event_loop().time() + args.duration)
asyncio.get_event_loop().time() + args.duration
)
while not stop.is_set(): while not stop.is_set():
if deadline is not None and asyncio.get_event_loop().time() >= deadline: if deadline is not None and asyncio.get_event_loop().time() >= deadline:
break break
frame, phase = _make_iq_frame( frame, phase = _make_iq_frame(args.buffer_size, args.tone, args.sample_rate, phase)
args.buffer_size, args.tone, args.sample_rate, phase
)
try: try:
await ws.send(frame) await ws.send(frame)
except websockets.ConnectionClosed: except websockets.ConnectionClosed:
@ -204,20 +203,15 @@ def main() -> int:
p = argparse.ArgumentParser( p = argparse.ArgumentParser(
description="Full-stack TX smoke: localhost mock-hub → WS → agent → Pluto.", description="Full-stack TX smoke: localhost mock-hub → WS → agent → Pluto.",
) )
p.add_argument("--identifier", default=None, p.add_argument("--identifier", default=None, help="Pluto IP/hostname (default: auto-discover pluto.local)")
help="Pluto IP/hostname (default: auto-discover pluto.local)") p.add_argument("--frequency", type=float, default=2_450_000_000.0, help="TX LO in Hz (default 2.45 GHz)")
p.add_argument("--frequency", type=float, default=2_450_000_000.0, p.add_argument("--gain", type=float, default=0.0, help="TX gain in dB; Pluto range [-89, 0] (default 0)")
help="TX LO in Hz (default 2.45 GHz)") p.add_argument("--sample-rate", type=float, default=1_000_000.0, help="Baseband sample rate (default 1 Msps)")
p.add_argument("--gain", type=float, default=0.0, p.add_argument("--tone", type=float, default=100_000.0, help="Baseband tone offset in Hz (default 100 kHz)")
help="TX gain in dB; Pluto range [-89, 0] (default 0)") p.add_argument("--buffer-size", type=int, default=4096, help="Complex samples per frame (default 4096)")
p.add_argument("--sample-rate", type=float, default=1_000_000.0, p.add_argument(
help="Baseband sample rate (default 1 Msps)") "--duration", type=float, default=30.0, help="Seconds to transmit; 0 = run until Ctrl-C (default 30)"
p.add_argument("--tone", type=float, default=100_000.0, )
help="Baseband tone offset in Hz (default 100 kHz)")
p.add_argument("--buffer-size", type=int, default=4096,
help="Complex samples per frame (default 4096)")
p.add_argument("--duration", type=float, default=30.0,
help="Seconds to transmit; 0 = run until Ctrl-C (default 30)")
p.add_argument("--log-level", default="INFO") p.add_argument("--log-level", default="INFO")
args = p.parse_args() args = p.parse_args()

View File

@ -22,6 +22,7 @@ import os
from dataclasses import asdict, dataclass, field from dataclasses import asdict, dataclass, field
from pathlib import Path from pathlib import Path
def _resolve_default_path() -> Path: def _resolve_default_path() -> Path:
return Path(os.environ.get("RIA_AGENT_CONFIG", str(Path.home() / ".ria" / "agent.json"))) return Path(os.environ.get("RIA_AGENT_CONFIG", str(Path.home() / ".ria" / "agent.json")))

View File

@ -46,9 +46,7 @@ def heartbeat_payload(
if c.tx_max_duration_s is not None: if c.tx_max_duration_s is not None:
payload["tx_max_duration_s"] = float(c.tx_max_duration_s) payload["tx_max_duration_s"] = float(c.tx_max_duration_s)
if c.tx_allowed_freq_ranges: if c.tx_allowed_freq_ranges:
payload["tx_allowed_freq_ranges"] = [ payload["tx_allowed_freq_ranges"] = [[float(lo), float(hi)] for lo, hi in c.tx_allowed_freq_ranges]
[float(lo), float(hi)] for lo, hi in c.tx_allowed_freq_ranges
]
if app_id: if app_id:
payload["app_id"] = app_id payload["app_id"] = app_id
if sessions: if sessions:

View File

@ -270,9 +270,7 @@ class Streamer:
) )
self._rx = session self._rx = session
await self._send_status("streaming", app_id) await self._send_status("streaming", app_id)
session.task = asyncio.create_task( session.task = asyncio.create_task(self._capture_loop(session), name="ria-streamer-capture")
self._capture_loop(session), name="ria-streamer-capture"
)
async def _handle_rx_stop(self, msg: dict) -> None: async def _handle_rx_stop(self, msg: dict) -> None:
session = self._rx session = self._rx
@ -310,9 +308,7 @@ class Streamer:
logger.warning("Applying configure failed: %s", exc) logger.warning("Applying configure failed: %s", exc)
try: try:
samples = await loop.run_in_executor( samples = await loop.run_in_executor(None, session.sdr.rx, session.buffer_size)
None, session.sdr.rx, session.buffer_size
)
except Exception as exc: except Exception as exc:
from ria_toolkit_oss.sdr import SdrDisconnectedError from ria_toolkit_oss.sdr import SdrDisconnectedError
@ -342,7 +338,7 @@ class Streamer:
# ================================================================== # ==================================================================
# TX # TX
async def _handle_tx_start(self, msg: dict) -> None: async def _handle_tx_start(self, msg: dict) -> None: # noqa: C901
app_id = msg.get("app_id") or "" app_id = msg.get("app_id") or ""
radio_config = dict(msg.get("radio_config") or {}) radio_config = dict(msg.get("radio_config") or {})
@ -383,9 +379,7 @@ class Streamer:
buffer_size = int(radio_config.pop("buffer_size", _DEFAULT_BUFFER_SIZE)) buffer_size = int(radio_config.pop("buffer_size", _DEFAULT_BUFFER_SIZE))
underrun_policy = str(radio_config.pop("underrun_policy", "pause")) underrun_policy = str(radio_config.pop("underrun_policy", "pause"))
if underrun_policy not in ("pause", "zero", "repeat"): if underrun_policy not in ("pause", "zero", "repeat"):
await self._send_tx_status( await self._send_tx_status(app_id, "error", f"invalid underrun_policy {underrun_policy!r}")
app_id, "error", f"invalid underrun_policy {underrun_policy!r}"
)
return return
if not device: if not device:
await self._send_tx_status(app_id, "error", "tx_start missing radio_config.device") await self._send_tx_status(app_id, "error", "tx_start missing radio_config.device")
@ -404,15 +398,10 @@ class Streamer:
# manifest bug and we want it surfaced immediately, not papered # manifest bug and we want it surfaced immediately, not papered
# over with stale radio state. # over with stale radio state.
if hasattr(sdr, "init_tx"): if hasattr(sdr, "init_tx"):
init_args = { init_args = {k: radio_config.get(f"tx_{k}") for k in ("sample_rate", "center_frequency", "gain")}
k: radio_config.get(f"tx_{k}")
for k in ("sample_rate", "center_frequency", "gain")
}
missing = [f"tx_{k}" for k, v in init_args.items() if v is None] missing = [f"tx_{k}" for k, v in init_args.items() if v is None]
if missing: if missing:
raise ValueError( raise ValueError(f"tx_start missing required radio_config keys: {missing}")
f"tx_start missing required radio_config keys: {missing}"
)
sdr.init_tx( sdr.init_tx(
sample_rate=init_args["sample_rate"], sample_rate=init_args["sample_rate"],
center_frequency=init_args["center_frequency"], center_frequency=init_args["center_frequency"],
@ -498,9 +487,8 @@ class Streamer:
return _silence(n) return _silence(n)
# Max-duration watchdog. # Max-duration watchdog.
if ( if session.max_duration_s is not None and (time.monotonic() - session.started_at) >= float(
session.max_duration_s is not None session.max_duration_s
and (time.monotonic() - session.started_at) >= float(session.max_duration_s)
): ):
session.stop_event.set() session.stop_event.set()
try: try:
@ -528,7 +516,7 @@ class Streamer:
if arr.size < 2 or arr.size % 2 != 0: if arr.size < 2 or arr.size % 2 != 0:
logger.warning("Malformed TX frame: %d floats (must be non-zero even count)", arr.size) logger.warning("Malformed TX frame: %d floats (must be non-zero even count)", arr.size)
return self._underrun_fill(session, n) return self._underrun_fill(session, n)
samples = (arr[0::2].astype(np.complex64) + 1j * arr[1::2].astype(np.complex64)) samples = arr[0::2].astype(np.complex64) + 1j * arr[1::2].astype(np.complex64)
if samples.size < n: if samples.size < n:
out = np.zeros(n, dtype=np.complex64) out = np.zeros(n, dtype=np.complex64)
out[: samples.size] = samples out[: samples.size] = samples
@ -747,6 +735,7 @@ def _default_sdr_factory(device: str, identifier: str | None):
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Top-level entry # Top-level entry
async def run_streamer(ws_url: str, token: str, *, cfg: AgentConfig | None = None) -> None: async def run_streamer(ws_url: str, token: str, *, cfg: AgentConfig | None = None) -> None:
"""Connect to *ws_url* and run the streamer loop until cancelled.""" """Connect to *ws_url* and run the streamer loop until cancelled."""
ws = WsClient(ws_url, token) ws = WsClient(ws_url, token)

View File

@ -37,7 +37,7 @@ def _engine(cfg: _config.AppConfig, sudo_override: bool = False) -> list[str]:
for exe in ("docker", "podman"): for exe in ("docker", "podman"):
if shutil.which(exe): if shutil.which(exe):
use_sudo = sudo_override or cfg.sudo use_sudo = sudo_override or cfg.sudo
return (["sudo", exe] if use_sudo else [exe]) return ["sudo", exe] if use_sudo else [exe]
print("error: neither 'docker' nor 'podman' found on PATH", file=sys.stderr) print("error: neither 'docker' nor 'podman' found on PATH", file=sys.stderr)
sys.exit(2) sys.exit(2)
@ -96,7 +96,9 @@ def _hardware_flags(labels: dict, no_gpu: bool, no_usb: bool, no_host_net: bool)
if _gpu_available(): if _gpu_available():
flags += ["--gpus", "all"] flags += ["--gpus", "all"]
else: else:
notes.append("image wants GPU but no NVIDIA runtime detected — skipping --gpus (use --force-gpu to override)") notes.append(
"image wants GPU but no NVIDIA runtime detected — skipping --gpus (use --force-gpu to override)"
)
if hw_items & {"pluto", "rtlsdr", "hackrf", "bladerf"} and not no_usb: if hw_items & {"pluto", "rtlsdr", "hackrf", "bladerf"} and not no_usb:
flags += ["--device", "/dev/bus/usb"] flags += ["--device", "/dev/bus/usb"]

View File

@ -15,8 +15,13 @@ __all__ = [
] ]
from .mock import MockSDR from .mock import MockSDR
from .sdr import SDR, SDRError, SdrDisconnectedError, SDRParameterError, translate_disconnect # noqa: F401 from .sdr import ( # noqa: F401
SDR,
SdrDisconnectedError,
SDRError,
SDRParameterError,
translate_disconnect,
)
_DRIVER_CANDIDATES: tuple[tuple[str, str, str], ...] = ( _DRIVER_CANDIDATES: tuple[tuple[str, str, str], ...] = (
("mock", "ria_toolkit_oss.sdr.mock", "MockSDR"), ("mock", "ria_toolkit_oss.sdr.mock", "MockSDR"),

View File

@ -8,7 +8,12 @@ import adi
import numpy as np import numpy as np
from ria_toolkit_oss.datatypes.recording import Recording from ria_toolkit_oss.datatypes.recording import Recording
from ria_toolkit_oss.sdr.sdr import SDR, SDRError, SDRParameterError, translate_disconnect from ria_toolkit_oss.sdr.sdr import (
SDR,
SDRError,
SDRParameterError,
translate_disconnect,
)
class Pluto(SDR): class Pluto(SDR):

View File

@ -26,9 +26,11 @@ class _FakeResp:
def _run_register(argv: list[str], cfg_path) -> int: def _run_register(argv: list[str], cfg_path) -> int:
fake_resp = _FakeResp({"agent_id": "agent-1", "token": "tok-abc"}) fake_resp = _FakeResp({"agent_id": "agent-1", "token": "tok-abc"})
with patch.dict("os.environ", {"RIA_AGENT_CONFIG": str(cfg_path)}, clear=False), \ with (
patch("urllib.request.urlopen", return_value=fake_resp), \ patch.dict("os.environ", {"RIA_AGENT_CONFIG": str(cfg_path)}, clear=False),
patch.object(sys, "argv", ["ria-agent", *argv]): patch("urllib.request.urlopen", return_value=fake_resp),
patch.object(sys, "argv", ["ria-agent", *argv]),
):
try: try:
agent_cli.main() agent_cli.main()
except SystemExit as exc: except SystemExit as exc:
@ -96,9 +98,11 @@ def test_stream_allow_tx_does_not_persist(tmp_path):
captured["cfg"] = cfg captured["cfg"] = cfg
return None return None
with patch.dict("os.environ", {"RIA_AGENT_CONFIG": str(cfg_path)}, clear=False), \ with (
patch("ria_toolkit_oss.agent.streamer.run_streamer", new=_fake_run_streamer), \ patch.dict("os.environ", {"RIA_AGENT_CONFIG": str(cfg_path)}, clear=False),
patch.object(sys, "argv", ["ria-agent", "stream", "--allow-tx"]): patch("ria_toolkit_oss.agent.streamer.run_streamer", new=_fake_run_streamer),
patch.object(sys, "argv", ["ria-agent", "stream", "--allow-tx"]),
):
try: try:
agent_cli.main() agent_cli.main()
except SystemExit: except SystemExit:

View File

@ -70,9 +70,7 @@ def test_server_start_stream_stop_cycle_over_real_ws():
reconnect_pause=0.05, reconnect_pause=0.05,
) )
streamer = Streamer(ws=client, sdr_factory=lambda d, i: MockSDR(buffer_size=32, seed=0)) streamer = Streamer(ws=client, sdr_factory=lambda d, i: MockSDR(buffer_size=32, seed=0))
task = asyncio.create_task( task = asyncio.create_task(client.run(on_message=streamer.on_message, heartbeat=streamer.build_heartbeat))
client.run(on_message=streamer.on_message, heartbeat=streamer.build_heartbeat)
)
await asyncio.wait_for(ready.wait(), timeout=3.0) await asyncio.wait_for(ready.wait(), timeout=3.0)
await asyncio.wait_for(stopped.wait(), timeout=3.0) await asyncio.wait_for(stopped.wait(), timeout=3.0)
client.stop() client.stop()

View File

@ -77,10 +77,7 @@ def test_server_tx_start_binary_stop_cycle_over_real_ws():
msg = await asyncio.wait_for(ws.recv(), timeout=2.0) msg = await asyncio.wait_for(ws.recv(), timeout=2.0)
if isinstance(msg, str): if isinstance(msg, str):
control_frames.append(json.loads(msg)) control_frames.append(json.loads(msg))
if any( if any(f.get("type") == "tx_status" and f.get("state") == "transmitting" for f in control_frames):
f.get("type") == "tx_status" and f.get("state") == "transmitting"
for f in control_frames
):
break break
await ws.send(json.dumps({"type": "tx_stop", "app_id": "tx-app"})) await ws.send(json.dumps({"type": "tx_stop", "app_id": "tx-app"}))

View File

@ -30,7 +30,6 @@ from ria_toolkit_oss.agent.config import AgentConfig
from ria_toolkit_oss.agent.streamer import Streamer from ria_toolkit_oss.agent.streamer import Streamer
from ria_toolkit_oss.sdr.mock import MockSDR from ria_toolkit_oss.sdr.mock import MockSDR
_STRESS_S = float(os.environ.get("RIA_LOCK_STRESS_S", "2.0")) _STRESS_S = float(os.environ.get("RIA_LOCK_STRESS_S", "2.0"))
@ -156,18 +155,21 @@ def test_full_duplex_stays_healthy_over_stress_window():
s = Streamer(ws=ws, sdr_factory=lambda d, i: sdr, cfg=AgentConfig(tx_enabled=True)) s = Streamer(ws=ws, sdr_factory=lambda d, i: sdr, cfg=AgentConfig(tx_enabled=True))
await s.on_message( await s.on_message(
{"type": "start", "app_id": "app-1", {"type": "start", "app_id": "app-1", "radio_config": {"device": "mock", "buffer_size": BUF}}
"radio_config": {"device": "mock", "buffer_size": BUF}}
) )
await s.on_message( await s.on_message(
{"type": "tx_start", "app_id": "app-1", {
"type": "tx_start",
"app_id": "app-1",
"radio_config": { "radio_config": {
"device": "mock", "buffer_size": BUF, "device": "mock",
"buffer_size": BUF,
"tx_sample_rate": 1_000_000, "tx_sample_rate": 1_000_000,
"tx_center_frequency": 2.45e9, "tx_center_frequency": 2.45e9,
"tx_gain": -20, "tx_gain": -20,
"underrun_policy": "zero", "underrun_policy": "zero",
}} },
}
) )
marker = np.arange(BUF, dtype=np.complex64) + 1 marker = np.arange(BUF, dtype=np.complex64) + 1
@ -180,12 +182,10 @@ def test_full_duplex_stays_healthy_over_stress_window():
# which routes through the same setters the stress test above # which routes through the same setters the stress test above
# verifies. # verifies.
await s.on_message( await s.on_message(
{"type": "tx_configure", "app_id": "app-1", {"type": "tx_configure", "app_id": "app-1", "radio_config": {"tx_sample_rate": 1_000_000 + i}}
"radio_config": {"tx_sample_rate": 1_000_000 + i}}
) )
await s.on_message( await s.on_message(
{"type": "configure", "app_id": "app-1", {"type": "configure", "app_id": "app-1", "radio_config": {"sample_rate": 2_000_000 + i}}
"radio_config": {"sample_rate": 2_000_000 + i}}
) )
i += 1 i += 1
await asyncio.sleep(0.005) await asyncio.sleep(0.005)
@ -197,8 +197,7 @@ def test_full_duplex_stays_healthy_over_stress_window():
ws, s = asyncio.run(scenario()) ws, s = asyncio.run(scenario())
# No error frame leaked out. # No error frame leaked out.
errors = [m for m in ws.json_sent errors = [m for m in ws.json_sent if m.get("type") in ("error", "tx_status") and m.get("state") == "error"]
if m.get("type") in ("error", "tx_status") and m.get("state") == "error"]
assert errors == [], f"Unexpected error frames: {errors}" assert errors == [], f"Unexpected error frames: {errors}"
# RX produced IQ frames and TX's callback ran — heartbeat-level contention # RX produced IQ frames and TX's callback ran — heartbeat-level contention
# check: both setter paths were hit at least once during configure dispatch. # check: both setter paths were hit at least once during configure dispatch.

View File

@ -121,9 +121,7 @@ def test_start_without_device_emits_error():
def test_configure_queues_update(): def test_configure_queues_update():
async def scenario(): async def scenario():
streamer = Streamer(ws=FakeWs(), sdr_factory=_factory) streamer = Streamer(ws=FakeWs(), sdr_factory=_factory)
await streamer.on_message( await streamer.on_message({"type": "configure", "app_id": "x", "radio_config": {"center_frequency": 915e6}})
{"type": "configure", "app_id": "x", "radio_config": {"center_frequency": 915e6}}
)
# Before start(), pending config lives on the standalone dict exposed via the _pending_config shim. # Before start(), pending config lives on the standalone dict exposed via the _pending_config shim.
return streamer._pending_config return streamer._pending_config

View File

@ -143,10 +143,7 @@ def test_rejects_duplicate_tx_session():
return ws return ws
ws = asyncio.run(scenario()) ws = asyncio.run(scenario())
errors = [ errors = [m for m in ws.json_sent if m.get("type") == "tx_status" and m.get("state") == "error"]
m for m in ws.json_sent
if m.get("type") == "tx_status" and m.get("state") == "error"
]
assert any("already active" in e.get("message", "") for e in errors) assert any("already active" in e.get("message", "") for e in errors)

View File

@ -70,10 +70,7 @@ def test_underrun_pause_stops_session_and_emits_status():
# Do not push any buffers. The callback underruns on first tick and # Do not push any buffers. The callback underruns on first tick and
# the watchdog should emit "underrun" and tear down. # the watchdog should emit "underrun" and tear down.
for _ in range(100): for _ in range(100):
if any( if any(m.get("type") == "tx_status" and m.get("state") == "underrun" for m in ws.json_sent):
m.get("type") == "tx_status" and m.get("state") == "underrun"
for m in ws.json_sent
):
break break
await asyncio.sleep(0.01) await asyncio.sleep(0.01)
for _ in range(50): for _ in range(50):
@ -103,9 +100,7 @@ def test_underrun_zero_keeps_session_alive():
ws, still_alive = asyncio.run(scenario()) ws, still_alive = asyncio.run(scenario())
# No underrun status emitted (policy absorbs it silently). # No underrun status emitted (policy absorbs it silently).
assert not any( assert not any(m.get("type") == "tx_status" and m.get("state") == "underrun" for m in ws.json_sent)
m.get("type") == "tx_status" and m.get("state") == "underrun" for m in ws.json_sent
)
assert still_alive assert still_alive
# All produced buffers are zero (no real data was pushed). # All produced buffers are zero (no real data was pushed).
assert sdr.tx_produced, "expected at least one TX callback invocation" assert sdr.tx_produced, "expected at least one TX callback invocation"
@ -129,9 +124,7 @@ def test_underrun_repeat_replays_last_buffer():
ws, sdr = asyncio.run(scenario()) ws, sdr = asyncio.run(scenario())
# No underrun status emitted. # No underrun status emitted.
assert not any( assert not any(m.get("type") == "tx_status" and m.get("state") == "underrun" for m in ws.json_sent)
m.get("type") == "tx_status" and m.get("state") == "underrun" for m in ws.json_sent
)
# At least two buffers equal to the marker — the real one and ≥1 repeat. # At least two buffers equal to the marker — the real one and ≥1 repeat.
matching = [b for b in sdr.tx_produced if np.array_equal(b, marker)] matching = [b for b in sdr.tx_produced if np.array_equal(b, marker)]
assert len(matching) >= 2, f"expected ≥2 buffers matching marker, got {len(matching)}" assert len(matching) >= 2, f"expected ≥2 buffers matching marker, got {len(matching)}"

View File

@ -142,9 +142,7 @@ def test_malformed_control_frame_does_not_crash():
async def on_msg(m): async def on_msg(m):
handled.append(m) handled.append(m)
task = asyncio.create_task( task = asyncio.create_task(client.run(on_message=on_msg, heartbeat=lambda: {"type": "heartbeat"}))
client.run(on_message=on_msg, heartbeat=lambda: {"type": "heartbeat"})
)
for _ in range(50): for _ in range(50):
if handled: if handled:
break break

View File

@ -102,9 +102,7 @@ def test_binary_frame_dropped_when_no_handler():
async def on_msg(m): async def on_msg(m):
messages.append(m) messages.append(m)
task = asyncio.create_task( task = asyncio.create_task(client.run(on_message=on_msg, heartbeat=lambda: {"type": "heartbeat"}))
client.run(on_message=on_msg, heartbeat=lambda: {"type": "heartbeat"})
)
for _ in range(50): for _ in range(50):
if messages: if messages:
break break