"""Tests for TxExecutor — signal synthesis and step execution.""" from __future__ import annotations import threading from unittest.mock import MagicMock, patch import numpy as np import pytest from ria_toolkit_oss.orchestration.tx_executor import TxExecutor def _cfg(modulation="QPSK", symbol_rate=100_000, steps=None): return { "id": "test-tx", "type": "sdr", "control_method": "sdr_agent", "sdr_agent": { "modulation": modulation, "symbol_rate": symbol_rate, "center_frequency": 0.0, "filter": "rrc", "rolloff": 0.35, }, "schedule": steps or [{"label": "step1", "duration": 0.001, "power_dbm": -10}], } # --------------------------------------------------------------------------- # Initialisation # --------------------------------------------------------------------------- class TestTxExecutorInit: def test_stores_sdr_device(self): ex = TxExecutor(_cfg(), sdr_device="pluto") assert ex.sdr_device == "pluto" def test_stop_event_created_when_not_supplied(self): ex = TxExecutor(_cfg()) assert isinstance(ex.stop_event, threading.Event) assert not ex.stop_event.is_set() def test_accepts_external_stop_event(self): ev = threading.Event() ex = TxExecutor(_cfg(), stop_event=ev) assert ex.stop_event is ev # --------------------------------------------------------------------------- # run() — schedule iteration # --------------------------------------------------------------------------- class TestTxExecutorRun: def test_empty_schedule_returns_immediately(self): cfg = _cfg(steps=[]) ex = TxExecutor(cfg) ex.run() # must not raise or block def test_pre_set_stop_event_skips_all_steps(self): ev = threading.Event() ev.set() ex = TxExecutor(_cfg(), stop_event=ev) # If stop was set, _execute_step should never be called. # run() should return cleanly without attempting synthesis. ex.run() def test_no_sdr_falls_back_to_simulation(self, monkeypatch): """Without SDR hardware TxExecutor simulates by calling stop_event.wait.""" cfg = _cfg(steps=[{"label": "s", "duration": 0.001, "power_dbm": 0}]) waited = [] real_ev = threading.Event() orig_wait = real_ev.wait def _fake_wait(timeout=None): waited.append(timeout) return False monkeypatch.setattr(real_ev, "wait", _fake_wait) # Patch SDR init to always fail (forces simulation path) with patch.object(TxExecutor, "_init_sdr", lambda self, *a, **kw: setattr(self, "_sdr", None)): ex = TxExecutor(cfg, sdr_device="nonexistent_xyz", stop_event=real_ev) ex.run() assert len(waited) >= 1, "expected stop_event.wait to be called for simulation" # --------------------------------------------------------------------------- # _synthesise — all modulation types and filter types # --------------------------------------------------------------------------- class TestSynthesise: @pytest.fixture(autouse=True) def _ex(self): self.ex = TxExecutor(_cfg()) def _synth(self, mod, num_samples=256): return self.ex._synthesise(mod, sps=4, num_samples=num_samples, filter_type="rrc", rolloff=0.35) @pytest.mark.parametrize("mod", ["BPSK", "QPSK", "8PSK", "16QAM", "64QAM", "256QAM"]) def test_psk_qam_returns_complex64_array(self, mod): sig = self._synth(mod) assert sig.dtype == np.complex64 assert len(sig) == 256 def test_fsk_returns_correct_length(self): sig = self._synth("FSK") assert len(sig) == 256 def test_ook_returns_correct_length(self): sig = self._synth("OOK") assert len(sig) == 256 def test_gmsk_returns_correct_length(self): sig = self._synth("GMSK") assert len(sig) == 256 def test_oqpsk_returns_correct_length(self): sig = self._synth("OQPSK") assert len(sig) == 256 @pytest.mark.parametrize("mod", ["BPSK", "QPSK", "16QAM", "FSK", "OOK", "GMSK"]) def test_samples_are_finite(self, mod): sig = self._synth(mod) assert np.all(np.isfinite(sig.real)), f"{mod}: non-finite real samples" assert np.all(np.isfinite(sig.imag)), f"{mod}: non-finite imag samples" def test_unknown_modulation_defaults_to_qpsk(self): sig = self._synth("UNKNOWN_MOD_XYZ") assert len(sig) == 256 assert sig.dtype == np.complex64 @pytest.mark.parametrize("filter_type", ["rrc", "rc", "gaussian", "rect", "none"]) def test_all_filter_types(self, filter_type): sig = self.ex._synthesise("QPSK", sps=4, num_samples=128, filter_type=filter_type, rolloff=0.35) assert len(sig) == 128 @pytest.mark.parametrize("n", [64, 128, 512, 1024]) def test_output_length_matches_requested_samples(self, n): sig = self._synth("QPSK", num_samples=n) assert len(sig) == n def test_bpsk_output_is_complex_not_real(self): sig = self._synth("BPSK") # complex64 always has imag part; just check dtype assert sig.dtype == np.complex64 def test_256qam_correct_length(self): sig = self._synth("256QAM") assert len(sig) == 256