Compare commits

...

8 Commits

Author SHA1 Message Date
1ea865f5f8 fix(agent): open SDRs off the event loop so a slow USRP open can't drop the hub
rx/tx start handlers called the registry's blocking SDR open (and TX init)
directly on the asyncio loop. A USRP open shells out to uhd_find_devices and
loads its FPGA — several seconds — freezing the WebSocket keepalive long enough
for the hub to drop the agent and stop the app, with the agent terminal hung in
the blocking call. A Pluto opens fast enough to slip under the timeout, which is
why it worked where a USRP did not.

Run acquire/_apply_sdr_config (rx) and acquire/config/init_tx (tx) in a thread
via run_in_executor, keeping release/close-on-failure inside the thread. Also
build the heartbeat off the loop, since it can probe hardware (uhd_find_devices)
while idle and block the keepalive the same way.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-05 13:49:30 -04:00
54b66b64c4 USRP Race Fixed 2026-06-05 13:38:25 -04:00
b6b52bf3c9 feat(usrp): continuous-streaming rx() for gapless agent capture
The agent capture loop calls sdr.rx(buffer_size) per chunk. USRP inherited the
base rx() → record(), which issued start_cont/stop_cont and slept 0.1s EVERY
buffer. At 2.5 MSps that captured ~1.6 ms of IQ per ~100 ms — heavily gapped,
transient-laden, and zero-filled on timeout, which rendered as choppy/black
bands in the spectrogram.

USRP.rx() now keeps a single continuous stream running across calls:
- issues start_cont once (lazily, on first rx()),
- recv()s until the request is filled, carrying any over-read into a residual
  buffer so nothing is dropped between rx() boundaries (gapless),
- tolerates overflow (samples still valid), treats repeated timeouts as a
  disconnect, and stops the stream on close().

Hardware-free tests stub uhd and prove gaplessness, single start, overflow
handling, timeout->disconnect, and stop cleanup.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-05 10:54:23 -04:00
bb1259fefc fix(agent): don't probe SDR hardware while a session is active
heartbeat detect_devices() shelled out to uhd_find_devices every 60s. When a
USB SDR (USRP B2x0) was mid-capture, that probe disrupted the live stream and
the device briefly vanished ("No UHD Devices Found"), killing the capture.

detect_devices() gains a probe flag; heartbeat_payload passes probe=False
whenever a session is active, returning the last good enumeration (or a
driver-only list) instead of touching the hardware.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-05 10:41:02 -04:00
6bead217a3 fix(agent): advertise USRP auto-select (identifier=None), not name=
The CLI get_sdr_device path can only open a USB USRP via auto-select: its
_create_device_dict matches the identifier against raw device-dict values,
but common.py prepends "addr="/"name=" before handing it over, so no prefixed
identifier ever matches (this is also why addr=192.168.3.1 failed to match the
B210). Advertising name=<name> was therefore unusable.

detect_devices() now advertises a single USRP entry with identifier=None
(auto-select the sole device), labelled with the serial(s) found. The hub
forwards None, so the agent opens USRP() and picks the attached B210.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-05 10:30:33 -04:00
bf64604bcf feat(agent): agent-owned device discovery, identifiers, and udev install
Make `ria-agent stream` work with any SDR the agent has drivers for, with
no per-device config in the hub:

- heartbeat advertises rich `hardware` entries {device, identifier, label,
  connected} via hardware.detect_devices(); USRP is enumerated into concrete
  instances (uhd_find_devices), others advertise driver-only entries. The
  identifier is chosen to round-trip through parse_ident (None=auto-select or
  name=...), so a device address is never a bare value.
- ship udev rules (Pluto/RTL-SDR/HackRF/USRP B2x0/bladeRF) + `ria-agent
  install-udev` so USB radios open without sudo — one privileged step, all
  inside the toolkit.
- streamer surfaces a "run: sudo ria-agent install-udev" hint on USB
  permission errors instead of the cryptic UHD message.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-05 10:03:14 -04:00
d38276a533 fix(deps): promote websockets to a runtime dependency
`websockets` is used by `ria_toolkit_oss.agent.ws_client`, which is
imported as part of `ria-agent stream`. It was only declared in the
optional `[tool.poetry.group.agent]` poetry group, so a vanilla
`pip install ria-toolkit-oss` left `ria-agent stream` failing with
`ModuleNotFoundError: No module named 'websockets'`.

Moved into PEP 621 `[project].dependencies` with the same constraint
(`>=12.0,<14.0`). The duplicate in the optional poetry group is left
in place so `poetry install --with agent` remains self-sufficient.

poetry.lock: `websockets` now joins the `main` dependency group; the
generator-header bump (2.1.4 → 2.3.4) and the `jsonschema-specifications`
version-string normalization are auto-regenerated noise from the
locally-installed poetry version.

