# Agent TX Streaming — Implementation Plan (`ria-toolkit-oss`) **Scope:** Part A of [agent_tx_plan.md](./agent_tx_plan.md). This repo only. **Goal:** Make the agent accept hub-originated TX control + binary IQ, stream it to the SDR in full duplex with RX, and enforce agent-local safety caps. **Acceptance:** `pytest tests/agent/` green; `ria-agent stream --allow-tx` accepts a `tx_start` against MockSDR and round-trips binary frames to `_stream_tx`. Each phase below lands independently. After every phase the existing agent tests must still pass (no regressions), and the phase's own new tests must be green. --- ## Preconditions - `--allow-tx` is opt-in at CLI level. Default config has `tx_enabled=False`; the agent will reject all TX control frames from the hub. - Pluto FDD: one `adi.Pluto` instance serves both RX and TX. We share the SDR between sessions keyed by `(device, identifier)`. - Known pre-existing bug: [`sdr/pluto.py:151`](../src/ria_toolkit_oss/sdr/pluto.py#L151) sets `_rx_initialized = False` inside `init_tx`. Our streamer's RX path (`sdr.rx(n)`) does not read this flag, so FDD still works. Leave the bug for a separate follow-up; do not refactor Pluto in this plan. ## Glossary - **Session** = a `(app_id, direction)` pair held by the agent: one `RxSession` or one `TxSession`. - **Direction** = `"rx"` (agent → hub binary) or `"tx"` (hub → agent binary). - **Shared SDR** = when the same `(device, identifier)` is referenced by an RX and TX session concurrently; both sessions hold the same driver instance. --- ## Phase 1 — WS binary ingress **Why first:** protocol-plumbing only. No behavior change for existing RX, but unblocks every later phase. ### Touches - `src/ria_toolkit_oss/agent/ws_client.py` — add optional `on_binary` callback to `WsClient.run`. - `tests/agent/test_ws_client.py` — add a "server sends binary, handler receives" case. ### Shape ```python # ws_client.py BinaryHandler = Callable[[bytes], Awaitable[None]] async def run( self, on_message: MessageHandler, heartbeat: HeartbeatBuilder, on_binary: BinaryHandler | None = None, # NEW, default preserves old behavior ) -> None: ... async for raw in self._ws: if isinstance(raw, bytes): if on_binary is not None: try: await on_binary(raw) except Exception: logger.exception("on_binary handler raised; dropping frame") else: logger.debug("Discarding unexpected %d-byte binary frame", len(raw)) continue # ... existing JSON dispatch unchanged ``` ### Acceptance - New test: local `websockets` server pushes a binary frame after JSON handshake → handler sees exact bytes. - Existing `test_ws_client.py` cases still pass with `on_binary=None`. --- ## Phase 2 — Config + CLI TX opt-in **Why second:** small, isolated, and gives the rest of the phases a real `AgentConfig.tx_enabled` / caps to read. ### Touches - `src/ria_toolkit_oss/agent/config.py` - `src/ria_toolkit_oss/agent/cli.py` - `tests/agent/test_config.py` - new `tests/agent/test_cli_tx.py` ### config.py Extend the dataclass and preserve backward-compat for old JSON files (the existing `extra` trick already handles unknown keys, but we want these fields promoted to first-class): ```python @dataclass class AgentConfig: hub_url: str = "" agent_id: str = "" token: str = "" name: str = "" insecure: bool = False api_key: str = "" # NEW — TX interlocks tx_enabled: bool = False tx_max_gain_db: float | None = None tx_max_duration_s: float | None = None tx_allowed_freq_ranges: list[list[float]] | None = None # JSON-friendly list-of-lists extra: dict = field(default_factory=dict) ``` Update `load()` to pull the new fields and `save()` to emit them. Preserve the `0o600` chmod behavior. ### cli.py Two entry points need flags: ``` ria-agent register --hub ... --api-key ... [--allow-tx] [--tx-max-gain-db VALUE] [--tx-max-duration-s VALUE] [--tx-freq-range LO HI] # repeatable: --tx-freq-range 2.4e9 2.5e9 --tx-freq-range 5.7e9 5.8e9 ``` ``` ria-agent stream [--allow-tx] # runtime override: sets cfg.tx_enabled for this process only ``` In `_cmd_register`: after successful server registration, populate `cfg.tx_enabled=bool(args.allow_tx)` and caps from argparse before `_config.save(cfg)`. In `_cmd_stream`: `if args.allow_tx: cfg.tx_enabled = True` (before passing `cfg` to the streamer — which requires plumbing `cfg` in, see Phase 3). ### Acceptance - `test_config.py` round-trip: new fields serialize → deserialize cleanly; missing fields in old JSON default correctly. - `test_cli_tx.py`: `register --allow-tx --tx-max-gain-db -10` writes expected JSON; `stream --allow-tx` sets runtime flag without touching disk. --- ## Phase 3 — Streamer refactor: session model (RX behavior preserved) **Why third:** the TX work needs a session-based state machine. Doing the refactor before wiring TX keeps the diff reviewable and keeps the RX regression surface contained. **Goal:** replace the flat state (`self._sdr`, `self._app_id`, `self._capture_task`, `self._pending_config`, `self._status`) with explicit session objects and an SDR registry, without changing any observable RX behavior. ### Touches - `src/ria_toolkit_oss/agent/streamer.py` (bulk of work) - `src/ria_toolkit_oss/agent/hardware.py` (heartbeat grows `capabilities` + optional `sessions` snapshot) - `src/ria_toolkit_oss/agent/cli.py` (plumb `cfg` into the streamer) - `tests/agent/test_streamer.py` + `test_hardware.py` — update for new heartbeat shape; keep all RX assertions. ### Data model ```python from dataclasses import dataclass, field @dataclass class RxSession: app_id: str sdr: Any device_key: tuple[str, str | None] # (device, identifier) buffer_size: int task: asyncio.Task pending_config: dict = field(default_factory=dict) @dataclass class TxSession: app_id: str sdr: Any device_key: tuple[str, str | None] buffer_size: int queue: queue.Queue # thread-safe; bytes -> np.complex64 buffers stop_event: threading.Event task: asyncio.Task # wraps run_in_executor(sdr._stream_tx, ...) underrun_policy: str = "pause" pending_config: dict = field(default_factory=dict) last_buffer: np.ndarray | None = None # for "repeat" policy started_at: float = 0.0 max_duration_s: float | None = None ``` ### SDR registry (ref-counted) ```python class _SdrRegistry: def __init__(self, factory): self._factory = factory # (device, identifier) -> SDR self._instances: dict[tuple[str, str|None], tuple[Any, int]] = {} self._lock = threading.Lock() def acquire(self, device: str, identifier: str | None): key = (device, identifier) with self._lock: if key in self._instances: sdr, rc = self._instances[key] self._instances[key] = (sdr, rc + 1) return sdr, key sdr = self._factory(device, identifier) self._instances[key] = (sdr, 1) return sdr, key def release(self, key: tuple[str, str|None]) -> bool: with self._lock: sdr, rc = self._instances[key] if rc <= 1: del self._instances[key] return True # caller should close() self._instances[key] = (sdr, rc - 1) return False ``` ### Streamer state ```python class Streamer: def __init__(self, ws, cfg: AgentConfig, sdr_factory=None): self.ws = ws self._cfg = cfg self._registry = _SdrRegistry(sdr_factory or _default_sdr_factory) self._rx: RxSession | None = None self._tx: TxSession | None = None ``` ### Message dispatch ```python async def on_message(self, msg: dict) -> None: t = msg.get("type") handlers = { "start": self._handle_rx_start, "stop": self._handle_rx_stop, "configure": self._handle_rx_configure, # TX handlers stubbed here in Phase 3, implemented in Phase 4 "tx_start": self._handle_tx_start, "tx_stop": self._handle_tx_stop, "tx_configure": self._handle_tx_configure, } handler = handlers.get(t) if handler is None: logger.warning("Unknown server message type: %r", t) return await handler(msg) ``` Rename internals: `_handle_start → _handle_rx_start`, `_handle_stop → _handle_rx_stop`, etc. Behavior unchanged — just reading/writing `self._rx` in place of the old flat attributes, and going through the registry for acquire/release. ### Heartbeat ```python # streamer.py def build_heartbeat(self) -> dict: status = "streaming" if (self._rx or self._tx) else "idle" sessions: dict = {} if self._rx: sessions["rx"] = {"app_id": self._rx.app_id, "state": "streaming"} if self._tx: sessions["tx"] = {"app_id": self._tx.app_id, "state": self._tx_state()} return heartbeat_payload( status=status, app_id=(self._rx or self._tx).app_id if (self._rx or self._tx) else None, cfg=self._cfg, sessions=sessions or None, ) ``` Update `hardware.heartbeat_payload` to take `cfg` (for `capabilities`/`tx_enabled`) and optional `sessions`. Keep unknown-arg compatibility — existing tests can pass `cfg=AgentConfig()` to get the old shape minus the new fields. ### Phase 3 acceptance - All existing `test_streamer.py` / `test_integration.py` / `test_hardware.py` cases pass, with the heartbeat additions asserted in `test_hardware.py` (capabilities = `["rx"]` when `tx_enabled=False`). - New test: two `start` messages in sequence with same `(device, identifier)` both succeed without recreating the SDR (registry hit). (This is a Phase 3 bonus — confirms the registry works before TX consumes it.) - New test: `tx_start` with `tx_enabled=False` returns `tx_status: error` (handler stubs can do just this much in Phase 3, full implementation lands in Phase 4). --- ## Phase 4 — TX implementation **Why fourth:** now that binary arrives, config exists, and sessions exist, wire up real TX. ### Touches - `src/ria_toolkit_oss/agent/streamer.py` - Potentially a small helper module `src/ria_toolkit_oss/agent/_tx_loop.py` if streamer.py gets unwieldy. - `tests/agent/test_streamer_tx.py`, `test_tx_safety.py`, `test_tx_underrun.py`, `test_full_duplex.py` ### Binary ingress ```python async def on_binary(self, data: bytes) -> None: if self._tx is None: logger.debug("Dropping %d-byte binary frame: no TX session", len(data)) return try: self._tx.queue.put(data, timeout=2.0) # backpressure: block if full except queue.Full: logger.warning("TX queue stalled; dropping frame (agent side)") ``` Wire this in via `ws.run(..., on_binary=self.on_binary)` — change `run_streamer()`'s `ws.run` call accordingly. ### `_handle_tx_start` ```python async def _handle_tx_start(self, msg: dict) -> None: app_id = msg.get("app_id") or "" cfg_radio = dict(msg.get("radio_config") or {}) # 1) interlocks if not self._cfg.tx_enabled: return await self._send_tx_error(app_id, "tx disabled on this agent") gain = cfg_radio.get("tx_gain") if self._cfg.tx_max_gain_db is not None and gain is not None and float(gain) > self._cfg.tx_max_gain_db: return await self._send_tx_error(app_id, f"tx_gain {gain} exceeds cap {self._cfg.tx_max_gain_db}") freq = cfg_radio.get("tx_center_frequency") if self._cfg.tx_allowed_freq_ranges and freq is not None: if not any(lo <= float(freq) <= hi for lo, hi in self._cfg.tx_allowed_freq_ranges): return await self._send_tx_error(app_id, f"tx_center_frequency {freq} outside allowed ranges") if self._tx is not None: return await self._send_tx_error(app_id, "tx already active on this agent") # 2) device device = cfg_radio.pop("device", None) identifier = cfg_radio.pop("identifier", None) buffer_size = int(cfg_radio.pop("buffer_size", 1024)) underrun_policy = cfg_radio.pop("underrun_policy", "pause") if not device: return await self._send_tx_error(app_id, "tx_start missing radio_config.device") try: sdr, device_key = self._registry.acquire(device, identifier) _apply_sdr_config(sdr, cfg_radio) # sets tx_* attributes via alias map # explicit init_tx if the driver supports it if hasattr(sdr, "init_tx"): sdr.init_tx( sample_rate=cfg_radio.get("tx_sample_rate"), center_frequency=cfg_radio.get("tx_center_frequency"), gain=cfg_radio.get("tx_gain"), channel=cfg_radio.get("tx_channel", 0), gain_mode=cfg_radio.get("tx_gain_mode", "manual"), ) except Exception as exc: self._registry.release(device_key) logger.exception("Failed to init TX on %r", device) return await self._send_tx_error(app_id, f"tx init failed: {exc}") # 3) build session + launch loop self._tx = TxSession( app_id=app_id, sdr=sdr, device_key=device_key, buffer_size=buffer_size, queue=queue.Queue(maxsize=8), stop_event=threading.Event(), task=None, # filled below underrun_policy=underrun_policy, max_duration_s=self._cfg.tx_max_duration_s, started_at=time.monotonic(), ) loop = asyncio.get_running_loop() self._tx.task = loop.run_in_executor(None, self._tx_executor_body) await self._send_tx_status(app_id, "armed") # streamer transitions to "transmitting" on the first buffer consumed in the thread; # schedule a tiny watchdog that emits that status when queue count rises. ``` ### TX executor body Runs in a worker thread. Blocks in the SDR's `_stream_tx` driven by our callback that pulls from the queue. ```python def _tx_executor_body(self) -> None: sdr = self._tx.sdr try: sdr._stream_tx(self._tx_callback) except Exception: logger.exception("TX stream crashed") # surface via asyncio side asyncio.run_coroutine_threadsafe( self._send_tx_status(self._tx.app_id, "error", "stream crashed"), asyncio.get_event_loop(), ) def _tx_callback(self, num_samples): tx = self._tx if tx is None or tx.stop_event.is_set(): sdr = tx.sdr if tx else None if sdr is not None: sdr.pause_tx() return _silence(num_samples) # duration watchdog if tx.max_duration_s is not None and (time.monotonic() - tx.started_at) > tx.max_duration_s: tx.stop_event.set() tx.sdr.pause_tx() _schedule(self._send_tx_status(tx.app_id, "done", "max duration reached")) return _silence(num_samples) try: raw = tx.queue.get(timeout=0.1) except queue.Empty: return self._underrun_fill(tx, num_samples) samples = np.frombuffer(raw, dtype=np.float32) # interleaved float32 -> complex64 if samples.size % 2 != 0 or samples.size // 2 != num_samples: # malformed / wrong-sized frame; underrun this cycle logger.warning("TX frame size mismatch: got %d floats, expected %d", samples.size, num_samples * 2) return self._underrun_fill(tx, num_samples) complex_samples = samples.reshape(-1, 2).view(np.complex64).reshape(-1) tx.last_buffer = complex_samples return complex_samples ``` Helper `_underrun_fill`: ```python def _underrun_fill(self, tx: TxSession, num_samples: int): if tx.underrun_policy == "zero": return np.zeros(num_samples, dtype=np.complex64) if tx.underrun_policy == "repeat" and tx.last_buffer is not None: return tx.last_buffer[:num_samples] if tx.last_buffer.size >= num_samples \ else np.concatenate([tx.last_buffer, np.zeros(num_samples - tx.last_buffer.size, dtype=np.complex64)]) # "pause" (default) tx.stop_event.set() tx.sdr.pause_tx() _schedule(self._send_tx_status(tx.app_id, "underrun")) return np.zeros(num_samples, dtype=np.complex64) ``` `_schedule()` is a tiny wrapper around `asyncio.run_coroutine_threadsafe` that resolves the loop once at streamer construction. ### `_handle_tx_stop` ```python async def _handle_tx_stop(self, msg: dict) -> None: tx = self._tx if tx is None: return tx.stop_event.set() tx.sdr.pause_tx() # drain the queue so the executor thread wakes try: while True: tx.queue.get_nowait() except queue.Empty: pass # wait up to ~1s for the executor thread to finish if tx.task is not None: try: await asyncio.wait_for(asyncio.wrap_future(tx.task), timeout=1.0) except asyncio.TimeoutError: logger.warning("TX executor did not exit within 1s after stop") # release SDR reference should_close = self._registry.release(tx.device_key) if should_close: try: tx.sdr.close() except Exception: logger.exception("Error closing SDR on tx_stop") self._tx = None await self._send_tx_status(msg.get("app_id") or "", "done") ``` ### `_handle_tx_configure` ```python async def _handle_tx_configure(self, msg: dict) -> None: if self._tx is None: return self._tx.pending_config.update(msg.get("radio_config") or {}) ``` Consume `pending_config` at the top of `_tx_callback` before pulling from the queue (same pattern as RX's `_capture_loop`), using `_apply_sdr_config` with tx aliases. ### `_apply_sdr_config` — extend alias map ```python _CONFIG_ATTR_MAP = { # existing RX aliases... "sample_rate": ("sample_rate", "rx_sample_rate"), "center_frequency": ("center_freq", "rx_center_frequency"), "gain": ("gain", "rx_gain"), "bandwidth": ("bandwidth", "rx_bandwidth"), # NEW TX aliases "tx_sample_rate": ("tx_sample_rate",), "tx_center_frequency": ("tx_center_frequency", "tx_lo"), "tx_gain": ("tx_gain",), "tx_bandwidth": ("tx_bandwidth",), } ``` Pluto has `set_tx_sample_rate`, `set_tx_center_frequency`, `set_tx_gain` — those are called by `init_tx` using the attribute values, so setting attributes via `_apply_sdr_config` + calling `init_tx` is sufficient. ### Phase 4 acceptance - `test_streamer_tx.py`: full happy path — `tx_start` against MockSDR → 3 binary frames → verify `_stream_tx` callback received them in order → `tx_stop` → session cleared, SDR closed. - `test_tx_safety.py`: one test per cap — `tx_enabled=False`, gain cap, freq range, duplicate session. Each produces a `tx_status: error` JSON; registry shows zero outstanding acquires. - `test_tx_underrun.py`: three tests — `pause` (session ends, `underrun` emitted), `zero` (callback returns zeros, no status change), `repeat` (callback returns last buffer). - `test_full_duplex.py`: against MockSDR, send `start` + `tx_start` with same `(device=mock, identifier=None)` → registry ref-count = 2 → both sessions stream independently → stop one, other still runs → stop second, SDR closed. --- ## Phase 5 — Integration + docs **Touches:** - `tests/agent/test_integration_tx.py` — end-to-end with a real local `websockets` server + MockSDR. Mirror `test_integration.py`'s shape: register → heartbeat with `tx_enabled=True` → tx_start → 3 binary frames → tx_stop. - `docs/agent_tx_protocol.md` — short, user-facing: message types, binary format, heartbeat additions, interlock config, CLI examples. Link from [screens_agent_handoff.md](./screens_agent_handoff.md). - `README.md` (if it mentions agent subcommands) — add `--allow-tx` usage. **Real-Pluto smoke test** (manual, not in CI): 1. `ria-agent register --hub http://hub:3005 --api-key KEY --allow-tx --tx-max-gain-db -10 --tx-freq-range 2.4e9 2.5e9` 2. `ria-agent stream` 3. From a Python REPL with `websockets`, open the hub WS on the agent's behalf (bypass hub during dev), send a `tx_start` + binary frames of a 1kHz tone → confirm carrier on a spectrum analyzer at the configured frequency. --- ## File-by-file summary | File | Phase | Change | |---|---|---| | `src/ria_toolkit_oss/agent/ws_client.py` | 1 | Add `on_binary` callback. | | `src/ria_toolkit_oss/agent/config.py` | 2 | Add `tx_enabled`, `tx_max_gain_db`, `tx_max_duration_s`, `tx_allowed_freq_ranges`. | | `src/ria_toolkit_oss/agent/cli.py` | 2, 4 | `--allow-tx` + cap flags on `register`; `--allow-tx` on `stream`; plumb `cfg` into `Streamer`. | | `src/ria_toolkit_oss/agent/hardware.py` | 3 | `heartbeat_payload(cfg, sessions)` with `capabilities`, `tx_enabled`. | | `src/ria_toolkit_oss/agent/streamer.py` | 3, 4 | Session refactor, SDR registry, TX dispatch, TX loop, underrun fills, `_apply_sdr_config` TX aliases. | | `src/ria_toolkit_oss/agent/_tx_loop.py` | 4 (opt) | Extracted TX callback helpers if streamer.py > ~400 lines. | | `tests/agent/test_ws_client.py` | 1 | Binary-frame case. | | `tests/agent/test_config.py` | 2 | Round-trip new fields. | | `tests/agent/test_cli_tx.py` | 2 | New — `--allow-tx` flag handling. | | `tests/agent/test_hardware.py` | 3 | Heartbeat `capabilities` + `sessions`. | | `tests/agent/test_streamer.py` | 3 | Refactor for session model; RX assertions unchanged. | | `tests/agent/test_streamer_tx.py` | 4 | New — TX happy path. | | `tests/agent/test_tx_safety.py` | 4 | New — cap enforcement. | | `tests/agent/test_tx_underrun.py` | 4 | New — pause/zero/repeat policies. | | `tests/agent/test_full_duplex.py` | 4 | New — shared SDR ref count. | | `tests/agent/test_integration_tx.py` | 5 | New — real `websockets` server E2E. | | `docs/agent_tx_protocol.md` | 5 | New — operator-facing protocol doc. | --- ## Implementation gotchas (do not skip) 1. **Asyncio ↔ thread bridge.** The SDR's `_stream_tx` is synchronous and runs in an executor thread. Its callback must not `await`. Use `queue.Queue` (thread-safe) for inbound buffers and `asyncio.run_coroutine_threadsafe(coro, loop)` to emit `tx_status` from inside the thread. Resolve `loop` once at streamer construction; don't call `get_event_loop()` from the thread. 2. **`sdr.pause_tx()` from inside the callback.** Pluto's `_stream_tx` loop condition is `while self._enable_tx is True`. Calling `pause_tx()` inside the callback sets `_enable_tx = False` so the NEXT iteration exits. That's fine — it may emit one trailing zero-filled buffer. Document this; don't try to exit mid-callback. 3. **Queue drain on stop.** When `_handle_tx_stop` sets `stop_event` and pauses TX, the executor thread may still be blocked in `queue.get(timeout=0.1)`. Draining the queue does not unblock a timed get. Rely on the 100ms timeout; the thread exits on the next iteration. Don't try to clever-inject a poison pill. 4. **Interleaved float32 → complex64 conversion.** `np.frombuffer(buf, dtype=np.float32).view(np.complex64)` is zero-copy and correct when `buf.size` is a multiple of 8 bytes. Validate size first; mismatched size = underrun for that cycle, don't crash the thread. 5. **MockSDR's `_stream_tx`** ([sdr/mock.py:96-100](../src/ria_toolkit_oss/sdr/mock.py#L96-L100)) calls `callback(self.rx_buffer_size)` — it passes a *size*, not samples. The TX callback contract is "I am given `num_samples`, I return that many complex64 samples." `test_streamer_tx` must respect this: the test's `sdr.tx_buffer_size` (if used) doesn't affect what the callback receives from mock. Simplest path: set `MockSDR.rx_buffer_size = buffer_size` in the test harness before `_stream_tx` is invoked, so the TX callback receives the right size. 6. **`init_tx` on MockSDR vs Pluto.** MockSDR's `init_tx` [sets attributes and flips `_tx_initialized = True`](../src/ria_toolkit_oss/sdr/mock.py#L70-L81). Pluto's does the same plus `_rx_initialized = False` (the FDD bug). For full-duplex tests we currently target MockSDR only — Pluto FDD will work because our RX path ignores `_rx_initialized`, but the real-Pluto smoke test is the only validation. Call that out in the PR description. 7. **Don't block the event loop.** `asyncio.wait_for(asyncio.wrap_future(tx.task), timeout=1.0)` in `_handle_tx_stop` is non-blocking from the loop's perspective — the 1s cap prevents a misbehaving driver from stalling heartbeat/RX. 8. **Heartbeat during TX.** The existing heartbeat loop runs on a 30s timer. Sessions snapshot is cheap; no locking needed if we read `self._rx`/`self._tx` references atomically (Python ref swap is GIL-safe for single field reads). --- ## Rollout 1. Open a single PR per phase (1 → 2 → 3 → 4 → 5), each green on its own. 2. Phase 3 is the riskiest diff (RX refactor). Get a second reviewer if possible; the regression surface is all of current RX behavior. 3. After Phase 4 merges, `ria-agent stream --allow-tx` is a usable toy — you can hand-drive it from a Python REPL with `websockets` to validate against real hardware before the hub side is ready. 4. Phase 5 closes the loop and ships the user-facing docs. ## Out of scope (explicit) - **Multi-app-per-agent** — one RX + one TX per agent in v1. Adding session IDs to binary frames is a v2 protocol bump. - **Other TX drivers** (HackRF, USRP, bladeRF) — wiring `_CONFIG_ATTR_MAP` entries and verifying `_stream_tx` behavior per-driver. Tackle when the hub has an operator that targets them. - **Resampling / clock drift** — agent treats the hub-supplied samples as authoritative. Drift manifests as underruns; the underrun policy is the only mitigation. - **Fixing Pluto's `init_tx` `_rx_initialized = False` reset** — pre-existing, not triggered by our RX path, left for a separate cleanup.