M
muq
93ae08bc91
The OSS threshold_qualifier was last synced from utils on Feb 23 2026, before the major robustness improvements landed in utils on Mar 19 2026. This commit brings it fully up to date. Changes ported from utils: - Multi-pass detection (Pass 1 strong burst, Pass 2 weak residual, Pass 3 sustained faint burst via macro-window averaging) - Noise floor estimation via percentile instead of simple max*threshold - Dynamic range ratio guard (early exit on low-contrast captures) - Improved _find_ranges, _expand_and_filter_ranges, _merge_ranges helpers - Spectral smoothing in _estimate_spectral_bounds for wideband bursts - Minimum duration filter expressed in absolute time (5ms) not sample count Also includes the Pass 2 hysteresis spillover fix: - Pass 2 expansion now runs against residual_power (masked) instead of smoothed_power, preventing it from walking into Pass 1 territory - Pass 2 mask now has a window_size guard band around Pass 1 ranges, matching the guard already used in Pass 3 Only change from utils: import swapped to ria_toolkit_oss.datatypes.
98 lines
3.5 KiB
Python
98 lines
3.5 KiB
Python
import numpy as np
|
|
from scipy.signal import butter, lfilter
|
|
|
|
from ria_toolkit_oss.datatypes.annotation import Annotation
|
|
from ria_toolkit_oss.datatypes.recording import Recording
|
|
|
|
|
|
def isolate_signal(recording: Recording, annotation: Annotation) -> Recording:
|
|
"""
|
|
Slice, filter and frequency shift the input recording according to the bounding box defined by the annotation.
|
|
|
|
:param recording: The input Recording to be sliced.
|
|
:type recording: Recording
|
|
:param annotation: The Annotation object defining the area of the recording to isolate.
|
|
:type annotation: Annotation
|
|
:param decimate: Decimate the input signal after filtering to reduce the sample rate.
|
|
:type decimate: bool
|
|
|
|
:returns: The subsection of the original recording defined by the annotation.
|
|
:rtype: Recording"""
|
|
|
|
sample_start = max(0, annotation.sample_start)
|
|
sample_stop = min(len(recording), annotation.sample_start + annotation.sample_count)
|
|
|
|
anno_base_center_freq = (annotation.freq_lower_edge + annotation.freq_upper_edge) / 2 - recording.metadata.get(
|
|
"center_frequency", 0
|
|
)
|
|
|
|
anno_bw = annotation.freq_upper_edge - annotation.freq_lower_edge
|
|
|
|
signal_slice = recording.data[0, sample_start:sample_stop]
|
|
|
|
# normalize
|
|
signal_slice = signal_slice / np.max(np.abs(signal_slice))
|
|
|
|
isolation_bw = anno_bw
|
|
|
|
# frequency shift the center of the box about zero
|
|
shifted_signal_slice = frequency_shift_iq_samples(
|
|
iq_samples=signal_slice,
|
|
sample_rate=recording.metadata["sample_rate"],
|
|
shift_frequency=-1 * anno_base_center_freq,
|
|
)
|
|
|
|
# filter
|
|
if isolation_bw < recording.metadata["sample_rate"] - 1:
|
|
filtered_signal = apply_complex_lowpass_filter(
|
|
signal=shifted_signal_slice, cutoff_frequency=isolation_bw, sample_rate=recording.metadata["sample_rate"]
|
|
)
|
|
|
|
else:
|
|
filtered_signal = shifted_signal_slice
|
|
|
|
output = Recording(data=[filtered_signal], metadata=recording.metadata)
|
|
return output
|
|
|
|
|
|
def frequency_shift_iq_samples(iq_samples, sample_rate, shift_frequency):
|
|
# Number of samples
|
|
num_samples = len(iq_samples)
|
|
|
|
# Create a time vector from 0 to the total duration in seconds
|
|
time_vector = np.arange(num_samples) / sample_rate
|
|
|
|
# Generate the complex exponential for the frequency shift
|
|
complex_exponential = np.exp(1j * 2 * np.pi * shift_frequency * time_vector)
|
|
|
|
# Apply the frequency shift to the IQ samples
|
|
shifted_samples = iq_samples * complex_exponential
|
|
|
|
return shifted_samples
|
|
|
|
|
|
# Function to apply a lowpass Butterworth filter to a complex signal
|
|
def apply_complex_lowpass_filter(signal, cutoff_frequency, sample_rate, order=5):
|
|
# Design the lowpass filter
|
|
b, a = design_complex_lowpass_filter(cutoff_frequency, sample_rate, order)
|
|
|
|
# Apply the lowpass filter
|
|
filtered_signal = lfilter(b, a, signal)
|
|
return filtered_signal
|
|
|
|
|
|
def design_complex_lowpass_filter(cutoff_frequency, sample_rate, order=5):
|
|
# Nyquist frequency for complex signals is the sample rate
|
|
nyquist = sample_rate
|
|
|
|
# Ensure the cutoff frequency is positive and within the Nyquist limit
|
|
if cutoff_frequency <= 0 or cutoff_frequency > nyquist:
|
|
raise ValueError("Cutoff frequency must be between 0 and the Nyquist frequency.")
|
|
|
|
# Normalize the cutoff frequency to the Nyquist frequency
|
|
cutoff_normalized = cutoff_frequency / nyquist
|
|
|
|
# Create a Butterworth lowpass filter
|
|
b, a = butter(order, cutoff_normalized, btype="low")
|
|
return b, a
|