"""Concurrent RX + TX sessions on the same agent — shared SDR via registry.""" from __future__ import annotations import asyncio import time import numpy as np from ria_toolkit_oss.agent.config import AgentConfig from ria_toolkit_oss.agent.streamer import Streamer from ria_toolkit_oss.sdr.mock import MockSDR class FullDuplexMockSDR(MockSDR): """MockSDR with a recording TX path so the test can assert both directions.""" def __init__(self, buffer_size: int): super().__init__(buffer_size=buffer_size) self.tx_produced: list[np.ndarray] = [] def _stream_tx(self, callback): self._enable_tx = True self._tx_initialized = True while self._enable_tx: result = callback(self.rx_buffer_size) self.tx_produced.append(np.asarray(result).copy()) time.sleep(0.005) class FakeWs: def __init__(self): self.json_sent = [] self.bytes_sent = [] async def send_json(self, p): self.json_sent.append(p) async def send_bytes(self, b): self.bytes_sent.append(b) def _iq_frame(samples: np.ndarray) -> bytes: interleaved = np.empty(samples.size * 2, dtype=np.float32) interleaved[0::2] = samples.real interleaved[1::2] = samples.imag return interleaved.tobytes() def test_rx_and_tx_share_one_sdr_instance(): built: list[FullDuplexMockSDR] = [] def factory(device, identifier): sdr = FullDuplexMockSDR(buffer_size=16) built.append(sdr) return sdr async def scenario(): ws = FakeWs() s = Streamer(ws=ws, sdr_factory=factory, cfg=AgentConfig(tx_enabled=True)) # Start RX first. await s.on_message( { "type": "start", "app_id": "app-1", "radio_config": {"device": "mock", "buffer_size": 16}, } ) # Then start TX on the same device — should share the SDR handle. await s.on_message( { "type": "tx_start", "app_id": "app-1", "radio_config": { "device": "mock", "buffer_size": 16, "tx_sample_rate": 1_000_000, "tx_gain": -20, "tx_center_frequency": 2.45e9, "underrun_policy": "zero", }, } ) # Push a known TX buffer. marker = np.arange(16, dtype=np.complex64) + 7 await s.on_binary(_iq_frame(marker)) # Let both directions produce output. for _ in range(80): rx_ok = len(ws.bytes_sent) >= 2 tx_ok = any(np.array_equal(b, marker) for b in built[0].tx_produced) if built else False if rx_ok and tx_ok: break await asyncio.sleep(0.01) # Heartbeat should show both sessions. hb = s.build_heartbeat() # Stop TX first, RX keeps running. await s.on_message({"type": "tx_stop", "app_id": "app-1"}) tx_after_stop = s._tx is None rx_still_active = s._rx is not None # Now stop RX. await s.on_message({"type": "stop", "app_id": "app-1"}) return ws, s, built, hb, tx_after_stop, rx_still_active ws, s, built, hb, tx_after_stop, rx_still_active = asyncio.run(scenario()) # One SDR was built and shared. assert len(built) == 1, f"expected exactly one SDR instance, got {len(built)}" # Both directions produced output. assert len(ws.bytes_sent) >= 1, "RX produced no IQ frames" marker = np.arange(16, dtype=np.complex64) + 7 assert any( np.array_equal(b, marker) for b in built[0].tx_produced ), "TX callback never saw the pushed marker buffer" # Heartbeat reflected both sessions while they were active. assert hb["sessions"]["rx"]["app_id"] == "app-1" assert hb["sessions"]["tx"]["app_id"] == "app-1" # Stopping TX does not tear down RX. assert tx_after_stop assert rx_still_active # After both stops, registry is empty. assert s._registry.refcount(("mock", None)) == 0 assert s._rx is None assert s._tx is None