ria-toolkit-oss/src/ria_toolkit_oss/sdr/mock.py
ben 9a960e2f29
Some checks failed
Build Sphinx Docs Set / Build Docs (pull_request) Failing after 1s
Build Project / Build Project (3.10) (pull_request) Successful in 57s
Build Project / Build Project (3.11) (pull_request) Successful in 1m7s
Build Project / Build Project (3.12) (pull_request) Successful in 56s
Test with tox / Test with tox (3.12) (pull_request) Failing after 5m13s
Test with tox / Test with tox (3.11) (pull_request) Failing after 5m48s
Test with tox / Test with tox (3.10) (pull_request) Failing after 8m46s
zfp functionality and servers
2026-03-31 13:51:10 -04:00

132 lines
4.5 KiB
Python

"""Simulated SDR device for testing without hardware.
Set ``recorder.device = "mock"`` (or ``"sim"``) in a campaign config to use
this driver. The inference loop can also use it by specifying ``device:
"mock"`` in the SDR start request.
The mock generates complex float32 AWGN samples normalised to [-1, 1].
It satisfies both interfaces used in this codebase:
- ``record(num_samples)`` / ``_stream_rx(callback)`` — used by
``CampaignExecutor`` (inherits from ``SDR`` base class).
- ``rx(num_samples)`` — PlutoSDR-style interface used by the controller
inference loop.
"""
from __future__ import annotations
import time
import numpy as np
from ria_toolkit_oss.sdr.sdr import SDR
_DEFAULT_BUFFER_SIZE = 4096
# Simulated sample rate throttle: sleep this long between buffers so the
# loop does not spin at 100% CPU. 10 ms ≈ 100 buffers/s which is fine for
# tests and campaign execution timing.
_SLEEP_PER_BUFFER_S = 0.01
class MockSDR(SDR):
"""Software-simulated SDR that generates AWGN noise.
Args:
buffer_size: Number of complex samples per streaming buffer.
seed: Optional RNG seed for reproducible output.
"""
def __init__(self, buffer_size: int = _DEFAULT_BUFFER_SIZE, seed: int | None = None):
super().__init__()
self.rx_buffer_size: int = buffer_size
self._rng = np.random.default_rng(seed)
# Direct attribute aliases used by _apply_sdr_config in the controller.
self.center_freq: float = 2.45e9
self.sample_rate: float = 10e6
self.gain: float = 40.0
# ------------------------------------------------------------------
# Abstract method implementations
# ------------------------------------------------------------------
def init_rx(
self,
sample_rate: float,
center_frequency: float,
gain,
channel: int = 0,
gain_mode: str = "manual",
) -> None:
self.rx_sample_rate = float(sample_rate)
self.rx_center_frequency = float(center_frequency)
self.rx_gain = 40.0 if gain is None else float(gain)
# Mirror to the attribute names used by _apply_sdr_config.
self.sample_rate = self.rx_sample_rate
self.center_freq = self.rx_center_frequency
self.gain = self.rx_gain
self._rx_initialized = True
def init_tx(
self,
sample_rate: float,
center_frequency: float,
gain,
channel: int = 0,
gain_mode: str = "manual",
) -> None:
self.tx_sample_rate = float(sample_rate)
self.tx_center_frequency = float(center_frequency)
self.tx_gain = 40.0 if gain is None else float(gain)
self._tx_initialized = True
def _stream_rx(self, callback) -> None:
"""Generate 1-D AWGN buffers and pass each to *callback* until stopped.
Uses 1-D arrays so the base class ``_validate_buffer`` check does not
incorrectly flag them as corrupted (the (1, N) form triggers a false
positive in the all-same-value check).
"""
self._enable_rx = True
while self._enable_rx:
buf = self._awgn(self.rx_buffer_size)
callback(buf)
time.sleep(_SLEEP_PER_BUFFER_S)
def _stream_tx(self, callback) -> None:
self._enable_tx = True
while self._enable_tx:
callback(self.rx_buffer_size)
time.sleep(_SLEEP_PER_BUFFER_S)
def set_clock_source(self, source: str) -> None:
pass # no-op
def close(self) -> None:
self._enable_rx = False
self._enable_tx = False
self._rx_initialized = False
self._tx_initialized = False
# ------------------------------------------------------------------
# PlutoSDR-style interface used by the controller inference loop
# ------------------------------------------------------------------
def rx(self, num_samples: int) -> np.ndarray:
"""Return *num_samples* complex64 AWGN samples (PlutoSDR-style)."""
return self._awgn(num_samples)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _awgn(self, n: int) -> np.ndarray:
"""Return *n* normalised complex64 AWGN samples as a 1-D array."""
real = self._rng.standard_normal(n).astype(np.float32)
imag = self._rng.standard_normal(n).astype(np.float32)
buf = real + 1j * imag
peak = np.abs(buf).max()
if peak > 1e-9:
buf /= peak
return buf