Push Tracker
ria-toolkit-oss/Agent TX Streaming Handoff.md

223 lines
14 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Agent TX Streaming — `ria-toolkit-oss` Handoff
**Paired repo:** `ria-hub` (this doc lives here, but it's written for the Claude working in `ria-toolkit-oss`)
**Source of truth for the overall design:** [Agent TX Streaming - Cross-Repo Plan.md](./Agent%20TX%20Streaming%20-%20Cross-Repo%20Plan.md) — read that first.
**Status (ria-hub side):** landed 2026-04-16. Ready to talk to a TX-capable agent.
---
## Your job
Implement Part A of the plan in `ria-toolkit-oss` (§A1A8). The hub is already speaking the protocol below and waiting for an agent that can:
1. Accept hub → agent binary TX buffers over an existing WebSocket.
2. Enforce the operator-configured TX interlocks (`tx_enabled`, `tx_max_gain_db`, `tx_max_duration_s`, `tx_allowed_freq_ranges`).
3. Drive `sdr.init_tx` / `_stream_tx` so a real Pluto transmits what the hub sends.
4. Report status back as `tx_status` JSON frames.
Full-duplex with the existing RX session on the same `app_id` must keep working.
---
## What ria-hub does (so you know what's on the other end of the wire)
You don't need to know any of this to do your work — but if you hit a wall, here's the mental model:
| Hub-side concept | What it does |
|---|---|
| `AgentTxSink` (Python, Celery worker) | Mirrors `AgentDataSource`. Publishes `tx_start`/`tx_stop`/`tx_configure` control JSON and raw binary IQ buffers intended for the agent. |
| `/screens/agent/ws` FastAPI endpoint | The WebSocket you already connect to. Now also pumps hub → agent binary TX frames and republishes your `tx_status` JSON upstream to the Celery task. |
| Redis channels (`agent:tx_iq:*`, `agent:events:*`) | Internal to the hub. You will never see them. Everything reaches you as WS frames. |
| Capability gate | Hub refuses to launch a TX app unless it's seen a recent heartbeat from you with `"tx" ∈ capabilities` and `tx_enabled: true`. |
| Audit log (`AgentTxAudit`) | Hub persists who started what transmission at what frequency and gain. Your error messages in `tx_status` end up in that record. |
**Bottom line: from your process, this is still the same WebSocket you've been using. You're just getting new message types and a new class of binary frames going the other direction.**
---
## Protocol contract (the only thing you actually need)
All additions. Existing RX messages (`start`/`stop`/`configure` + agent → hub binary) keep their current semantics — do not touch them.
### Hub → agent
**JSON control frames** (text WS frames):
```jsonc
// Arm TX. Call sdr.init_tx with this radio_config and start _stream_tx.
// After this you'll start receiving binary frames (see below) that go into
// the stream callback.
{
"type": "tx_start",
"app_id": "app-abc",
"radio_config": {
"device": "pluto",
"identifier": "ip:192.168.3.1",
"tx_sample_rate": 1000000,
"tx_center_frequency": 2450000000,
"tx_gain": -20, // dB. Pluto: negative = attenuation.
"tx_bandwidth": 1000000, // optional
"buffer_size": 1024, // optional; complex samples per buffer
"underrun_policy": "pause" // "pause" | "zero" | "repeat"
}
}
// Stop TX, drain queue, pause_tx. RX session on the same app_id (if any)
// stays alive.
{ "type": "tx_stop", "app_id": "app-abc" }
// Apply parameter changes at the next buffer boundary. Any subset of
// radio_config fields.
{ "type": "tx_configure", "app_id": "app-abc", "radio_config": { "tx_gain": -25 } }
// Advisory — safe to ignore. Hub publishes this whenever it RPUSHes a new
// binary buffer; it was wired so the WS bridge wakes up promptly. You do
// NOT need to act on it. Consider it a keepalive.
{ "type": "tx_data_available", "app_id": "app-abc" }
```
**Binary frames** (binary WS frames):
* Raw interleaved `float32` IQ samples in `[-1, 1]`.
* One frame = one buffer.
* Byte length is always `num_complex_samples × 8` (8 bytes per complex sample: two float32s).
* **Only valid between `tx_start` and `tx_stop`.** If you receive a binary frame outside that window, drop it and log WARN — don't crash, don't panic.
Format validator is already in `ria_toolkit_oss.sdr.sdr._verify_sample_format` — reuse it.
### Agent → hub
**JSON status frames** (text WS frames). Use the existing `send_json` path:
```jsonc
// Lifecycle — emit on every state transition.
{ "type": "tx_status", "app_id": "app-abc", "state": "armed" }
{ "type": "tx_status", "app_id": "app-abc", "state": "transmitting" }
{ "type": "tx_status", "app_id": "app-abc", "state": "underrun" }
{ "type": "tx_status", "app_id": "app-abc", "state": "done" }
// Errors — include a human-readable message. Hub surfaces it to the UI
// and writes it into the audit record.
{ "type": "tx_status", "app_id": "app-abc", "state": "error",
"message": "gain -5 exceeds tx_max_gain_db=-15" }
```
**States** (hub assumes this vocabulary):
* `armed``init_tx` done, callback started, queue empty, nothing transmitting yet.
* `transmitting` — at least one buffer has flowed through the callback.
* `underrun` — queue drained; what you do next depends on `underrun_policy`:
* `"pause"` → call `pause_tx()`, emit `underrun`, stay paused until the hub sends a fresh `tx_start`.
* `"zero"` → continue with `np.zeros(...)` fills, still emit `underrun` once so the hub can show the indicator.
* `"repeat"` → loop the last good buffer, emit `underrun` once.
* `done` — clean stop after `tx_stop`.
* `error` — capability rejection or hardware failure. Include `message`.
**Extended heartbeat** — you are already sending heartbeats. Grow the payload:
```jsonc
{
"type": "heartbeat",
"hardware": ["mock", "pluto"],
"status": "streaming", // unchanged semantics
"capabilities": ["rx", "tx"], // NEW — derived from tx_enabled + SDR class having init_tx
"tx_enabled": true, // NEW — mirror of cfg.tx_enabled
"tx_max_gain_db": -10, // NEW — optional, from agent config
"tx_max_duration_s": 60, // NEW — optional
"tx_allowed_freq_ranges": [[2.4e9, 2.5e9]], // NEW — optional
"sessions": { // NEW — optional per-session snapshot
"rx": { "app_id": "app-abc", "state": "streaming" },
"tx": { "app_id": "app-abc", "state": "transmitting" }
},
"app_id": "app-abc" // keep for back-compat
}
```
The hub reads these fields, stores them on `ScreensAgent`, and gates TX launches on them. **If you don't advertise `tx` in `capabilities` and `tx_enabled: true`, the hub will refuse to start any TX app with HTTP 400 — no WS traffic will be generated.**
### Backpressure model (what happens when you can't keep up)
* The hub caps its outbound TX queue at 200 buffers. If it fills, the hub either blocks on `write()` or drops the oldest buffer — both are benign for you.
* On the agent side, enforce your own cap (plan §A2 suggests 8 buffers). When full, `await ws.send` on the hub will slow via TCP/WS backpressure. You don't need an application-level flow-control message.
---
## Implementation roadmap (mapped to the Cross-Repo Plan)
Work in the order below. Each row is a single PR-sized unit.
| # | Plan ref | Deliverable | Acceptance |
|---|---|---|---|
| 1 | §A3 | `AgentConfig` gains `tx_enabled`, `tx_max_gain_db`, `tx_max_duration_s`, `tx_allowed_freq_ranges`. `save()` keeps 0600. | Unit test: round-trip through `~/.ria/agent.json`. |
| 2 | §A4 | `ria-agent register --allow-tx --tx-max-gain-db -10 --tx-max-duration 60` persists into config. `ria-agent stream --allow-tx` is a runtime override. | Integration test: `ria-agent register --allow-tx` then `cat ~/.ria/agent.json` shows fields. |
| 3 | §A1 | `ws_client.run()` grows a `on_binary: Callable[[bytes], Awaitable[None]]` parameter. Reconnect + heartbeat + malformed-frame behavior unchanged. | Existing `test_ws_client.py` still passes; new `test_ws_client_binary.py` asserts bytes reach the handler. |
| 4 | §A2 | Replace flat `self._sdr` / `self._app_id` state in `streamer.py` with `RxSession` + `TxSession` dataclasses. SDR instances cached by `(device, identifier)` so RX+TX share one handle on the same device. | Unit test: creating a TxSession on the same device as an active RxSession reuses the same SDR object. |
| 5 | §A2 | `_handle_tx_start`, `_handle_tx_stop`, `_handle_tx_configure` + `on_binary(data)``self._tx.queue.put(data)`. TX loop runs `_stream_tx` in an executor thread with a thread-safe `queue.Queue` adapter. | Integration test against MockSDR: tx_start → 10 binary frames → tx_stop produces exactly those samples through the callback. |
| 6 | §A2 | Underrun handling: `"pause"` / `"zero"` / `"repeat"` fills. Emits `tx_status: underrun` exactly once per drain event. | Unit test per policy against a slow producer. |
| 7 | §A2 | Cap enforcement **before** opening the SDR: reject with `tx_status: error` if `tx_enabled=False`, gain exceeds cap, freq outside allowed ranges, or duration cap exceeded (watchdog in TX loop calls `tx_stop` after `tx_max_duration_s`). | Unit test per rejection path; SDR is never opened when rejection fires. |
| 8 | §A5 | Heartbeat grows `capabilities`, `tx_enabled`, optional caps, `sessions`. | Integration test: start agent with `--allow-tx`, connect, verify heartbeat payload. |
| 9 | §A6 | Audit the Pluto driver's `_tx_lock` + `_param_lock` interaction to ensure concurrent RX + TX on the same `adi.Pluto` doesn't race on attribute writes. `MockSDR.init_tx` already exists — no change needed. | Stress test: 30 seconds of concurrent RX + TX on MockSDR with `_param_lock` instrumented for contention. |
| 10 | §A7 | Test matrix per plan: `test_streamer_tx`, `test_tx_safety`, `test_tx_underrun`, `test_full_duplex`, `test_ws_client_binary`, `test_integration_tx`. | All green in CI. |
| 11 | §A8 | Docs: new `docs/agent_tx_protocol.md` OR extended section in existing agent protocol doc. Regulatory disclaimer included. | Lints + renders. |
**Ship order advice:** 1 → 2 → 3 → 4 → (5 || 6) → 7 → 8 → 9 → 10 → 11. Steps 13 are strict prerequisites for everything else. Steps 5 and 6 can parallelize. Step 7 can't land without 5.
---
## Verification loop (how to prove the two sides talk)
Once you've implemented §A1A7, use this to close the loop with the live hub:
1. On the agent host, run `ria-agent register --allow-tx --tx-max-gain-db -10 --tx-max-duration 60` then `ria-agent stream`.
2. Confirm the hub has seen the heartbeat: `curl $HUB/screens/agents/json | jq '.agents[] | select(.agent_id==...) | {tx_enabled, capabilities}'` should show `tx_enabled: true` and `["rx","tx"]`.
3. Create a Screens app whose manifest contains a `dataSink.type == "agent"` pointing at your `agent_id`, or use the composer UI with a `PlutoTXOp` in the graph + the new TX-sink agent picker.
4. `POST /screens/apps/{id}/start` on the hub. You should observe, in order:
1. `tx_start` JSON on your WS.
2. Binary frames arriving (if the hub-side operator is actually generating buffers — may no-op for now since the operator refactor is planned but not done).
3. Your `tx_status: armed` JSON emitted back.
5. Stop the app. You should receive `tx_stop` and emit `tx_status: done`.
6. Provoke a rejection: set `tx_max_gain_db: -15` in your config, then start a TX app with `tx_gain: -5`. The hub should return `HTTP 400` from `/start` without any WS traffic — capability gate fires first. If you make it past the gate and it's still wrong, emit `tx_status: error` and the hub will surface the message to the UI.
**Useful hub-side greps if something is wrong:**
* `grep -r "tx_status" controller/app/modules/screens/` — see how the hub parses your frames.
* `grep -r "tx_enabled" controller/app/modules/screens/` — see what heartbeat fields the hub reads.
* `controller/app/modules/screens/agent_ws.py:200-290` — the WS handler's JSON dispatch.
* `controller/app/modules/screens/data_sinks.py` — what the hub publishes on each control frame.
---
## Open questions (from the original plan that still apply)
Answered since the plan was written:
* ✅ **Operator name:** `PlutoTXOp` (PascalCase, stored in the hub's MongoDB `ops` collection via the application packager).
* ✅ **Redis channel naming:** kept `agent:*` prefix on the hub side — you never see this.
* ✅ **Status plumbing:** `tx_status` frames get republished on a hub-internal pub/sub and surface to the UI through the existing SSE stream. You just send the frames; the hub does the rest.
Still open (flag when you have a preference):
* **Bulk + loop fast-path.** If the hub's TX operator turns out to be a fixed recording played on loop, we could add a `{ "type": "tx_start", ..., "loop": true }` variant where the hub sends the buffer once and the agent uses the existing `tx_recording` path. Protocol-compatible with the streaming version. Defer until a real use case demands it.
* **Multi-app-per-agent.** Out of scope for v1 (§Non-goals). If/when needed: prefix binary frames with a 4-byte session header and bump a `protocol_version` in the heartbeat.
* **TX clock drift.** Relying on generous queue depth + stable local networks for v1. Longer term may need agent-side resampling.
---
## What lives in `ria-hub` now (reference)
You don't need to read any of this, but if you're curious or need to debug the integration, these are the load-bearing bits on the hub side:
| Path | What |
|---|---|
| `controller/app/modules/screens/data_sinks.py` | `AgentTxSink`, `LocalPlutoTxSink`, `build_data_sink` |
| `controller/app/modules/screens/agent_ws.py` | `_forward_tx_binary`, heartbeat parsing, `tx_status` republish |
| `controller/app/modules/screens/graph_derivation.py` | `_pluto_tx_spec_mapping`, `_SDR_SINK_MAP`, `_derive_data_sink` |
| `controller/app/modules/screens/routes.py` | `_check_agent_tx_capability`, `AgentTxAudit` write, `POST /apps/{id}/sink-agent` |
| `controller/app/modules/screens/models.py` | `ScreensAgent` TX fields, `AgentTxAudit` document |
| `schemas/screens/app_manifest.schema.json` | `dataSink` schema block |
| `web_src/js/components/screens/components/TxConsentModal.vue` | Pre-transmit consent dialog |
| `web_src/js/components/screens/components/SinkPanel.vue` | TX-capable agent picker + live `tx_status` indicator |
| `web_src/js/components/screens/ScreensApp.vue` | Consent gate + `tx_status` forwarding to children |
---
## Regulatory note (keep this in your docs too)
Transmission is regulated in every jurisdiction. The agent-side interlocks (`tx_enabled`, caps, freq ranges) exist so the operator can configure safe defaults for an agent's physical location. They are not a substitute for licensing or for respecting local regulations. The hub shows a consent modal and writes an audit log so actions are attributable. None of this is a legal compliance layer — it's defense-in-depth.