M
madrigal
8a66860d33
All checks were successful
Build Sphinx Docs Set / Build Docs (pull_request) Successful in 15m51s
Build Project / Build Project (3.10) (pull_request) Successful in 16m14s
Build Project / Build Project (3.11) (pull_request) Successful in 17m9s
Build Project / Build Project (3.12) (pull_request) Successful in 2m29s
Test with tox / Test with tox (3.12) (pull_request) Successful in 21m28s
Test with tox / Test with tox (3.10) (pull_request) Successful in 22m50s
Test with tox / Test with tox (3.11) (pull_request) Successful in 23m18s
110 lines
3.3 KiB
Python
110 lines
3.3 KiB
Python
"""QA metrics for captured RF recordings."""
|
||
|
||
from __future__ import annotations
|
||
|
||
from dataclasses import dataclass, field
|
||
|
||
import numpy as np
|
||
|
||
from ria_toolkit_oss.data.recording import Recording
|
||
|
||
from .campaign import QAConfig
|
||
|
||
|
||
@dataclass
|
||
class QAResult:
|
||
"""Result of QA checks on a single recording."""
|
||
|
||
passed: bool
|
||
flagged: bool # True if any metric is below threshold (but not hard-failed)
|
||
snr_db: float
|
||
duration_s: float
|
||
issues: list[str] = field(default_factory=list)
|
||
|
||
def to_dict(self) -> dict:
|
||
return {
|
||
"passed": self.passed,
|
||
"flagged": self.flagged,
|
||
"snr_db": round(self.snr_db, 2),
|
||
"duration_s": round(self.duration_s, 3),
|
||
"issues": self.issues,
|
||
}
|
||
|
||
|
||
def estimate_snr_db(samples: np.ndarray, signal_fraction: float = 0.7) -> float:
|
||
"""Estimate SNR from IQ samples using PSD-based signal/noise separation.
|
||
|
||
Computes an FFT of the samples and assumes the top ``signal_fraction``
|
||
of power bins are signal and the remainder are noise. This is a
|
||
heuristic appropriate for a controlled testbed where a single dominant
|
||
signal is expected.
|
||
|
||
Args:
|
||
samples: 1-D complex array of IQ samples.
|
||
signal_fraction: Fraction of PSD bins to treat as signal (0–1).
|
||
|
||
Returns:
|
||
Estimated SNR in dB, or 0.0 if the noise floor is zero.
|
||
"""
|
||
n_fft = min(4096, len(samples))
|
||
window = np.hanning(n_fft)
|
||
psd = np.abs(np.fft.fft(samples[:n_fft] * window)) ** 2
|
||
|
||
psd_sorted = np.sort(psd)[::-1]
|
||
n_signal = min(max(1, int(n_fft * signal_fraction)), n_fft - 1)
|
||
signal_power = psd_sorted[:n_signal].mean()
|
||
noise_power = psd_sorted[n_signal:].mean()
|
||
|
||
if noise_power <= 0.0:
|
||
return 0.0
|
||
return float(10.0 * np.log10(signal_power / noise_power))
|
||
|
||
|
||
def check_recording(recording: Recording, config: QAConfig) -> QAResult:
|
||
"""Run QA checks on a recording against the campaign QA config.
|
||
|
||
Checks performed:
|
||
- Duration: number of samples / sample_rate >= min_duration_s
|
||
- SNR: estimated SNR >= snr_threshold_db
|
||
|
||
Args:
|
||
recording: Recording to evaluate.
|
||
config: QA thresholds from the campaign config.
|
||
|
||
Returns:
|
||
QAResult with pass/flag status and per-metric details.
|
||
"""
|
||
issues: list[str] = []
|
||
flagged = False
|
||
|
||
# --- Duration check ---
|
||
sample_rate = recording.metadata.get("sample_rate", 1.0)
|
||
n_samples = recording.data.shape[-1]
|
||
duration_s = n_samples / sample_rate if sample_rate else 0.0
|
||
|
||
if duration_s < config.min_duration_s:
|
||
issues.append(f"Duration too short: {duration_s:.1f}s < {config.min_duration_s:.1f}s threshold")
|
||
flagged = True
|
||
|
||
# --- SNR check ---
|
||
samples = recording.data[0] if recording.data.ndim > 1 else recording.data
|
||
snr_db = estimate_snr_db(samples)
|
||
|
||
if snr_db < config.snr_threshold_db:
|
||
issues.append(f"SNR below threshold: {snr_db:.1f} dB < {config.snr_threshold_db:.1f} dB")
|
||
flagged = True
|
||
|
||
# In flag_for_review mode: flag but don't hard-fail
|
||
if config.flag_for_review:
|
||
passed = True # always accept; human reviews flagged recordings
|
||
else:
|
||
passed = not flagged
|
||
|
||
return QAResult(
|
||
passed=passed,
|
||
flagged=flagged,
|
||
snr_db=snr_db,
|
||
duration_s=duration_s,
|
||
issues=issues,
|
||
)
|