25 KiB
Agent TX Streaming — Implementation Plan (ria-toolkit-oss)
Scope: Part A of agent_tx_plan.md. This repo only.
Goal: Make the agent accept hub-originated TX control + binary IQ, stream it to the SDR in full duplex with RX, and enforce agent-local safety caps.
Acceptance: pytest tests/agent/ green; ria-agent stream --allow-tx accepts a tx_start against MockSDR and round-trips binary frames to _stream_tx.
Each phase below lands independently. After every phase the existing agent tests must still pass (no regressions), and the phase's own new tests must be green.
Preconditions
--allow-txis opt-in at CLI level. Default config hastx_enabled=False; the agent will reject all TX control frames from the hub.- Pluto FDD: one
adi.Plutoinstance serves both RX and TX. We share the SDR between sessions keyed by(device, identifier). - Known pre-existing bug:
sdr/pluto.py:151sets_rx_initialized = Falseinsideinit_tx. Our streamer's RX path (sdr.rx(n)) does not read this flag, so FDD still works. Leave the bug for a separate follow-up; do not refactor Pluto in this plan.
Glossary
- Session = a
(app_id, direction)pair held by the agent: oneRxSessionor oneTxSession. - Direction =
"rx"(agent → hub binary) or"tx"(hub → agent binary). - Shared SDR = when the same
(device, identifier)is referenced by an RX and TX session concurrently; both sessions hold the same driver instance.
Phase 1 — WS binary ingress
Why first: protocol-plumbing only. No behavior change for existing RX, but unblocks every later phase.
Touches
src/ria_toolkit_oss/agent/ws_client.py— add optionalon_binarycallback toWsClient.run.tests/agent/test_ws_client.py— add a "server sends binary, handler receives" case.
Shape
# ws_client.py
BinaryHandler = Callable[[bytes], Awaitable[None]]
async def run(
self,
on_message: MessageHandler,
heartbeat: HeartbeatBuilder,
on_binary: BinaryHandler | None = None, # NEW, default preserves old behavior
) -> None:
...
async for raw in self._ws:
if isinstance(raw, bytes):
if on_binary is not None:
try:
await on_binary(raw)
except Exception:
logger.exception("on_binary handler raised; dropping frame")
else:
logger.debug("Discarding unexpected %d-byte binary frame", len(raw))
continue
# ... existing JSON dispatch unchanged
Acceptance
- New test: local
websocketsserver pushes a binary frame after JSON handshake → handler sees exact bytes. - Existing
test_ws_client.pycases still pass withon_binary=None.
Phase 2 — Config + CLI TX opt-in
Why second: small, isolated, and gives the rest of the phases a real AgentConfig.tx_enabled / caps to read.
Touches
src/ria_toolkit_oss/agent/config.pysrc/ria_toolkit_oss/agent/cli.pytests/agent/test_config.py- new
tests/agent/test_cli_tx.py
config.py
Extend the dataclass and preserve backward-compat for old JSON files (the existing extra trick already handles unknown keys, but we want these fields promoted to first-class):
@dataclass
class AgentConfig:
hub_url: str = ""
agent_id: str = ""
token: str = ""
name: str = ""
insecure: bool = False
api_key: str = ""
# NEW — TX interlocks
tx_enabled: bool = False
tx_max_gain_db: float | None = None
tx_max_duration_s: float | None = None
tx_allowed_freq_ranges: list[list[float]] | None = None # JSON-friendly list-of-lists
extra: dict = field(default_factory=dict)
Update load() to pull the new fields and save() to emit them. Preserve the 0o600 chmod behavior.
cli.py
Two entry points need flags:
ria-agent register --hub ... --api-key ...
[--allow-tx]
[--tx-max-gain-db VALUE]
[--tx-max-duration-s VALUE]
[--tx-freq-range LO HI] # repeatable: --tx-freq-range 2.4e9 2.5e9 --tx-freq-range 5.7e9 5.8e9
ria-agent stream
[--allow-tx] # runtime override: sets cfg.tx_enabled for this process only
In _cmd_register: after successful server registration, populate cfg.tx_enabled=bool(args.allow_tx) and caps from argparse before _config.save(cfg).
In _cmd_stream: if args.allow_tx: cfg.tx_enabled = True (before passing cfg to the streamer — which requires plumbing cfg in, see Phase 3).
Acceptance
test_config.pyround-trip: new fields serialize → deserialize cleanly; missing fields in old JSON default correctly.test_cli_tx.py:register --allow-tx --tx-max-gain-db -10writes expected JSON;stream --allow-txsets runtime flag without touching disk.
Phase 3 — Streamer refactor: session model (RX behavior preserved)
Why third: the TX work needs a session-based state machine. Doing the refactor before wiring TX keeps the diff reviewable and keeps the RX regression surface contained.
Goal: replace the flat state (self._sdr, self._app_id, self._capture_task, self._pending_config, self._status) with explicit session objects and an SDR registry, without changing any observable RX behavior.
Touches
src/ria_toolkit_oss/agent/streamer.py(bulk of work)src/ria_toolkit_oss/agent/hardware.py(heartbeat growscapabilities+ optionalsessionssnapshot)src/ria_toolkit_oss/agent/cli.py(plumbcfginto the streamer)tests/agent/test_streamer.py+test_hardware.py— update for new heartbeat shape; keep all RX assertions.
Data model
from dataclasses import dataclass, field
@dataclass
class RxSession:
app_id: str
sdr: Any
device_key: tuple[str, str | None] # (device, identifier)
buffer_size: int
task: asyncio.Task
pending_config: dict = field(default_factory=dict)
@dataclass
class TxSession:
app_id: str
sdr: Any
device_key: tuple[str, str | None]
buffer_size: int
queue: queue.Queue # thread-safe; bytes -> np.complex64 buffers
stop_event: threading.Event
task: asyncio.Task # wraps run_in_executor(sdr._stream_tx, ...)
underrun_policy: str = "pause"
pending_config: dict = field(default_factory=dict)
last_buffer: np.ndarray | None = None # for "repeat" policy
started_at: float = 0.0
max_duration_s: float | None = None
SDR registry (ref-counted)
class _SdrRegistry:
def __init__(self, factory):
self._factory = factory # (device, identifier) -> SDR
self._instances: dict[tuple[str, str|None], tuple[Any, int]] = {}
self._lock = threading.Lock()
def acquire(self, device: str, identifier: str | None):
key = (device, identifier)
with self._lock:
if key in self._instances:
sdr, rc = self._instances[key]
self._instances[key] = (sdr, rc + 1)
return sdr, key
sdr = self._factory(device, identifier)
self._instances[key] = (sdr, 1)
return sdr, key
def release(self, key: tuple[str, str|None]) -> bool:
with self._lock:
sdr, rc = self._instances[key]
if rc <= 1:
del self._instances[key]
return True # caller should close()
self._instances[key] = (sdr, rc - 1)
return False
Streamer state
class Streamer:
def __init__(self, ws, cfg: AgentConfig, sdr_factory=None):
self.ws = ws
self._cfg = cfg
self._registry = _SdrRegistry(sdr_factory or _default_sdr_factory)
self._rx: RxSession | None = None
self._tx: TxSession | None = None
Message dispatch
async def on_message(self, msg: dict) -> None:
t = msg.get("type")
handlers = {
"start": self._handle_rx_start,
"stop": self._handle_rx_stop,
"configure": self._handle_rx_configure,
# TX handlers stubbed here in Phase 3, implemented in Phase 4
"tx_start": self._handle_tx_start,
"tx_stop": self._handle_tx_stop,
"tx_configure": self._handle_tx_configure,
}
handler = handlers.get(t)
if handler is None:
logger.warning("Unknown server message type: %r", t)
return
await handler(msg)
Rename internals: _handle_start → _handle_rx_start, _handle_stop → _handle_rx_stop, etc. Behavior unchanged — just reading/writing self._rx in place of the old flat attributes, and going through the registry for acquire/release.
Heartbeat
# streamer.py
def build_heartbeat(self) -> dict:
status = "streaming" if (self._rx or self._tx) else "idle"
sessions: dict = {}
if self._rx: sessions["rx"] = {"app_id": self._rx.app_id, "state": "streaming"}
if self._tx: sessions["tx"] = {"app_id": self._tx.app_id, "state": self._tx_state()}
return heartbeat_payload(
status=status,
app_id=(self._rx or self._tx).app_id if (self._rx or self._tx) else None,
cfg=self._cfg,
sessions=sessions or None,
)
Update hardware.heartbeat_payload to take cfg (for capabilities/tx_enabled) and optional sessions. Keep unknown-arg compatibility — existing tests can pass cfg=AgentConfig() to get the old shape minus the new fields.
Phase 3 acceptance
- All existing
test_streamer.py/test_integration.py/test_hardware.pycases pass, with the heartbeat additions asserted intest_hardware.py(capabilities =["rx"]whentx_enabled=False). - New test: two
startmessages in sequence with same(device, identifier)both succeed without recreating the SDR (registry hit). (This is a Phase 3 bonus — confirms the registry works before TX consumes it.) - New test:
tx_startwithtx_enabled=Falsereturnstx_status: error(handler stubs can do just this much in Phase 3, full implementation lands in Phase 4).
Phase 4 — TX implementation
Why fourth: now that binary arrives, config exists, and sessions exist, wire up real TX.
Touches
src/ria_toolkit_oss/agent/streamer.py- Potentially a small helper module
src/ria_toolkit_oss/agent/_tx_loop.pyif streamer.py gets unwieldy. tests/agent/test_streamer_tx.py,test_tx_safety.py,test_tx_underrun.py,test_full_duplex.py
Binary ingress
async def on_binary(self, data: bytes) -> None:
if self._tx is None:
logger.debug("Dropping %d-byte binary frame: no TX session", len(data))
return
try:
self._tx.queue.put(data, timeout=2.0) # backpressure: block if full
except queue.Full:
logger.warning("TX queue stalled; dropping frame (agent side)")
Wire this in via ws.run(..., on_binary=self.on_binary) — change run_streamer()'s ws.run call accordingly.
_handle_tx_start
async def _handle_tx_start(self, msg: dict) -> None:
app_id = msg.get("app_id") or ""
cfg_radio = dict(msg.get("radio_config") or {})
# 1) interlocks
if not self._cfg.tx_enabled:
return await self._send_tx_error(app_id, "tx disabled on this agent")
gain = cfg_radio.get("tx_gain")
if self._cfg.tx_max_gain_db is not None and gain is not None and float(gain) > self._cfg.tx_max_gain_db:
return await self._send_tx_error(app_id, f"tx_gain {gain} exceeds cap {self._cfg.tx_max_gain_db}")
freq = cfg_radio.get("tx_center_frequency")
if self._cfg.tx_allowed_freq_ranges and freq is not None:
if not any(lo <= float(freq) <= hi for lo, hi in self._cfg.tx_allowed_freq_ranges):
return await self._send_tx_error(app_id, f"tx_center_frequency {freq} outside allowed ranges")
if self._tx is not None:
return await self._send_tx_error(app_id, "tx already active on this agent")
# 2) device
device = cfg_radio.pop("device", None)
identifier = cfg_radio.pop("identifier", None)
buffer_size = int(cfg_radio.pop("buffer_size", 1024))
underrun_policy = cfg_radio.pop("underrun_policy", "pause")
if not device:
return await self._send_tx_error(app_id, "tx_start missing radio_config.device")
try:
sdr, device_key = self._registry.acquire(device, identifier)
_apply_sdr_config(sdr, cfg_radio) # sets tx_* attributes via alias map
# explicit init_tx if the driver supports it
if hasattr(sdr, "init_tx"):
sdr.init_tx(
sample_rate=cfg_radio.get("tx_sample_rate"),
center_frequency=cfg_radio.get("tx_center_frequency"),
gain=cfg_radio.get("tx_gain"),
channel=cfg_radio.get("tx_channel", 0),
gain_mode=cfg_radio.get("tx_gain_mode", "manual"),
)
except Exception as exc:
self._registry.release(device_key)
logger.exception("Failed to init TX on %r", device)
return await self._send_tx_error(app_id, f"tx init failed: {exc}")
# 3) build session + launch loop
self._tx = TxSession(
app_id=app_id,
sdr=sdr,
device_key=device_key,
buffer_size=buffer_size,
queue=queue.Queue(maxsize=8),
stop_event=threading.Event(),
task=None, # filled below
underrun_policy=underrun_policy,
max_duration_s=self._cfg.tx_max_duration_s,
started_at=time.monotonic(),
)
loop = asyncio.get_running_loop()
self._tx.task = loop.run_in_executor(None, self._tx_executor_body)
await self._send_tx_status(app_id, "armed")
# streamer transitions to "transmitting" on the first buffer consumed in the thread;
# schedule a tiny watchdog that emits that status when queue count rises.
TX executor body
Runs in a worker thread. Blocks in the SDR's _stream_tx driven by our callback that pulls from the queue.
def _tx_executor_body(self) -> None:
sdr = self._tx.sdr
try:
sdr._stream_tx(self._tx_callback)
except Exception:
logger.exception("TX stream crashed")
# surface via asyncio side
asyncio.run_coroutine_threadsafe(
self._send_tx_status(self._tx.app_id, "error", "stream crashed"),
asyncio.get_event_loop(),
)
def _tx_callback(self, num_samples):
tx = self._tx
if tx is None or tx.stop_event.is_set():
sdr = tx.sdr if tx else None
if sdr is not None:
sdr.pause_tx()
return _silence(num_samples)
# duration watchdog
if tx.max_duration_s is not None and (time.monotonic() - tx.started_at) > tx.max_duration_s:
tx.stop_event.set()
tx.sdr.pause_tx()
_schedule(self._send_tx_status(tx.app_id, "done", "max duration reached"))
return _silence(num_samples)
try:
raw = tx.queue.get(timeout=0.1)
except queue.Empty:
return self._underrun_fill(tx, num_samples)
samples = np.frombuffer(raw, dtype=np.float32)
# interleaved float32 -> complex64
if samples.size % 2 != 0 or samples.size // 2 != num_samples:
# malformed / wrong-sized frame; underrun this cycle
logger.warning("TX frame size mismatch: got %d floats, expected %d", samples.size, num_samples * 2)
return self._underrun_fill(tx, num_samples)
complex_samples = samples.reshape(-1, 2).view(np.complex64).reshape(-1)
tx.last_buffer = complex_samples
return complex_samples
Helper _underrun_fill:
def _underrun_fill(self, tx: TxSession, num_samples: int):
if tx.underrun_policy == "zero":
return np.zeros(num_samples, dtype=np.complex64)
if tx.underrun_policy == "repeat" and tx.last_buffer is not None:
return tx.last_buffer[:num_samples] if tx.last_buffer.size >= num_samples \
else np.concatenate([tx.last_buffer,
np.zeros(num_samples - tx.last_buffer.size, dtype=np.complex64)])
# "pause" (default)
tx.stop_event.set()
tx.sdr.pause_tx()
_schedule(self._send_tx_status(tx.app_id, "underrun"))
return np.zeros(num_samples, dtype=np.complex64)
_schedule() is a tiny wrapper around asyncio.run_coroutine_threadsafe that resolves the loop once at streamer construction.
_handle_tx_stop
async def _handle_tx_stop(self, msg: dict) -> None:
tx = self._tx
if tx is None:
return
tx.stop_event.set()
tx.sdr.pause_tx()
# drain the queue so the executor thread wakes
try:
while True:
tx.queue.get_nowait()
except queue.Empty:
pass
# wait up to ~1s for the executor thread to finish
if tx.task is not None:
try:
await asyncio.wait_for(asyncio.wrap_future(tx.task), timeout=1.0)
except asyncio.TimeoutError:
logger.warning("TX executor did not exit within 1s after stop")
# release SDR reference
should_close = self._registry.release(tx.device_key)
if should_close:
try:
tx.sdr.close()
except Exception:
logger.exception("Error closing SDR on tx_stop")
self._tx = None
await self._send_tx_status(msg.get("app_id") or "", "done")
_handle_tx_configure
async def _handle_tx_configure(self, msg: dict) -> None:
if self._tx is None:
return
self._tx.pending_config.update(msg.get("radio_config") or {})
Consume pending_config at the top of _tx_callback before pulling from the queue (same pattern as RX's _capture_loop), using _apply_sdr_config with tx aliases.
_apply_sdr_config — extend alias map
_CONFIG_ATTR_MAP = {
# existing RX aliases...
"sample_rate": ("sample_rate", "rx_sample_rate"),
"center_frequency": ("center_freq", "rx_center_frequency"),
"gain": ("gain", "rx_gain"),
"bandwidth": ("bandwidth", "rx_bandwidth"),
# NEW TX aliases
"tx_sample_rate": ("tx_sample_rate",),
"tx_center_frequency": ("tx_center_frequency", "tx_lo"),
"tx_gain": ("tx_gain",),
"tx_bandwidth": ("tx_bandwidth",),
}
Pluto has set_tx_sample_rate, set_tx_center_frequency, set_tx_gain — those are called by init_tx using the attribute values, so setting attributes via _apply_sdr_config + calling init_tx is sufficient.
Phase 4 acceptance
test_streamer_tx.py: full happy path —tx_startagainst MockSDR → 3 binary frames → verify_stream_txcallback received them in order →tx_stop→ session cleared, SDR closed.test_tx_safety.py: one test per cap —tx_enabled=False, gain cap, freq range, duplicate session. Each produces atx_status: errorJSON; registry shows zero outstanding acquires.test_tx_underrun.py: three tests —pause(session ends,underrunemitted),zero(callback returns zeros, no status change),repeat(callback returns last buffer).test_full_duplex.py: against MockSDR, sendstart+tx_startwith same(device=mock, identifier=None)→ registry ref-count = 2 → both sessions stream independently → stop one, other still runs → stop second, SDR closed.
Phase 5 — Integration + docs
Touches:
tests/agent/test_integration_tx.py— end-to-end with a real localwebsocketsserver + MockSDR. Mirrortest_integration.py's shape: register → heartbeat withtx_enabled=True→ tx_start → 3 binary frames → tx_stop.docs/agent_tx_protocol.md— short, user-facing: message types, binary format, heartbeat additions, interlock config, CLI examples. Link from screens_agent_handoff.md.README.md(if it mentions agent subcommands) — add--allow-txusage.
Real-Pluto smoke test (manual, not in CI):
ria-agent register --hub http://hub:3005 --api-key KEY --allow-tx --tx-max-gain-db -10 --tx-freq-range 2.4e9 2.5e9ria-agent stream- From a Python REPL with
websockets, open the hub WS on the agent's behalf (bypass hub during dev), send atx_start+ binary frames of a 1kHz tone → confirm carrier on a spectrum analyzer at the configured frequency.
File-by-file summary
| File | Phase | Change |
|---|---|---|
src/ria_toolkit_oss/agent/ws_client.py |
1 | Add on_binary callback. |
src/ria_toolkit_oss/agent/config.py |
2 | Add tx_enabled, tx_max_gain_db, tx_max_duration_s, tx_allowed_freq_ranges. |
src/ria_toolkit_oss/agent/cli.py |
2, 4 | --allow-tx + cap flags on register; --allow-tx on stream; plumb cfg into Streamer. |
src/ria_toolkit_oss/agent/hardware.py |
3 | heartbeat_payload(cfg, sessions) with capabilities, tx_enabled. |
src/ria_toolkit_oss/agent/streamer.py |
3, 4 | Session refactor, SDR registry, TX dispatch, TX loop, underrun fills, _apply_sdr_config TX aliases. |
src/ria_toolkit_oss/agent/_tx_loop.py |
4 (opt) | Extracted TX callback helpers if streamer.py > ~400 lines. |
tests/agent/test_ws_client.py |
1 | Binary-frame case. |
tests/agent/test_config.py |
2 | Round-trip new fields. |
tests/agent/test_cli_tx.py |
2 | New — --allow-tx flag handling. |
tests/agent/test_hardware.py |
3 | Heartbeat capabilities + sessions. |
tests/agent/test_streamer.py |
3 | Refactor for session model; RX assertions unchanged. |
tests/agent/test_streamer_tx.py |
4 | New — TX happy path. |
tests/agent/test_tx_safety.py |
4 | New — cap enforcement. |
tests/agent/test_tx_underrun.py |
4 | New — pause/zero/repeat policies. |
tests/agent/test_full_duplex.py |
4 | New — shared SDR ref count. |
tests/agent/test_integration_tx.py |
5 | New — real websockets server E2E. |
docs/agent_tx_protocol.md |
5 | New — operator-facing protocol doc. |
Implementation gotchas (do not skip)
-
Asyncio ↔ thread bridge. The SDR's
_stream_txis synchronous and runs in an executor thread. Its callback must notawait. Usequeue.Queue(thread-safe) for inbound buffers andasyncio.run_coroutine_threadsafe(coro, loop)to emittx_statusfrom inside the thread. Resolvelooponce at streamer construction; don't callget_event_loop()from the thread. -
sdr.pause_tx()from inside the callback. Pluto's_stream_txloop condition iswhile self._enable_tx is True. Callingpause_tx()inside the callback sets_enable_tx = Falseso the NEXT iteration exits. That's fine — it may emit one trailing zero-filled buffer. Document this; don't try to exit mid-callback. -
Queue drain on stop. When
_handle_tx_stopsetsstop_eventand pauses TX, the executor thread may still be blocked inqueue.get(timeout=0.1). Draining the queue does not unblock a timed get. Rely on the 100ms timeout; the thread exits on the next iteration. Don't try to clever-inject a poison pill. -
Interleaved float32 → complex64 conversion.
np.frombuffer(buf, dtype=np.float32).view(np.complex64)is zero-copy and correct whenbuf.sizeis a multiple of 8 bytes. Validate size first; mismatched size = underrun for that cycle, don't crash the thread. -
MockSDR's
_stream_tx(sdr/mock.py:96-100) callscallback(self.rx_buffer_size)— it passes a size, not samples. The TX callback contract is "I am givennum_samples, I return that many complex64 samples."test_streamer_txmust respect this: the test'ssdr.tx_buffer_size(if used) doesn't affect what the callback receives from mock. Simplest path: setMockSDR.rx_buffer_size = buffer_sizein the test harness before_stream_txis invoked, so the TX callback receives the right size. -
init_txon MockSDR vs Pluto. MockSDR'sinit_txsets attributes and flips_tx_initialized = True. Pluto's does the same plus_rx_initialized = False(the FDD bug). For full-duplex tests we currently target MockSDR only — Pluto FDD will work because our RX path ignores_rx_initialized, but the real-Pluto smoke test is the only validation. Call that out in the PR description. -
Don't block the event loop.
asyncio.wait_for(asyncio.wrap_future(tx.task), timeout=1.0)in_handle_tx_stopis non-blocking from the loop's perspective — the 1s cap prevents a misbehaving driver from stalling heartbeat/RX. -
Heartbeat during TX. The existing heartbeat loop runs on a 30s timer. Sessions snapshot is cheap; no locking needed if we read
self._rx/self._txreferences atomically (Python ref swap is GIL-safe for single field reads).
Rollout
- Open a single PR per phase (1 → 2 → 3 → 4 → 5), each green on its own.
- Phase 3 is the riskiest diff (RX refactor). Get a second reviewer if possible; the regression surface is all of current RX behavior.
- After Phase 4 merges,
ria-agent stream --allow-txis a usable toy — you can hand-drive it from a Python REPL withwebsocketsto validate against real hardware before the hub side is ready. - Phase 5 closes the loop and ships the user-facing docs.
Out of scope (explicit)
- Multi-app-per-agent — one RX + one TX per agent in v1. Adding session IDs to binary frames is a v2 protocol bump.
- Other TX drivers (HackRF, USRP, bladeRF) — wiring
_CONFIG_ATTR_MAPentries and verifying_stream_txbehavior per-driver. Tackle when the hub has an operator that targets them. - Resampling / clock drift — agent treats the hub-supplied samples as authoritative. Drift manifests as underruns; the underrun policy is the only mitigation.
- Fixing Pluto's
init_tx_rx_initialized = Falsereset — pre-existing, not triggered by our RX path, left for a separate cleanup.