Note: `requests` has the same packaging gap (used by the legacy agent
path, declared only in the optional `agent` group); leaving that for a
follow-up since the immediate complaint is websockets.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-01 13:12:20 -04:00
5b7f487a5f feat(agent): default ria-agent register --hub to https://riahub.ai
Most users register against the production hub, so requiring --hub on
every invocation was friction. Default the flag to https://riahub.ai
via a module-level DEFAULT_HUB_URL constant; explicit --hub still wins,
so dev and self-hosted setups (e.g. http://whitehorse:3005) keep working.

The legacy `ria-agent run` path keeps its own --hub handling unchanged
— it has a config-file fallback and existing operators rely on it.

Bumps version to 0.1.8.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-01 13:05:11 -04:00
16 changed files with 926 additions and 54 deletions

View File

@ -1,5 +1,17 @@
# Changelog # Changelog
## [0.1.8] - 2026-06-01
### Changed
- **`ria-agent register --hub` now defaults to `https://riahub.ai`** — most users can run `ria-agent register --api-key ria_reg_...` without the `--hub` flag. Dev and self-hosted users keep the existing override (`--hub http://my-hub:3005`). The default lives in `ria_toolkit_oss.agent.cli.DEFAULT_HUB_URL`.
### Fixed
- **`websockets` is now a runtime dependency** — previously declared only in the optional `agent` poetry group, so a vanilla `pip install ria-toolkit-oss` left `ria-agent stream` failing with `ModuleNotFoundError: No module named 'websockets'`. Added to `[project].dependencies` with the same constraint (`>=12.0,<14.0`).
---
## [0.1.7] - 2026-05-26 ## [0.1.7] - 2026-05-26
### Added ### Added

View File

@ -14,7 +14,7 @@ sys.path.insert(0, os.path.abspath(os.path.join('..', '..')))
project = 'ria-toolkit-oss' project = 'ria-toolkit-oss'
copyright = '2026, Qoherent Inc' copyright = '2026, Qoherent Inc'
author = 'Qoherent Inc.' author = 'Qoherent Inc.'
release = '0.1.7' release = '0.1.8'
# -- General configuration --------------------------------------------------- # -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration

8
poetry.lock generated
View File

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand. # This file is automatically @generated by Poetry 2.3.4 and should not be changed by hand.
[[package]] [[package]]
name = "alabaster" name = "alabaster"
@ -1264,7 +1264,7 @@ files = [
[package.dependencies] [package.dependencies]
attrs = ">=22.2.0" attrs = ">=22.2.0"
jsonschema-specifications = ">=2023.03.6" jsonschema-specifications = ">=2023.3.6"
referencing = ">=0.28.4" referencing = ">=0.28.4"
rpds-py = ">=0.25.0" rpds-py = ">=0.25.0"
@ -3613,7 +3613,7 @@ version = "13.1"
description = "An implementation of the WebSocket Protocol (RFC 6455 & 7692)" description = "An implementation of the WebSocket Protocol (RFC 6455 & 7692)"
optional = false optional = false
python-versions = ">=3.8" python-versions = ">=3.8"
groups = ["agent", "docs", "server", "test"] groups = ["main", "agent", "docs", "server", "test"]
files = [ files = [
{file = "websockets-13.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:f48c749857f8fb598fb890a75f540e3221d0976ed0bf879cf3c7eef34151acee"}, {file = "websockets-13.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:f48c749857f8fb598fb890a75f540e3221d0976ed0bf879cf3c7eef34151acee"},
{file = "websockets-13.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:c7e72ce6bda6fb9409cc1e8164dd41d7c91466fb599eb047cfda72fe758a34a7"}, {file = "websockets-13.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:c7e72ce6bda6fb9409cc1e8164dd41d7c91466fb599eb047cfda72fe758a34a7"},
@ -3706,4 +3706,4 @@ files = [
[metadata] [metadata]
lock-version = "2.1" lock-version = "2.1"
python-versions = ">=3.10" python-versions = ">=3.10"
content-hash = "66c9adf647316db90f963da05e8a83574378bfa4db2c69ce751446b5ee7c408c" content-hash = "17b45f12030cda8eabd0ecd10d51c98e5fa7d9b342952c1c4924b7425800cd0f"

View File

@ -1,6 +1,6 @@
[project] [project]
name = "ria-toolkit-oss" name = "ria-toolkit-oss"
version = "0.1.7" version = "0.1.8"
description = "An open-source version of the RIA Toolkit, including the fundamental tools to get started developing, testing, and deploying radio intelligence applications" description = "An open-source version of the RIA Toolkit, including the fundamental tools to get started developing, testing, and deploying radio intelligence applications"
license = { text = "AGPL-3.0-only" } license = { text = "AGPL-3.0-only" }
readme = "README.md" readme = "README.md"
@ -50,7 +50,8 @@ dependencies = [
"pyyaml (>=6.0.3,<7.0.0)", "pyyaml (>=6.0.3,<7.0.0)",
"click (>=8.1.0,<9.0.0)", "click (>=8.1.0,<9.0.0)",
"matplotlib (>=3.8.0,<4.0.0)", "matplotlib (>=3.8.0,<4.0.0)",
"paramiko (>=3.5.1)" "paramiko (>=3.5.1)",
"websockets (>=12.0,<14.0)"
] ]
# [project.optional-dependencies] Commented out to prevent Tox tests from failing # [project.optional-dependencies] Commented out to prevent Tox tests from failing
@ -77,6 +78,7 @@ packages = [
] ]
include = [ include = [
"**/*.so", # Required for Nuitkaification "**/*.so", # Required for Nuitkaification
"src/ria_toolkit_oss/agent/udev/*.rules", # Shipped SDR udev rules (ria-agent install-udev)
] ]
[build-system] [build-system]

View File

@ -5,11 +5,13 @@ Subcommands:
- ``ria-agent run [legacy args]`` legacy long-poll NodeAgent (unchanged). - ``ria-agent run [legacy args]`` legacy long-poll NodeAgent (unchanged).
- ``ria-agent stream`` new WebSocket-based IQ streamer. - ``ria-agent stream`` new WebSocket-based IQ streamer.
- ``ria-agent detect`` print SDR drivers whose modules import cleanly. - ``ria-agent detect`` print SDR drivers whose modules import cleanly.
- ``ria-agent register --hub URL --api-key KEY`` register with the hub - ``ria-agent register --api-key KEY`` register with the production hub
using a personal registration key (minted from **Settings RIA Agents** (``https://riahub.ai`` by default; override with ``--hub URL`` for dev
on the hub, shown once at mint time) and save credentials (and optional or self-hosted) using a personal registration key (minted from
TX interlocks) to ``~/.ria/agent.json``. The hub also accepts the legacy **Settings RIA Agents** on the hub, shown once at mint time) and save
shared ``[wac] API_KEY`` for back-compat, but that path is deprecated. credentials (and optional TX interlocks) to ``~/.ria/agent.json``. The
hub also accepts the legacy shared ``[wac] API_KEY`` for back-compat,
but that path is deprecated.
Invoking ``ria-agent`` with no subcommand falls through to the legacy Invoking ``ria-agent`` with no subcommand falls through to the legacy
long-poll behavior for back-compatibility with existing deployments. long-poll behavior for back-compatibility with existing deployments.
@ -52,6 +54,10 @@ def _user_agent() -> str:
# small DB lookup + insert; anything past this is a stuck hub, not a slow one. # small DB lookup + insert; anything past this is a stuck hub, not a slow one.
_REGISTER_TIMEOUT_S = 15 _REGISTER_TIMEOUT_S = 15
# Production hub URL — used as the default for `ria-agent register` so most
# users don't need to pass --hub. Dev / self-hosted users override explicitly.
DEFAULT_HUB_URL = "https://riahub.ai"
REGISTRATION_REASON_MESSAGES = { REGISTRATION_REASON_MESSAGES = {
"invalid_key": ( "invalid_key": (
@ -199,6 +205,71 @@ def _cmd_stream(args: argparse.Namespace) -> int:
return 0 return 0
_UDEV_RULES_NAME = "90-ria-sdr.rules"
def _cmd_install_udev(args: argparse.Namespace) -> int:
"""Install the bundled SDR udev rules so USB radios open without sudo.
This is the one OS-level step needed for USB SDRs (B2x0 / RTL-SDR / HackRF /
bladeRF). It ships inside ria-toolkit-oss no separate tool to install but
writing to ``/etc/udev/rules.d`` and reloading udev requires root, so run it
once with sudo. Network radios (Pluto/USRP over IP) need nothing here.
"""
import os
import shutil
import subprocess
from importlib.resources import files
try:
src = files("ria_toolkit_oss.agent").joinpath("udev", _UDEV_RULES_NAME)
rules_text = src.read_text()
except Exception as e:
print(f"error: bundled udev rules not found: {e}", file=sys.stderr)
return 1
dest_dir = args.dest
dest = os.path.join(dest_dir, _UDEV_RULES_NAME)
if os.geteuid() != 0:
print(
"error: installing udev rules requires root.\n"
f" run once: sudo {os.path.basename(sys.argv[0])} install-udev",
file=sys.stderr,
)
return 1
try:
os.makedirs(dest_dir, exist_ok=True)
with open(dest, "w") as f:
f.write(rules_text)
print(f"Installed udev rules -> {dest}")
except OSError as e:
print(f"error: failed to write {dest}: {e}", file=sys.stderr)
return 1
if not args.no_reload and shutil.which("udevadm"):
for cmd in (["udevadm", "control", "--reload-rules"], ["udevadm", "trigger"]):
try:
subprocess.run(cmd, check=True)
except Exception as e:
print(f"warning: '{' '.join(cmd)}' failed: {e}", file=sys.stderr)
# Add the invoking (pre-sudo) user to the access group so group-based rules
# apply even without a local logind session.
target_user = os.environ.get("SUDO_USER") or ""
if target_user and shutil.which("usermod"):
try:
subprocess.run(["usermod", "-aG", args.group, target_user], check=True)
print(f"Added user '{target_user}' to group '{args.group}'.")
print(f"Log out and back in (or run 'newgrp {args.group}') for the group to take effect.")
except Exception as e:
print(f"warning: could not add '{target_user}' to '{args.group}': {e}", file=sys.stderr)
print("Done. Unplug and replug your USB SDR, then run `ria-agent stream`.")
return 0
def _derive_ws_url(hub_url: str, agent_id: str) -> str: def _derive_ws_url(hub_url: str, agent_id: str) -> str:
if not hub_url: if not hub_url:
return "" return ""
@ -225,8 +296,35 @@ def main() -> None:
sub.add_parser("run", help="Legacy long-poll agent (NodeAgent)") sub.add_parser("run", help="Legacy long-poll agent (NodeAgent)")
sub.add_parser("detect", help="List available SDR drivers") sub.add_parser("detect", help="List available SDR drivers")
p_udev = sub.add_parser(
"install-udev",
help="Install SDR udev rules so USB radios open without sudo (run once, with sudo)",
)
p_udev.add_argument(
"--dest",
default="/etc/udev/rules.d",
help="Directory to install the rules file into (default: /etc/udev/rules.d)",
)
p_udev.add_argument(
"--group",
default="plugdev",
help="Group granted device access; the invoking user is added to it (default: plugdev)",
)
p_udev.add_argument(
"--no-reload",
action="store_true",
help="Skip 'udevadm control --reload-rules' / 'udevadm trigger'",
)
p_reg = sub.add_parser("register", help="Register agent with RIA Hub and save credentials") p_reg = sub.add_parser("register", help="Register agent with RIA Hub and save credentials")
p_reg.add_argument("--hub", required=True, help="RIA Hub URL (e.g. http://whitehorse:3005)") p_reg.add_argument(
"--hub",
default=DEFAULT_HUB_URL,
help=(
f"RIA Hub URL (default: {DEFAULT_HUB_URL}). "
"Override for dev or self-hosted hubs, e.g. http://whitehorse:3005."
),
)
p_reg.add_argument( p_reg.add_argument(
"--api-key", "--api-key",
dest="api_key", dest="api_key",
@ -295,6 +393,8 @@ def main() -> None:
return return
if args.command == "detect": if args.command == "detect":
sys.exit(_cmd_detect(args)) sys.exit(_cmd_detect(args))
if args.command == "install-udev":
sys.exit(_cmd_install_udev(args))
if args.command == "register": if args.command == "register":
sys.exit(_cmd_register(args)) sys.exit(_cmd_register(args))
if args.command == "stream": if args.command == "stream":

View File

@ -1,17 +1,181 @@
"""Hardware detection and heartbeat payload construction for the streamer.""" """Hardware detection and heartbeat payload construction for the streamer.
The heartbeat advertises a ``hardware`` list the hub uses to populate the
radio-device picker. Each entry is a dict::
{"device": "usrp", "identifier": "name=MyB210",
"label": "Ettus USRP B210 (35D7CAD)", "connected": True}
- ``device`` driver/device-type name (``"usrp"``, ``"pluto"``, ).
- ``identifier`` the exact addressing string this driver wants, or ``None``
to let the driver auto-select the sole device of its type.
The hub forwards this verbatim in ``radio_config`` so the
identifier is always agent-owned never derived from the
composer graph (which is what used to leak a Pluto IP into a
USRP open). It must round-trip through
``ria_toolkit_oss_cli.common.parse_ident``: a bare value is
read as an IP address, so non-network devices use ``None`` or
``name=<value>``.
- ``label`` human-friendly text for the hub dropdown.
- ``connected`` ``True`` when the device was physically enumerated,
``False`` when only the driver is importable (no hardware
probed/found), ``None`` when presence is unknown.
The hub tolerates plain string entries from older agents (see
``_agent_device_names`` / ``hwName``), so this richer shape is backward
compatible.
"""
from __future__ import annotations from __future__ import annotations
import logging
import time
from ria_toolkit_oss.sdr import detect_available from ria_toolkit_oss.sdr import detect_available
from .config import AgentConfig from .config import AgentConfig
logger = logging.getLogger(__name__)
# Human-friendly names for the hub dropdown, keyed by device-type name.
DEVICE_LABELS: dict[str, str] = {
"usrp": "Ettus USRP (UHD)",
"pluto": "ADALM-Pluto",
"rtlsdr": "RTL-SDR",
"hackrf": "HackRF One",
"blade": "BladeRF",
"thinkrf": "ThinkRF (RTSA)",
"mock": "Mock SDR (synthetic)",
}
# Enumeration can shell out (e.g. ``uhd_find_devices``), so cache results for a
# short window rather than re-probing on every ~30s heartbeat. Hot-plug shows up
# within one TTL.
_PROBE_TTL_S = 60.0
_probe_cache: tuple[float, list[dict]] | None = None
def available_devices() -> list[str]: def available_devices() -> list[str]:
"""Return a sorted list of device names whose driver modules import cleanly.""" """Return a sorted list of device names whose driver modules import cleanly."""
return sorted(detect_available().keys()) return sorted(detect_available().keys())
def _label_for(device: str, suffix: str = "") -> str:
base = DEVICE_LABELS.get(device, device)
return f"{base} ({suffix})" if suffix else base
def _enumerate_usrp() -> list[dict] | None:
"""Probe for connected USRPs via ``uhd_find_devices``.
Returns a list of concrete device entries (``connected=True``), an empty
list when UHD ran but found nothing, or ``None`` when probing is not
possible (UHD/driver unavailable) so the caller can fall back to a
driver-only entry.
"""
try:
from ria_toolkit_oss.sdr.usrp import _parse_uhd_find_devices
except Exception:
return None
try:
found = _parse_uhd_find_devices() or []
except Exception as exc:
logger.debug("USRP enumeration failed: %s", exc)
return None
if not found:
return []
# Addressing reality for the CLI get_sdr_device path: its USRP
# _create_device_dict matches the identifier against *raw* device values,
# but common.py prepends "addr="/"name=" before handing it over — so no
# prefixed identifier ever matches. The only reliable open for a USB USRP
# (B2x0) is auto-select (identifier=None → first device found). Networked
# USRPs addressed by IP would need a separate fix and aren't enumerated
# distinctly here. So advertise one auto-select entry, labelled with the
# serial(s) we saw so the operator still knows what's attached.
labels = [dev.get("serial") or dev.get("name") or dev.get("product") or "USRP" for dev in found]
return [
{
"device": "usrp",
"identifier": None,
"label": _label_for("usrp", ", ".join(labels)),
"connected": True,
}
]
# Device types we can cheaply enumerate into concrete instances. Anything not
# listed is advertised as a single driver-only entry (presence unknown).
_PROBERS = {
"usrp": _enumerate_usrp,
}
def _detect_devices_uncached() -> list[dict]:
out: list[dict] = []
for device in available_devices():
prober = _PROBERS.get(device)
if prober is not None:
probed = prober()
if probed: # one or more concrete instances found
out.extend(probed)
continue
if probed == []: # prober ran but found no hardware
out.append(
{
"device": device,
"identifier": None,
"label": _label_for(device),
"connected": False,
}
)
continue
# probed is None — couldn't probe; fall through to unknown entry.
out.append(
{
"device": device,
"identifier": None,
"label": _label_for(device),
"connected": None,
}
)
return out
def _driver_only_devices() -> list[dict]:
"""Hardware list from importable drivers alone — no device probing."""
return [{"device": d, "identifier": None, "label": _label_for(d), "connected": None} for d in available_devices()]
def detect_devices(*, use_cache: bool = True, probe: bool = True) -> list[dict]:
"""Return enriched ``hardware`` entries for the heartbeat.
Results are cached for ``_PROBE_TTL_S`` seconds because enumeration may shell
out to hardware tools (e.g. ``uhd_find_devices``). Pass ``use_cache=False``
to force a fresh probe.
``probe=False`` MUST be used while a capture/transmit session is active:
probing a USB SDR (running ``uhd_find_devices``) while it is streaming
disrupts the live stream and makes the device briefly disappear. In that
case we return the last good enumeration if we have one, else a driver-only
list never touching the hardware.
"""
global _probe_cache
now = time.monotonic()
if use_cache and _probe_cache is not None:
ts, cached = _probe_cache
if not probe or (now - ts < _PROBE_TTL_S):
return cached
if not probe:
# No cache yet and we must not touch the hardware mid-stream.
return _driver_only_devices()
devices = _detect_devices_uncached()
_probe_cache = (now, devices)
return devices
def heartbeat_payload( def heartbeat_payload(
status: str = "idle", status: str = "idle",
app_id: str | None = None, app_id: str | None = None,
@ -30,9 +194,11 @@ def heartbeat_payload(
if c.tx_enabled: if c.tx_enabled:
capabilities.append("tx") capabilities.append("tx")
# Never probe the hardware while a session is active: running
# uhd_find_devices against a streaming USB SDR disrupts the live capture.
payload: dict = { payload: dict = {
"type": "heartbeat", "type": "heartbeat",
"hardware": available_devices(), "hardware": detect_devices(probe=not bool(sessions)),
"status": status, "status": status,
"capabilities": capabilities, "capabilities": capabilities,
"tx_enabled": bool(c.tx_enabled), "tx_enabled": bool(c.tx_enabled),

View File

@ -249,12 +249,19 @@ class Streamer:
await self._send_error(app_id, "start missing radio_config.device") await self._send_error(app_id, "start missing radio_config.device")
return return
# Open the SDR in a thread, never inline. The open is blocking and can be
# slow — a USRP shells out to uhd_find_devices and loads its FPGA, which
# takes seconds — and doing it on the event loop freezes the WebSocket
# keepalive long enough that the hub drops the agent and stops the app.
# (A Pluto opens fast enough to slip under the timeout, which is why it
# worked where a USRP hung.)
loop = asyncio.get_running_loop()
try: try:
sdr, device_key = self._registry.acquire(device, identifier) sdr, device_key = await loop.run_in_executor(None, self._registry.acquire, device, identifier)
_apply_sdr_config(sdr, radio_config) await loop.run_in_executor(None, _apply_sdr_config, sdr, radio_config)
except Exception as exc: except Exception as exc:
logger.exception("Failed to open SDR %r", device) logger.exception("Failed to open SDR %r", device)
await self._send_error(app_id, f"SDR init failed: {exc}") await self._send_error(app_id, f"SDR init failed: {_friendly_sdr_error(device, exc)}")
return return
# Inherit any pending config that was queued before start. # Inherit any pending config that was queued before start.
@ -385,42 +392,51 @@ class Streamer:
await self._send_tx_status(app_id, "error", "tx_start missing radio_config.device") await self._send_tx_status(app_id, "error", "tx_start missing radio_config.device")
return return
device_key: tuple[str, str | None] | None = None # Open + init the SDR in a thread, never inline — the open is blocking and
sdr: Any = None # slow on a USRP (uhd_find_devices + FPGA load), and freezing the event
try: # loop stalls the WebSocket keepalive until the hub drops us. Cleanup on
sdr, device_key = self._registry.acquire(device, identifier) # failure (release/close) stays inside the thread so a partial open never
_apply_sdr_config(sdr, radio_config) # leaks a device handle.
# init_tx is mandatory for any driver that exposes it: drivers def _open_and_init_tx() -> tuple[Any, tuple[str, str | None]]:
# that gate _stream_tx on _tx_initialized (Pluto, HackRF, USRP, sdr_local, key_local = self._registry.acquire(device, identifier)
# …) crash with a confusing "TX was not initialized" error 2 s try:
# later in the executor thread if we skip it. Treat the three _apply_sdr_config(sdr_local, radio_config)
# required keys as a hard contract — a missing one is a hub-side # init_tx is mandatory for any driver that exposes it: drivers
# manifest bug and we want it surfaced immediately, not papered # that gate _stream_tx on _tx_initialized (Pluto, HackRF, USRP,
# over with stale radio state. # …) crash with a confusing "TX was not initialized" error 2 s
if hasattr(sdr, "init_tx"): # later in the executor thread if we skip it. Treat the three
init_args = {k: radio_config.get(f"tx_{k}") for k in ("sample_rate", "center_frequency", "gain")} # required keys as a hard contract — a missing one is a hub-side
missing = [f"tx_{k}" for k, v in init_args.items() if v is None] # manifest bug and we want it surfaced immediately, not papered
if missing: # over with stale radio state.
raise ValueError(f"tx_start missing required radio_config keys: {missing}") if hasattr(sdr_local, "init_tx"):
sdr.init_tx( init_args = {k: radio_config.get(f"tx_{k}") for k in ("sample_rate", "center_frequency", "gain")}
sample_rate=init_args["sample_rate"], missing = [f"tx_{k}" for k, v in init_args.items() if v is None]
center_frequency=init_args["center_frequency"], if missing:
gain=init_args["gain"], raise ValueError(f"tx_start missing required radio_config keys: {missing}")
channel=radio_config.get("tx_channel", 0), sdr_local.init_tx(
gain_mode=radio_config.get("tx_gain_mode", "manual"), sample_rate=init_args["sample_rate"],
) center_frequency=init_args["center_frequency"],
except Exception as exc: gain=init_args["gain"],
if device_key is not None: channel=radio_config.get("tx_channel", 0),
if self._registry.release(device_key): gain_mode=radio_config.get("tx_gain_mode", "manual"),
)
except Exception:
if self._registry.release(key_local):
try: try:
sdr.close() sdr_local.close()
except Exception: except Exception:
pass pass
logger.exception("Failed to init TX on %r", device) raise
await self._send_tx_status(app_id, "error", f"tx init failed: {exc}") return sdr_local, key_local
return
self._loop = asyncio.get_running_loop() self._loop = asyncio.get_running_loop()
try:
sdr, device_key = await self._loop.run_in_executor(None, _open_and_init_tx)
except Exception as exc:
logger.exception("Failed to init TX on %r", device)
await self._send_tx_status(app_id, "error", f"tx init failed: {_friendly_sdr_error(device, exc)}")
return
session = TxSession( session = TxSession(
app_id=app_id, app_id=app_id,
sdr=sdr, sdr=sdr,
@ -732,6 +748,25 @@ def _default_sdr_factory(device: str, identifier: str | None):
return get_sdr_device(device, ident=identifier) return get_sdr_device(device, ident=identifier)
def _friendly_sdr_error(device: str, exc: Exception) -> str:
"""Add an actionable hint when an SDR open fails on USB permissions.
UHD/libusb surface this as 'insufficient permissions' / EACCES, which is
cryptic to operators. Point them at the one-time fix that ships with the
toolkit instead of leaving them to discover udev rules on their own.
"""
text = str(exc).lower()
permission_markers = ("insufficient permissions", "permission denied", "eacces", "access denied")
is_perm = isinstance(exc, PermissionError) or any(m in text for m in permission_markers)
if is_perm:
return (
f"{exc}\n"
f"USB permission denied opening '{device}'. Run this once, then replug the device:\n"
f" sudo ria-agent install-udev"
)
return str(exc)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Top-level entry # Top-level entry

View File

@ -0,0 +1,34 @@
# RIA Toolkit SDR udev rules
#
# Grants non-root access to the USB SDRs ria-agent can drive, so `ria-agent
# stream` can open them without sudo. Installed by `ria-agent install-udev`.
#
# Access is granted two ways for portability:
# - GROUP="plugdev", MODE="0660" — classic group-based access.
# - TAG+="uaccess" — systemd-logind grants the active local
# session user access dynamically.
# A user in `plugdev` (or logged in locally) can open the device after replug.
# ADALM-Pluto (Analog Devices)
SUBSYSTEM=="usb", ATTRS{idVendor}=="0456", ATTRS{idProduct}=="b673", MODE="0660", GROUP="plugdev", TAG+="uaccess"
# RTL-SDR (Realtek RTL2832U)
SUBSYSTEM=="usb", ATTRS{idVendor}=="0bda", ATTRS{idProduct}=="2832", MODE="0660", GROUP="plugdev", TAG+="uaccess"
SUBSYSTEM=="usb", ATTRS{idVendor}=="0bda", ATTRS{idProduct}=="2838", MODE="0660", GROUP="plugdev", TAG+="uaccess"
# HackRF (Great Scott Gadgets)
SUBSYSTEM=="usb", ATTRS{idVendor}=="1d50", ATTRS{idProduct}=="6089", MODE="0660", GROUP="plugdev", TAG+="uaccess"
SUBSYSTEM=="usb", ATTRS{idVendor}=="1d50", ATTRS{idProduct}=="604b", MODE="0660", GROUP="plugdev", TAG+="uaccess"
SUBSYSTEM=="usb", ATTRS{idVendor}=="1d50", ATTRS{idProduct}=="cc15", MODE="0660", GROUP="plugdev", TAG+="uaccess"
# Ettus USRP B2x0 (UHD)
SUBSYSTEM=="usb", ATTRS{idVendor}=="2500", ATTRS{idProduct}=="0020", MODE="0660", GROUP="plugdev", TAG+="uaccess"
SUBSYSTEM=="usb", ATTRS{idVendor}=="2500", ATTRS{idProduct}=="0021", MODE="0660", GROUP="plugdev", TAG+="uaccess"
SUBSYSTEM=="usb", ATTRS{idVendor}=="2500", ATTRS{idProduct}=="0022", MODE="0660", GROUP="plugdev", TAG+="uaccess"
# USRP B2x0 in bootloader / uninitialized (Cypress FX3 / legacy Ettus VID)
SUBSYSTEM=="usb", ATTRS{idVendor}=="fffe", ATTRS{idProduct}=="0002", MODE="0660", GROUP="plugdev", TAG+="uaccess"
SUBSYSTEM=="usb", ATTRS{idVendor}=="04b4", ATTRS{idProduct}=="00f3", MODE="0660", GROUP="plugdev", TAG+="uaccess"
# Nuand bladeRF
SUBSYSTEM=="usb", ATTRS{idVendor}=="2cf0", ATTRS{idProduct}=="5246", MODE="0660", GROUP="plugdev", TAG+="uaccess"
SUBSYSTEM=="usb", ATTRS{idVendor}=="2cf0", ATTRS{idProduct}=="5250", MODE="0660", GROUP="plugdev", TAG+="uaccess"

View File

@ -119,9 +119,15 @@ class WsClient:
await asyncio.sleep(self.reconnect_pause) await asyncio.sleep(self.reconnect_pause)
async def _heartbeat_loop(self, heartbeat: HeartbeatBuilder) -> None: async def _heartbeat_loop(self, heartbeat: HeartbeatBuilder) -> None:
loop = asyncio.get_running_loop()
while True: while True:
try: try:
await self.send_json(heartbeat()) # Build off the event loop: a heartbeat can probe SDR hardware
# (e.g. uhd_find_devices on a USRP), which blocks for seconds and
# would otherwise freeze the WebSocket keepalive long enough for
# the hub to drop the agent.
payload = await loop.run_in_executor(None, heartbeat)
await self.send_json(payload)
except Exception as exc: except Exception as exc:
logger.debug("Heartbeat send failed: %s", exc) logger.debug("Heartbeat send failed: %s", exc)
return return

View File

@ -7,7 +7,7 @@ import numpy as np
import uhd import uhd
from ria_toolkit_oss.data.recording import Recording from ria_toolkit_oss.data.recording import Recording
from ria_toolkit_oss.sdr.sdr import SDR, SDRParameterError from ria_toolkit_oss.sdr.sdr import SDR, SdrDisconnectedError, SDRError, SDRParameterError
class USRP(SDR): class USRP(SDR):
@ -32,6 +32,13 @@ class USRP(SDR):
self._rx_initialized = False self._rx_initialized = False
self._tx_initialized = False self._tx_initialized = False
# True once a continuous RX stream has been started (see rx()). Kept
# running across rx() calls so the agent streamer gets gapless capture
# instead of a start/stop per buffer.
self._rx_streaming = False
# Samples received past the end of one rx() request, carried into the
# next call so nothing is dropped between buffers.
self._rx_residual = np.empty(0, dtype=np.complex64)
def init_rx( def init_rx(
self, self,
@ -65,7 +72,7 @@ class USRP(SDR):
# build USRP object # build USRP object
usrp_args = _generate_usrp_config_string(sample_rate=sample_rate, device_dict=self.device_dict) usrp_args = _generate_usrp_config_string(sample_rate=sample_rate, device_dict=self.device_dict)
self.usrp = uhd.usrp.MultiUSRP(usrp_args) self.usrp = _open_multi_usrp(usrp_args)
# check if channel arg is valid # check if channel arg is valid
max_num_channels = self.usrp.get_rx_num_channels() max_num_channels = self.usrp.get_rx_num_channels()
@ -96,6 +103,8 @@ class USRP(SDR):
# flag to prevent user from calling certain functions before this one. # flag to prevent user from calling certain functions before this one.
self._rx_initialized = True self._rx_initialized = True
self._tx_initialized = False self._tx_initialized = False
self._rx_streaming = False # (re)started lazily on the first rx() call
self._rx_residual = np.empty(0, dtype=np.complex64)
return {"sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain} return {"sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain}
@ -265,6 +274,97 @@ class USRP(SDR):
return Recording(data=store_array[:, :num_samples], metadata=metadata) return Recording(data=store_array[:, :num_samples], metadata=metadata)
def rx(self, num_samples: int) -> "np.ndarray":
"""Return *num_samples* complex64 IQ samples from a continuous RX stream.
This is the interface the agent streamer's capture loop calls every
buffer. Unlike ``record()`` (a one-shot that issues ``start_cont`` /
``stop_cont`` and sleeps each call), this keeps a single continuous
stream running across calls, so capture is gapless no per-buffer
start/stop churn, transients, or zero-filled gaps that show up as black
bands in the spectrogram.
On the first call it auto-initializes RX (from ``sample_rate`` /
``center_freq`` / ``gain`` set by the caller) and issues ``start_cont``
once. ``close()`` (or ``stop()``) stops the stream.
"""
if not self._rx_initialized:
gain = self.gain if isinstance(self.gain, (int, float)) else 40.0
self.init_rx(
sample_rate=self.sample_rate,
center_frequency=self.center_freq,
gain=gain,
channel=0,
)
if not self._rx_streaming:
stream_command = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
stream_command.stream_now = True
self.rx_stream.issue_stream_cmd(stream_command)
self._enable_rx = True
self._rx_streaming = True
print("USRP Starting RX (continuous)...")
out = np.empty(num_samples, dtype=np.complex64)
filled = 0
# Drain any samples carried over from the previous call first.
if self._rx_residual.size:
take = min(self._rx_residual.size, num_samples)
out[:take] = self._rx_residual[:take]
self._rx_residual = self._rx_residual[take:]
filled = take
recv_buffer = np.zeros((1, self.rx_buffer_size), dtype=np.complex64)
consecutive_timeouts = 0
error_codes = uhd.types.RXMetadataErrorCode
while filled < num_samples:
n = self.rx_stream.recv(recv_buffer, self.metadata, self.timeout)
err = self.metadata.error_code
if err == error_codes.timeout:
consecutive_timeouts += 1
# A stalled stream is a disconnect, not a transient hiccup.
if consecutive_timeouts >= 5:
self._rx_streaming = False
raise SdrDisconnectedError("USRP RX timed out repeatedly — device may be disconnected")
continue
consecutive_timeouts = 0
# Overflow ("O") means the host fell behind and UHD dropped samples
# upstream; the samples we did get are still valid, so keep going.
if err not in (error_codes.none, error_codes.overflow):
self._rx_streaming = False
raise SDRError(f"USRP RX error: {err}")
if n <= 0:
continue
take = min(n, num_samples - filled)
out[filled : filled + take] = recv_buffer[0, :take]
filled += take
# Keep anything received past this request for the next call so the
# stream stays gapless across rx() boundaries.
if take < n:
self._rx_residual = recv_buffer[0, take:n].copy()
return out
def _stop_rx_stream(self) -> None:
"""Issue stop_cont for the continuous RX stream, if running."""
if not self._rx_streaming:
return
self._enable_rx = False
try:
stop_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont)
stop_cmd.stream_now = True
self.rx_stream.issue_stream_cmd(stop_cmd)
except Exception:
pass
self._rx_streaming = False
self._rx_residual = np.empty(0, dtype=np.complex64)
print("USRP RX stopped.")
def init_tx( def init_tx(
self, self,
sample_rate: int | float, sample_rate: int | float,
@ -294,7 +394,7 @@ class USRP(SDR):
print(f"USRP TX Gain Mode = '{gain_mode}'") print(f"USRP TX Gain Mode = '{gain_mode}'")
config_str = _generate_usrp_config_string(sample_rate=sample_rate, device_dict=self.device_dict) config_str = _generate_usrp_config_string(sample_rate=sample_rate, device_dict=self.device_dict)
self.usrp = uhd.usrp.MultiUSRP(config_str) self.usrp = _open_multi_usrp(config_str)
# check if channel arg is valid # check if channel arg is valid
max_num_channels = self.usrp.get_rx_num_channels() max_num_channels = self.usrp.get_rx_num_channels()
@ -371,6 +471,7 @@ class USRP(SDR):
print(f"USRP TX Gain = {self.tx_gain}") print(f"USRP TX Gain = {self.tx_gain}")
def close(self): def close(self):
self._stop_rx_stream()
self._tx_initialized = False self._tx_initialized = False
self._rx_initialized = False self._rx_initialized = False
if hasattr(self, "rx_stream"): if hasattr(self, "rx_stream"):
@ -462,6 +563,32 @@ class USRP(SDR):
return {"center_frequency": True, "sample_rate": True, "gain": True} return {"center_frequency": True, "sample_rate": True, "gain": True}
def _open_multi_usrp(usrp_args, *, attempts=4, settle_s=2.0):
"""Construct a ``uhd.usrp.MultiUSRP``, retrying transient B200 USB states.
On USB USRPs (B200/B210) the ``uhd_find_devices`` enumeration that resolves
the device (see ``_create_device_dict``) runs immediately before the open and
can leave the FX3 USB controller mid-reset, so the first open fails with e.g.
``RuntimeError: fx3 is in state 5``. The device settles once that
enumeration's USB handle is fully released — and the failed open itself nudges
the FX3 to reload firmware/FPGA so we retry with a short backoff before
giving up. A non-transient error (bad args, genuinely absent device) is
re-raised immediately.
"""
for attempt in range(1, attempts + 1):
try:
return uhd.usrp.MultiUSRP(usrp_args)
except RuntimeError as exc:
msg = str(exc).lower()
transient = ("fx3" in msg) or ("usb" in msg) or ("no devices found" in msg)
if not transient or attempt == attempts:
raise
print(
f"\033[93mUSRP open attempt {attempt}/{attempts} failed " f"({exc}); retrying in {settle_s}s…\033[0m"
)
time.sleep(settle_s)
def _create_device_dict(identifier_value=None): def _create_device_dict(identifier_value=None):
""" """
Get the dictionary of information corresponding to any unique identifier, Get the dictionary of information corresponding to any unique identifier,

View File

@ -0,0 +1,36 @@
"""Tests for `ria-agent install-udev` and the bundled udev rules."""
from __future__ import annotations
from importlib.resources import files
from unittest.mock import patch
from ria_toolkit_oss.agent import cli as agent_cli
def test_bundled_udev_rules_present_and_cover_usb_sdrs():
text = files("ria_toolkit_oss.agent").joinpath("udev", "90-ria-sdr.rules").read_text()
# ADALM-Pluto, RTL-SDR, HackRF, and USRP B2x0 VIDs must be covered.
for vid in ("0456", "0bda", "1d50", "2500"):
assert vid in text
def test_install_udev_requires_root(capsys):
args = agent_cli.argparse.Namespace(dest="/etc/udev/rules.d", group="plugdev", no_reload=True)
with patch("os.geteuid", return_value=1000):
rc = agent_cli._cmd_install_udev(args)
assert rc == 1
err = capsys.readouterr().err
assert "requires root" in err
assert "install-udev" in err
def test_install_udev_writes_rules_when_root(tmp_path, monkeypatch, capsys):
args = agent_cli.argparse.Namespace(dest=str(tmp_path), group="plugdev", no_reload=True)
# No SUDO_USER and --no-reload → no subprocess calls; just the file write.
monkeypatch.delenv("SUDO_USER", raising=False)
with patch("os.geteuid", return_value=0):
rc = agent_cli._cmd_install_udev(args)
assert rc == 0
written = (tmp_path / "90-ria-sdr.rules").read_text()
assert "SUBSYSTEM" in written

View File

@ -140,3 +140,59 @@ def test_register_surfaces_reason_on_http_error(tmp_path, capsys):
assert "Settings → RIA Agents" in captured.err assert "Settings → RIA Agents" in captured.err
# Config must NOT be written on failure. # Config must NOT be written on failure.
assert not cfg_path.exists() assert not cfg_path.exists()
def test_default_hub_url_is_production():
"""Lock in the constant so a future typo doesn't silently redirect users."""
assert agent_cli.DEFAULT_HUB_URL == "https://riahub.ai"
def test_register_defaults_hub_to_production(tmp_path):
"""Omitting --hub uses the production hub URL constant."""
cfg_path = tmp_path / "agent.json"
captured: dict = {}
def _fake_urlopen(req, *args, **kwargs):
captured["url"] = req.full_url
raise urllib.error.HTTPError(
url=req.full_url, code=403, msg="", hdrs=None, # type: ignore[arg-type]
fp=BytesIO(_structured("invalid_key")),
)
with (
patch.dict("os.environ", {"RIA_AGENT_CONFIG": str(cfg_path)}, clear=False),
patch("urllib.request.urlopen", side_effect=_fake_urlopen),
patch.object(sys, "argv", ["ria-agent", "register", "--api-key", "ria_reg_x"]),
):
with pytest.raises(SystemExit):
agent_cli.main()
assert captured["url"] == f"{agent_cli.DEFAULT_HUB_URL}/screens/agents/register"
def test_register_hub_override_wins_over_default(tmp_path):
"""Explicit --hub still wins; default is only a fallback."""
cfg_path = tmp_path / "agent.json"
captured: dict = {}
def _fake_urlopen(req, *args, **kwargs):
captured["url"] = req.full_url
raise urllib.error.HTTPError(
url=req.full_url, code=403, msg="", hdrs=None, # type: ignore[arg-type]
fp=BytesIO(_structured("invalid_key")),
)
with (
patch.dict("os.environ", {"RIA_AGENT_CONFIG": str(cfg_path)}, clear=False),
patch("urllib.request.urlopen", side_effect=_fake_urlopen),
patch.object(
sys,
"argv",
["ria-agent", "register", "--hub", "http://whitehorse:3005", "--api-key", "ria_reg_x"],
),
):
with pytest.raises(SystemExit):
agent_cli.main()
assert captured["url"] == "http://whitehorse:3005/screens/agents/register"
assert agent_cli.DEFAULT_HUB_URL not in captured["url"]

View File

@ -17,11 +17,16 @@ def test_available_devices_sorted_list():
assert "mock" in devices assert "mock" in devices
def _device_names(hardware_list):
return {e["device"] for e in hardware_list}
def test_heartbeat_payload_shape(): def test_heartbeat_payload_shape():
p = hardware.heartbeat_payload() p = hardware.heartbeat_payload()
assert p["type"] == "heartbeat" assert p["type"] == "heartbeat"
assert p["status"] == "idle" assert p["status"] == "idle"
assert "mock" in p["hardware"] # hardware is now a list of rich dict entries.
assert "mock" in _device_names(p["hardware"])
assert "app_id" not in p assert "app_id" not in p
# New fields, default shape # New fields, default shape
assert p["capabilities"] == ["rx"] assert p["capabilities"] == ["rx"]
@ -32,6 +37,53 @@ def test_heartbeat_payload_shape():
assert p2["app_id"] == "abc" assert p2["app_id"] == "abc"
def test_detect_devices_entry_shape():
devices = hardware.detect_devices(use_cache=False)
assert isinstance(devices, list)
for entry in devices:
assert set(entry) >= {"device", "identifier", "label", "connected"}
assert isinstance(entry["device"], str)
# identifier round-trips through parse_ident: None or a string.
assert entry["identifier"] is None or isinstance(entry["identifier"], str)
mock = next(e for e in devices if e["device"] == "mock")
assert mock["label"] # has a human label
def test_detect_devices_cache():
a = hardware.detect_devices(use_cache=False)
b = hardware.detect_devices(use_cache=True)
assert _device_names(a) == _device_names(b)
def test_detect_devices_probe_false_never_touches_hardware(monkeypatch):
# probe=False must not run the hardware enumerators (uhd_find_devices etc.),
# which would disrupt an active USB capture.
def boom():
raise AssertionError("hardware must not be probed when probe=False")
monkeypatch.setattr(hardware, "_detect_devices_uncached", boom)
monkeypatch.setattr(hardware, "_probe_cache", None)
devices = hardware.detect_devices(probe=False, use_cache=False)
assert all(e.get("connected") is None for e in devices) # driver-only
def test_heartbeat_disables_probe_during_active_session(monkeypatch):
seen = {}
def fake_detect(**kw):
seen.clear()
seen.update(kw)
return []
monkeypatch.setattr(hardware, "detect_devices", fake_detect)
hardware.heartbeat_payload(sessions={"rx": {"app_id": "a", "state": "streaming"}})
assert seen.get("probe") is False # streaming → no hardware probe
hardware.heartbeat_payload(sessions=None)
assert seen.get("probe") is True # idle → probe allowed
def test_heartbeat_payload_tx_capability_from_cfg(): def test_heartbeat_payload_tx_capability_from_cfg():
from ria_toolkit_oss.agent.config import AgentConfig from ria_toolkit_oss.agent.config import AgentConfig

View File

@ -9,11 +9,24 @@ import numpy as np
from ria_toolkit_oss.agent.streamer import ( from ria_toolkit_oss.agent.streamer import (
Streamer, Streamer,
_apply_sdr_config, _apply_sdr_config,
_friendly_sdr_error,
_samples_to_interleaved_float32, _samples_to_interleaved_float32,
) )
from ria_toolkit_oss.sdr.mock import MockSDR from ria_toolkit_oss.sdr.mock import MockSDR
def test_friendly_sdr_error_adds_udev_hint_on_permission():
msg = _friendly_sdr_error("usrp", RuntimeError("USB open failed: insufficient permissions."))
assert "install-udev" in msg
assert "insufficient permissions" in msg
def test_friendly_sdr_error_passes_through_other_errors():
msg = _friendly_sdr_error("usrp", RuntimeError("No USRP device found for identifier 'name=x'"))
assert "install-udev" not in msg
assert "No USRP device found" in msg
class FakeWs: class FakeWs:
def __init__(self): def __init__(self):
self.json_sent: list[dict] = [] self.json_sent: list[dict] = []

View File

@ -0,0 +1,91 @@
"""Hardware-free tests for _open_multi_usrp's transient-FX3 retry.
On B200/B210 the `uhd_find_devices` enumeration that runs right before opening
can leave the FX3 USB controller mid-reset, so the first MultiUSRP open fails
with "fx3 is in state 5". _open_multi_usrp retries transient USB states with a
short settle; a non-transient error is re-raised immediately.
"""
from __future__ import annotations
import sys
import types
import pytest
@pytest.fixture
def usrp_mod(monkeypatch):
"""Import the usrp module against a stub `uhd`, with time.sleep neutered."""
saved_uhd = sys.modules.get("uhd")
saved_usrp = sys.modules.get("ria_toolkit_oss.sdr.usrp")
uhd = types.ModuleType("uhd")
uhd.usrp = types.SimpleNamespace(MultiUSRP=None) # set per-test
sys.modules["uhd"] = uhd
sys.modules.pop("ria_toolkit_oss.sdr.usrp", None)
import ria_toolkit_oss.sdr.usrp as mod
monkeypatch.setattr(mod.time, "sleep", lambda *_a, **_k: None)
yield mod
for name, m in (("uhd", saved_uhd), ("ria_toolkit_oss.sdr.usrp", saved_usrp)):
if m is None:
sys.modules.pop(name, None)
else:
sys.modules[name] = m
def _flaky_factory(fail_times, exc):
"""A MultiUSRP stand-in that raises `exc` the first `fail_times` calls."""
calls = {"n": 0}
def make(args):
calls["n"] += 1
if calls["n"] <= fail_times:
raise exc
return f"usrp<{args}>"
make.calls = calls
return make
def test_retries_transient_fx3_state_then_succeeds(usrp_mod):
factory = _flaky_factory(2, RuntimeError("RuntimeError: fx3 is in state 5"))
usrp_mod.uhd.usrp.MultiUSRP = factory
out = usrp_mod._open_multi_usrp("name=B210,", attempts=4, settle_s=0)
assert out == "usrp<name=B210,>"
assert factory.calls["n"] == 3 # failed twice, third succeeded
def test_gives_up_after_attempts_and_raises_last(usrp_mod):
factory = _flaky_factory(99, RuntimeError("fx3 is in state 5"))
usrp_mod.uhd.usrp.MultiUSRP = factory
with pytest.raises(RuntimeError, match="fx3 is in state 5"):
usrp_mod._open_multi_usrp("name=B210,", attempts=3, settle_s=0)
assert factory.calls["n"] == 3 # exactly `attempts` tries, no infinite loop
def test_non_transient_error_is_raised_immediately(usrp_mod):
factory = _flaky_factory(99, RuntimeError("EnvironmentError: no UHD images"))
usrp_mod.uhd.usrp.MultiUSRP = factory
with pytest.raises(RuntimeError, match="no UHD images"):
usrp_mod._open_multi_usrp("name=B210,", attempts=4, settle_s=0)
assert factory.calls["n"] == 1 # not retried — fails fast
def test_success_on_first_try_does_not_retry(usrp_mod):
factory = _flaky_factory(0, RuntimeError("fx3 is in state 5"))
usrp_mod.uhd.usrp.MultiUSRP = factory
out = usrp_mod._open_multi_usrp("addr=192.168.10.2,", attempts=4, settle_s=0)
assert out == "usrp<addr=192.168.10.2,>"
assert factory.calls["n"] == 1

142
tests/agent/test_usrp_rx.py Normal file
View File

@ -0,0 +1,142 @@
"""Hardware-free tests for the USRP continuous-streaming rx().
`uhd` isn't importable without the UHD install, so we stub the bits USRP.rx()
touches and drive it with a scripted fake rx_stream. The point is to prove the
capture is gapless across rx() calls the property that fixes the choppy /
black-banded spectrogram caused by the old start/stop-per-buffer record().
"""
from __future__ import annotations
import sys
import types
import numpy as np
import pytest
def _install_fake_uhd():
uhd = types.ModuleType("uhd")
class StreamCMD:
def __init__(self, mode):
self.mode = mode
self.stream_now = False
self.time_spec = None
uhd.types = types.SimpleNamespace(
StreamCMD=StreamCMD,
StreamMode=types.SimpleNamespace(start_cont="start_cont", stop_cont="stop_cont"),
RXMetadataErrorCode=types.SimpleNamespace(none="none", overflow="overflow", timeout="timeout"),
)
uhd.usrp = types.SimpleNamespace()
sys.modules["uhd"] = uhd
return uhd
@pytest.fixture
def USRP():
# Snapshot so the fake uhd / freshly-imported usrp don't leak into other
# tests (e.g. detect_available() would otherwise think usrp is importable).
saved_uhd = sys.modules.get("uhd")
saved_usrp = sys.modules.get("ria_toolkit_oss.sdr.usrp")
_install_fake_uhd()
sys.modules.pop("ria_toolkit_oss.sdr.usrp", None)
from ria_toolkit_oss.sdr.usrp import USRP as _USRP
yield _USRP
for name, mod in (("uhd", saved_uhd), ("ria_toolkit_oss.sdr.usrp", saved_usrp)):
if mod is None:
sys.modules.pop(name, None)
else:
sys.modules[name] = mod
class _FakeStream:
"""Delivers a contiguous ramp of samples; ``real`` part is the sample index.
``script`` is a list of (count, error_code) the recv loop walks through.
"""
def __init__(self, script, metadata):
self._script = list(script)
self._metadata = metadata
self._counter = 0
self.issued = []
def issue_stream_cmd(self, cmd):
self.issued.append(cmd.mode)
def recv(self, buffer, metadata, timeout):
count, err = self._script.pop(0)
metadata.error_code = err
if count > 0:
idx = np.arange(self._counter, self._counter + count, dtype=np.float32)
buffer[0, :count] = idx.astype(np.complex64)
self._counter += count
return count
def _make_usrp(USRP, script, rx_buffer_size=4):
u = USRP.__new__(USRP)
u._rx_initialized = True
u._rx_streaming = False
u._rx_residual = np.empty(0, dtype=np.complex64)
u.rx_buffer_size = rx_buffer_size
u.timeout = 0.1
u._enable_rx = False
u.metadata = types.SimpleNamespace(error_code="none")
u.rx_stream = _FakeStream(script, u.metadata)
return u
def test_rx_is_gapless_across_calls(USRP):
# rx_buffer_size=4; each recv yields 4 fresh samples. Two rx(6) calls must
# return a contiguous 0..11 ramp — the over-read remainder is carried over.
script = [(4, "none")] * 4
u = _make_usrp(USRP, script)
first = u.rx(6)
second = u.rx(6)
assert first.dtype == np.complex64 and len(first) == 6
combined = np.concatenate([first, second]).real
assert np.array_equal(combined, np.arange(12, dtype=np.float32)) # no drops, no zeros
assert "start_cont" in u.rx_stream.issued # stream started exactly via start_cont
assert u.rx_stream.issued.count("start_cont") == 1 # ...and only once
def test_rx_starts_stream_only_once(USRP):
u = _make_usrp(USRP, [(4, "none")] * 6)
u.rx(4)
u.rx(4)
assert u.rx_stream.issued.count("start_cont") == 1
def test_rx_keeps_going_on_overflow(USRP):
# Overflow samples are still valid — they must be used, not dropped.
script = [(2, "none"), (2, "overflow"), (2, "none")]
u = _make_usrp(USRP, script)
out = u.rx(6).real
assert np.array_equal(out, np.arange(6, dtype=np.float32))
def test_rx_raises_on_persistent_timeout(USRP):
from ria_toolkit_oss.sdr.sdr import SdrDisconnectedError
u = _make_usrp(USRP, [(0, "timeout")] * 10)
with pytest.raises(SdrDisconnectedError):
u.rx(4)
def test_stop_rx_stream_resets_state(USRP):
u = _make_usrp(USRP, [(4, "none")] * 4)
u.rx(6) # leaves a 2-sample residual, stream running
assert u._rx_streaming is True
assert u._rx_residual.size == 2
u._stop_rx_stream()
assert u._rx_streaming is False
assert u._rx_residual.size == 0
assert "stop_cont" in u.rx_stream.issued