ria-toolkit-oss/tests/transforms/test_iq_impairments.py
ben c36fdcf607
Some checks failed
Build Sphinx Docs Set / Build Docs (pull_request) Successful in 16s
Test with tox / Test with tox (3.10) (pull_request) Failing after 17m6s
Build Project / Build Project (3.10) (pull_request) Successful in 17m26s
Build Project / Build Project (3.11) (pull_request) Successful in 17m25s
Build Project / Build Project (3.12) (pull_request) Successful in 17m27s
Test with tox / Test with tox (3.12) (pull_request) Successful in 17m21s
Test with tox / Test with tox (3.11) (pull_request) Failing after 21m50s
optimiztions and fixes
2026-04-01 11:57:59 -04:00

407 lines
15 KiB
Python
Raw Permalink RIA Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Unit tests for ria_toolkit_oss.transforms.iq_impairments.
Bugs/issues identified during review:
- time_shift(signal, shift=0) returns all-zeros instead of the original signal.
This is because `data[:, :-0]` evaluates as `data[:, :0]` (empty slice).
Tests marked with BUG comments document this known failure.
- resample() 'else' branch creates 'empty_array' but never returns it (dead code).
When up < down, a shorter-than-input array is returned instead of zero-padded.
- add_awgn_to_signal() contains a leftover debug print() call.
"""
import numpy as np
import pytest
from ria_toolkit_oss.datatypes import Recording
from ria_toolkit_oss.transforms import iq_impairments
# ---------------------------------------------------------------------------
# Shared fixtures
# ---------------------------------------------------------------------------
SAMPLE_METADATA = {"source": "test", "timestamp": 1700000000.0}
# 1×4 complex signal
DATA_4 = np.array([[1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j]], dtype=np.complex128)
# 1×5 complex signal
DATA_5 = np.array([[1 + 0j, 2 + 0j, 3 + 0j, 4 + 0j, 5 + 0j]], dtype=np.complex128)
# ---------------------------------------------------------------------------
# add_awgn_to_signal
# ---------------------------------------------------------------------------
def test_add_awgn_array_shape():
"""Output shape matches input."""
result = iq_impairments.add_awgn_to_signal(DATA_4, snr=10)
assert result.shape == DATA_4.shape
def test_add_awgn_array_is_complex():
"""Result must be complex."""
result = iq_impairments.add_awgn_to_signal(DATA_4, snr=10)
assert np.iscomplexobj(result)
def test_add_awgn_not_identical_to_input():
"""AWGN must actually change the signal."""
np.random.seed(42)
result = iq_impairments.add_awgn_to_signal(DATA_4, snr=10)
assert not np.array_equal(result, DATA_4)
def test_add_awgn_recording_input():
"""Returns a Recording when given a Recording; metadata is preserved."""
rec = Recording(data=DATA_4.copy(), metadata=SAMPLE_METADATA)
result = iq_impairments.add_awgn_to_signal(rec, snr=10)
assert isinstance(result, Recording)
assert result.metadata["source"] == "test"
assert result.data.shape == DATA_4.shape
def test_add_awgn_recording_data_changed():
"""AWGN must change the data even when a Recording is passed in."""
np.random.seed(42)
rec = Recording(data=DATA_4.copy(), metadata=SAMPLE_METADATA)
result = iq_impairments.add_awgn_to_signal(rec, snr=10)
assert not np.array_equal(result.data, DATA_4)
def test_add_awgn_invalid_real_input():
"""Raises ValueError for real (non-complex) input."""
real_data = np.array([[1.0, 2.0, 3.0]])
with pytest.raises(ValueError):
iq_impairments.add_awgn_to_signal(real_data)
def test_add_awgn_snr_approximated():
"""With a large SNR the output should be close to the original signal."""
np.random.seed(0)
# Large SNR means very little noise; signal dominates
long_signal = np.ones((1, 100000), dtype=np.complex128)
result = iq_impairments.add_awgn_to_signal(long_signal, snr=60)
assert np.allclose(result, long_signal, atol=0.01)
# ---------------------------------------------------------------------------
# time_shift
# ---------------------------------------------------------------------------
def test_time_shift_positive():
"""Positive shift moves samples right; leading samples become zero."""
result = iq_impairments.time_shift(DATA_5, shift=2)
expected = np.array([[0 + 0j, 0 + 0j, 1 + 0j, 2 + 0j, 3 + 0j]])
assert np.array_equal(result, expected)
def test_time_shift_negative():
"""Negative shift moves samples left; trailing samples become zero."""
result = iq_impairments.time_shift(DATA_5, shift=-2)
expected = np.array([[3 + 0j, 4 + 0j, 5 + 0j, 0 + 0j, 0 + 0j]])
assert np.array_equal(result, expected)
def test_time_shift_shape_preserved():
"""Output shape must equal input shape."""
result = iq_impairments.time_shift(DATA_5, shift=1)
assert result.shape == DATA_5.shape
def test_time_shift_recording_input():
"""Returns a Recording when given a Recording; metadata preserved."""
rec = Recording(data=DATA_5.copy(), metadata=SAMPLE_METADATA)
result = iq_impairments.time_shift(rec, shift=2)
assert isinstance(result, Recording)
assert result.metadata["source"] == "test"
expected = np.array([[0 + 0j, 0 + 0j, 1 + 0j, 2 + 0j, 3 + 0j]])
assert np.array_equal(result.data, expected)
def test_time_shift_invalid_real_input():
"""Raises ValueError for real input."""
real_data = np.array([[1.0, 2.0, 3.0]])
with pytest.raises(ValueError):
iq_impairments.time_shift(real_data)
def test_time_shift_large_shift_warns():
"""shift > n raises a UserWarning."""
with pytest.warns(UserWarning):
iq_impairments.time_shift(DATA_5, shift=100)
def test_time_shift_zero_is_identity():
"""shift=0 returns the original signal unchanged."""
result = iq_impairments.time_shift(DATA_5, shift=0)
assert np.array_equal(result, DATA_5)
# ---------------------------------------------------------------------------
# frequency_shift
# ---------------------------------------------------------------------------
def test_frequency_shift_zero_is_identity():
"""A shift of 0 leaves the signal unchanged (cos(0)=1, sin(0)=0)."""
result = iq_impairments.frequency_shift(DATA_4, shift=0.0)
assert np.allclose(result, DATA_4)
def test_frequency_shift_shape_preserved():
"""Output shape must equal input shape."""
result = iq_impairments.frequency_shift(DATA_4, shift=0.25)
assert result.shape == DATA_4.shape
def test_frequency_shift_is_complex():
"""Output must be complex."""
result = iq_impairments.frequency_shift(DATA_4, shift=0.1)
assert np.iscomplexobj(result)
def test_frequency_shift_half_nyquist():
"""Shift of 0.5 (Nyquist) alternates sign: exp(j*π*n) = (-1)^n."""
# Start with a real signal equal to [1, 1, 1, 1] (on the real axis).
signal = np.array([[1 + 0j, 1 + 0j, 1 + 0j, 1 + 0j]], dtype=np.complex128)
result = iq_impairments.frequency_shift(signal, shift=0.5)
n = np.arange(4)
expected = signal * np.exp(1j * 2 * np.pi * 0.5 * n)
assert np.allclose(result, expected)
def test_frequency_shift_recording_input():
"""Returns a Recording when given a Recording; metadata preserved."""
rec = Recording(data=DATA_4.copy(), metadata=SAMPLE_METADATA)
result = iq_impairments.frequency_shift(rec, shift=0.25)
assert isinstance(result, Recording)
assert result.metadata["source"] == "test"
assert result.data.shape == DATA_4.shape
def test_frequency_shift_out_of_range_positive():
"""shift > 0.5 raises ValueError."""
with pytest.raises(ValueError):
iq_impairments.frequency_shift(DATA_4, shift=0.6)
def test_frequency_shift_out_of_range_negative():
"""shift < -0.5 raises ValueError."""
with pytest.raises(ValueError):
iq_impairments.frequency_shift(DATA_4, shift=-0.51)
def test_frequency_shift_invalid_real_input():
"""Raises ValueError for real (non-complex) input."""
real_data = np.array([[1.0, 2.0, 3.0]])
with pytest.raises(ValueError):
iq_impairments.frequency_shift(real_data, shift=0.1)
def test_frequency_shift_boundary_values():
"""Boundary values ±0.5 are accepted without error."""
iq_impairments.frequency_shift(DATA_4, shift=0.5)
iq_impairments.frequency_shift(DATA_4, shift=-0.5)
# ---------------------------------------------------------------------------
# phase_shift
# ---------------------------------------------------------------------------
def test_phase_shift_zero_is_identity():
"""Phase shift of 0 leaves signal unchanged."""
result = iq_impairments.phase_shift(DATA_4, phase=0.0)
assert np.allclose(result, DATA_4)
def test_phase_shift_pi_negates():
"""Phase shift of π negates the signal: exp(jπ) = -1."""
result = iq_impairments.phase_shift(DATA_4, phase=np.pi)
assert np.allclose(result, -DATA_4)
def test_phase_shift_half_pi():
"""Phase shift of π/2 multiplies by j: exp(j π/2) = j."""
result = iq_impairments.phase_shift(DATA_4, phase=np.pi / 2)
expected = DATA_4 * 1j
assert np.allclose(result, expected)
def test_phase_shift_shape_preserved():
"""Output shape must equal input shape."""
result = iq_impairments.phase_shift(DATA_4, phase=np.pi / 4)
assert result.shape == DATA_4.shape
def test_phase_shift_recording_input():
"""Returns a Recording when given a Recording; metadata preserved."""
rec = Recording(data=DATA_4.copy(), metadata=SAMPLE_METADATA)
result = iq_impairments.phase_shift(rec, phase=np.pi / 2)
assert isinstance(result, Recording)
assert result.metadata["source"] == "test"
expected = DATA_4 * 1j
assert np.allclose(result.data, expected)
def test_phase_shift_out_of_range_positive():
"""phase > π raises ValueError."""
with pytest.raises(ValueError):
iq_impairments.phase_shift(DATA_4, phase=np.pi + 0.01)
def test_phase_shift_out_of_range_negative():
"""phase < -π raises ValueError."""
with pytest.raises(ValueError):
iq_impairments.phase_shift(DATA_4, phase=-np.pi - 0.01)
def test_phase_shift_boundary_values():
"""Boundary values ±π are accepted without error."""
iq_impairments.phase_shift(DATA_4, phase=np.pi)
iq_impairments.phase_shift(DATA_4, phase=-np.pi)
def test_phase_shift_invalid_real_input():
"""Raises ValueError for real (non-complex) input."""
real_data = np.array([[1.0, 2.0, 3.0]])
with pytest.raises(ValueError):
iq_impairments.phase_shift(real_data, phase=0.0)
# ---------------------------------------------------------------------------
# iq_imbalance
# ---------------------------------------------------------------------------
def test_iq_imbalance_basic_shape():
"""Output shape matches input shape."""
result = iq_impairments.iq_imbalance(DATA_4, amplitude_imbalance=1.0, phase_imbalance=0.1, dc_offset=0.0)
assert result.shape == DATA_4.shape
def test_iq_imbalance_is_complex():
"""Output must be complex."""
result = iq_impairments.iq_imbalance(DATA_4, amplitude_imbalance=1.0, phase_imbalance=0.1, dc_offset=0.0)
assert np.iscomplexobj(result)
def test_iq_imbalance_changes_signal():
"""IQ imbalance with non-zero parameters must change the signal."""
result = iq_impairments.iq_imbalance(DATA_4, amplitude_imbalance=3.0, phase_imbalance=0.5, dc_offset=2.0)
assert not np.allclose(result, DATA_4)
def test_iq_imbalance_recording_input():
"""Returns a Recording when given a Recording; metadata preserved."""
rec = Recording(data=DATA_4.copy(), metadata=SAMPLE_METADATA)
result = iq_impairments.iq_imbalance(rec, amplitude_imbalance=1.0, phase_imbalance=0.1, dc_offset=0.0)
assert isinstance(result, Recording)
assert result.metadata["source"] == "test"
assert result.data.shape == DATA_4.shape
def test_iq_imbalance_phase_out_of_range_positive():
"""phase_imbalance > π raises ValueError."""
with pytest.raises(ValueError):
iq_impairments.iq_imbalance(DATA_4, phase_imbalance=np.pi + 0.01)
def test_iq_imbalance_phase_out_of_range_negative():
"""phase_imbalance < -π raises ValueError."""
with pytest.raises(ValueError):
iq_impairments.iq_imbalance(DATA_4, phase_imbalance=-np.pi - 0.01)
def test_iq_imbalance_phase_boundary_values():
"""Boundary values ±π are accepted without error."""
iq_impairments.iq_imbalance(DATA_4, phase_imbalance=np.pi)
iq_impairments.iq_imbalance(DATA_4, phase_imbalance=-np.pi)
def test_iq_imbalance_invalid_real_input():
"""Raises ValueError for real (non-complex) input."""
real_data = np.array([[1.0, 2.0, 3.0]])
with pytest.raises(ValueError):
iq_impairments.iq_imbalance(real_data)
def test_iq_imbalance_amplitude_symmetry():
"""Swapping sign of amplitude_imbalance should exchange I and Q scaling."""
pos = iq_impairments.iq_imbalance(DATA_4, amplitude_imbalance=3.0, phase_imbalance=0.0, dc_offset=0.0)
neg = iq_impairments.iq_imbalance(DATA_4, amplitude_imbalance=-3.0, phase_imbalance=0.0, dc_offset=0.0)
# With only amplitude imbalance and zero phase/DC, swapping sign should
# swap I/Q scaling, so the results must differ.
assert not np.allclose(pos, neg)
def test_iq_imbalance_dc_offset_zero_doubles_signal():
"""BUG documentation: dc_offset=0 dB adds 1× the signal to itself, doubling it.
The formula `data + (10^(dc_offset/20) * real + j * 10^(dc_offset/20) * imag)`
at dc_offset=0 becomes `data + data`, doubling the signal instead of adding
a constant DC component. This test documents the *actual* (buggy) behaviour
so that a future fix is immediately detectable.
"""
# Use a pure real signal so we can reason without phase effects.
signal = np.array([[2 + 0j]], dtype=np.complex128)
result = iq_impairments.iq_imbalance(signal, amplitude_imbalance=0.0, phase_imbalance=0.0, dc_offset=0.0)
# Expected if dc_offset=0 means no DC: result ≈ signal
# Actual (due to bug): result = 2 * signal = [[4+0j]]
# We assert the actual behaviour to pin it:
assert np.allclose(result.real, 4.0), (
"dc_offset=0 currently doubles the signal (adds 1× copy). "
"If this assertion fails, the dc_offset formula has been fixed — update this test."
)
# ---------------------------------------------------------------------------
# resample
# ---------------------------------------------------------------------------
def test_resample_upsample_shape():
"""up=2, down=1 — resampled signal is truncated to original length."""
signal = np.array([[1 + 1j, 2 + 2j, 4 + 4j, 8 + 8j]], dtype=np.complex128)
result = iq_impairments.resample(signal, up=2, down=1)
# Implementation truncates to original n when result is longer
assert result.shape[0] == 1
assert result.shape[1] == signal.shape[1]
def test_resample_is_complex():
"""Resampled output is complex."""
result = iq_impairments.resample(DATA_4, up=2, down=1)
assert np.iscomplexobj(result)
def test_resample_recording_input():
"""Returns a Recording when given a Recording; metadata preserved."""
rec = Recording(data=DATA_4.copy(), metadata=SAMPLE_METADATA)
result = iq_impairments.resample(rec, up=2, down=1)
assert isinstance(result, Recording)
assert result.metadata["source"] == "test"
def test_resample_unchanged_ratio():
"""up == down should return the same number of samples."""
result = iq_impairments.resample(DATA_4, up=3, down=3)
assert result.shape[1] == DATA_4.shape[1]
def test_resample_invalid_real_input():
"""Raises ValueError for real (non-complex) input."""
real_data = np.array([[1.0, 2.0, 3.0]])
with pytest.raises(ValueError):
iq_impairments.resample(real_data)
def test_resample_downsample_returns_same_length():
"""Downsampling zero-pads output to match input length."""
signal = np.array([[1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j, 5 + 5j, 6 + 6j]], dtype=np.complex128)
result = iq_impairments.resample(signal, up=1, down=2)
assert result.shape[1] == signal.shape[1]