Some checks failed
Build Sphinx Docs Set / Build Docs (pull_request) Has been cancelled
Test with tox / Test with tox (3.10) (pull_request) Has been cancelled
Test with tox / Test with tox (3.11) (pull_request) Has been cancelled
Test with tox / Test with tox (3.12) (pull_request) Has been cancelled
Build Project / Build Project (3.12) (pull_request) Has been cancelled
Build Project / Build Project (3.11) (pull_request) Has been cancelled
Build Project / Build Project (3.10) (pull_request) Has been cancelled
210 lines
7.6 KiB
Python
210 lines
7.6 KiB
Python
"""Step-A6 (Pluto lock audit) coverage.
|
|
|
|
Verifies the two invariants the handoff doc calls for when RX and TX run
|
|
concurrently on one shared SDR handle:
|
|
|
|
1. ``_param_lock`` actually serializes concurrent RX + TX setter calls — the
|
|
spec's §A6 acceptance criterion is *"``_param_lock`` instrumented for
|
|
contention"*. We drive parallel ``set_{rx,tx}_sample_rate`` calls through
|
|
the lock and assert it's hit often enough to prove both paths fight for it.
|
|
2. Under a sustained full-duplex session (RX capturing + TX transmitting on
|
|
one ``(device, identifier)``), no setter write is dropped and no exception
|
|
escapes the executor — i.e., the shared-handle assumption holds. Runs
|
|
against ``MockSDR`` per the spec; the real Pluto driver now takes the
|
|
same lock on its TX setters so the production code path is isomorphic.
|
|
|
|
The stress window is 2 seconds by default — the handoff mentions 30 s but
|
|
that's impractical in CI. Set ``RIA_LOCK_STRESS_S`` to override.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import threading
|
|
import time
|
|
|
|
import numpy as np
|
|
|
|
from ria_toolkit_oss.agent.config import AgentConfig
|
|
from ria_toolkit_oss.agent.streamer import Streamer
|
|
from ria_toolkit_oss.sdr.mock import MockSDR
|
|
|
|
_STRESS_S = float(os.environ.get("RIA_LOCK_STRESS_S", "2.0"))
|
|
|
|
|
|
class InstrumentedMockSDR(MockSDR):
|
|
"""MockSDR that counts lock acquisitions and exposes a real ``_param_lock``.
|
|
|
|
``_param_lock`` is inherited from ``SDR`` as a reentrant lock; we wrap it
|
|
with a counter that records every time RX or TX setters grab it, so the
|
|
test can assert real contention rather than just "the code compiles".
|
|
"""
|
|
|
|
def __init__(self, buffer_size: int):
|
|
super().__init__(buffer_size=buffer_size)
|
|
self.rx_lock_hits = 0
|
|
self.tx_lock_hits = 0
|
|
self.param_lock_hits = 0
|
|
# Shadow lock that increments a counter each time __enter__ fires.
|
|
real_lock = self._param_lock
|
|
|
|
test = self
|
|
|
|
class CountingLock:
|
|
def __enter__(self_inner):
|
|
test.param_lock_hits += 1
|
|
real_lock.acquire()
|
|
return self_inner
|
|
|
|
def __exit__(self_inner, *a):
|
|
real_lock.release()
|
|
return False
|
|
|
|
# ``threading.RLock`` interop for any code that calls acquire/release directly.
|
|
def acquire(self_inner, *a, **k):
|
|
test.param_lock_hits += 1
|
|
return real_lock.acquire(*a, **k)
|
|
|
|
def release(self_inner):
|
|
return real_lock.release()
|
|
|
|
self._param_lock = CountingLock()
|
|
|
|
# The MockSDR doesn't ship RX setter methods that hit the lock — override
|
|
# ``sample_rate`` / ``center_freq`` / ``gain`` writes to route through the
|
|
# same lock the real Pluto driver uses, so this test faithfully models the
|
|
# production contention path.
|
|
def set_rx_sample_rate(self, sample_rate):
|
|
with self._param_lock:
|
|
self.rx_lock_hits += 1
|
|
self.rx_sample_rate = float(sample_rate)
|
|
self.sample_rate = self.rx_sample_rate
|
|
|
|
def set_tx_sample_rate(self, sample_rate):
|
|
with self._param_lock:
|
|
self.tx_lock_hits += 1
|
|
self.tx_sample_rate = float(sample_rate)
|
|
# Mirror Pluto: both RX and TX write the same native attribute.
|
|
self.sample_rate = self.tx_sample_rate
|
|
|
|
|
|
class FakeWs:
|
|
def __init__(self):
|
|
self.json_sent: list[dict] = []
|
|
self.bytes_sent: list[bytes] = []
|
|
|
|
async def send_json(self, p):
|
|
self.json_sent.append(p)
|
|
|
|
async def send_bytes(self, b):
|
|
self.bytes_sent.append(b)
|
|
|
|
|
|
def _iq_frame(samples: np.ndarray) -> bytes:
|
|
interleaved = np.empty(samples.size * 2, dtype=np.float32)
|
|
interleaved[0::2] = samples.real
|
|
interleaved[1::2] = samples.imag
|
|
return interleaved.tobytes()
|
|
|
|
|
|
def test_param_lock_contended_under_concurrent_setters():
|
|
"""Run two threads that hammer RX + TX sample-rate setters and assert both
|
|
lock paths fire. This proves the lock is doing work — if either setter
|
|
bypassed ``_param_lock``, one of the counters would stay at zero."""
|
|
sdr = InstrumentedMockSDR(buffer_size=16)
|
|
stop = threading.Event()
|
|
|
|
def rx_setter():
|
|
i = 0
|
|
while not stop.is_set():
|
|
sdr.set_rx_sample_rate(1_000_000 + (i % 1000))
|
|
i += 1
|
|
|
|
def tx_setter():
|
|
i = 0
|
|
while not stop.is_set():
|
|
sdr.set_tx_sample_rate(2_000_000 + (i % 1000))
|
|
i += 1
|
|
|
|
t1 = threading.Thread(target=rx_setter)
|
|
t2 = threading.Thread(target=tx_setter)
|
|
t1.start()
|
|
t2.start()
|
|
time.sleep(min(_STRESS_S, 2.0))
|
|
stop.set()
|
|
t1.join()
|
|
t2.join()
|
|
|
|
assert sdr.rx_lock_hits > 100, f"RX setter barely ran: {sdr.rx_lock_hits}"
|
|
assert sdr.tx_lock_hits > 100, f"TX setter barely ran: {sdr.tx_lock_hits}"
|
|
# Every setter call should have passed through _param_lock exactly once.
|
|
assert sdr.param_lock_hits >= sdr.rx_lock_hits + sdr.tx_lock_hits
|
|
|
|
|
|
def test_full_duplex_stays_healthy_over_stress_window():
|
|
"""Start RX + TX on one shared SDR and drive both paths for ``_STRESS_S``
|
|
seconds, pushing binary frames and emitting ``tx_configure`` mid-stream.
|
|
The session must survive, deliver buffers in both directions, and leave
|
|
the registry clean on shutdown."""
|
|
BUF = 32
|
|
sdr = InstrumentedMockSDR(buffer_size=BUF)
|
|
|
|
async def scenario():
|
|
ws = FakeWs()
|
|
s = Streamer(ws=ws, sdr_factory=lambda d, i: sdr, cfg=AgentConfig(tx_enabled=True))
|
|
|
|
await s.on_message(
|
|
{"type": "start", "app_id": "app-1", "radio_config": {"device": "mock", "buffer_size": BUF}}
|
|
)
|
|
await s.on_message(
|
|
{
|
|
"type": "tx_start",
|
|
"app_id": "app-1",
|
|
"radio_config": {
|
|
"device": "mock",
|
|
"buffer_size": BUF,
|
|
"tx_sample_rate": 1_000_000,
|
|
"tx_center_frequency": 2.45e9,
|
|
"tx_gain": -20,
|
|
"underrun_policy": "zero",
|
|
},
|
|
}
|
|
)
|
|
|
|
marker = np.arange(BUF, dtype=np.complex64) + 1
|
|
deadline = time.monotonic() + _STRESS_S
|
|
i = 0
|
|
while time.monotonic() < deadline:
|
|
await s.on_binary(_iq_frame(marker))
|
|
if i % 8 == 0:
|
|
# Mid-stream parameter reconfiguration touches _apply_sdr_config,
|
|
# which routes through the same setters the stress test above
|
|
# verifies.
|
|
await s.on_message(
|
|
{"type": "tx_configure", "app_id": "app-1", "radio_config": {"tx_sample_rate": 1_000_000 + i}}
|
|
)
|
|
await s.on_message(
|
|
{"type": "configure", "app_id": "app-1", "radio_config": {"sample_rate": 2_000_000 + i}}
|
|
)
|
|
i += 1
|
|
await asyncio.sleep(0.005)
|
|
|
|
await s.on_message({"type": "tx_stop", "app_id": "app-1"})
|
|
await s.on_message({"type": "stop", "app_id": "app-1"})
|
|
return ws, s
|
|
|
|
ws, s = asyncio.run(scenario())
|
|
|
|
# No error frame leaked out.
|
|
errors = [m for m in ws.json_sent if m.get("type") in ("error", "tx_status") and m.get("state") == "error"]
|
|
assert errors == [], f"Unexpected error frames: {errors}"
|
|
# RX produced IQ frames and TX's callback ran — heartbeat-level contention
|
|
# check: both setter paths were hit at least once during configure dispatch.
|
|
assert ws.bytes_sent, "RX produced no IQ frames"
|
|
assert sdr.param_lock_hits > 0
|
|
# Sessions cleaned up; registry drained.
|
|
assert s._tx is None
|
|
assert s._rx is None
|
|
assert s._registry.refcount(("mock", None)) == 0
|