""" Parallel signal separation for multi-component frequency-offset signals. Provides methods to detect and separate overlapping frequency-domain signals that occupy the same time window but different frequency bands. This module implements **spectral peak detection** to identify distinct frequency components and split single time-domain annotations into frequency-specific sub-annotations. **Key Design Decisions** (per Codex review): 1. **Complex IQ Support**: Uses `scipy.signal.welch` with `return_onesided=False` for proper complex signal handling. Window length automatically adapts to signal length via `nperseg=min(nfft, len(signal))` to handle bursts >> from ria_toolkit_oss.annotations import find_spectral_components >>> # Detect the two distinct channels (returns relative frequencies) >>> components = find_spectral_components(signal, sampling_rate=20e6) >>> print(f"Found {len(components)} components") Found 2 components The module is designed to work with detected time-domain annotations, allowing splitting of overlapping signals into separate training samples. """ import json from typing import List, Optional, Tuple import numpy as np from scipy import ndimage from scipy import signal as scipy_signal from ria_toolkit_oss.datatypes import Annotation, Recording def find_spectral_components( signal_data: np.ndarray, sampling_rate: float, nfft: int = 65536, noise_threshold_db: Optional[float] = None, min_component_bw: float = 50e3, time_percentile: float = 70.0, ) -> List[Tuple[float, float, float]]: """ Find distinct frequency components using spectral peak detection. Identifies separate frequency components in a signal by analyzing the power spectral density and finding peaks corresponding to distinct signals. This is useful for separating parallel signals that occupy different frequency bands. **Frequency Representation**: Returns frequencies in **baseband/relative** Hz (centered at 0). To get absolute RF frequencies, add center_frequency_hz from recording metadata to all returned values. Algorithm: 1. Compute power spectral density using Welch (properly handles complex IQ) 2. Auto-estimate noise floor from data if not specified 3. Smooth PSD to reduce spurious peaks 4. Find local maxima above noise floor 5. Estimate bandwidth per peak using -3dB (fallback: cumulative power) 6. Filter components below minimum bandwidth threshold :param signal_data: Complex IQ signal samples (np.complex64/128) :type signal_data: np.ndarray :param sampling_rate: Sample rate in Hz :type sampling_rate: float :param nfft: FFT size / window length for Welch. Automatically capped at signal length to handle bursts (default: 65536) :type nfft: int :param noise_threshold_db: Minimum SNR threshold in dB. If None (default), auto-estimates as np.percentile(psd_db, 10). Adapt this across hardware (Pluto: ~-100, ThinkRF: ~-60). :type noise_threshold_db: Optional[float] :param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz) :type min_component_bw: float :param power_threshold: Cumulative power threshold for fallback bandwidth estimation (default: 0.99 = 99% power, like OBW) :type power_threshold: float :returns: List of (center_freq_hz, lower_freq_hz, upper_freq_hz) tuples. **All frequencies are relative (baseband, 0-centered).** Add recording metadata['center_frequency'] to get absolute RF frequencies. :rtype: List[Tuple[float, float, float]] :raises ValueError: If signal has fewer than 256 samples **Example**:: >>> from ria.io import load_recording >>> from ria_toolkit_oss.annotations import find_spectral_components >>> recording = load_recording("capture.sigmf") >>> segment = recording.data[0][start:end] >>> # Components in relative (baseband) frequency >>> components = find_spectral_components(segment, sampling_rate=20e6) >>> for center_rel, lower_rel, upper_rel in components: ... # Convert to absolute RF frequency ... center_abs = recording.metadata['center_frequency'] + center_rel ... print(f"Component @ {center_abs/1e9:.3f} GHz") """ # Validate input min_samples = 256 if len(signal_data) < min_samples: raise ValueError(f"Signal too short: need at least {min_samples} samples, " f"got {len(signal_data)}.") # Compute PSD using Welch method for complex IQ signals # CRITICAL: return_onesided=False for proper complex signal handling nperseg = min(nfft, len(signal_data)) noverlap = nperseg // 2 # --- STFT --- freqs, times, Zxx = scipy_signal.stft( signal_data, fs=sampling_rate, window="blackman", nperseg=nperseg, noverlap=noverlap, return_onesided=False, boundary=None, ) # Shift zero freq to center Zxx = np.fft.fftshift(Zxx, axes=0) freqs = np.fft.fftshift(freqs) # Power spectrogram power = np.abs(Zxx) ** 2 power_db = 10 * np.log10(power + 1e-12) # --- Aggregate across time robustly --- # Using percentile instead of mean prevents short signals from being diluted freq_profile_db = np.percentile(power_db, time_percentile, axis=1) # --- Noise floor estimation --- if noise_threshold_db is None: noise_threshold_db = np.percentile(freq_profile_db, 20) threshold = noise_threshold_db + 3 # 3 dB above noise floor # --- Smooth lightly (avoid merging nearby signals) --- freq_profile_db = ndimage.gaussian_filter1d(freq_profile_db, sigma=1.5) # --- Binary mask of significant frequencies --- mask = freq_profile_db > threshold # --- Find contiguous frequency regions --- labeled, num_features = ndimage.label(mask) components = [] for region_label in range(1, num_features + 1): region_indices = np.where(labeled == region_label)[0] if len(region_indices) == 0: continue lower_idx = region_indices[0] upper_idx = region_indices[-1] lower_freq = freqs[lower_idx] upper_freq = freqs[upper_idx] bw = upper_freq - lower_freq if bw < min_component_bw: continue center_freq = (lower_freq + upper_freq) / 2 components.append((center_freq, lower_freq, upper_freq)) return components def split_annotation_by_components( annotation: Annotation, signal: np.ndarray, sampling_rate: float, center_frequency_hz: float = 0.0, nfft: int = 65536, noise_threshold_db: Optional[float] = None, min_component_bw: float = 50e3, ) -> List[Annotation]: """ Split an annotation into multiple annotations by detected frequency components. Takes an existing annotation spanning multiple frequency components and analyzes the frequency content to create separate sub-annotations for each distinct frequency component. **Use case**: Energy detection found a time window with 2-3 parallel WiFi channels. This function splits it into separate annotations per channel. **Frequency Handling**: `find_spectral_components` returns relative (baseband) frequencies. This function adds `center_frequency_hz` to convert to absolute RF frequencies for SigMF annotation bounds. This ensures correct frequency context across baseband and RF domains. :param annotation: Original annotation to split :type annotation: Annotation :param signal: Full signal array (complex IQ) :type signal: np.ndarray :param sampling_rate: Sample rate in Hz :type sampling_rate: float :param center_frequency_hz: RF center frequency to add to relative frequencies from peak detection (default: 0.0 = baseband) :type center_frequency_hz: float :param nfft: FFT size for analysis (default: 65536, auto-capped at signal length) :type nfft: int :param noise_threshold_db: Noise floor threshold in dB. If None (default), auto-estimates from data. :type noise_threshold_db: Optional[float] :param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz) :type min_component_bw: float :returns: List of new annotations (one per detected component). Returns empty list if no components found or segment too short. :rtype: List[Annotation] **Example**:: >>> from ria.io import load_recording >>> from ria_toolkit_oss.annotations import split_annotation_by_components >>> recording = load_recording("capture.sigmf") >>> # Original annotation spans multiple channels >>> original = recording.annotations[0] >>> # Split using RF center frequency from metadata >>> components = split_annotation_by_components( ... original, ... recording.data[0], ... recording.metadata['sample_rate'], ... center_frequency_hz=recording.metadata.get('center_frequency', 0.0) ... ) >>> print(f"Split into {len(components)} components") Split into 2 components **Algorithm**: 1. Extract segment corresponding to annotation time bounds 2. Find frequency components in that segment (returns relative frequencies) 3. Add center_frequency_hz to get absolute RF frequencies 4. Create new annotation for each component 5. Preserve original metadata (label, type, etc.) 6. Add component info to comment JSON **Notes**: - Original annotation is not modified - Returns empty list if segment too short (<256 samples) - Segments Recording: """ Split multiple annotations in a recording by frequency components. Processes specified annotations (or all if indices=None), replacing each with its frequency-separated components. Uses RF center_frequency from recording metadata for proper absolute frequency conversion. :param recording: Recording to process :type recording: Recording :param indices: Annotation indices to split (None = all, default: None). Use indices=[] to skip splitting (returns unchanged recording). :type indices: Optional[List[int]] :param nfft: FFT size for spectral analysis (default: 65536, auto-capped at signal segment length) :type nfft: int :param noise_threshold_db: Noise floor threshold in dB. If None (default), auto-estimates from each segment. :type noise_threshold_db: Optional[float] :param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz). Components narrower than this are filtered out. :type min_component_bw: float :returns: New Recording with split annotations :rtype: Recording **Example**:: >>> from ria.io import load_recording >>> from ria_toolkit_oss.annotations import split_recording_annotations >>> recording = load_recording("capture.sigmf") >>> # Split all annotations >>> split_rec = split_recording_annotations(recording) >>> print(f"Original: {len(recording.annotations)} annotations") >>> print(f"Split: {len(split_rec.annotations)} annotations") Original: 5 annotations Split: 9 annotations **Algorithm**: 1. For each annotation in indices (or all if None): 2. Call split_annotation_by_components with RF center_frequency 3. If components found, replace annotation with components 4. If no components found, keep original annotation 5. Annotations not in indices are kept unchanged **Notes**: - Original recording is not modified - Returns empty Recording.annotations if recording has no annotations - RF center_frequency from metadata ensures correct absolute frequencies - If an annotation can't be split (too short, wrong format), original kept """ if indices is None: # Split all annotations indices = list(range(len(recording.annotations))) if not recording.annotations: # No annotations to split return recording signal = recording.data[0] sample_rate = recording.metadata["sample_rate"] center_frequency = recording.metadata.get("center_frequency", 0.0) # Build new annotation list new_annotations = [] for i, anno in enumerate(recording.annotations): if i in indices: # Attempt to split this annotation try: components = split_annotation_by_components( anno, signal, sample_rate, center_frequency_hz=center_frequency, nfft=nfft, noise_threshold_db=noise_threshold_db, min_component_bw=min_component_bw, ) if components: # Split successful, use components new_annotations.extend(components) else: # No components found, keep original new_annotations.append(anno) except Exception: # Split failed for any reason, keep original new_annotations.append(anno) else: # Not in split list, keep as-is new_annotations.append(anno) return Recording(data=recording.data, metadata=recording.metadata, annotations=new_annotations)