Compare commits

...

1 Commits

Author SHA1 Message Date
81b0f28507 adding docs so they are not lost 2026-06-16 11:54:05 -04:00
10 changed files with 2438 additions and 0 deletions

View File

@ -0,0 +1,223 @@
# Agent TX Streaming — `ria-toolkit-oss` Handoff
**Paired repo:** `ria-hub` (this doc lives here, but it's written for the Claude working in `ria-toolkit-oss`)
**Source of truth for the overall design:** [Agent TX Streaming - Cross-Repo Plan.md](./Agent%20TX%20Streaming%20-%20Cross-Repo%20Plan.md) — read that first.
**Status (ria-hub side):** landed 2026-04-16. Ready to talk to a TX-capable agent.
---
## Your job
Implement Part A of the plan in `ria-toolkit-oss` (§A1A8). The hub is already speaking the protocol below and waiting for an agent that can:
1. Accept hub → agent binary TX buffers over an existing WebSocket.
2. Enforce the operator-configured TX interlocks (`tx_enabled`, `tx_max_gain_db`, `tx_max_duration_s`, `tx_allowed_freq_ranges`).
3. Drive `sdr.init_tx` / `_stream_tx` so a real Pluto transmits what the hub sends.
4. Report status back as `tx_status` JSON frames.
Full-duplex with the existing RX session on the same `app_id` must keep working.
---
## What ria-hub does (so you know what's on the other end of the wire)
You don't need to know any of this to do your work — but if you hit a wall, here's the mental model:
| Hub-side concept | What it does |
|---|---|
| `AgentTxSink` (Python, Celery worker) | Mirrors `AgentDataSource`. Publishes `tx_start`/`tx_stop`/`tx_configure` control JSON and raw binary IQ buffers intended for the agent. |
| `/screens/agent/ws` FastAPI endpoint | The WebSocket you already connect to. Now also pumps hub → agent binary TX frames and republishes your `tx_status` JSON upstream to the Celery task. |
| Redis channels (`agent:tx_iq:*`, `agent:events:*`) | Internal to the hub. You will never see them. Everything reaches you as WS frames. |
| Capability gate | Hub refuses to launch a TX app unless it's seen a recent heartbeat from you with `"tx" ∈ capabilities` and `tx_enabled: true`. |
| Audit log (`AgentTxAudit`) | Hub persists who started what transmission at what frequency and gain. Your error messages in `tx_status` end up in that record. |
**Bottom line: from your process, this is still the same WebSocket you've been using. You're just getting new message types and a new class of binary frames going the other direction.**
---
## Protocol contract (the only thing you actually need)
All additions. Existing RX messages (`start`/`stop`/`configure` + agent → hub binary) keep their current semantics — do not touch them.
### Hub → agent
**JSON control frames** (text WS frames):
```jsonc
// Arm TX. Call sdr.init_tx with this radio_config and start _stream_tx.
// After this you'll start receiving binary frames (see below) that go into
// the stream callback.
{
"type": "tx_start",
"app_id": "app-abc",
"radio_config": {
"device": "pluto",
"identifier": "ip:192.168.3.1",
"tx_sample_rate": 1000000,
"tx_center_frequency": 2450000000,
"tx_gain": -20, // dB. Pluto: negative = attenuation.
"tx_bandwidth": 1000000, // optional
"buffer_size": 1024, // optional; complex samples per buffer
"underrun_policy": "pause" // "pause" | "zero" | "repeat"
}
}
// Stop TX, drain queue, pause_tx. RX session on the same app_id (if any)
// stays alive.
{ "type": "tx_stop", "app_id": "app-abc" }
// Apply parameter changes at the next buffer boundary. Any subset of
// radio_config fields.
{ "type": "tx_configure", "app_id": "app-abc", "radio_config": { "tx_gain": -25 } }
// Advisory — safe to ignore. Hub publishes this whenever it RPUSHes a new
// binary buffer; it was wired so the WS bridge wakes up promptly. You do
// NOT need to act on it. Consider it a keepalive.
{ "type": "tx_data_available", "app_id": "app-abc" }
```
**Binary frames** (binary WS frames):
* Raw interleaved `float32` IQ samples in `[-1, 1]`.
* One frame = one buffer.
* Byte length is always `num_complex_samples × 8` (8 bytes per complex sample: two float32s).
* **Only valid between `tx_start` and `tx_stop`.** If you receive a binary frame outside that window, drop it and log WARN — don't crash, don't panic.
Format validator is already in `ria_toolkit_oss.sdr.sdr._verify_sample_format` — reuse it.
### Agent → hub
**JSON status frames** (text WS frames). Use the existing `send_json` path:
```jsonc
// Lifecycle — emit on every state transition.
{ "type": "tx_status", "app_id": "app-abc", "state": "armed" }
{ "type": "tx_status", "app_id": "app-abc", "state": "transmitting" }
{ "type": "tx_status", "app_id": "app-abc", "state": "underrun" }
{ "type": "tx_status", "app_id": "app-abc", "state": "done" }
// Errors — include a human-readable message. Hub surfaces it to the UI
// and writes it into the audit record.
{ "type": "tx_status", "app_id": "app-abc", "state": "error",
"message": "gain -5 exceeds tx_max_gain_db=-15" }
```
**States** (hub assumes this vocabulary):
* `armed``init_tx` done, callback started, queue empty, nothing transmitting yet.
* `transmitting` — at least one buffer has flowed through the callback.
* `underrun` — queue drained; what you do next depends on `underrun_policy`:
* `"pause"` → call `pause_tx()`, emit `underrun`, stay paused until the hub sends a fresh `tx_start`.
* `"zero"` → continue with `np.zeros(...)` fills, still emit `underrun` once so the hub can show the indicator.
* `"repeat"` → loop the last good buffer, emit `underrun` once.
* `done` — clean stop after `tx_stop`.
* `error` — capability rejection or hardware failure. Include `message`.
**Extended heartbeat** — you are already sending heartbeats. Grow the payload:
```jsonc
{
"type": "heartbeat",
"hardware": ["mock", "pluto"],
"status": "streaming", // unchanged semantics
"capabilities": ["rx", "tx"], // NEW — derived from tx_enabled + SDR class having init_tx
"tx_enabled": true, // NEW — mirror of cfg.tx_enabled
"tx_max_gain_db": -10, // NEW — optional, from agent config
"tx_max_duration_s": 60, // NEW — optional
"tx_allowed_freq_ranges": [[2.4e9, 2.5e9]], // NEW — optional
"sessions": { // NEW — optional per-session snapshot
"rx": { "app_id": "app-abc", "state": "streaming" },
"tx": { "app_id": "app-abc", "state": "transmitting" }
},
"app_id": "app-abc" // keep for back-compat
}
```
The hub reads these fields, stores them on `ScreensAgent`, and gates TX launches on them. **If you don't advertise `tx` in `capabilities` and `tx_enabled: true`, the hub will refuse to start any TX app with HTTP 400 — no WS traffic will be generated.**
### Backpressure model (what happens when you can't keep up)
* The hub caps its outbound TX queue at 200 buffers. If it fills, the hub either blocks on `write()` or drops the oldest buffer — both are benign for you.
* On the agent side, enforce your own cap (plan §A2 suggests 8 buffers). When full, `await ws.send` on the hub will slow via TCP/WS backpressure. You don't need an application-level flow-control message.
---
## Implementation roadmap (mapped to the Cross-Repo Plan)
Work in the order below. Each row is a single PR-sized unit.
| # | Plan ref | Deliverable | Acceptance |
|---|---|---|---|
| 1 | §A3 | `AgentConfig` gains `tx_enabled`, `tx_max_gain_db`, `tx_max_duration_s`, `tx_allowed_freq_ranges`. `save()` keeps 0600. | Unit test: round-trip through `~/.ria/agent.json`. |
| 2 | §A4 | `ria-agent register --allow-tx --tx-max-gain-db -10 --tx-max-duration 60` persists into config. `ria-agent stream --allow-tx` is a runtime override. | Integration test: `ria-agent register --allow-tx` then `cat ~/.ria/agent.json` shows fields. |
| 3 | §A1 | `ws_client.run()` grows a `on_binary: Callable[[bytes], Awaitable[None]]` parameter. Reconnect + heartbeat + malformed-frame behavior unchanged. | Existing `test_ws_client.py` still passes; new `test_ws_client_binary.py` asserts bytes reach the handler. |
| 4 | §A2 | Replace flat `self._sdr` / `self._app_id` state in `streamer.py` with `RxSession` + `TxSession` dataclasses. SDR instances cached by `(device, identifier)` so RX+TX share one handle on the same device. | Unit test: creating a TxSession on the same device as an active RxSession reuses the same SDR object. |
| 5 | §A2 | `_handle_tx_start`, `_handle_tx_stop`, `_handle_tx_configure` + `on_binary(data)``self._tx.queue.put(data)`. TX loop runs `_stream_tx` in an executor thread with a thread-safe `queue.Queue` adapter. | Integration test against MockSDR: tx_start → 10 binary frames → tx_stop produces exactly those samples through the callback. |
| 6 | §A2 | Underrun handling: `"pause"` / `"zero"` / `"repeat"` fills. Emits `tx_status: underrun` exactly once per drain event. | Unit test per policy against a slow producer. |
| 7 | §A2 | Cap enforcement **before** opening the SDR: reject with `tx_status: error` if `tx_enabled=False`, gain exceeds cap, freq outside allowed ranges, or duration cap exceeded (watchdog in TX loop calls `tx_stop` after `tx_max_duration_s`). | Unit test per rejection path; SDR is never opened when rejection fires. |
| 8 | §A5 | Heartbeat grows `capabilities`, `tx_enabled`, optional caps, `sessions`. | Integration test: start agent with `--allow-tx`, connect, verify heartbeat payload. |
| 9 | §A6 | Audit the Pluto driver's `_tx_lock` + `_param_lock` interaction to ensure concurrent RX + TX on the same `adi.Pluto` doesn't race on attribute writes. `MockSDR.init_tx` already exists — no change needed. | Stress test: 30 seconds of concurrent RX + TX on MockSDR with `_param_lock` instrumented for contention. |
| 10 | §A7 | Test matrix per plan: `test_streamer_tx`, `test_tx_safety`, `test_tx_underrun`, `test_full_duplex`, `test_ws_client_binary`, `test_integration_tx`. | All green in CI. |
| 11 | §A8 | Docs: new `docs/agent_tx_protocol.md` OR extended section in existing agent protocol doc. Regulatory disclaimer included. | Lints + renders. |
**Ship order advice:** 1 → 2 → 3 → 4 → (5 || 6) → 7 → 8 → 9 → 10 → 11. Steps 13 are strict prerequisites for everything else. Steps 5 and 6 can parallelize. Step 7 can't land without 5.
---
## Verification loop (how to prove the two sides talk)
Once you've implemented §A1A7, use this to close the loop with the live hub:
1. On the agent host, run `ria-agent register --allow-tx --tx-max-gain-db -10 --tx-max-duration 60` then `ria-agent stream`.
2. Confirm the hub has seen the heartbeat: `curl $HUB/screens/agents/json | jq '.agents[] | select(.agent_id==...) | {tx_enabled, capabilities}'` should show `tx_enabled: true` and `["rx","tx"]`.
3. Create a Screens app whose manifest contains a `dataSink.type == "agent"` pointing at your `agent_id`, or use the composer UI with a `PlutoTXOp` in the graph + the new TX-sink agent picker.
4. `POST /screens/apps/{id}/start` on the hub. You should observe, in order:
1. `tx_start` JSON on your WS.
2. Binary frames arriving (if the hub-side operator is actually generating buffers — may no-op for now since the operator refactor is planned but not done).
3. Your `tx_status: armed` JSON emitted back.
5. Stop the app. You should receive `tx_stop` and emit `tx_status: done`.
6. Provoke a rejection: set `tx_max_gain_db: -15` in your config, then start a TX app with `tx_gain: -5`. The hub should return `HTTP 400` from `/start` without any WS traffic — capability gate fires first. If you make it past the gate and it's still wrong, emit `tx_status: error` and the hub will surface the message to the UI.
**Useful hub-side greps if something is wrong:**
* `grep -r "tx_status" controller/app/modules/screens/` — see how the hub parses your frames.
* `grep -r "tx_enabled" controller/app/modules/screens/` — see what heartbeat fields the hub reads.
* `controller/app/modules/screens/agent_ws.py:200-290` — the WS handler's JSON dispatch.
* `controller/app/modules/screens/data_sinks.py` — what the hub publishes on each control frame.
---
## Open questions (from the original plan that still apply)
Answered since the plan was written:
* ✅ **Operator name:** `PlutoTXOp` (PascalCase, stored in the hub's MongoDB `ops` collection via the application packager).
* ✅ **Redis channel naming:** kept `agent:*` prefix on the hub side — you never see this.
* ✅ **Status plumbing:** `tx_status` frames get republished on a hub-internal pub/sub and surface to the UI through the existing SSE stream. You just send the frames; the hub does the rest.
Still open (flag when you have a preference):
* **Bulk + loop fast-path.** If the hub's TX operator turns out to be a fixed recording played on loop, we could add a `{ "type": "tx_start", ..., "loop": true }` variant where the hub sends the buffer once and the agent uses the existing `tx_recording` path. Protocol-compatible with the streaming version. Defer until a real use case demands it.
* **Multi-app-per-agent.** Out of scope for v1 (§Non-goals). If/when needed: prefix binary frames with a 4-byte session header and bump a `protocol_version` in the heartbeat.
* **TX clock drift.** Relying on generous queue depth + stable local networks for v1. Longer term may need agent-side resampling.
---
## What lives in `ria-hub` now (reference)
You don't need to read any of this, but if you're curious or need to debug the integration, these are the load-bearing bits on the hub side:
| Path | What |
|---|---|
| `controller/app/modules/screens/data_sinks.py` | `AgentTxSink`, `LocalPlutoTxSink`, `build_data_sink` |
| `controller/app/modules/screens/agent_ws.py` | `_forward_tx_binary`, heartbeat parsing, `tx_status` republish |
| `controller/app/modules/screens/graph_derivation.py` | `_pluto_tx_spec_mapping`, `_SDR_SINK_MAP`, `_derive_data_sink` |
| `controller/app/modules/screens/routes.py` | `_check_agent_tx_capability`, `AgentTxAudit` write, `POST /apps/{id}/sink-agent` |
| `controller/app/modules/screens/models.py` | `ScreensAgent` TX fields, `AgentTxAudit` document |
| `schemas/screens/app_manifest.schema.json` | `dataSink` schema block |
| `web_src/js/components/screens/components/TxConsentModal.vue` | Pre-transmit consent dialog |
| `web_src/js/components/screens/components/SinkPanel.vue` | TX-capable agent picker + live `tx_status` indicator |
| `web_src/js/components/screens/ScreensApp.vue` | Consent gate + `tx_status` forwarding to children |
---
## Regulatory note (keep this in your docs too)
Transmission is regulated in every jurisdiction. The agent-side interlocks (`tx_enabled`, caps, freq ranges) exist so the operator can configure safe defaults for an agent's physical location. They are not a substitute for licensing or for respecting local regulations. The hub shows a consent modal and writes an audit log so actions are attributable. None of this is a legal compliance layer — it's defense-in-depth.

View File

@ -0,0 +1,568 @@
# Agent TX Streaming — Implementation Plan (`ria-toolkit-oss`)
**Scope:** Part A of [agent_tx_plan.md](./agent_tx_plan.md). This repo only.
**Goal:** Make the agent accept hub-originated TX control + binary IQ, stream it to the SDR in full duplex with RX, and enforce agent-local safety caps.
**Acceptance:** `pytest tests/agent/` green; `ria-agent stream --allow-tx` accepts a `tx_start` against MockSDR and round-trips binary frames to `_stream_tx`.
Each phase below lands independently. After every phase the existing agent tests must still pass (no regressions), and the phase's own new tests must be green.
---
## Preconditions
- `--allow-tx` is opt-in at CLI level. Default config has `tx_enabled=False`; the agent will reject all TX control frames from the hub.
- Pluto FDD: one `adi.Pluto` instance serves both RX and TX. We share the SDR between sessions keyed by `(device, identifier)`.
- Known pre-existing bug: [`sdr/pluto.py:151`](../src/ria_toolkit_oss/sdr/pluto.py#L151) sets `_rx_initialized = False` inside `init_tx`. Our streamer's RX path (`sdr.rx(n)`) does not read this flag, so FDD still works. Leave the bug for a separate follow-up; do not refactor Pluto in this plan.
## Glossary
- **Session** = a `(app_id, direction)` pair held by the agent: one `RxSession` or one `TxSession`.
- **Direction** = `"rx"` (agent → hub binary) or `"tx"` (hub → agent binary).
- **Shared SDR** = when the same `(device, identifier)` is referenced by an RX and TX session concurrently; both sessions hold the same driver instance.
---
## Phase 1 — WS binary ingress
**Why first:** protocol-plumbing only. No behavior change for existing RX, but unblocks every later phase.
### Touches
- `src/ria_toolkit_oss/agent/ws_client.py` — add optional `on_binary` callback to `WsClient.run`.
- `tests/agent/test_ws_client.py` — add a "server sends binary, handler receives" case.
### Shape
```python
# ws_client.py
BinaryHandler = Callable[[bytes], Awaitable[None]]
async def run(
self,
on_message: MessageHandler,
heartbeat: HeartbeatBuilder,
on_binary: BinaryHandler | None = None, # NEW, default preserves old behavior
) -> None:
...
async for raw in self._ws:
if isinstance(raw, bytes):
if on_binary is not None:
try:
await on_binary(raw)
except Exception:
logger.exception("on_binary handler raised; dropping frame")
else:
logger.debug("Discarding unexpected %d-byte binary frame", len(raw))
continue
# ... existing JSON dispatch unchanged
```
### Acceptance
- New test: local `websockets` server pushes a binary frame after JSON handshake → handler sees exact bytes.
- Existing `test_ws_client.py` cases still pass with `on_binary=None`.
---
## Phase 2 — Config + CLI TX opt-in
**Why second:** small, isolated, and gives the rest of the phases a real `AgentConfig.tx_enabled` / caps to read.
### Touches
- `src/ria_toolkit_oss/agent/config.py`
- `src/ria_toolkit_oss/agent/cli.py`
- `tests/agent/test_config.py`
- new `tests/agent/test_cli_tx.py`
### config.py
Extend the dataclass and preserve backward-compat for old JSON files (the existing `extra` trick already handles unknown keys, but we want these fields promoted to first-class):
```python
@dataclass
class AgentConfig:
hub_url: str = ""
agent_id: str = ""
token: str = ""
name: str = ""
insecure: bool = False
api_key: str = ""
# NEW — TX interlocks
tx_enabled: bool = False
tx_max_gain_db: float | None = None
tx_max_duration_s: float | None = None
tx_allowed_freq_ranges: list[list[float]] | None = None # JSON-friendly list-of-lists
extra: dict = field(default_factory=dict)
```
Update `load()` to pull the new fields and `save()` to emit them. Preserve the `0o600` chmod behavior.
### cli.py
Two entry points need flags:
```
ria-agent register --hub ... --api-key ...
[--allow-tx]
[--tx-max-gain-db VALUE]
[--tx-max-duration-s VALUE]
[--tx-freq-range LO HI] # repeatable: --tx-freq-range 2.4e9 2.5e9 --tx-freq-range 5.7e9 5.8e9
```
```
ria-agent stream
[--allow-tx] # runtime override: sets cfg.tx_enabled for this process only
```
In `_cmd_register`: after successful server registration, populate `cfg.tx_enabled=bool(args.allow_tx)` and caps from argparse before `_config.save(cfg)`.
In `_cmd_stream`: `if args.allow_tx: cfg.tx_enabled = True` (before passing `cfg` to the streamer — which requires plumbing `cfg` in, see Phase 3).
### Acceptance
- `test_config.py` round-trip: new fields serialize → deserialize cleanly; missing fields in old JSON default correctly.
- `test_cli_tx.py`: `register --allow-tx --tx-max-gain-db -10` writes expected JSON; `stream --allow-tx` sets runtime flag without touching disk.
---
## Phase 3 — Streamer refactor: session model (RX behavior preserved)
**Why third:** the TX work needs a session-based state machine. Doing the refactor before wiring TX keeps the diff reviewable and keeps the RX regression surface contained.
**Goal:** replace the flat state (`self._sdr`, `self._app_id`, `self._capture_task`, `self._pending_config`, `self._status`) with explicit session objects and an SDR registry, without changing any observable RX behavior.
### Touches
- `src/ria_toolkit_oss/agent/streamer.py` (bulk of work)
- `src/ria_toolkit_oss/agent/hardware.py` (heartbeat grows `capabilities` + optional `sessions` snapshot)
- `src/ria_toolkit_oss/agent/cli.py` (plumb `cfg` into the streamer)
- `tests/agent/test_streamer.py` + `test_hardware.py` — update for new heartbeat shape; keep all RX assertions.
### Data model
```python
from dataclasses import dataclass, field
@dataclass
class RxSession:
app_id: str
sdr: Any
device_key: tuple[str, str | None] # (device, identifier)
buffer_size: int
task: asyncio.Task
pending_config: dict = field(default_factory=dict)
@dataclass
class TxSession:
app_id: str
sdr: Any
device_key: tuple[str, str | None]
buffer_size: int
queue: queue.Queue # thread-safe; bytes -> np.complex64 buffers
stop_event: threading.Event
task: asyncio.Task # wraps run_in_executor(sdr._stream_tx, ...)
underrun_policy: str = "pause"
pending_config: dict = field(default_factory=dict)
last_buffer: np.ndarray | None = None # for "repeat" policy
started_at: float = 0.0
max_duration_s: float | None = None
```
### SDR registry (ref-counted)
```python
class _SdrRegistry:
def __init__(self, factory):
self._factory = factory # (device, identifier) -> SDR
self._instances: dict[tuple[str, str|None], tuple[Any, int]] = {}
self._lock = threading.Lock()
def acquire(self, device: str, identifier: str | None):
key = (device, identifier)
with self._lock:
if key in self._instances:
sdr, rc = self._instances[key]
self._instances[key] = (sdr, rc + 1)
return sdr, key
sdr = self._factory(device, identifier)
self._instances[key] = (sdr, 1)
return sdr, key
def release(self, key: tuple[str, str|None]) -> bool:
with self._lock:
sdr, rc = self._instances[key]
if rc <= 1:
del self._instances[key]
return True # caller should close()
self._instances[key] = (sdr, rc - 1)
return False
```
### Streamer state
```python
class Streamer:
def __init__(self, ws, cfg: AgentConfig, sdr_factory=None):
self.ws = ws
self._cfg = cfg
self._registry = _SdrRegistry(sdr_factory or _default_sdr_factory)
self._rx: RxSession | None = None
self._tx: TxSession | None = None
```
### Message dispatch
```python
async def on_message(self, msg: dict) -> None:
t = msg.get("type")
handlers = {
"start": self._handle_rx_start,
"stop": self._handle_rx_stop,
"configure": self._handle_rx_configure,
# TX handlers stubbed here in Phase 3, implemented in Phase 4
"tx_start": self._handle_tx_start,
"tx_stop": self._handle_tx_stop,
"tx_configure": self._handle_tx_configure,
}
handler = handlers.get(t)
if handler is None:
logger.warning("Unknown server message type: %r", t)
return
await handler(msg)
```
Rename internals: `_handle_start → _handle_rx_start`, `_handle_stop → _handle_rx_stop`, etc. Behavior unchanged — just reading/writing `self._rx` in place of the old flat attributes, and going through the registry for acquire/release.
### Heartbeat
```python
# streamer.py
def build_heartbeat(self) -> dict:
status = "streaming" if (self._rx or self._tx) else "idle"
sessions: dict = {}
if self._rx: sessions["rx"] = {"app_id": self._rx.app_id, "state": "streaming"}
if self._tx: sessions["tx"] = {"app_id": self._tx.app_id, "state": self._tx_state()}
return heartbeat_payload(
status=status,
app_id=(self._rx or self._tx).app_id if (self._rx or self._tx) else None,
cfg=self._cfg,
sessions=sessions or None,
)
```
Update `hardware.heartbeat_payload` to take `cfg` (for `capabilities`/`tx_enabled`) and optional `sessions`. Keep unknown-arg compatibility — existing tests can pass `cfg=AgentConfig()` to get the old shape minus the new fields.
### Phase 3 acceptance
- All existing `test_streamer.py` / `test_integration.py` / `test_hardware.py` cases pass, with the heartbeat additions asserted in `test_hardware.py` (capabilities = `["rx"]` when `tx_enabled=False`).
- New test: two `start` messages in sequence with same `(device, identifier)` both succeed without recreating the SDR (registry hit). (This is a Phase 3 bonus — confirms the registry works before TX consumes it.)
- New test: `tx_start` with `tx_enabled=False` returns `tx_status: error` (handler stubs can do just this much in Phase 3, full implementation lands in Phase 4).
---
## Phase 4 — TX implementation
**Why fourth:** now that binary arrives, config exists, and sessions exist, wire up real TX.
### Touches
- `src/ria_toolkit_oss/agent/streamer.py`
- Potentially a small helper module `src/ria_toolkit_oss/agent/_tx_loop.py` if streamer.py gets unwieldy.
- `tests/agent/test_streamer_tx.py`, `test_tx_safety.py`, `test_tx_underrun.py`, `test_full_duplex.py`
### Binary ingress
```python
async def on_binary(self, data: bytes) -> None:
if self._tx is None:
logger.debug("Dropping %d-byte binary frame: no TX session", len(data))
return
try:
self._tx.queue.put(data, timeout=2.0) # backpressure: block if full
except queue.Full:
logger.warning("TX queue stalled; dropping frame (agent side)")
```
Wire this in via `ws.run(..., on_binary=self.on_binary)` — change `run_streamer()`'s `ws.run` call accordingly.
### `_handle_tx_start`
```python
async def _handle_tx_start(self, msg: dict) -> None:
app_id = msg.get("app_id") or ""
cfg_radio = dict(msg.get("radio_config") or {})
# 1) interlocks
if not self._cfg.tx_enabled:
return await self._send_tx_error(app_id, "tx disabled on this agent")
gain = cfg_radio.get("tx_gain")
if self._cfg.tx_max_gain_db is not None and gain is not None and float(gain) > self._cfg.tx_max_gain_db:
return await self._send_tx_error(app_id, f"tx_gain {gain} exceeds cap {self._cfg.tx_max_gain_db}")
freq = cfg_radio.get("tx_center_frequency")
if self._cfg.tx_allowed_freq_ranges and freq is not None:
if not any(lo <= float(freq) <= hi for lo, hi in self._cfg.tx_allowed_freq_ranges):
return await self._send_tx_error(app_id, f"tx_center_frequency {freq} outside allowed ranges")
if self._tx is not None:
return await self._send_tx_error(app_id, "tx already active on this agent")
# 2) device
device = cfg_radio.pop("device", None)
identifier = cfg_radio.pop("identifier", None)
buffer_size = int(cfg_radio.pop("buffer_size", 1024))
underrun_policy = cfg_radio.pop("underrun_policy", "pause")
if not device:
return await self._send_tx_error(app_id, "tx_start missing radio_config.device")
try:
sdr, device_key = self._registry.acquire(device, identifier)
_apply_sdr_config(sdr, cfg_radio) # sets tx_* attributes via alias map
# explicit init_tx if the driver supports it
if hasattr(sdr, "init_tx"):
sdr.init_tx(
sample_rate=cfg_radio.get("tx_sample_rate"),
center_frequency=cfg_radio.get("tx_center_frequency"),
gain=cfg_radio.get("tx_gain"),
channel=cfg_radio.get("tx_channel", 0),
gain_mode=cfg_radio.get("tx_gain_mode", "manual"),
)
except Exception as exc:
self._registry.release(device_key)
logger.exception("Failed to init TX on %r", device)
return await self._send_tx_error(app_id, f"tx init failed: {exc}")
# 3) build session + launch loop
self._tx = TxSession(
app_id=app_id,
sdr=sdr,
device_key=device_key,
buffer_size=buffer_size,
queue=queue.Queue(maxsize=8),
stop_event=threading.Event(),
task=None, # filled below
underrun_policy=underrun_policy,
max_duration_s=self._cfg.tx_max_duration_s,
started_at=time.monotonic(),
)
loop = asyncio.get_running_loop()
self._tx.task = loop.run_in_executor(None, self._tx_executor_body)
await self._send_tx_status(app_id, "armed")
# streamer transitions to "transmitting" on the first buffer consumed in the thread;
# schedule a tiny watchdog that emits that status when queue count rises.
```
### TX executor body
Runs in a worker thread. Blocks in the SDR's `_stream_tx` driven by our callback that pulls from the queue.
```python
def _tx_executor_body(self) -> None:
sdr = self._tx.sdr
try:
sdr._stream_tx(self._tx_callback)
except Exception:
logger.exception("TX stream crashed")
# surface via asyncio side
asyncio.run_coroutine_threadsafe(
self._send_tx_status(self._tx.app_id, "error", "stream crashed"),
asyncio.get_event_loop(),
)
def _tx_callback(self, num_samples):
tx = self._tx
if tx is None or tx.stop_event.is_set():
sdr = tx.sdr if tx else None
if sdr is not None:
sdr.pause_tx()
return _silence(num_samples)
# duration watchdog
if tx.max_duration_s is not None and (time.monotonic() - tx.started_at) > tx.max_duration_s:
tx.stop_event.set()
tx.sdr.pause_tx()
_schedule(self._send_tx_status(tx.app_id, "done", "max duration reached"))
return _silence(num_samples)
try:
raw = tx.queue.get(timeout=0.1)
except queue.Empty:
return self._underrun_fill(tx, num_samples)
samples = np.frombuffer(raw, dtype=np.float32)
# interleaved float32 -> complex64
if samples.size % 2 != 0 or samples.size // 2 != num_samples:
# malformed / wrong-sized frame; underrun this cycle
logger.warning("TX frame size mismatch: got %d floats, expected %d", samples.size, num_samples * 2)
return self._underrun_fill(tx, num_samples)
complex_samples = samples.reshape(-1, 2).view(np.complex64).reshape(-1)
tx.last_buffer = complex_samples
return complex_samples
```
Helper `_underrun_fill`:
```python
def _underrun_fill(self, tx: TxSession, num_samples: int):
if tx.underrun_policy == "zero":
return np.zeros(num_samples, dtype=np.complex64)
if tx.underrun_policy == "repeat" and tx.last_buffer is not None:
return tx.last_buffer[:num_samples] if tx.last_buffer.size >= num_samples \
else np.concatenate([tx.last_buffer,
np.zeros(num_samples - tx.last_buffer.size, dtype=np.complex64)])
# "pause" (default)
tx.stop_event.set()
tx.sdr.pause_tx()
_schedule(self._send_tx_status(tx.app_id, "underrun"))
return np.zeros(num_samples, dtype=np.complex64)
```
`_schedule()` is a tiny wrapper around `asyncio.run_coroutine_threadsafe` that resolves the loop once at streamer construction.
### `_handle_tx_stop`
```python
async def _handle_tx_stop(self, msg: dict) -> None:
tx = self._tx
if tx is None:
return
tx.stop_event.set()
tx.sdr.pause_tx()
# drain the queue so the executor thread wakes
try:
while True:
tx.queue.get_nowait()
except queue.Empty:
pass
# wait up to ~1s for the executor thread to finish
if tx.task is not None:
try:
await asyncio.wait_for(asyncio.wrap_future(tx.task), timeout=1.0)
except asyncio.TimeoutError:
logger.warning("TX executor did not exit within 1s after stop")
# release SDR reference
should_close = self._registry.release(tx.device_key)
if should_close:
try:
tx.sdr.close()
except Exception:
logger.exception("Error closing SDR on tx_stop")
self._tx = None
await self._send_tx_status(msg.get("app_id") or "", "done")
```
### `_handle_tx_configure`
```python
async def _handle_tx_configure(self, msg: dict) -> None:
if self._tx is None:
return
self._tx.pending_config.update(msg.get("radio_config") or {})
```
Consume `pending_config` at the top of `_tx_callback` before pulling from the queue (same pattern as RX's `_capture_loop`), using `_apply_sdr_config` with tx aliases.
### `_apply_sdr_config` — extend alias map
```python
_CONFIG_ATTR_MAP = {
# existing RX aliases...
"sample_rate": ("sample_rate", "rx_sample_rate"),
"center_frequency": ("center_freq", "rx_center_frequency"),
"gain": ("gain", "rx_gain"),
"bandwidth": ("bandwidth", "rx_bandwidth"),
# NEW TX aliases
"tx_sample_rate": ("tx_sample_rate",),
"tx_center_frequency": ("tx_center_frequency", "tx_lo"),
"tx_gain": ("tx_gain",),
"tx_bandwidth": ("tx_bandwidth",),
}
```
Pluto has `set_tx_sample_rate`, `set_tx_center_frequency`, `set_tx_gain` — those are called by `init_tx` using the attribute values, so setting attributes via `_apply_sdr_config` + calling `init_tx` is sufficient.
### Phase 4 acceptance
- `test_streamer_tx.py`: full happy path — `tx_start` against MockSDR → 3 binary frames → verify `_stream_tx` callback received them in order → `tx_stop` → session cleared, SDR closed.
- `test_tx_safety.py`: one test per cap — `tx_enabled=False`, gain cap, freq range, duplicate session. Each produces a `tx_status: error` JSON; registry shows zero outstanding acquires.
- `test_tx_underrun.py`: three tests — `pause` (session ends, `underrun` emitted), `zero` (callback returns zeros, no status change), `repeat` (callback returns last buffer).
- `test_full_duplex.py`: against MockSDR, send `start` + `tx_start` with same `(device=mock, identifier=None)` → registry ref-count = 2 → both sessions stream independently → stop one, other still runs → stop second, SDR closed.
---
## Phase 5 — Integration + docs
**Touches:**
- `tests/agent/test_integration_tx.py` — end-to-end with a real local `websockets` server + MockSDR. Mirror `test_integration.py`'s shape: register → heartbeat with `tx_enabled=True` → tx_start → 3 binary frames → tx_stop.
- `docs/agent_tx_protocol.md` — short, user-facing: message types, binary format, heartbeat additions, interlock config, CLI examples. Link from [screens_agent_handoff.md](./screens_agent_handoff.md).
- `README.md` (if it mentions agent subcommands) — add `--allow-tx` usage.
**Real-Pluto smoke test** (manual, not in CI):
1. `ria-agent register --hub http://hub:3005 --api-key KEY --allow-tx --tx-max-gain-db -10 --tx-freq-range 2.4e9 2.5e9`
2. `ria-agent stream`
3. From a Python REPL with `websockets`, open the hub WS on the agent's behalf (bypass hub during dev), send a `tx_start` + binary frames of a 1kHz tone → confirm carrier on a spectrum analyzer at the configured frequency.
---
## File-by-file summary
| File | Phase | Change |
|---|---|---|
| `src/ria_toolkit_oss/agent/ws_client.py` | 1 | Add `on_binary` callback. |
| `src/ria_toolkit_oss/agent/config.py` | 2 | Add `tx_enabled`, `tx_max_gain_db`, `tx_max_duration_s`, `tx_allowed_freq_ranges`. |
| `src/ria_toolkit_oss/agent/cli.py` | 2, 4 | `--allow-tx` + cap flags on `register`; `--allow-tx` on `stream`; plumb `cfg` into `Streamer`. |
| `src/ria_toolkit_oss/agent/hardware.py` | 3 | `heartbeat_payload(cfg, sessions)` with `capabilities`, `tx_enabled`. |
| `src/ria_toolkit_oss/agent/streamer.py` | 3, 4 | Session refactor, SDR registry, TX dispatch, TX loop, underrun fills, `_apply_sdr_config` TX aliases. |
| `src/ria_toolkit_oss/agent/_tx_loop.py` | 4 (opt) | Extracted TX callback helpers if streamer.py > ~400 lines. |
| `tests/agent/test_ws_client.py` | 1 | Binary-frame case. |
| `tests/agent/test_config.py` | 2 | Round-trip new fields. |
| `tests/agent/test_cli_tx.py` | 2 | New — `--allow-tx` flag handling. |
| `tests/agent/test_hardware.py` | 3 | Heartbeat `capabilities` + `sessions`. |
| `tests/agent/test_streamer.py` | 3 | Refactor for session model; RX assertions unchanged. |
| `tests/agent/test_streamer_tx.py` | 4 | New — TX happy path. |
| `tests/agent/test_tx_safety.py` | 4 | New — cap enforcement. |
| `tests/agent/test_tx_underrun.py` | 4 | New — pause/zero/repeat policies. |
| `tests/agent/test_full_duplex.py` | 4 | New — shared SDR ref count. |
| `tests/agent/test_integration_tx.py` | 5 | New — real `websockets` server E2E. |
| `docs/agent_tx_protocol.md` | 5 | New — operator-facing protocol doc. |
---
## Implementation gotchas (do not skip)
1. **Asyncio ↔ thread bridge.** The SDR's `_stream_tx` is synchronous and runs in an executor thread. Its callback must not `await`. Use `queue.Queue` (thread-safe) for inbound buffers and `asyncio.run_coroutine_threadsafe(coro, loop)` to emit `tx_status` from inside the thread. Resolve `loop` once at streamer construction; don't call `get_event_loop()` from the thread.
2. **`sdr.pause_tx()` from inside the callback.** Pluto's `_stream_tx` loop condition is `while self._enable_tx is True`. Calling `pause_tx()` inside the callback sets `_enable_tx = False` so the NEXT iteration exits. That's fine — it may emit one trailing zero-filled buffer. Document this; don't try to exit mid-callback.
3. **Queue drain on stop.** When `_handle_tx_stop` sets `stop_event` and pauses TX, the executor thread may still be blocked in `queue.get(timeout=0.1)`. Draining the queue does not unblock a timed get. Rely on the 100ms timeout; the thread exits on the next iteration. Don't try to clever-inject a poison pill.
4. **Interleaved float32 → complex64 conversion.** `np.frombuffer(buf, dtype=np.float32).view(np.complex64)` is zero-copy and correct when `buf.size` is a multiple of 8 bytes. Validate size first; mismatched size = underrun for that cycle, don't crash the thread.
5. **MockSDR's `_stream_tx`** ([sdr/mock.py:96-100](../src/ria_toolkit_oss/sdr/mock.py#L96-L100)) calls `callback(self.rx_buffer_size)` — it passes a *size*, not samples. The TX callback contract is "I am given `num_samples`, I return that many complex64 samples." `test_streamer_tx` must respect this: the test's `sdr.tx_buffer_size` (if used) doesn't affect what the callback receives from mock. Simplest path: set `MockSDR.rx_buffer_size = buffer_size` in the test harness before `_stream_tx` is invoked, so the TX callback receives the right size.
6. **`init_tx` on MockSDR vs Pluto.** MockSDR's `init_tx` [sets attributes and flips `_tx_initialized = True`](../src/ria_toolkit_oss/sdr/mock.py#L70-L81). Pluto's does the same plus `_rx_initialized = False` (the FDD bug). For full-duplex tests we currently target MockSDR only — Pluto FDD will work because our RX path ignores `_rx_initialized`, but the real-Pluto smoke test is the only validation. Call that out in the PR description.
7. **Don't block the event loop.** `asyncio.wait_for(asyncio.wrap_future(tx.task), timeout=1.0)` in `_handle_tx_stop` is non-blocking from the loop's perspective — the 1s cap prevents a misbehaving driver from stalling heartbeat/RX.
8. **Heartbeat during TX.** The existing heartbeat loop runs on a 30s timer. Sessions snapshot is cheap; no locking needed if we read `self._rx`/`self._tx` references atomically (Python ref swap is GIL-safe for single field reads).
---
## Rollout
1. Open a single PR per phase (1 → 2 → 3 → 4 → 5), each green on its own.
2. Phase 3 is the riskiest diff (RX refactor). Get a second reviewer if possible; the regression surface is all of current RX behavior.
3. After Phase 4 merges, `ria-agent stream --allow-tx` is a usable toy — you can hand-drive it from a Python REPL with `websockets` to validate against real hardware before the hub side is ready.
4. Phase 5 closes the loop and ships the user-facing docs.
## Out of scope (explicit)
- **Multi-app-per-agent** — one RX + one TX per agent in v1. Adding session IDs to binary frames is a v2 protocol bump.
- **Other TX drivers** (HackRF, USRP, bladeRF) — wiring `_CONFIG_ATTR_MAP` entries and verifying `_stream_tx` behavior per-driver. Tackle when the hub has an operator that targets them.
- **Resampling / clock drift** — agent treats the hub-supplied samples as authoritative. Drift manifests as underruns; the underrun policy is the only mitigation.
- **Fixing Pluto's `init_tx` `_rx_initialized = False` reset** — pre-existing, not triggered by our RX path, left for a separate cleanup.

420
docs/agent_tx_plan.md Normal file
View File

@ -0,0 +1,420 @@
# Agent TX Streaming — Cross-Repo Plan
**Repos:** `ria-toolkit-oss`, `ria-hub`, Screens frontend
**Status:** Proposal / pre-implementation
**Prerequisites:** The RX-streaming work from [screens_agent_handoff.md](./screens_agent_handoff.md) and [screens_agent_streamer_plan.md](./screens_agent_streamer_plan.md) is landed (agent WS protocol, `AgentDataSource`, `/screens/agents/register`, `/screens/agent/ws`).
## Goal
Let a Screens app running on the hub drive a **remote agent's Pluto** (or other TX-capable SDR) to transmit — streaming IQ buffers end-to-end from an operator like `plutoTXoperator` into the agent's `sdr.tx()` path. Mirror image of what `AgentDataSource` already does for RX.
## Non-goals (v1)
- Multi-tenant radio sharing (one app owns the radio at a time per agent).
- Bulk/upload-once TX — superseded by streaming per request.
- Arbitrary waveform generation in the agent. The agent is dumb pipe + hardware control; signal generation stays on the hub.
## Key design decisions
| # | Decision | Value |
|---|---|---|
| D1 | **Delivery mode** | **Streaming**. Hub pushes binary IQ buffers continuously over the existing WS; agent's `_stream_tx` callback pulls them from an in-agent queue. |
| D2 | **Full-duplex** | **Yes.** A single `app_id` may own both an RX session and a TX session on the same agent concurrently. Same physical SDR handle serves both (Pluto is FDD-capable; `init_rx` and `init_tx` are independent on one `adi.Pluto` instance). |
| D3 | **Safety caps** | **Agent-enforced.** `~/.ria/agent.json` holds `tx_enabled`, `tx_max_gain_db`, `tx_max_duration_s`, optional `tx_allowed_freq_ranges: [[low,high], …]`. Agent rejects `tx_start` frames that violate any of these, independent of what the hub sends. |
| D4 | **Buffer format** | Interleaved float32 IQ, range `[-1, 1]` — same as RX. Format-validated by `ria_toolkit_oss.sdr.sdr._verify_sample_format`. |
| D5 | **Protocol evolution** | Keep existing RX messages (`start`/`stop`/`configure`) unchanged for back-compat. Add parallel `tx_start`/`tx_stop`/`tx_configure`. Heartbeat grows to advertise capabilities. |
| D6 | **Underrun policy** | Default `pause`: if the TX queue empties, agent calls `pause_tx()` and emits `tx_status: underrun`. Hub must recover by sending a fresh `tx_start` + buffers. Configurable per session via `radio_config.underrun_policy ∈ {"pause", "zero", "repeat"}`. |
| D7 | **Backpressure** | Rely on TCP/WS backpressure. Agent caps inbound TX queue at 8 buffers; `await ws.send` on the hub side slows when the agent doesn't drain. No application-level flow control in v1. |
| D8 | **Session identity** | `app_id` identifies a Screens app. Each app has at most one RX session and one TX session per agent. Binary direction disambiguates: agent → hub binary = RX IQ; hub → agent binary = TX IQ. |
## Protocol specification
Additions only. Existing RX messages from [screens_agent_handoff.md §Phase 4](./screens_agent_handoff.md) are unchanged.
### Hub → agent (JSON)
```jsonc
// Arm the TX side. Agent calls init_tx, starts the stream_tx thread with an empty queue.
// After this, hub sends binary TX buffers on the same WS.
{
"type": "tx_start",
"app_id": "app-abc",
"radio_config": {
"device": "pluto",
"identifier": "ip:192.168.3.1",
"tx_sample_rate": 1000000,
"tx_center_frequency": 2450000000,
"tx_gain": -20, // dB, negative = attenuation on Pluto
"tx_bandwidth": 1000000, // optional
"buffer_size": 1024,
"underrun_policy": "pause" // "pause" | "zero" | "repeat"
}
}
// Apply parameter changes at the next buffer boundary.
{ "type": "tx_configure", "app_id": "app-abc", "radio_config": { "tx_gain": -25 } }
// Stop TX, drain queue, pause_tx, release TX side (RX may continue if a separate RX session is live).
{ "type": "tx_stop", "app_id": "app-abc" }
```
### Hub → agent (binary)
Raw interleaved float32 IQ in `[-1, 1]`. One WS frame = one buffer = `buffer_size` complex samples = `buffer_size * 2 * 4` bytes. Delivered only between `tx_start` and `tx_stop`. Binary frames arriving outside that window are discarded and logged at WARN.
### Agent → hub (JSON)
```jsonc
// Lifecycle events.
{ "type": "tx_status", "app_id": "app-abc", "state": "armed" }
{ "type": "tx_status", "app_id": "app-abc", "state": "transmitting" }
{ "type": "tx_status", "app_id": "app-abc", "state": "underrun" } // queue empty; TX paused
{ "type": "tx_status", "app_id": "app-abc", "state": "done" }
{ "type": "tx_status", "app_id": "app-abc", "state": "error", "message": "gain -5 exceeds tx_max_gain_db=-15" }
// Reject reasons from agent-enforced caps/interlocks are surfaced via tx_status:error.
```
### Heartbeat extension
Existing `{type: heartbeat, hardware[], status}` grows:
```jsonc
{
"type": "heartbeat",
"hardware": ["mock", "pluto"],
"status": "streaming", // unchanged semantics
"capabilities": ["rx", "tx"], // NEW — derived from tx_enabled + SDR class having init_tx
"tx_enabled": true, // NEW — mirrors config flag
"sessions": { // NEW — optional per-session snapshot
"rx": { "app_id": "app-abc", "state": "streaming" },
"tx": { "app_id": "app-abc", "state": "transmitting" }
},
"app_id": "app-abc" // kept for back-compat
}
```
---
## Part A — `ria-toolkit-oss` (this repo)
### A1. `agent/ws_client.py`
Currently the WS client drops server → agent binary (`ws_client.py:77-79`). Add a binary handler alongside the JSON one.
```python
BinaryHandler = Callable[[bytes], Awaitable[None]]
async def run(
self,
on_message: MessageHandler,
heartbeat: HeartbeatBuilder,
on_binary: BinaryHandler | None = None,
) -> None:
...
async for raw in self._ws:
if isinstance(raw, bytes):
if on_binary is not None:
await on_binary(raw)
continue
...
```
Keep the reconnect, heartbeat, and malformed-frame behavior unchanged.
### A2. `agent/streamer.py` — add TX sessions
Replace the flat `self._sdr` / `self._app_id` / `self._capture_task` state with a session model:
```python
@dataclass
class RxSession:
app_id: str
sdr: Any
buffer_size: int
task: asyncio.Task
pending_config: dict
@dataclass
class TxSession:
app_id: str
sdr: Any
queue: asyncio.Queue[bytes] # bounded, maxsize=8
task: asyncio.Task # runs _stream_tx in executor
underrun_policy: str
pending_config: dict
bytes_transmitted: int = 0
started_at: float = 0.0 # for tx_max_duration_s enforcement
```
The streamer holds `self._rx: RxSession | None` and `self._tx: TxSession | None`. SDR instances are cached by `(device, identifier)` — when RX and TX name the same device, both sessions share one handle (matters for Pluto FDD).
**New handlers**:
- `_handle_tx_start(msg)` — check `cfg.tx_enabled`, validate gain/duration/freq against caps, open/resolve SDR, `sdr.init_tx(...)`, start `_tx_loop`, emit `tx_status: armed`.
- `_handle_tx_stop(msg)` — cancel TX task, `sdr.pause_tx()`, drain queue, release SDR if no RX session on it, emit `tx_status: done`.
- `_handle_tx_configure(msg)` — stash into `self._tx.pending_config`, applied at next buffer boundary (same pattern as RX).
- `on_binary(data)` — if `self._tx`: `await self._tx.queue.put(data)` (awaiting here is the backpressure mechanism). Else: log and drop.
**TX loop** (runs in an executor thread via `loop.run_in_executor`, like the RX capture loop):
```python
def _tx_callback(num_samples: int) -> np.ndarray:
# Called by sdr._stream_tx on every buffer boundary.
try:
raw = self._tx_queue_sync.get(timeout=0.1)
except queue.Empty:
return self._underrun_fill(num_samples) # policy-driven
samples = np.frombuffer(raw, dtype=np.float32).view(np.complex64)
if len(samples) < num_samples:
return _pad_zero(samples, num_samples)
return samples[:num_samples]
```
Use a thread-safe `queue.Queue` for the TX side (the `asyncio.Queue` lives on the event loop; the executor thread reads from a sibling `queue.Queue` fed by a tiny asyncio→threading adapter).
**Underrun fills**:
- `"pause"`: signal the main loop to call `sdr.pause_tx()`, emit `tx_status: underrun`, exit the callback.
- `"zero"`: return `np.zeros(num_samples, dtype=np.complex64)`.
- `"repeat"`: return the last good buffer (cached). If no buffer yet: zeros.
**Cap enforcement** in `_handle_tx_start` (before opening the SDR):
```python
if not self._cfg.tx_enabled:
return await self._send_error_tx(app_id, "tx disabled on this agent")
if (cap := self._cfg.tx_max_gain_db) is not None and tx_gain > cap:
return await self._send_error_tx(app_id, f"gain {tx_gain} exceeds cap {cap}")
if (cap := self._cfg.tx_max_duration_s) is not None:
# enforced by a watchdog in _tx_loop that calls tx_stop after cap seconds
...
for (lo, hi) in self._cfg.tx_allowed_freq_ranges or []:
if lo <= tx_center_frequency <= hi:
break
else:
if self._cfg.tx_allowed_freq_ranges:
return await self._send_error_tx(app_id, f"freq {tx_center_frequency} outside allowed ranges")
```
### A3. `agent/config.py`
Extend `AgentConfig`:
```python
@dataclass
class AgentConfig:
# existing fields…
tx_enabled: bool = False
tx_max_gain_db: float | None = None
tx_max_duration_s: float | None = None
tx_allowed_freq_ranges: list[tuple[float, float]] | None = None
```
`save()` preserves existing 0600 perms.
### A4. `agent/cli.py`
- `ria-agent register --allow-tx --tx-max-gain-db -10 --tx-max-duration 60` — persist the interlock into config.
- `ria-agent stream --allow-tx` — runtime override (sets `cfg.tx_enabled=True` for the life of the process without writing config).
- `ria-agent detect` unchanged.
### A5. `agent/hardware.py`
```python
def heartbeat_payload(status, app_id=None, *, cfg: AgentConfig, sessions: dict | None = None) -> dict:
caps = ["rx"]
if cfg.tx_enabled:
caps.append("tx")
payload = {
"type": "heartbeat",
"hardware": available_devices(),
"status": status,
"capabilities": caps,
"tx_enabled": cfg.tx_enabled,
}
if app_id:
payload["app_id"] = app_id
if sessions:
payload["sessions"] = sessions
return payload
```
### A6. SDR layer
- **Audit**: [`sdr/pluto.py`](../src/ria_toolkit_oss/sdr/pluto.py) `tx_recording` + `_stream_tx` paths already use `_tx_lock` (line 31, 323, 360). Double-check concurrent-with-RX behavior: the `adi.Pluto` Python object is not thread-safe for arbitrary attribute writes, so all `set_tx_*` / `set_rx_*` must go through the shared `_param_lock` (already present at [`sdr/sdr.py:44`](../src/ria_toolkit_oss/sdr/sdr.py#L44)). Verify `rx()` in a loop + `_stream_tx` in another thread don't step on each other.
- **MockSDR** already has `init_tx` + `_stream_tx` (`sdr/mock.py:70-100`). No changes needed for mock-based tests.
- **Other TX-capable drivers** (blade, usrp, hackrf): out of scope for v1; leave their `init_tx` as-is.
### A7. Tests (`tests/agent/`)
- `test_streamer_tx.py``tx_start` → binary frames → `_stream_tx` callback pulls correct samples → `tx_stop` cleans up.
- `test_tx_safety.py` — cap violations (gain, duration, freq, `tx_enabled=False`) each produce `tx_status: error` and never open the SDR.
- `test_tx_underrun.py` — each policy (`pause`, `zero`, `repeat`) exercised against a fake slow producer.
- `test_full_duplex.py` — one `app_id` sends `start` + `tx_start`; both sessions share one MockSDR; both produce their expected frames; stopping one does not stop the other.
- `test_ws_client_binary.py` — binary frames now reach the binary handler.
- `test_integration_tx.py` — end-to-end against local `websockets` server + MockSDR.
### A8. Docs
- Add a TX section to any existing agent protocol doc (or create `docs/agent_tx_protocol.md`).
- Include a regulatory disclaimer: the operator is responsible for transmissions. The agent is an enabler, not a policy layer beyond the interlocks.
---
## Part B — `ria-hub`
> Paths below are conceptual — confirm against the actual module layout in `ria-hub` before editing. Anchor points reference the RX handoff at [screens_agent_handoff.md §Part B](./screens_agent_handoff.md).
### B1. `AgentTxSink` (new)
Mirror of `AgentDataSource`. Location: `controller/app/modules/screens/data_sinks.py` (or wherever output sinks live in `ria-hub`).
Responsibilities:
- `prepare(radio_config)` — send `tx_start` via Redis pub/sub on `screens:agent:{agent_id}:tx` → WS proxy → agent.
- `write(buffer: np.ndarray | bytes)` — convert to interleaved float32 bytes, send as binary over the WS. Awaits on WS backpressure.
- `configure(partial_radio_config)` — send `tx_configure`.
- `close()` — send `tx_stop`.
- Subscribes to the agent's `tx_status` frames (via the same Redis pub/sub channel used for RX status today) and surfaces state back to the orchestrator. An `error` state aborts the Celery task.
### B2. Refactor `plutoTXoperator`
The existing operator presumably calls `radio.tx(...)` against a directly-attached Pluto. Abstract the "output" into an injectable sink:
```python
class PlutoTxOperator:
def __init__(self, sink: TxSink, ...):
self.sink = sink # AgentTxSink when dataSink.type == "agent", else LocalPlutoTxSink
def run(self, ...):
self.sink.prepare(self.radio_config)
while not stop:
buf = self._generate_next_buffer()
self.sink.write(buf)
self.sink.close()
```
The local path (existing direct-hardware behavior) becomes `LocalPlutoTxSink`, a thin wrapper around the current `radio.tx` calls. No behavior change for existing deployments.
`build_data_sink()` (to match `build_data_source()` from B1/B6) routes on `dataSink.type`.
### B3. Manifest schema
Add `dataSink` alongside `dataSource` in the manifest. New `type: "agent"`:
```json
{
"dataSource": { "type": "agent", "device": "pluto", "agent_id": "agent-abc", "params": { "sample_rate": 1000000, "center_frequency": 2450000000, "gain": 40 } },
"dataSink": { "type": "agent", "device": "pluto", "agent_id": "agent-abc", "params": { "tx_sample_rate": 1000000, "tx_center_frequency": 2450000000, "tx_gain": -20, "underrun_policy": "pause" } }
}
```
Update Pydantic models + JSON schema validators in `controller/app/modules/screens/graph_derivation.py` (or equivalent). When `dataSource.agent_id == dataSink.agent_id` and both target `pluto` with the same `identifier`, the agent will naturally share one SDR handle — no special-casing needed on the hub side.
### B4. WS endpoint extensions
`/screens/agent/ws` already exists. Add:
- Support for hub → agent **binary frames** (currently binary is agent → hub only). FastAPI's `WebSocket.send_bytes` works directly; just route binary from the Redis pub/sub channel through to the WS.
- New Redis pub/sub channel `screens:agent:{agent_id}:tx` for outbound TX control JSON + a separate `screens:agent:{agent_id}:tx_bin` for outbound binary. (Two channels because many Redis brokers don't love mixing binary into text-keyed channels; if your deployment uses Redis 6+ with `SUBSCRIBE` that handles bytes, one channel is fine.)
### B5. Celery wiring
When `dataSink.type == "agent"`, the Celery task that runs the TX-containing graph uses `AgentTxSink` instead of a local sink. The operator code (`plutoTXoperator`) is unchanged because the sink abstraction hides the difference.
Full-duplex: a single task with both `dataSource.type == "agent"` and `dataSink.type == "agent"` pointing at the same agent spawns both the RX consumer loop (existing `AgentDataSource.next_chunk` via BLPOP) and the TX producer loop (`AgentTxSink.write`). Both sides are wired up before any capture frames are sent.
### B6. Capability gating
Before any control path sends `tx_start`:
```python
agent = get_agent(agent_id)
if agent.last_heartbeat.age > 60: # stale
raise HTTPException(503, "agent not responding")
if "tx" not in agent.last_heartbeat.capabilities:
raise HTTPException(400, "agent has not opted in to transmission (tx_enabled=false)")
```
Surface clear errors to the Screens UI so the user knows it's an agent config issue, not an app config issue.
### B7. Audit log
New MongoDB collection `agent_tx_audit`:
```
{
agent_id, app_id, user_id,
center_frequency_hz, tx_gain_db, duration_s, num_samples,
started_at, ended_at, terminal_status, // "done" | "error" | "underrun" | "cancelled"
error_message?
}
```
Write on every `tx_start`. Update on terminal `tx_status`. Index on `{agent_id, started_at}` for admin-view queries.
### B8. Registration — no change needed
`POST /screens/agents/register` and `~/.ria/agent.json` already cover credential storage. The TX interlock (`tx_enabled`, caps) is written by the *agent operator* via `ria-agent register --allow-tx`; the hub only reads the heartbeat to learn whether an agent will accept TX.
---
## Part C — Screens (Vue 3 frontend)
### C1. App composer
- **Agent picker** (exists from RX work) grows a "TX capable" filter toggle; hides agents whose heartbeat `capabilities` lacks `"tx"`.
- When the graph contains `plutoTXoperator` (or any future TX operator):
- Render a **dataSink** section mirroring dataSource.
- Fields: device, agent_id, identifier, tx_sample_rate, tx_center_frequency, tx_gain, underrun_policy.
- Validation: tx_center_frequency within radio band; tx_gain within agent-advertised max (read from heartbeat when available).
### C2. Run-time UI
- **Consent modal** on "Start" for any app whose manifest contains a `dataSink.type: "agent"`:
> "This app will transmit on **2.450 GHz** at **-20 dB** through agent **lab-pluto-01**. I confirm this transmission is permitted under my local radio regulations."
Required checkbox, cannot be remembered across apps.
- **TX status indicator** in the running-app view: shows `armed` / `transmitting` / `underrun` state from `tx_status` frames. Red banner on `underrun` or `error`.
- **Stop TX button** always visible during transmission; fires `tx_stop` immediately. Separate from "Stop app" (which also stops RX).
### C3. Admin view
Extend the agents list from B8 of the RX handoff:
- Column: **TX**: `enabled` / `disabled` / `in-use by app X`.
- Agent detail page: show `tx_max_gain_db`, `tx_max_duration_s`, `tx_allowed_freq_ranges`, and the last 10 rows from `agent_tx_audit` filtered to this agent.
---
## Rollout order
1. **Part A §A1-A3, A7** — agent-side TX session + binary ingress + safety, all behind `--allow-tx`. Mock-based tests. Shippable standalone; no consumer yet.
2. **Part B §B1-B5** — hub sink + manifest + WS extension + Celery wiring. End-to-end test: Screens app with `plutoTXoperator` + agent sink → real Pluto in the lab → verify carrier on a spectrum analyzer.
3. **Part B §B6-B7** — capability gating + audit log. Blocks general release, not lab use.
4. **Part C §C1** — composer UI for TX apps.
5. **Part C §C2-C3** — consent modal + admin view. Gate for first non-internal user.
Parts A + B can land on parallel branches and meet at step 2's integration test. Part C can start in parallel with B once the manifest shape in B3 is stable.
## Test matrix (integration)
| Scenario | Expected |
|---|---|
| App with RX only, agent connected | RX as today (regression guard) |
| App with TX only, agent `tx_enabled=True` | TX starts, underrun → pause, stop cleans up |
| App with RX + TX same agent, same device | One Pluto handle serves both; independent gains/frequencies |
| App with TX, agent `tx_enabled=False` | Hub rejects at gate with 400; no WS traffic generated |
| App with TX, gain exceeds agent cap | `tx_status: error`; SDR never opened |
| Hub stops sending TX buffers mid-stream | `underrun` emitted after queue drains; agent paused cleanly |
| WS drops during TX | Agent cancels TX task, pauses hardware, reconnects; hub must re-issue `tx_start` |
| Agent process killed during TX | Hardware stops (existing `close()` already handles this; verify `_tx_lock` released) |
## Open questions
- **Waveform source**: is `plutoTXoperator` a real-time generator emitting on a clock, or does it synthesize a fixed recording and loop? If the latter, worth exposing a "bulk + loop" fast-path — hub sends the buffer once, agent loops it via existing `tx_recording`. Same protocol (`tx_start` + one buffer + `loop: true`), much less WS traffic.
- **Multi-app-per-agent**: out of scope for v1 (§Non-goals). When needed: add a session id to binary frames (4-byte prefix: magic + stream_id + reserved), bump a `protocol_version` in the heartbeat.
- **Streaming TX clock drift**: if hub and agent sample clocks drift, repeating zeros on underrun is audible/visible in the spectrum. Longer term: agent-side resampling or PLL, both expensive. v1: rely on generous queue depth + stable local networks.
- **Other TX-capable SDRs**: HackRF, USRP, bladeRF. The `_CONFIG_ATTR_MAP` in [`agent/streamer.py:169-175`](../src/ria_toolkit_oss/agent/streamer.py#L169-L175) will need per-driver entries when those come online.
## Regulatory note
Transmission is regulated in every jurisdiction. The agent-side interlocks (`tx_enabled`, caps, freq ranges) exist so the operator can configure safe defaults for an agent's physical location. They are not a substitute for licensing or for respecting local regulations. The hub's consent modal and audit log exist so actions are attributable. None of this is a legal compliance layer — it's a defense-in-depth mechanism.

185
docs/agent_tx_protocol.md Normal file
View File

@ -0,0 +1,185 @@
# Agent TX Protocol
Operator-facing reference for the TX streaming extensions to the agent
WebSocket protocol. Implementation plan: [agent_tx_implementation_plan.md](./agent_tx_implementation_plan.md).
Cross-repo design: [agent_tx_plan.md](./agent_tx_plan.md).
> **Regulatory note.** Transmission is regulated in every jurisdiction. The
> agent-side interlocks documented below let you configure safe defaults
> for your deployment. They do not replace licensing or responsibility
> for your own emissions. The RIA Hub's consent modal and audit log make
> actions attributable — they are not a legal-compliance layer.
## Opt-in
TX is **disabled by default**. The hub cannot make the agent transmit unless
the operator has explicitly opted in on the agent host.
Two equivalent opt-in paths:
```bash
# Persist to ~/.ria/agent.json so the agent always allows TX.
ria-agent register --hub http://HUB:3005 --api-key KEY \
--allow-tx \
--tx-max-gain-db -10 \
--tx-max-duration-s 60 \
--tx-freq-range 2.4e9 2.5e9 \
--tx-freq-range 5.7e9 5.8e9
```
```bash
# Runtime-only override (does not touch disk).
ria-agent stream --allow-tx
```
Caps:
| Flag | Config key | Effect |
|---|---|---|
| `--tx-max-gain-db VALUE` | `tx_max_gain_db` | Reject any `tx_start` whose `tx_gain > VALUE` |
| `--tx-max-duration-s VALUE` | `tx_max_duration_s` | Auto-stop any TX session after `VALUE` seconds (watchdog in the TX loop) |
| `--tx-freq-range LO HI` (repeatable) | `tx_allowed_freq_ranges` | Reject any `tx_start` whose `tx_center_frequency` falls outside all configured ranges |
The agent enforces each cap **before** opening the SDR. A violating
`tx_start` produces a `tx_status: error` frame and never touches hardware.
## Heartbeat advertisement
Every heartbeat now includes:
```jsonc
{
"type": "heartbeat",
"hardware": ["mock", "pluto"],
"status": "streaming",
"capabilities": ["rx", "tx"], // "tx" present only when tx_enabled=True
"tx_enabled": true,
"sessions": { // omitted when no session is live
"rx": { "app_id": "app-1", "state": "streaming" },
"tx": { "app_id": "app-1", "state": "transmitting" }
}
}
```
Hubs should read `capabilities` to decide whether to surface TX operators
against this agent in the Screens app composer.
## Control messages
### Hub → agent (JSON)
```jsonc
// Arm the TX side. Agent validates interlocks, opens/resolves the SDR,
// and transitions into "armed". The next binary frames are consumed as
// TX IQ buffers.
{
"type": "tx_start",
"app_id": "app-1",
"radio_config": {
"device": "pluto",
"identifier": "ip:192.168.3.1",
"tx_sample_rate": 1000000,
"tx_center_frequency": 2450000000,
"tx_gain": -20, // dB; Pluto uses negative attenuation
"tx_bandwidth": 1000000, // optional
"buffer_size": 1024,
"underrun_policy": "pause" // "pause" (default) | "zero" | "repeat"
}
}
// Update parameters at the next buffer boundary. No re-arm needed.
{ "type": "tx_configure", "app_id": "app-1",
"radio_config": { "tx_gain": -25 } }
// Stop TX, drain the inbound queue, pause_tx, release the SDR (if no RX
// session is still using it). A new tx_start can follow immediately.
{ "type": "tx_stop", "app_id": "app-1" }
```
### Hub → agent (binary)
- Raw interleaved float32 IQ, normalised to `[-1, 1]`.
- One WebSocket frame = one buffer = `buffer_size` complex samples =
`buffer_size * 2 * 4` bytes.
- Accepted only while a TX session is live. Frames outside that window
are logged and dropped.
- Malformed frames (odd float count, wrong size) trigger one underrun
cycle but do not crash the stream.
### Agent → hub (JSON)
```jsonc
{ "type": "tx_status", "app_id": "app-1", "state": "armed" }
{ "type": "tx_status", "app_id": "app-1", "state": "transmitting" }
{ "type": "tx_status", "app_id": "app-1", "state": "underrun" }
{ "type": "tx_status", "app_id": "app-1", "state": "done" }
{ "type": "tx_status", "app_id": "app-1", "state": "error",
"message": "tx_gain -5 exceeds cap -15.0" }
```
Transitions:
```
tx_start tx_stop
—————————————————▶ armed ▶ transmitting ——————————▶ done
│ │
│ │ queue empties + policy="pause"
│ ▼
│ underrun ▶ done (auto-teardown)
└─ interlock / init failure ▶ error (no session)
```
## Underrun policies
When the inbound TX queue is empty at a buffer boundary:
| Policy | Behavior |
|---|---|
| `pause` *(default)* | Callback returns silence, calls `pause_tx()`, flips the session into `underrun`. Watchdog emits `tx_status: underrun` + `tx_status: done` and tears down. Hub must re-issue `tx_start` to resume. |
| `zero` | Callback returns a zero-filled buffer. Session stays alive; no status change. Carrier continues with dead air. |
| `repeat` | Callback returns the most recently transmitted buffer. If no buffer has arrived yet, falls back to zero for that cycle. |
Choose `pause` for correctness-sensitive workloads (any data modulation
where zero-fill or repeat corrupts the stream). Choose `zero` or `repeat`
for continuous-carrier use cases where brief stalls are acceptable.
## Concurrent RX + TX
A single `app_id` may hold both an RX session (`start`/`stop`) and a TX
session (`tx_start`/`tx_stop`) on the same agent at the same time. When
both reference the same `(device, identifier)`, the agent shares a single
driver instance between the two sessions (ref-counted release on stop).
Multi-app sharing of one SDR is not supported in v1. A second `tx_start`
with a different `app_id` while another TX session is live produces
`tx_status: error "tx already active on this agent"`.
## Buffer format recap
- **Direction** is the only framing: hub → agent binary means TX,
agent → hub binary means RX.
- **Layout**: `[I0, Q0, I1, Q1, …]` as little-endian float32.
- **Size**: `buffer_size * 2 * 4` bytes. Mismatched sizes are treated as
a single-cycle underrun (malformed frame).
- **Range**: samples must lie in `[-1, 1]`. Out-of-range values are
transmitted as-is; the SDR driver may clip.
## Configuration reference
`~/.ria/agent.json` is written by `ria-agent register` and read by
`ria-agent stream`. Minimum schema with TX:
```json
{
"hub_url": "https://hub.example.com",
"agent_id": "agent-abc123",
"token": "rha_...",
"tx_enabled": true,
"tx_max_gain_db": -10.0,
"tx_max_duration_s": 60,
"tx_allowed_freq_ranges": [[2.4e9, 2.5e9], [5.7e9, 5.8e9]]
}
```
File permissions are enforced to `0600` by `save()`.

View File

@ -0,0 +1,232 @@
---
name: Per-user agent registration keys
description: Replace the shared [wac] API_KEY with per-user registration keys issued from the RIA Agents page on RIA Hub.
type: plan
---
# Per-user agent registration keys — plan
**Status:** design only; nothing implemented.
**Owner (toolkit side):** `ria-toolkit-oss`
**Owner (hub side):** `ria-hub` / `controller`
**Related:** [screens_agent_handoff.md](./screens_agent_handoff.md), [agent_tx_protocol.md](./agent_tx_protocol.md)
---
## Context (current state)
Today, `ria-agent register` calls `POST {hub_url}/screens/agents/register` with
an `X-API-Key` header ([cli.py:41-64](../src/ria_toolkit_oss/agent/cli.py#L41-L64)).
The hub validates that header against a single shared secret — `[wac] API_KEY`
in the hub's `app.ini` ([legacy_executor.py:821-822](../src/ria_toolkit_oss/agent/legacy_executor.py#L821-L822)).
The hub responds with `{agent_id, token}`; the agent persists both to
`~/.ria/agent.json` and uses `token` as the bearer on the WS handshake
afterwards.
Consequences of the shared secret:
- Every agent operator holds the same key → no per-user attribution in logs.
- Revoking one operator forces a rotation across every deployed agent.
- Key-in-CLI-history leaks escalate to the whole fleet.
- Nothing ties a registered agent to a human in the hub's user table.
## Goal
A user signs into `riahub.ai`, opens an **RIA Agents** page, mints a key, and
uses it once with `ria-agent register`. The resulting agent is owned by that
user; the key can be revoked without affecting anyone else's agents.
The agent-side `token` returned by `/screens/agents/register` keeps its current
role (bearer for the WS handshake). Only the *registration* credential
changes.
---
## User flow
1. User signs into `https://riahub.ai`.
2. User navigates to **Settings → RIA Agents** (or a top-level `/agents`
page — see open question O1).
3. User clicks **Generate registration key**. A modal shows the key **once**,
with copy-to-clipboard. Only a prefix + hash is stored server-side.
4. User runs, on the agent host:
```
ria-agent register --hub https://riahub.ai --api-key ria_reg_<...>
```
5. Hub validates the key, creates an agent row owned by the user, marks the
key as `consumed` (one-shot) or bumps `last_used_at` (multi-use — see O2),
and returns `{agent_id, token}` exactly as today.
6. The agent list on the same page shows the new agent's `name`, `hardware[]`,
`last_heartbeat`, and **Revoke** / **Rename** actions.
---
## Scope split
### Toolkit (`ria-toolkit-oss`)
The CLI already sends `X-API-Key`, so no protocol change is required. Two
small quality-of-life changes:
| # | Change | File |
|---|--------|------|
| T1 | Update `--api-key` help text and [cli.py:8 docstring](../src/ria_toolkit_oss/agent/cli.py#L8) to say "personal registration key from the RIA Agents page" rather than "Hub API key". | [agent/cli.py](../src/ria_toolkit_oss/agent/cli.py) |
| T2 | On registration failure, if the response body is JSON with a `reason` field (`invalid_key` / `expired` / `already_consumed` / `revoked`), surface it verbatim instead of the raw `HTTPError`. Makes user-facing errors actionable. | [agent/cli.py:56-61](../src/ria_toolkit_oss/agent/cli.py#L56-L61) |
No change to `config.py`, `ws_client.py`, or the streamer — the `token`
returned by register is still what authenticates the WS connection.
### Hub (`ria-hub` / `controller`)
Paths below are inferred from [screens_agent_handoff.md](./screens_agent_handoff.md)
(`controller/app/modules/...`). Hub team should sanity-check before starting.
#### Prior art — check RIA Conductor first
The RIA Conductor feature is believed to already implement similar key
generation (likely for authenticating conductors to the hub). **Before
building anything in this section, read the Conductor key code** and decide
whether to:
- **Reuse** it as-is (shared `registration_keys` table, `kind` column
discriminating `conductor` vs. `agent`) — preferred if the shapes line up.
- **Extract** the hashing / minting / revoke primitives into a shared
`registration_keys` module that both features depend on.
- **Fork** a parallel `agent_registration_keys` table — only if the
Conductor model is materially different (e.g. per-org scoping, different
lifetime rules) and forcing a merge would distort one or both features.
Whichever path is chosen should be decided up front and noted on the PR, so
we don't end up with two near-identical key subsystems by accident. The
security notes below (argon2id, one-time reveal, rate limits, audit logging)
apply regardless of which path is taken — confirm Conductor already does
these; if not, the fix belongs in the shared code, not this feature.
#### Data model
New collection (Mongo) or table (if Postgres is used for users):
```
registration_keys
_id
user_id # FK to hub users
name # user-supplied label, e.g. "lab laptop"
key_prefix # first 8 chars of the plaintext, for UI display
key_hash # argon2id or bcrypt of the full plaintext
created_at
expires_at # optional; null = no expiry
consumed_at # null until first successful registration (if one-shot)
revoked_at # null unless explicitly revoked
last_used_at # updated on every successful use (if multi-use)
```
Augment the existing agents collection with `owner_user_id` (FK) and
`registered_via_key_id` (FK to `registration_keys._id`).
Decide O2 before building: one-shot vs. reusable. Recommendation: one-shot by
default with an optional "reusable for N days" toggle, since one-shot is the
lower-blast-radius default and matches how GitHub/Gitea deploy keys behave.
#### Endpoints
| # | Endpoint | Notes |
|---|----------|-------|
| H1 | `POST /api/v1/user/registration-keys` | Auth: session cookie. Body: `{name, expires_in_days?, reusable?}`. Returns plaintext key **once**. |
| H2 | `GET /api/v1/user/registration-keys` | Auth: session cookie. Lists the caller's keys (prefix + metadata, never plaintext). |
| H3 | `DELETE /api/v1/user/registration-keys/{id}` | Auth: session cookie. Revokes. |
| H4 | `POST /screens/agents/register` (existing) | Change auth: look up `X-API-Key` by hash instead of string-compare against `[wac] API_KEY`. Reject if revoked / expired / consumed. Set `owner_user_id` on the new agent row. |
| H5 | `GET /api/v1/user/agents` | Auth: session cookie. Lists the caller's agents for the UI. |
| H6 | `DELETE /api/v1/user/agents/{id}` | Auth: session cookie. De-registers and closes any live WS. |
H4 is the only backwards-incompatible change. See the migration section for
how to ship it without breaking existing deployments.
#### Frontend
New page — **Settings → RIA Agents** — two panels:
- **Registration keys:** table (name, prefix, created, expires, last used,
revoke button) + "Generate" button that opens the one-time-reveal modal.
- **Agents:** table (name, hardware, status, last heartbeat, rename, revoke).
Matches the existing Gitea-style Settings sidebar if RIA Hub is Gitea-based
(O3).
---
## Migration from the shared `[wac] API_KEY`
The shared key is likely in use on every existing deployment. To avoid a
flag day:
1. **Dual-accept window.** H4 accepts *either* a per-user key (lookup by
hash) *or* the legacy `[wac] API_KEY` string. When the legacy key is used,
the resulting agent has `owner_user_id = null` and a warning is logged.
2. **Admin UI surfaces "unowned" agents** so an admin can re-assign them or
ask owners to re-register.
3. **Deprecation window of one release**, then H4 rejects the legacy key and
the `[wac] API_KEY` config is removed from `app.ini`.
No toolkit-side migration needed — existing `~/.ria/agent.json` files already
store the post-registration `token`, which keeps working regardless of how
registration itself was authenticated.
---
## Security notes
- Store `key_hash` with a password hash (argon2id), not a fast hash. The key
is a secret-equivalent: treat it like a password.
- Plaintext key format: `ria_reg_<base64url of 32 random bytes>`. Prefix makes
the purpose obvious in leaked logs and lets scanners (trufflehog etc.)
recognize it.
- One-time reveal in the UI — never persist or re-display the plaintext.
- Rate-limit H4 per source IP and per `key_prefix` to blunt brute-force on
leaked prefixes. Lock a key out after N failed attempts in M minutes.
- Log every H4 call (success + failure, with key prefix and source IP)
to the audit trail.
---
## Open questions
- **O1.** Where does the page live? A top-level `/agents` route is
discoverable; `/user/settings/agents` matches Gitea's existing IA. Pick
before F7 (frontend task).
- **O2.** One-shot vs. reusable keys (default and whether both are offered).
Recommendation above; needs product sign-off.
- **O3.** Is RIA Hub's web UI really a Gitea fork? URL patterns
(`/qoherent/-/packages/...`, `.git` clones) suggest yes, but the "Settings"
integration plan depends on confirming this. If it isn't, F7 is a standalone
page instead.
- **O4.** Does the agent bearer `token` need per-user scoping too, or is
ownership-at-registration enough? Today the token is opaque and not tied
to a user in the WS handler. Probably fine to defer until after per-user
keys ship.
- **O5.** Should admins be able to mint keys on behalf of other users (for
onboarding)? If yes, H1 needs an admin-scoped variant.
- **O6.** Conductor reuse decision — reuse / extract / fork. Must be answered
before any hub-side code lands. See "Prior art" above.
---
## Out of scope
- SSO / OIDC for agent-to-hub auth (current `token` bearer is kept as-is).
- Per-agent capability scoping beyond what `--allow-tx` already does at
registration time.
- Fleet provisioning (N agents from one key); covered instead by "reusable"
flag in O2 if that's the chosen default.
---
## MVP cut
If the hub team wants the smallest shippable slice:
- H1, H2, H3, H4 (with dual-accept), H5.
- Frontend: registration-keys panel only; reuse the existing agents admin
view if one already exists.
- T1 toolkit copy-change.
Defer H6, rename flows, T2, and audit logging to a follow-up.

104
docs/ria_app_hub_handoff.md Normal file
View File

@ -0,0 +1,104 @@
# `ria-app` Hub-Side Handoff
**Repo:** `ria-hub`
**Goal:** Make containerized apps built by Application Composer self-describing so the new `ria-app` CLI in `ria-toolkit-oss` can auto-configure GPU/USB/network flags at `docker run` time. No user copy-paste of flags.
---
## Context — what exists today
In `ria-toolkit-oss` (branch `screens-connection`) there is now a `ria-app` CLI:
```bash
ria-app configure --registry registry.riahub.ai --namespace qoherent
ria-app pull <app>[:tag]
ria-app run <app>[:tag] [--config config.yaml]
ria-app list
ria-app logs <app> [-f]
ria-app stop <app>
```
`ria-app run` inspects OCI image labels and auto-adds runtime flags:
| Label | Value (example) | Effect |
|---|---|---|
| `ria.profile` | `native-x86`, `nvidia-x86`, `holoscan` | `nvidia`/`holoscan`/`cuda` → adds `--gpus all` |
| `ria.hardware` | comma list: `pluto,usrp,rtlsdr,hackrf,bladerf,thinkrf` | USB-attached SDRs → `--device /dev/bus/usb`; networked SDRs → `--net host` |
| `ria.app` | `<app-name>` | Used by `ria-app list` to filter images |
| `ria.version` | `<git sha or semver>` | Informational |
If the labels are missing, `ria-app run` still works but can't auto-configure — the user has to pass `--docker-args ...` themselves. So the value here is entirely in getting CI to stamp the labels.
---
## What to change in `ria-hub`
### 1. Stamp OCI labels on every built image
In the Application Composer build flow (follow the path from `application_composer.go:172` `ComposerBuildTrigger` → generated `.riahub/workflows/*.yml``sample-build-tools` `full_generator.py``Dockerfile` emission), add `LABEL` instructions to the generated Dockerfile. The values should be computed from the app JSON the user submitted, not hard-coded:
```dockerfile
LABEL ria.app="${APP_NAME}"
LABEL ria.profile="${PROFILE}" # native-x86 | nvidia-x86 | holoscan | ...
LABEL ria.hardware="${HARDWARE_CSV}" # e.g. "pluto,usrp" (empty string if none)
LABEL ria.version="${GIT_SHA}"
LABEL ria.operators="${OPERATORS_CSV}" # optional, nice for debugging
```
`HARDWARE_CSV` derivation: walk the operator graph in the submitted app JSON and collect the set of hardware backends that any operator requires. The mapping from operator → hardware tag should live next to the existing `operator_generator.py` apt-dep resolution (that code already knows, per operator, whether it needs `libuhd-dev`, `libad9361-dev`, `libhackrf-dev`, `librtlsdr-dev`, etc.). Reuse that table — just emit the short tag (`usrp`, `pluto`, `hackrf`, `rtlsdr`) alongside the apt package name.
Allowed hardware tags (must match what `ria-app` recognizes):
- `pluto`, `rtlsdr`, `hackrf`, `bladerf` → USB
- `usrp`, `thinkrf` → network
- (extend here when new SDR backends are added)
If an operator needs both (e.g. Pluto over USB *and* its iio network endpoint), list it once — `ria-app` already applies both USB and host-net when `pluto` appears.
### 2. Prefer `LABEL` over `ARG`-only
The CI job likely already passes things like `APP_NAME` and `GIT_SHA` as build args. Those args disappear after build unless promoted to `LABEL`. Make sure each of the five labels above ends up in the final image layer (verify with `docker image inspect --format '{{json .Config.Labels}}' <ref>`).
### 3. Push with both `:<sha>` and `:latest` tags
`ria-app` defaults to `:latest` when the user omits a tag. If CI only pushes immutable SHA tags today, also push `:latest` on main-branch builds so `ria-app run my-classifier` Just Works.
### 4. (Optional but recommended) App index endpoint
Add `GET /apps` to the hub API returning something like:
```json
[
{
"name": "my-classifier",
"image": "registry.riahub.ai/qoherent/my-classifier:latest",
"profile": "nvidia-x86",
"hardware": ["pluto"],
"updated_at": "2026-04-14T10:00:00Z"
}
]
```
This lets `ria-app list --remote` show available apps without the user knowing image names. Not required for MVP — skip if it adds scope.
### 5. (Optional) Ship a default `config.yaml` inside the image at a known path
`ria-app run --config <path>` mounts the user's config to `/config/config.yaml` and sets `RIA_CONFIG=/config/config.yaml`. The runtime already falls back to an embedded config per your handoff notes, so this just needs to keep working — no change unless you want to standardize the embedded path.
---
## Acceptance checklist
- [ ] A Composer-built image for a native-x86 app with a Pluto operator has labels: `ria.profile=native-x86`, `ria.hardware=pluto`, `ria.app=<name>`, `ria.version=<sha>`.
- [ ] A Composer-built image for an nvidia-x86 app has `ria.profile=nvidia-x86`.
- [ ] `docker image inspect --format '{{json .Config.Labels}}' <ref>` shows all five labels.
- [ ] `:latest` tag is pushed for main-branch builds.
- [ ] Running `ria-app run <app>` on a user's machine starts the container with the right `--gpus` / `--device` / `--net` flags without the user passing anything beyond the app name.
---
## Out of scope
- Anything on the `ria-toolkit-oss` side — the CLI is already implemented on branch `screens-connection`.
- Changes to the generated C++ code, CMakeLists, or runtime config lookup.
- Artifact downloads — we're distributing via the container registry only.

View File

@ -0,0 +1,257 @@
# Screens Agent Streamer — Hand-off
**Branch:** `screens-connection` in `ria-toolkit-oss`
**Status:** Part A complete (tests passing, 25/25). Part B is pending in the `ria-hub` repo.
**Related docs:** [screens_agent_streamer_plan.md](./screens_agent_streamer_plan.md), [../screens_connection_updates.md](../screens_connection_updates.md)
---
## What's done (Part A — this repo)
### Phase 1 — SDR foundation
- Added `ria_toolkit_oss.sdr.detect_available() -> dict[str, type]` that probes
every driver module and returns the map of importable driver classes.
Importability is used as a proxy for "user has installed this driver's
optional dep"; it does **not** probe for physical hardware.
- Added `SdrDisconnectedError` (subclass of `SDRError`) plus a
`translate_disconnect(exc)` helper in [sdr/sdr.py](../src/ria_toolkit_oss/sdr/sdr.py).
Pattern-matches USB/device-drop exceptions (`ENODEV`/`EIO`, "no such device",
"broken pipe", etc.) and converts them so the streamer can distinguish a
real hardware failure from a transient error.
- Audited every driver under `sdr/` for GUI imports — none found. All drivers
are headless-clean at import time.
- `Pluto.rx(num_samples)` now wraps `self.radio.rx()` with
`translate_disconnect`. The same one-liner pattern can be applied to other
drivers (hackrf/rtlsdr/usrp/blade/thinkrf) when they get wired to the
streamer — deferred until each is needed.
**Not done (Phase 1 Task 4):** SigMF recording validation across radio types —
needs real captures from each device, out of scope without hardware.
### Phase 2 — Agent package restructure
Moved the former `src/ria_toolkit_oss/agent.py` into a package:
```
src/ria_toolkit_oss/agent/
├── __init__.py # re-exports NodeAgent + main for back-compat
├── legacy_executor.py # former agent.py, unchanged behavior
├── streamer.py # new WebSocket IQ streamer
├── ws_client.py # persistent WS client + heartbeat + reconnect
├── hardware.py # wraps sdr.detect_available() for heartbeat payloads
├── config.py # ~/.ria/agent.json load/save (0600 perms)
└── cli.py # unified CLI (run / stream / detect / register)
```
Back-compat preserved: `from ria_toolkit_oss.agent import NodeAgent` still
works, and bare `ria-agent ...` with no subcommand (or with legacy flags
like `--hub`) falls through to the original long-poll executor.
### Phase 3 — Streamer implementation
- `ws_client.WsClient``async run()` loop, JSON heartbeats on a timer,
auto-reconnect on drop, bearer-token auth header on connect.
- `streamer.Streamer` — handles `start` / `stop` / `configure` messages,
opens the SDR via an injectable `sdr_factory`, runs capture in an executor
thread, and ships raw interleaved float32 IQ bytes per `rx()` call.
- `hardware.heartbeat_payload(status, app_id)``{type, hardware[], status}`.
- `config.AgentConfig` — dataclass round-tripped through JSON, unknown keys
preserved in an `extra` dict.
- CLI subcommands: `run` (legacy), `stream`, `detect`, `register`.
### Phase 4 — Protocol
Matches `screens_connection_updates.md` §"WebSocket Protocol" exactly:
| Direction | Message |
|-----------|---------|
| A → S (JSON) | `{type: heartbeat, hardware[], status}` |
| A → S (JSON) | `{type: status, status, app_id}` |
| A → S (JSON) | `{type: error, app_id, message}` |
| A → S (binary) | raw interleaved float32 IQ, one frame per `rx()` |
| S → A (JSON) | `{type: start, app_id, radio_config}` |
| S → A (JSON) | `{type: stop, app_id}` |
| S → A (JSON) | `{type: configure, app_id, radio_config}` — applied at next boundary |
**Auth decision:** bearer token in `Authorization` header on the initial
handshake. (Open questions in the plan around first-frame auth / mid-buffer
`configure` / backpressure remain open.)
### Phase 5 — Tests
25 tests, all green under `poetry run pytest tests/agent/`:
- `test_hardware.py``detect_available`, heartbeat payload shape
- `test_config.py` — round-trip, missing-file fallback, extra-key preservation
- `test_streamer.py` — start/stream/stop against `MockSDR` + fake WS, error
frames, configure queueing
- `test_disconnect.py``translate_disconnect` patterns + streamer reports
`SDR disconnected:` and closes the SDR
- `test_ws_client.py` — real local `websockets` server: heartbeat on connect,
auto-reconnect after server drop, malformed-frame resilience
- `test_integration.py` — end-to-end heartbeat → start → 3 binary IQ frames → stop
- `test_legacy.py` — regression: `NodeAgent` still importable
### Dependency / build changes
- Added `websockets (>=12.0,<14.0)` to `[tool.poetry.group.agent.dependencies]`.
- Repointed the `ria-agent` console script from `ria_toolkit_oss.agent:main`
to `ria_toolkit_oss.agent.cli:main` (the unified CLI, which still calls
through to the legacy `main` for back-compat).
### Files changed
```
M pyproject.toml
M poetry.lock
M src/ria_toolkit_oss/sdr/__init__.py
M src/ria_toolkit_oss/sdr/sdr.py
M src/ria_toolkit_oss/sdr/pluto.py
R src/ria_toolkit_oss/agent.py -> src/ria_toolkit_oss/agent/legacy_executor.py
A src/ria_toolkit_oss/agent/{__init__,cli,config,hardware,streamer,ws_client}.py
A tests/agent/{__init__,test_config,test_disconnect,test_hardware,
test_integration,test_legacy,test_streamer,test_ws_client}.py
A docs/screens_agent_streamer_plan.md
A docs/screens_agent_handoff.md (this file)
```
---
## What's left (Part B — `ria-hub` repo)
In priority order. B1 + B2 + B3 + B6 are the MVP; everything else hardens.
### Server-side (MVP)
| # | Task | File / area |
|---|------|-------------|
| B1 | Implement `AgentDataSource` (DataSource ABC, reads IQ from WS connection) and register in `build_data_source()` | `controller/app/modules/screens/data_sources.py` |
| B2 | Add `"agent"` to `dataSource.type` enum in manifest schema; update Pydantic / JSON schema validators | `controller/app/modules/screens/graph_derivation.py` + schema files |
| B3 | Agent WebSocket endpoint `POST /api/agent/ws` (or `GET` with Upgrade) — accepts agent connections, auths on bearer token, bridges the connection to the Celery task's `AgentDataSource` | new `controller/app/modules/agent/routes.py` |
| B6 | Celery wiring: when `dataSource.type == "agent"`, look up the connected agent by `agent_id`, forward `radio_config` as a `start` message, and feed received IQ bytes into the inference loop via `AgentDataSource.next_chunk()` | `controller/app/modules/screens/tasks.py` |
### Server-side (hardening)
| # | Task |
|---|------|
| B4 | Agent registry — MongoDB collection: `agent_id`, `hardware[]`, `last_heartbeat`, `online`, registration tokens |
| B5 | `POST /api/agent/register` returning `{agent_id, token}` for `~/.ria/agent.json` |
### Frontend
| # | Task |
|---|------|
| B7 | Device / agent picker in the Screens app config (Vue 3) |
| B8 | Agents list / status admin view |
### Protocol contract (keep in lockstep with Part A)
- Binary frames are interleaved float32 IQ, one frame per `radio.rx()` call.
- `radio_config` is forwarded verbatim from manifest `dataSource.params`.
Minimum keys the agent handles: `device`, `identifier`, `sample_rate`,
`center_frequency`, `gain`, `buffer_size`.
- `configure` messages from the server apply at the next capture boundary
(current agent implementation).
- Agent authenticates with bearer token in `Authorization` header on the
handshake.
### Still-open protocol questions (pin down before B3 lands)
- Auth frame as fallback when proxies strip `Authorization`?
- Mid-buffer `configure` application for tighter retune latency?
- Backpressure policy when the server's inference loop is slower than the
agent's `rx()` cadence — drop, queue, or pause the agent via a `pause`
control frame?
### Manifest example (new `agent` mode)
```json
{
"dataSource": {
"type": "agent",
"device": "pluto",
"agent_id": "agent-abc123",
"params": {
"identifier": "ip:192.168.3.1",
"sample_rate": 1000000,
"center_frequency": 2450000000,
"gain": 40,
"buffer_size": 1024
}
},
"preprocess": "magnitude_phase_window_stats",
"config": {"inference": {"knownDevices": [], "interval": 1}}
}
```
### Pipeline invariant
**No changes to `inference_core.py`, `preprocessors.py`, or
`run_onnx_chain_loop()`.** `AgentDataSource` is a drop-in replacement for
`SdrDataSource`; everything downstream is unchanged.
---
## Local test setup
### 1. Branch layout
- `ria-toolkit-oss` — branch **`screens-connection`** (this work).
- `ria-hub` — whatever branch you'll be doing Part B on (create a matching
`screens-connection` branch there to keep the hand-off clean).
### 2. Install the toolkit branch into your local RIA Hub
The hub declares `ria-toolkit-oss` as a git dep in its `pyproject.toml`.
Point it at this branch:
```toml
# ria-hub/pyproject.toml
ria-toolkit-oss = { git = "https://riahub.ai/qoherent/ria-toolkit-oss.git", branch = "screens-connection" }
```
Or for a fully local dev loop (edits visible without reinstall):
```bash
# from ria-hub repo
poetry run pip install -e /home/qrf/ria-toolkit-oss
# plus the agent extra so websockets is available server-side too
poetry run pip install websockets
```
### 3. Commit this branch and push (optional, for CI / shared dev)
```bash
cd /home/qrf/ria-toolkit-oss
git add -A
git commit -m "Add WebSocket IQ streamer agent (Part A)"
git push -u origin screens-connection
```
### 4. Try the agent locally against a mock server
You can exercise the whole Part A stack without RIA Hub by running the
integration test:
```bash
cd /home/qrf/ria-toolkit-oss
poetry run pytest tests/agent/test_integration.py -v
```
Or manually, once Part B lands:
```bash
# on the user machine (mock hardware is fine)
ria-agent register --url https://localhost:8000 --token <tok>
ria-agent detect # lists: mock, pluto, ... whatever's importable
ria-agent stream --url ws://localhost:8000/api/agent/ws
```
### 5. End-to-end integration test (after Part B MVP)
1. Start RIA Hub (FastAPI + Celery + Mongo + Redis) with the Part B branch.
2. Run `ria-agent stream` on your laptop.
3. Create a Screens app with `dataSource.type: "agent"` and `device: "mock"`.
4. Start the app; confirm the agent logs "streaming" and the SSE stream
shows inference metrics flowing.

View File

@ -0,0 +1,156 @@
# Screens Agent Streamer — Implementation Plan
**Source doc:** [screens_connection_updates.md](../screens_connection_updates.md)
**Created:** 2026-04-13
**Goal:** Add a thin WebSocket-based IQ streaming agent to `ria-toolkit-oss`, alongside the existing long-poll `NodeAgent`, and wire up the RIA Hub server to consume it.
---
## Architectural Decision
The existing [src/ria_toolkit_oss/agent.py](../src/ria_toolkit_oss/agent.py) (`NodeAgent`) uses HTTP long-polling and runs ONNX inference locally. It stays as-is.
The new **streamer agent** described in `screens_connection_updates.md` is a *different* execution mode — thin, WebSocket-based, server-driven inference. It will be added as a new submodule and exposed as a new CLI subcommand. Both modes coexist; users pick one based on deployment needs.
---
## Part A — ria-toolkit-oss (this repo)
### Phase 1 — SDR foundation
| # | Task | File(s) | Priority |
|---|------|---------|----------|
| 1 | Add `detect_available() -> dict[str, type]` that probes every driver without importing GUI deps | [src/ria_toolkit_oss/sdr/__init__.py](../src/ria_toolkit_oss/sdr/__init__.py) | P0 |
| 2 | Audit each SDR driver for headless cleanliness (no matplotlib/Qt at import time) | `src/ria_toolkit_oss/sdr/{pluto,hackrf,rtlsdr,usrp,blade,thinkrf}.py` | P1 |
| 3 | Raise typed `SdrDisconnectedError` from `radio.rx()` on USB drop instead of crashing | `src/ria_toolkit_oss/sdr/sdr.py` + drivers | P1 |
| 4 | Validate `load_recording` against SigMF captures from each radio type | `tests/io/` | P2 |
### Phase 2 — Agent package restructure (non-breaking)
Promote the existing module to a package and add the streamer next to it:
```
src/ria_toolkit_oss/
├── agent.py # DELETE after move
└── agent/
├── __init__.py # re-export NodeAgent for back-compat
├── legacy_executor.py # former agent.py (NodeAgent, unchanged behavior)
├── streamer.py # NEW — thin IQ streamer loop
├── ws_client.py # NEW — persistent WS client + heartbeat + reconnect
├── hardware.py # NEW — wraps sdr.detect_available()
├── config.py # shared: ~/.ria/agent.json load/save
└── cli.py # unified CLI with subcommands
```
Back-compat requirement: `from ria_toolkit_oss.agent import NodeAgent` must keep working. Keep the `ria-agent` console script entry point; expose both modes via subcommands.
### Phase 3 — Streamer implementation
| # | Task | File | Priority |
|---|------|------|----------|
| 5 | `ws_client.py` — persistent WebSocket, auto-reconnect, heartbeat loop | `src/ria_toolkit_oss/agent/ws_client.py` | P0 |
| 6 | `streamer.py` — main loop: receive `start` → open SDR → `radio.rx()` → send binary IQ → handle `stop`/`configure` | `src/ria_toolkit_oss/agent/streamer.py` | P0 |
| 7 | `hardware.py` — heartbeat payload builder, uses `sdr.detect_available()` | `src/ria_toolkit_oss/agent/hardware.py` | P0 |
| 8 | `config.py``~/.ria/agent.json` read/write, registration token storage | `src/ria_toolkit_oss/agent/config.py` | P1 |
| 9 | CLI subcommands: `ria-agent register`, `detect`, `stream` (new), `run` (legacy long-poll) | `src/ria_toolkit_oss/agent/cli.py` | P0 |
| 10 | Add `websockets` to dependencies | [pyproject.toml](../pyproject.toml) | P0 |
### Phase 4 — WebSocket protocol
Implement exactly the messages from `screens_connection_updates.md` §"WebSocket Protocol":
**Agent → Server (JSON control):**
```json
{"type": "heartbeat", "hardware": ["pluto", "hackrf"], "status": "idle"}
{"type": "status", "status": "streaming", "app_id": "abc"}
{"type": "error", "app_id": "abc", "message": "USB device disconnected"}
```
**Agent → Server (binary data):** raw interleaved float32 IQ bytes per `radio.rx()` call.
**Server → Agent (JSON control):**
```json
{"type": "start", "app_id": "...", "radio_config": {"device": "pluto", ...}}
{"type": "stop", "app_id": "..."}
{"type": "configure", "app_id": "...", "radio_config": {"center_frequency": 915000000}}
```
The agent does **not** interpret manifests, models, or preprocessing — it just applies `radio_config` to `ria_toolkit_oss.sdr.<device>`.
### Phase 5 — Tests
| # | Task | Location |
|---|------|----------|
| 11 | Unit: streamer loop against `sdr.mock` + fake WS | `tests/agent/test_streamer.py` |
| 12 | Unit: `ws_client` reconnect/heartbeat timing | `tests/agent/test_ws_client.py` |
| 13 | Unit: `hardware.detect_available()` and heartbeat payload | `tests/agent/test_hardware.py` |
| 14 | Integration: local `websockets` server → mock SDR → full start/stream/stop cycle | `tests/agent/test_integration.py` |
| 15 | Regression: `NodeAgent` still importable and functional | `tests/agent/test_legacy.py` |
---
## Part B — RIA Hub (hand-off to separate session)
> **Give this section to Claude in the ria-hub repo session.** It depends on Part A shipping first (or at minimum, stubbed protocol).
### Server-side tasks
| # | Task | File / Area | Priority |
|---|------|-------------|----------|
| B1 | Implement `AgentDataSource` (DataSource ABC, reads IQ from WebSocket connection) and register it in `build_data_source()` | `controller/app/modules/screens/data_sources.py` | P0 |
| B2 | Add `"agent"` to `dataSource.type` enum in manifest schema; update Pydantic/JSON schema validators | `controller/app/modules/screens/graph_derivation.py` + manifest schema files | P0 |
| B3 | Add agent WebSocket endpoint `POST /api/agent/ws` — accepts agent connections, auth via registration token, bridges the connection to the Celery task's `AgentDataSource` | `controller/app/modules/agent/routes.py` (new) | P0 |
| B4 | Agent registry: MongoDB collection tracking `agent_id`, hardware list, last heartbeat, online status, registration tokens | `controller/app/modules/agent/models.py` (new) | P1 |
| B5 | Registration endpoint `POST /api/agent/register` returning agent credentials for `~/.ria/agent.json` | `controller/app/modules/agent/routes.py` | P1 |
| B6 | Celery task wiring: when manifest `dataSource.type == "agent"`, look up the connected agent by `agent_id`, forward `radio_config` to it, and feed received IQ chunks into the inference loop via `AgentDataSource.next_chunk()` | `controller/app/modules/screens/tasks.py` | P0 |
| B7 | Device/agent picker UI in Screens app config (Vue 3) | frontend screens panel | P2 |
| B8 | Agents list/status admin view | frontend admin area | P2 |
### Protocol contract (must match Part A exactly)
See `screens_connection_updates.md` §"WebSocket Protocol". Key invariants:
- Binary frames are interleaved float32 IQ, one frame per `radio.rx()` call.
- `radio_config` is derived from the manifest's `dataSource.params` and forwarded verbatim.
- The server sends `configure` for retune-without-stop flow; the agent is responsible for applying it at the next capture boundary.
### Manifest example (new mode)
```json
{
"dataSource": {
"type": "agent",
"device": "pluto",
"agent_id": "agent-abc123",
"params": {
"identifier": "ip:192.168.3.1",
"sample_rate": 1000000,
"center_frequency": 2450000000,
"gain": 40,
"buffer_size": 1024
}
},
"preprocess": "magnitude_phase_window_stats",
"config": {"inference": {"knownDevices": [], "interval": 1}}
}
```
### Pipeline invariant
**No changes to `inference_core.py`, `preprocessors.py`, or `run_onnx_chain_loop()`.** `AgentDataSource` is a drop-in for `SdrDataSource`; everything downstream is unchanged.
---
## Rollout Order
1. Part A Phase 1 (SDR foundation) — safe to land independently.
2. Part A Phases 23 (agent package + streamer) — lands the `ria-agent stream` CLI; no server needed to build/test against a mock WS.
3. Part B B1, B2, B3, B6 — minimum viable server path; lets a real agent connect end-to-end.
4. Part A Phase 5 integration test against a dev RIA Hub instance.
5. Part B B4, B5 (registry + registration) — hardens multi-agent deployments.
6. Part B B7, B8 (UI) — operator-facing polish.
## Open Questions
- Auth model for the WebSocket handshake: bearer token in header, query param, or first-message auth frame?
- Should `configure` apply mid-buffer or only at capture boundaries? (Doc implies boundaries; confirm for retune latency budget.)
- Backpressure policy when the server's inference loop is slower than the agent's `rx()` cadence — drop frames, queue, or pause?

View File

@ -0,0 +1,123 @@
# Bug: `SpectrogramDashboardOp` destructor calls `std::terminate`
## Summary
`SpectrogramDashboardOp` spawns an HTTP server thread during setup but its destructor
does not `join()` or `detach()` it. Per the C++ standard, destroying a joinable
`std::thread` calls `std::terminate()` — so **any** shutdown path kills the app:
init failure, Ctrl-C, or normal exit at end of `main`.
## Evidence
Built app (`new_dashboard`) crashes on shutdown with this backtrace:
```
#3 __GI_raise
#4 __GI_abort
#5 libstdc++ (std::terminate handler)
#6 libstdc++
#7 std::terminate()
#8 std::thread::~thread()
#9 ria::ops::SpectrogramDashboardOp::~SpectrogramDashboardOp()
#10 __gnu_cxx::new_allocator<SpectrogramDashboardOp>::destroy(...)
...
#23 ria::Pipeline::~Pipeline()
#24 main
```
The stack shows the failure is entirely inside the op's own destructor — not
downstream of any flow / port-wiring issue. The op's startup message
`HTTP server started on port 8080` prints just before the crash, confirming the
server thread is running and joinable when destruction begins.
## Reproduction
1. Build any RIA app that includes `SpectrogramDashboardOp`.
2. Run the container; it crashes with `terminate called without an active exception`
regardless of whether other operators succeed or fail.
## Root cause
Standard C++ invariant:
> If a `std::thread` object is destroyed while still `joinable()`, the destructor
> calls `std::terminate()`.
> — [cppreference.com/w/cpp/thread/thread/~thread](https://en.cppreference.com/w/cpp/thread/thread/~thread)
The destructor needs to (a) signal the server to stop, (b) wait for the thread
to exit, and (c) join it before the `std::thread` member is destroyed.
## Fix
In `SpectrogramDashboardOp`:
```cpp
SpectrogramDashboardOp::~SpectrogramDashboardOp() {
// 1. Tell the HTTP server / websocket server to stop accepting
// and to return from its serve loop. Exact call depends on the
// HTTP library in use:
// - cpp-httplib: server_.stop();
// - Boost.Beast: acceptor_.close(); io_context_.stop();
// - custom: shutdown_flag_.store(true); close(listen_fd_);
if (server_) {
server_->stop();
}
// 2. Join the thread if it was ever started.
if (http_thread_.joinable()) {
http_thread_.join();
}
}
```
If multiple threads are owned (e.g. separate WebSocket broadcaster, update-rate
timer), join **each** of them.
## Related checks
While fixing this op, audit any other operator in the same repo that owns a
thread:
```bash
grep -rn "std::thread " src/
```
For each match, confirm the owning class's destructor does:
```cpp
if (thread_.joinable()) thread_.join();
```
plus whatever shutdown signal is needed to make the thread actually return.
## Acceptance
- `SpectrogramDashboardOp` destructor joins all spawned threads.
- A RIA app containing this op exits cleanly on `Ctrl-C` with no
`terminate called without an active exception` message.
- Forcing an init failure (e.g. a bad `websocket_port`) produces a readable
exception message instead of `SIGABRT`.
---
## Prompt to paste into Claude Code (in the op's repo)
> `SpectrogramDashboardOp` has a latent bug: its destructor lets a joinable
> `std::thread` (the HTTP server thread that prints "HTTP server started on
> port 8080") go out of scope, which per the C++ standard calls
> `std::terminate()`. This makes any built RIA app containing this op crash on
> every shutdown path — init failure, normal exit, and Ctrl-C — with the
> unhelpful message `terminate called without an active exception`. Stack trace
> at the point of abort goes through `std::thread::~thread()`
> `SpectrogramDashboardOp::~SpectrogramDashboardOp()`.
>
> Fix the destructor: (a) signal the HTTP server to stop (e.g. `server_->stop()`
> for cpp-httplib, or close the listening socket + set a shutdown flag), then
> (b) `if (http_thread_.joinable()) http_thread_.join();`. Apply the same pattern
> to any other `std::thread` members the op owns (WebSocket broadcaster, rate
> timer, etc.). Then grep for other `std::thread` members in this repo and audit
> their owners' destructors for the same bug.
>
> Acceptance: the op's destructor joins every thread it starts; a test that
> constructs and immediately destroys the op exits cleanly; Ctrl-C on a running
> app produces no `terminate` message.

View File

@ -0,0 +1,170 @@
# Agent CLI Simplification — Handoff to ria-toolkit-oss
**Repo:** `ria-toolkit-oss`, branch `screens-connection`
**Goal:** Reduce agent setup from 3+ commands to 2 simple ones.
---
## Current UX (painful)
```bash
# Step 1: Register via curl against FastAPI directly
curl -X POST http://hub:8005/screens/agents/register \
-H 'X-API-Key: supersecretapikey' \
-d '{"name": "my-agent"}'
# → {"agent_id": "agent-55cf3c5b8137f6f3", "token": "45Hbt..."}
# Step 2: Manually save credentials
ria-agent register \
--url http://hub:8005 \
--token 45HbtlpVDX7_XTF47biDcLcyiVmM51icEZVJ7J_UrEE \
--agent-id agent-55cf3c5b8137f6f3
# Step 3: Stream (with manual URL construction)
ria-agent stream \
--url "ws://hub:8005/screens/agent/ws?agent_id=agent-55cf3c5b8137f6f3" \
--token 45HbtlpVDX7_XTF47biDcLcyiVmM51icEZVJ7J_UrEE
```
Problems:
- User must know the FastAPI port (8005), not just the hub URL (3005)
- `register` subcommand only saves locally — doesn't call the server
- `_derive_ws_url` builds `/api/agent/ws/{agent_id}` but server endpoint is `/screens/agent/ws?agent_id=...`
- User must copy-paste agent_id and token between commands
---
## Target UX
```bash
# One-time setup: register with the hub (hits server, saves config)
ria-agent register --hub http://whitehorse:3005 --api-key supersecretapikey --name lab-pluto
# Stream (reads config, connects automatically)
ria-agent stream
```
That's it. Two commands, no copy-pasting.
---
## Changes needed in ria-toolkit-oss
### 1. `cli.py` — Make `register` call the server
Current `_cmd_register` just saves to `~/.ria/agent.json`. It should:
1. POST to `{hub_url}/screens/agents/register` with `X-API-Key` header
2. Receive `{agent_id, token}` from the server
3. Save everything to `~/.ria/agent.json`
```python
def _cmd_register(args: argparse.Namespace) -> int:
import urllib.request
import json as _json
hub_url = args.hub.rstrip("/")
api_key = args.api_key
# Call the server to register
url = f"{hub_url}/screens/agents/register"
body = _json.dumps({"name": args.name or ""}).encode()
req = urllib.request.Request(
url,
data=body,
headers={
"Content-Type": "application/json",
"X-API-Key": api_key,
},
)
try:
with urllib.request.urlopen(req) as resp:
data = _json.loads(resp.read())
except Exception as e:
print(f"error: registration failed: {e}", file=sys.stderr)
return 1
agent_id = data["agent_id"]
token = data["token"]
# Save to config
cfg = _config.load()
cfg.hub_url = hub_url
cfg.agent_id = agent_id
cfg.token = token
if args.name:
cfg.name = args.name
cfg.insecure = bool(args.insecure)
path = _config.save(cfg)
print(f"Registered agent: {agent_id}")
print(f"Credentials saved to {path}")
return 0
```
Update the argparse for `register`:
```python
p_reg = sub.add_parser("register", help="Register agent with RIA Hub and save credentials")
p_reg.add_argument("--hub", required=True, help="RIA Hub URL (e.g. http://whitehorse:3005)")
p_reg.add_argument("--api-key", required=True, help="Hub API key for authentication")
p_reg.add_argument("--name", default=None, help="Human-friendly agent name")
p_reg.add_argument("--insecure", action="store_true", help="Skip TLS verification")
```
Remove `--url`, `--token`, `--agent-id` from register — those are now server-generated.
### 2. `cli.py` — Fix `_derive_ws_url`
Current (wrong):
```python
suffix = f"/api/agent/ws/{agent_id}" if agent_id else "/api/agent/ws"
```
Should be:
```python
suffix = f"/screens/agent/ws?agent_id={agent_id}" if agent_id else "/screens/agent/ws"
```
### 3. `cli.py` — Make `stream` zero-arg by default
Current `_cmd_stream` already loads config and derives the URL — it just needs the URL fix above. After that, bare `ria-agent stream` works if `register` was run first.
### 4. `config.py` — Add `api_key` field (optional)
Add `api_key: str = ""` to `AgentConfig` so the hub API key can be persisted for re-registration or other API calls. Not strictly required but useful.
---
## Changes already done in ria-hub (Part B)
The server side is ready:
- `POST /screens/agents/register` — accepts `{"name": "..."}` with `X-API-Key` header, returns `{"agent_id": "...", "token": "..."}`
- `GET /screens/agent/ws?agent_id=...` — WebSocket endpoint, authenticates via `Authorization: Bearer {token}` header
- Agent token is hashed (SHA-256) and stored in MongoDB; lookup happens on WS connect
The Go proxy for `/screens/agents/register` through port 3005 still needs to be added (currently agents must hit FastAPI port 8005 directly). That's a ria-hub task, not ria-toolkit-oss.
---
## Summary of file changes
| File | Change |
|------|--------|
| `src/ria_toolkit_oss/agent/cli.py` | `register` calls server API, new flags `--hub`/`--api-key`; fix `_derive_ws_url` path |
| `src/ria_toolkit_oss/agent/config.py` | Optional: add `api_key` field to `AgentConfig` |
| `tests/agent/test_cli.py` | Update register tests for new server-calling behavior |
---
## Validated E2E flow (what works today)
We tested the full pipeline on whitehorse with a real Pluto SDR:
1. Agent connects via WebSocket with bearer token auth ✅
2. Server sends `start` with `radio_config` via Redis pub/sub → agent ✅
3. Agent opens Pluto, streams interleaved float32 IQ via binary WS frames ✅
4. FastAPI pushes frames to Redis list, Celery worker's `AgentDataSource.next_chunk()` BLPOP reads them ✅
5. Inference loop runs on live agent data identically to direct SDR mode ✅
The only manual friction is the multi-step registration and URL construction — which these CLI changes eliminate.