Compare commits

..

2 Commits

Author SHA1 Message Date
F fordg1
2bb2d9d5a7 Removing logos
Some checks failed
Build Sphinx Docs Set / Build Docs (pull_request) Failing after 1s
Build Project / Build Project (3.10) (pull_request) Failing after 1s
Build Project / Build Project (3.11) (pull_request) Failing after 1s
Build Project / Build Project (3.12) (pull_request) Failing after 1s
Test with tox / Test with tox (3.10) (pull_request) Failing after 1s
Test with tox / Test with tox (3.11) (pull_request) Failing after 1s
Test with tox / Test with tox (3.12) (pull_request) Failing after 1s
2026-03-31 13:48:38 -04:00
F fordg1
11d9532b5c Port annotation system from utils and fix ria package imports
Annotations package (new):
- Add threshold_qualifier with 3-pass hysteresis detector (Pass 1: strong
  bursts, Pass 2: weak residual bursts, Pass 3: macro-window faint burst
  detection), auto window_size scaled to 1ms, channel selection, and
  stable noise_floor baseline throughout
- Add energy_detector, cusum_annotator, parallel_signal_separator,
  qualify_slice, signal_isolation, annotation_transforms
- Add __init__.py exporting the four functions used by the CLI
- Fix all imports from utils.data → ria_toolkit_oss.datatypes

CLI annotate command (new):
- Port full annotate CLI from utils including list, add, remove, clear,
  energy, cusum, threshold, and separate subcommands
- Fix imports from utils.* → ria_toolkit_oss.* and utils_cli.* →
  ria_toolkit_oss_cli.*
- Safe overwrite logic: _annotated files always writable, originals
  protected; --overwrite writes in-place only on _annotated inputs

CLI view command:
- Add 'annotations' as a valid --type, wiring view_annotations from
  view_signal

view_signal.py:
- Add view_annotations function with blue/purple alternating palette and
  threshold %-sorted drawing order (lower % renders on top)

recording.py (datatypes):
- Fix lazy imports in to_wav() and to_blue() from utils.io → ria_toolkit_oss.io

io/recording.py:
- Add compatibility shim in from_npy to remap utils.data.annotation.Annotation
  to ria_toolkit_oss.datatypes.annotation.Annotation when loading .npy files
  pickled by the utils package
2026-03-31 13:34:00 -04:00
26 changed files with 341 additions and 1279 deletions

View File

@ -1,18 +0,0 @@
# Changelog
## [Unreleased] - 2026-02-20
### Added
- **Dual-Threshold Detection:** Logic to capture the start and end of signals, not just the peak.
- **Signal Smoothing & Noise Filters:** Prevents detections from breaking into fragments and ignores short interference spikes.
- **Auto-Frequency Calculation:** Automatically adjusts bounding boxes to fit signal frequency ranges tightly.
### Changed
- **Signal Power Detection:** Switched from raw signal strength to power for improved accuracy.
- **CLI Workflow:** `Clear` and `Remove` commands now modify files directly (in-place) to avoid redundant copies.
- **Metadata Logic:** Updated labels to show detection percentages and overhauled internal metadata cleaning.
- **Viewer UI:** Moved legend outside the plot, added a black background, and adjusted transparency for better spectrogram visibility.
### Fixed
- Prevented redundant `_annotated` suffixes in file naming patterns.
- Simplified internal math to increase processing speed and precision.

View File

@ -11,15 +11,15 @@ The Radio Dataset Framework provides a software interface to access and manipula
the need for users to interface with the source files directly. Instead, users initialize and interact with a Python the need for users to interface with the source files directly. Instead, users initialize and interact with a Python
object, while the complexities of efficient data retrieval and source file manipulation are managed behind the scenes. object, while the complexities of efficient data retrieval and source file manipulation are managed behind the scenes.
Utils includes an abstract class called :py:obj:`ria_toolkit_oss.datatypes.datasets.RadioDataset`, which defines common properties and Ria Toolkit OSS includes an abstract class called :py:obj:`ria_toolkit_oss.datatypes.datasets.RadioDataset`, which defines common properties and
behaviors for all radio datasets. :py:obj:`ria_toolkit_oss.datatypes.datasets.RadioDataset` can be considered a blueprint for all behaviors for all radio datasets. :py:obj:`ria_toolkit_oss.datatypes.datasets.RadioDataset` can be considered a blueprint for all
other radio dataset classes. This class is then subclassed to define more specific blueprints for different types other radio dataset classes. This class is then subclassed to define more specific blueprints for different types
of radio datasets. For example, :py:obj:`ria_toolkit_oss.datatypes.datasets.IQDataset`, which is tailored for machine learning tasks of radio datasets. For example, :py:obj:`ria_toolkit_oss.datatypes.datasets.IQDataset`, which is tailored for machine learning tasks
involving the processing of signals represented as IQ (In-phase and Quadrature) samples. involving the processing of signals represented as IQ (In-phase and Quadrature) samples.
Then, in the various project backends, there are concrete dataset classes, which inherit from both Utils and the base Then, in the various project backends, there are concrete dataset classes, which inherit from both Ria Toolkit OSS and the base
dataset class from the respective backend. For example, the :py:obj:`TorchIQDataset` class extends both dataset class from the respective backend. For example, the :py:obj:`TorchIQDataset` class extends both
:py:obj:`ria_toolkit_oss.datatypes.datasets.IQDataset` from Utils and :py:obj:`torch.ria_toolkit_oss.datatypes.IterableDataset` from :py:obj:`ria_toolkit_oss.datatypes.datasets.IQDataset` from Ria Toolkit OSS and :py:obj:`torch.ria_toolkit_oss.datatypes.IterableDataset` from
PyTorch, providing a concrete dataset class tailored for IQ datasets and optimized for the PyTorch backend. PyTorch, providing a concrete dataset class tailored for IQ datasets and optimized for the PyTorch backend.
Dataset initialization Dataset initialization
@ -130,7 +130,7 @@ Dataset processing and manipulation
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
All radio datasets support methods tailored specifically for radio processing. These methods are backend-independent, All radio datasets support methods tailored specifically for radio processing. These methods are backend-independent,
inherited from the blueprints in Utils like :py:obj:`ria_toolkit_oss.datatypes.datasets.RadioDataset`. inherited from the blueprints in Ria Toolkit OSS like :py:obj:`ria_toolkit_oss.datatypes.datasets.RadioDataset`.
For example, we can trim down the length of the examples from 1,024 to 512 samples, and then augment the dataset: For example, we can trim down the length of the examples from 1,024 to 512 samples, and then augment the dataset:

View File

@ -1,55 +1,4 @@
"""
The annotations package contains tools and utilities for creating, managing, and processing annotations.
Provides automatic annotation generation using various signal detection algorithms:
- Energy-based detection (detect_signals_energy)
- CUSUM-based segmentation (annotate_with_cusum)
- Threshold-based qualification (threshold_qualifier)
- Signal isolation and extraction (isolate_signal)
- Occupied bandwidth analysis (calculate_occupied_bandwidth, calculate_nominal_bandwidth)
All detection functions return Recording objects with added annotations.
"""
__all__ = [
# Energy-based detection
"detect_signals_energy",
"calculate_occupied_bandwidth",
"calculate_nominal_bandwidth",
"calculate_full_detected_bandwidth",
"annotate_with_obw",
# CUSUM detection
"annotate_with_cusum",
# Threshold detection
"threshold_qualifier",
# Parallel signal separation (Phase 2)
"find_spectral_components",
"split_annotation_by_components",
"split_recording_annotations",
# Signal isolation
"isolate_signal",
# Annotation transforms
"remove_contained_boxes",
"is_annotation_contained",
# Dataset creation
"qualify_slice_from_annotations",
]
from .annotation_transforms import is_annotation_contained, remove_contained_boxes
from .cusum_annotator import annotate_with_cusum from .cusum_annotator import annotate_with_cusum
from .energy_detector import ( from .energy_detector import detect_signals_energy
annotate_with_obw, from .parallel_signal_separator import split_recording_annotations
calculate_full_detected_bandwidth,
calculate_nominal_bandwidth,
calculate_occupied_bandwidth,
detect_signals_energy,
)
from .parallel_signal_separator import (
find_spectral_components,
split_annotation_by_components,
split_recording_annotations,
)
from .qualify_slice import qualify_slice_from_annotations
from .signal_isolation import isolate_signal
from .threshold_qualifier import threshold_qualifier from .threshold_qualifier import threshold_qualifier

View File

@ -1,4 +1,4 @@
from utils.data.annotation import Annotation from ria_toolkit_oss.datatypes.annotation import Annotation
# TODO figure out how to transfer labels in the merge case # TODO figure out how to transfer labels in the merge case

View File

@ -3,7 +3,7 @@ from typing import Optional
import numpy as np import numpy as np
from utils.data import Annotation, Recording from ria_toolkit_oss.datatypes import Annotation, Recording
def annotate_with_cusum( def annotate_with_cusum(
@ -24,7 +24,7 @@ def annotate_with_cusum(
changes between a low and high amplitude. changes between a low and high amplitude.
:param recording: A ``Recording`` object to annotate. :param recording: A ``Recording`` object to annotate.
:type recording: ``utils.data.Recording`` :type recording: ``ria_toolkit_oss.datatypes.Recording``
:param label: Label for the detected segments. :param label: Label for the detected segments.
:type label: str :type label: str
:param window_size: The length (in samples) of the moving average window. :param window_size: The length (in samples) of the moving average window.

View File

@ -11,7 +11,7 @@ from typing import Tuple
import numpy as np import numpy as np
from scipy.signal import filtfilt from scipy.signal import filtfilt
from utils.data import Annotation, Recording from ria_toolkit_oss.datatypes import Annotation, Recording
def detect_signals_energy( def detect_signals_energy(
@ -73,8 +73,8 @@ def detect_signals_energy(
**Example**:: **Example**::
>>> from utils.io import load_recording >>> from ria.io import load_recording
>>> from utils.annotations import detect_signals_energy >>> from ria_toolkit_oss.annotations import detect_signals_energy
>>> recording = load_recording("capture.sigmf") >>> recording = load_recording("capture.sigmf")
>>> # Detect with NBW frequency bounds (default, best for real signals) >>> # Detect with NBW frequency bounds (default, best for real signals)
@ -347,7 +347,7 @@ def annotate_with_obw(
**Example**:: **Example**::
>>> from utils.annotations import annotate_with_obw >>> from ria_toolkit_oss.annotations import annotate_with_obw
>>> annotated = annotate_with_obw(recording, label="signal_obw") >>> annotated = annotate_with_obw(recording, label="signal_obw")
""" """
signal = recording.data[0] signal = recording.data[0]

View File

@ -38,7 +38,7 @@ sub-annotations.
Example: Example:
Two WiFi channels captured simultaneously: Two WiFi channels captured simultaneously:
>>> from utils.annotations import find_spectral_components >>> from ria_toolkit_oss.annotations import find_spectral_components
>>> # Detect the two distinct channels (returns relative frequencies) >>> # Detect the two distinct channels (returns relative frequencies)
>>> components = find_spectral_components(signal, sampling_rate=20e6) >>> components = find_spectral_components(signal, sampling_rate=20e6)
>>> print(f"Found {len(components)} components") >>> print(f"Found {len(components)} components")
@ -55,7 +55,7 @@ import numpy as np
from scipy import ndimage from scipy import ndimage
from scipy import signal as scipy_signal from scipy import signal as scipy_signal
from utils.data import Annotation, Recording from ria_toolkit_oss.datatypes import Annotation, Recording
def find_spectral_components( def find_spectral_components(
@ -111,8 +111,8 @@ def find_spectral_components(
**Example**:: **Example**::
>>> from utils.io import load_recording >>> from ria.io import load_recording
>>> from utils.annotations import find_spectral_components >>> from ria_toolkit_oss.annotations import find_spectral_components
>>> recording = load_recording("capture.sigmf") >>> recording = load_recording("capture.sigmf")
>>> segment = recording.data[0][start:end] >>> segment = recording.data[0][start:end]
>>> # Components in relative (baseband) frequency >>> # Components in relative (baseband) frequency
@ -241,8 +241,8 @@ def split_annotation_by_components(
**Example**:: **Example**::
>>> from utils.io import load_recording >>> from ria.io import load_recording
>>> from utils.annotations import split_annotation_by_components >>> from ria_toolkit_oss.annotations import split_annotation_by_components
>>> recording = load_recording("capture.sigmf") >>> recording = load_recording("capture.sigmf")
>>> # Original annotation spans multiple channels >>> # Original annotation spans multiple channels
>>> original = recording.annotations[0] >>> original = recording.annotations[0]
@ -369,8 +369,8 @@ def split_recording_annotations(
**Example**:: **Example**::
>>> from utils.io import load_recording >>> from ria.io import load_recording
>>> from utils.annotations import split_recording_annotations >>> from ria_toolkit_oss.annotations import split_recording_annotations
>>> recording = load_recording("capture.sigmf") >>> recording = load_recording("capture.sigmf")
>>> # Split all annotations >>> # Split all annotations
>>> split_rec = split_recording_annotations(recording) >>> split_rec = split_recording_annotations(recording)

View File

@ -1,6 +1,6 @@
import numpy as np import numpy as np
from utils.data import Recording from ria_toolkit_oss.datatypes import Recording
def qualify_slice_from_annotations(recording: Recording, slice_length: int): def qualify_slice_from_annotations(recording: Recording, slice_length: int):

View File

@ -1,8 +1,8 @@
import numpy as np import numpy as np
from scipy.signal import butter, lfilter from scipy.signal import butter, lfilter
from utils.data.annotation import Annotation from ria_toolkit_oss.datatypes.annotation import Annotation
from utils.data.recording import Recording from ria_toolkit_oss.datatypes.recording import Recording
def isolate_signal(recording: Recording, annotation: Annotation) -> Recording: def isolate_signal(recording: Recording, annotation: Annotation) -> Recording:

View File

@ -46,17 +46,17 @@ from typing import Optional
import numpy as np import numpy as np
from utils.data import Annotation, Recording from ria_toolkit_oss.datatypes import Annotation, Recording
def _find_ranges(indices, window_size): def _find_ranges(indices, max_gap):
""" """
Groups individual indices into continuous temporal ranges. Groups individual indices into continuous temporal ranges.
Args: Args:
indices: Array of indices where the signal exceeded a threshold. indices: Array of indices where the signal exceeded a threshold.
window_size: Maximum gap allowed between indices to consider them part max_gap: Maximum gap allowed between indices to consider them part
of the same range. of the same range.
Returns: Returns:
A list of (start, stop) tuples representing detected signal segments. A list of (start, stop) tuples representing detected signal segments.
@ -65,38 +65,127 @@ def _find_ranges(indices, window_size):
if len(indices) == 0: if len(indices) == 0:
return [] return []
start = indices[0]
prev = indices[0]
ranges = [] ranges = []
start = indices[0]
in_range = False
for i in range(1, len(indices)): for i in range(1, len(indices)):
# If the gap between current and previous index is within window_size, if indices[i] - prev > max_gap:
# keep the range alive. ranges.append((start, prev))
if indices[i] - indices[i - 1] <= window_size: start = indices[i]
if not in_range: prev = indices[i]
# Start a new range
start = indices[i - 1]
in_range = True
else:
# Gap is too large; close the current range if one was active.
if in_range:
ranges.append((start, indices[i - 1]))
in_range = False
# Ensure the final segment is captured if the loop ends while in_range. ranges.append((start, prev))
if in_range:
ranges.append((start, indices[-1]))
return ranges return ranges
def _expand_and_filter_ranges(
smoothed_power: np.ndarray,
initial_ranges: list[tuple[int, int]],
boundary_val: float,
min_duration_samples: int,
) -> list[tuple[int, int]]:
"""Apply hysteresis expansion and minimum-duration filtering."""
out: list[tuple[int, int]] = []
n = len(smoothed_power)
for start, stop in initial_ranges:
if (stop - start) < min_duration_samples:
continue
true_start = start
while true_start > 0 and smoothed_power[true_start] > boundary_val:
true_start -= 1
true_stop = stop
while true_stop < n - 1 and smoothed_power[true_stop] > boundary_val:
true_stop += 1
if (true_stop - true_start) >= min_duration_samples:
out.append((true_start, true_stop))
return out
def _merge_ranges(ranges: list[tuple[int, int]], max_gap: int) -> list[tuple[int, int]]:
"""Merge overlapping or near-adjacent ranges."""
if not ranges:
return []
ranges = sorted(ranges, key=lambda r: r[0])
merged = [ranges[0]]
for s, e in ranges[1:]:
last_s, last_e = merged[-1]
if s <= last_e + max_gap:
merged[-1] = (last_s, max(last_e, e))
else:
merged.append((s, e))
return merged
def _estimate_noise_floor(power: np.ndarray, quantile: float = 20.0) -> float:
"""Estimate baseline from the quieter portion of the envelope."""
return float(np.percentile(power, quantile))
def _estimate_group_gap(sample_rate: float) -> int:
"""Use a fixed temporal grouping gap instead of reusing the smoothing window."""
return max(1, int(0.001 * sample_rate))
def _estimate_spectral_bounds(signal_segment: np.ndarray, sample_rate: float) -> tuple[float, float]:
"""Estimate occupied bandwidth from a smoothed magnitude spectrum."""
if len(signal_segment) == 0:
return -sample_rate / 4, sample_rate / 4
window = np.hanning(len(signal_segment))
windowed = signal_segment * window
fft_data = np.abs(np.fft.fftshift(np.fft.fft(windowed)))
fft_freqs = np.fft.fftshift(np.fft.fftfreq(len(signal_segment), 1 / sample_rate))
# Smooth the spectrum so noise-like wideband bursts form a contiguous mask
# instead of thousands of tiny isolated runs.
spectral_smooth_bins = max(5, min(257, (len(signal_segment) // 512) | 1))
spectral_kernel = np.ones(spectral_smooth_bins, dtype=np.float64) / spectral_smooth_bins
smoothed_fft = np.convolve(fft_data, spectral_kernel, mode="same")
spectral_floor = float(np.percentile(smoothed_fft, 20))
spectral_peak = float(np.max(smoothed_fft))
spectral_ratio = spectral_peak / max(spectral_floor, 1e-12)
if spectral_ratio < 1.2:
return -sample_rate / 4, sample_rate / 4
spectral_thresh = spectral_floor + 0.1 * (spectral_peak - spectral_floor)
sig_indices = np.where(smoothed_fft > spectral_thresh)[0]
if len(sig_indices) == 0:
peak_idx = int(np.argmax(smoothed_fft))
bin_hz = sample_rate / len(signal_segment)
half_bins = max(1, int(np.ceil(10_000.0 / bin_hz)))
lo_idx = max(0, peak_idx - half_bins)
hi_idx = min(len(smoothed_fft) - 1, peak_idx + half_bins)
else:
runs = _find_ranges(sig_indices, max_gap=max(1, spectral_smooth_bins // 2))
peak_idx = int(np.argmax(smoothed_fft))
lo_idx, hi_idx = min(runs, key=lambda run: 0 if run[0] <= peak_idx <= run[1] else min(abs(run[0] - peak_idx), abs(run[1] - peak_idx)))
# Prevent extremely narrow tone boxes from collapsing to just a few bins.
min_total_bw_hz = 20_000.0
min_half_bins = max(1, int(np.ceil((min_total_bw_hz / 2) / (sample_rate / len(signal_segment)))))
center_idx = int(round((lo_idx + hi_idx) / 2))
lo_idx = max(0, min(lo_idx, center_idx - min_half_bins))
hi_idx = min(len(smoothed_fft) - 1, max(hi_idx, center_idx + min_half_bins))
return float(fft_freqs[lo_idx]), float(fft_freqs[hi_idx])
def threshold_qualifier( def threshold_qualifier(
recording: Recording, recording: Recording,
threshold: float, threshold: float,
window_size: Optional[int] = 1024, window_size: Optional[int] = None,
label: Optional[str] = None, label: Optional[str] = None,
annotation_type: Optional[str] = "standalone", annotation_type: Optional[str] = "standalone",
channel: int = 0,
) -> Recording: ) -> Recording:
""" """
Annotate a recording with bounding boxes for regions above a threshold. Annotate a recording with bounding boxes for regions above a threshold.
@ -114,78 +203,129 @@ def threshold_qualifier(
Args: Args:
recording: The Recording object containing IQ or real signal data. recording: The Recording object containing IQ or real signal data.
threshold: Sensitivity multiplier (0.0 to 1.0) applied to max power. threshold: Sensitivity multiplier (0.0 to 1.0) applied to max power.
window_size: Size of the smoothing filter and max gap for merging hits. window_size: Size of the smoothing filter in samples. Defaults to 1ms worth of samples.
label: Custom string label for annotations. label: Custom string label for annotations.
annotation_type: Metadata string for the 'type' field in the annotation. annotation_type: Metadata string for the 'type' field in the annotation.
channel: Index of the channel to annotate. Defaults to 0.
Returns: Returns:
A new Recording object populated with detected Annotations. A new Recording object populated with detected Annotations.
""" """
# Extract signal and metadata # Extract signal and metadata
sample_data = recording.data[0] sample_data = recording.data[channel]
sample_rate = recording.metadata["sample_rate"] sample_rate = recording.metadata["sample_rate"]
center_frequency = recording.metadata.get("center_frequency", 0) center_frequency = recording.metadata.get("center_frequency", 0)
if window_size is None:
window_size = max(64, int(sample_rate * 0.001))
# --- 1. SIGNAL CONDITIONING --- # --- 1. SIGNAL CONDITIONING ---
# Convert to power (Magnitude squared) # Convert to power (Magnitude squared)
power_data = np.abs(sample_data) ** 2 power_data = np.abs(sample_data) ** 2
smoothing_window = np.ones(window_size) / window_size smoothing_window = np.ones(window_size) / window_size
smoothed_power = np.convolve(power_data, smoothing_window, mode="same") smoothed_power = np.convolve(power_data, smoothing_window, mode="same")
group_gap_samples = _estimate_group_gap(sample_rate)
# Define thresholds based on the global peak of the smoothed signal # Define thresholds using peak relative to baseline.
max_power = np.max(smoothed_power) max_power = np.max(smoothed_power)
trigger_val = threshold * max_power # High threshold to trigger detection noise_floor = _estimate_noise_floor(smoothed_power)
boundary_val = (threshold / 2) * max_power # Low threshold to define signal edges dynamic_range_ratio = max_power / max(noise_floor, 1e-12)
# Soft early exit: keep a guard for low-contrast noise, but compute it from
# the quieter tail of the envelope so burst-heavy captures are not rejected.
if dynamic_range_ratio < 1.5:
return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations)
trigger_val = noise_floor + threshold * (max_power - noise_floor)
boundary_val = noise_floor + 0.5 * threshold * (max_power - noise_floor)
# --- 2. INITIAL DETECTION --- # --- 2. INITIAL DETECTION ---
# Identify indices that strictly exceed the high trigger # Enforce an explicit minimum duration in seconds; this is stable across
indices = np.where(smoothed_power > trigger_val)[0] # varying capture lengths and avoids over-fitting to recording length.
initial_ranges = _find_ranges(indices=indices, window_size=window_size) min_duration_samples = max(1, int(0.005 * sample_rate))
annotations = [] annotations = []
threshold_base = min(sample_rate, len(sample_data)) # Pass 1: Detect stronger bursts.
indices = np.where(smoothed_power > trigger_val)[0]
pass1_initial = _find_ranges(indices=indices, max_gap=group_gap_samples)
pass1_ranges = _expand_and_filter_ranges(
smoothed_power=smoothed_power,
initial_ranges=pass1_initial,
boundary_val=boundary_val,
min_duration_samples=min_duration_samples,
)
for start, stop in initial_ranges: # Pass 2: Recover weaker bursts on residual power not already covered.
if (stop - start) < (threshold_base * 0.01): # This improves recall in mixed-amplitude captures.
continue mask = np.ones_like(smoothed_power, dtype=np.float32)
for s, e in pass1_ranges:
mask[max(0, s) : min(len(mask), e)] = 0.0
residual_power = smoothed_power * mask
# --- 3. HYSTERESIS (Boundary Expansion) --- residual_max = float(np.max(residual_power))
# Search backward from 'start' until power drops below the low boundary_val residual_ratio = residual_max / max(noise_floor, 1e-12)
true_start = start
while true_start > 0 and smoothed_power[true_start] > boundary_val:
true_start -= 1
# Search forward from 'stop' until power drops below the low boundary_val pass2_ranges: list[tuple[int, int]] = []
true_stop = stop if residual_ratio >= 2.0:
while true_stop < len(smoothed_power) - 1 and smoothed_power[true_stop] > boundary_val: weak_threshold = max(0.3, threshold * 0.7)
true_stop += 1 weak_trigger = noise_floor + weak_threshold * (residual_max - noise_floor)
weak_boundary = noise_floor + 0.5 * weak_threshold * (residual_max - noise_floor)
weak_indices = np.where(residual_power > weak_trigger)[0]
pass2_initial = _find_ranges(indices=weak_indices, max_gap=group_gap_samples)
pass2_ranges = _expand_and_filter_ranges(
smoothed_power=smoothed_power,
initial_ranges=pass2_initial,
boundary_val=weak_boundary,
min_duration_samples=min_duration_samples,
)
# Pass 3: Detect sustained faint bursts via macro-window averaging.
# Targets bursts whose peak power is near the trigger level but whose
# *average* power is consistently elevated above the noise floor — these
# are missed by peak-based detection because only a few short spikes exceed
# the trigger, all too brief to pass the minimum-duration filter.
#
# The mask is applied to power_data *before* convolving so that bright
# burst energy does not bleed through the long window into adjacent regions,
# which would inflate macro_residual_max and push the trigger above the
# faint burst's average power.
macro_window_size = max(window_size * 16, int(sample_rate * 0.02))
macro_kernel = np.ones(macro_window_size, dtype=np.float64) / macro_window_size
# Expand each annotated range by half the macro window on both sides so that
# the long convolution cannot "see" the leading/trailing edges of already-
# annotated bursts, which would produce spurious short fragments in Pass 3.
macro_expand = macro_window_size * 2
masked_power_for_macro = power_data.copy()
n = len(masked_power_for_macro)
for s, e in pass1_ranges + pass2_ranges:
masked_power_for_macro[max(0, s - macro_expand) : min(n, e + macro_expand)] = 0.0
macro_residual = np.convolve(masked_power_for_macro, macro_kernel, mode="same")
macro_residual_max = float(np.max(macro_residual))
pass3_ranges: list[tuple[int, int]] = []
if macro_residual_max / max(noise_floor, 1e-12) >= 1.3:
macro_trigger = noise_floor + threshold * (macro_residual_max - noise_floor)
macro_boundary = noise_floor + 0.5 * threshold * (macro_residual_max - noise_floor)
macro_indices = np.where(macro_residual > macro_trigger)[0]
macro_initial = _find_ranges(indices=macro_indices, max_gap=group_gap_samples)
pass3_ranges = _expand_and_filter_ranges(
smoothed_power=macro_residual,
initial_ranges=macro_initial,
boundary_val=macro_boundary,
min_duration_samples=min_duration_samples,
)
all_ranges = _merge_ranges(pass1_ranges + pass2_ranges + pass3_ranges, max_gap=group_gap_samples)
for true_start, true_stop in all_ranges:
# --- 4. SPECTRAL ANALYSIS (Frequency Detection) --- # --- 4. SPECTRAL ANALYSIS (Frequency Detection) ---
signal_segment = sample_data[true_start:true_stop] signal_segment = sample_data[true_start:true_stop]
if len(signal_segment) > 0: f_min, f_max = _estimate_spectral_bounds(signal_segment, sample_rate)
fft_data = np.abs(np.fft.fftshift(np.fft.fft(signal_segment)))
fft_freqs = np.fft.fftshift(np.fft.fftfreq(len(signal_segment), 1 / sample_rate))
# Determine frequency bounds where spectral energy is > 15% of segment peak
spectral_thresh = np.max(fft_data) * 0.15
sig_indices = np.where(fft_data > spectral_thresh)[0]
# Ensure the signal has some spectral width before annotating
if len(sig_indices) < 5:
continue
if len(sig_indices) > 0:
f_min, f_max = fft_freqs[sig_indices[0]], fft_freqs[sig_indices[-1]]
else:
# Default to middle half of bandwidth if no clear peaks found
f_min, f_max = -sample_rate / 4, sample_rate / 4
else:
f_min, f_max = -sample_rate / 4, sample_rate / 4
# --- 5. ANNOTATION GENERATION --- # --- 5. ANNOTATION GENERATION ---
if label is None: ann_label = label if label is not None else f"{int(threshold*100)}%"
label = f"{int(threshold*100)}%"
# Pack metadata for the UI/Downstream processing # Pack metadata for the UI/Downstream processing
comment_data = { comment_data = {
@ -202,7 +342,7 @@ def threshold_qualifier(
sample_count=true_stop - true_start, sample_count=true_stop - true_start,
freq_lower_edge=center_frequency + f_min, freq_lower_edge=center_frequency + f_min,
freq_upper_edge=center_frequency + f_max, freq_upper_edge=center_frequency + f_max,
label=label, label=ann_label,
comment=json.dumps(comment_data), comment=json.dumps(comment_data),
detail={"generator": "hysteresis_qualifier"}, detail={"generator": "hysteresis_qualifier"},
) )

View File

@ -1,8 +0,0 @@
"""
The Data package contains abstract data types tailored for radio machine learning, such as ``Recording``, as well
as the abstract interfaces for the radio dataset and radio dataset builder framework.
"""
__all__ = ["Annotation", "Recording"]
from .annotation import Annotation
from .recording import Recording

View File

@ -1,128 +0,0 @@
from __future__ import annotations
import json
from typing import Any, Optional
from sigmf import SigMFFile
class Annotation:
"""Signal annotations are labels or additional information associated with specific data points or segments within
a signal. These annotations could be used for tasks like supervised learning, where the goal is to train a model
to recognize patterns or characteristics in the signal associated with these annotations.
Annotations can be used to label interesting points in your recording.
:param sample_start: The index of the starting sample of the annotation.
:type sample_start: int
:param sample_count: The index of the ending sample of the annotation, inclusive.
:type sample_count: int
:param freq_lower_edge: The lower frequency of the annotation.
:type freq_lower_edge: float
:param freq_upper_edge: The upper frequency of the annotation.
:type freq_upper_edge: float
:param label: The label that will be displayed with the bounding box in compatible viewers including IQEngine.
Defaults to an emtpy string.
:type label: str, optional
:param comment: A human-readable comment. Defaults to an empty string.
:type comment: str, optional
:param detail: A dictionary of user defined annotation-specific metadata. Defaults to None.
:type detail: dict, optional
"""
def __init__(
self,
sample_start: int,
sample_count: int,
freq_lower_edge: float,
freq_upper_edge: float,
label: Optional[str] = "",
comment: Optional[str] = "",
detail: Optional[dict] = None,
):
"""Initialize a new Annotation instance."""
self.sample_start = int(sample_start)
self.sample_count = int(sample_count)
self.freq_lower_edge = float(freq_lower_edge)
self.freq_upper_edge = float(freq_upper_edge)
self.label = str(label)
self.comment = str(comment)
if detail is None:
self.detail = {}
elif not _is_jsonable(detail):
raise ValueError(f"Detail object is not json serializable: {detail}")
else:
self.detail = detail
def is_valid(self) -> bool:
"""
Check that the annotation sample count is > 0 and the freq_lower_edge<freq_upper_edge.
:returns: True if valid, False if not.
"""
return self.sample_count > 0 and self.freq_lower_edge < self.freq_upper_edge
def overlap(self, other):
"""
Quantify how much the bounding box in this annotation overlaps with another annotation.
:param other: The other annotation.
:type other: Annotation
:returns: The area of the overlap in samples*frequency, or 0 if they do not overlap."""
sample_overlap_start = max(self.sample_start, other.sample_start)
sample_overlap_end = min(self.sample_start + self.sample_count, other.sample_start + other.sample_count)
freq_overlap_start = max(self.freq_lower_edge, other.freq_lower_edge)
freq_overlap_end = min(self.freq_upper_edge, other.freq_upper_edge)
if freq_overlap_start >= freq_overlap_end or sample_overlap_start >= sample_overlap_end:
return 0
else:
return (sample_overlap_end - sample_overlap_start) * (freq_overlap_end - freq_overlap_start)
def area(self):
"""
The 'area' of the bounding box, samples*frequency.
Useful to quantify annotation size.
:returns: sample length multiplied by bandwidth."""
return self.sample_count * (self.freq_upper_edge - self.freq_lower_edge)
def __eq__(self, other: Annotation) -> bool:
return self.__dict__ == other.__dict__
def to_sigmf_format(self):
"""
Returns a JSON dictionary representing this annotation formatted to be saved in a .sigmf-meta file.
"""
annotation_dict = {SigMFFile.START_INDEX_KEY: self.sample_start, SigMFFile.LENGTH_INDEX_KEY: self.sample_count}
annotation_dict["metadata"] = {
SigMFFile.LABEL_KEY: self.label,
SigMFFile.COMMENT_KEY: self.comment,
SigMFFile.FHI_KEY: self.freq_upper_edge,
SigMFFile.FLO_KEY: self.freq_lower_edge,
"ria:detail": self.detail,
}
if _is_jsonable(annotation_dict):
return annotation_dict
else:
raise ValueError("Annotation dictionary was not json serializable.")
def _is_jsonable(x: Any) -> bool:
"""
:return: True if x is JSON serializable, False otherwise.
"""
try:
json.dumps(x)
return True
except (TypeError, OverflowError):
return False

View File

@ -1,853 +0,0 @@
from __future__ import annotations
import copy
import hashlib
import json
import os
import re
import time
import warnings
from typing import Any, Iterator, Optional
import numpy as np
from numpy.typing import ArrayLike
from utils.data.annotation import Annotation
PROTECTED_KEYS = ["rec_id", "timestamp"]
class Recording:
"""Tape of complex IQ (in-phase and quadrature) samples with associated metadata and annotations.
Recording data is a complex array of shape C x N, where C is the number of channels
and N is the number of samples in each channel.
Metadata is stored in a dictionary of key value pairs,
to include information such as sample_rate and center_frequency.
Annotations are a list of :ref:`Annotation <utils.data.Annotation>`,
defining bounding boxes in time and frequency with labels and metadata.
Here, signal data is represented as a NumPy array. This class is then extended in the RIA Backends to provide
support for different data structures, such as Tensors.
Recordings are long-form tapes can be obtained either from a software-defined radio (SDR) or generated
synthetically. Then, machine learning datasets are curated from collection of recordings by segmenting these
longer-form tapes into shorter units called slices.
All recordings are assigned a unique 64-character recording ID, ``rec_id``. If this field is missing from the
provided metadata, a new ID will be generated upon object instantiation.
:param data: Signal data as a tape IQ samples, either C x N complex, where C is the number of
channels and N is number of samples in the signal. If data is a one-dimensional array of complex samples with
length N, it will be reshaped to a two-dimensional array with dimensions 1 x N.
:type data: array_like
:param metadata: Additional information associated with the recording.
:type metadata: dict, optional
:param annotations: A collection of ``Annotation`` objects defining bounding boxes.
:type annotations: list of Annotations, optional
:param dtype: Explicitly specify the data-type of the complex samples. Must be a complex NumPy type, such as
``np.complex64`` or ``np.complex128``. Default is None, in which case the type is determined implicitly. If
``data`` is a NumPy array, the Recording will use the dtype of ``data`` directly without any conversion.
:type dtype: numpy dtype object, optional
:param timestamp: The timestamp when the recording data was generated. If provided, it should be a float or integer
representing the time in seconds since epoch (e.g., ``time.time()``). Only used if the `timestamp` field is not
present in the provided metadata.
:type dtype: float or int, optional
:raises ValueError: If data is not complex 1xN or CxN.
:raises ValueError: If metadata is not a python dict.
:raises ValueError: If metadata is not json serializable.
:raises ValueError: If annotations is not a list of valid annotation objects.
**Examples:**
>>> import numpy
>>> from utils.data import Recording, Annotation
>>> # Create an array of complex samples, just 1s in this case.
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
>>> # Create a dictionary of relevant metadata.
>>> sample_rate = 1e6
>>> center_frequency = 2.44e9
>>> metadata = {
... "sample_rate": sample_rate,
... "center_frequency": center_frequency,
... "author": "me",
... }
>>> # Create an annotation for the annotations list.
>>> annotations = [
... Annotation(
... sample_start=0,
... sample_count=1000,
... freq_lower_edge=center_frequency - (sample_rate / 2),
... freq_upper_edge=center_frequency + (sample_rate / 2),
... label="example",
... )
... ]
>>> # Store samples, metadata, and annotations together in a convenient object.
>>> recording = Recording(data=samples, metadata=metadata, annotations=annotations)
>>> print(recording.metadata)
{'sample_rate': 1000000.0, 'center_frequency': 2440000000.0, 'author': 'me'}
>>> print(recording.annotations[0].label)
'example'
"""
def __init__( # noqa C901
self,
data: ArrayLike | list[list],
metadata: Optional[dict[str, any]] = None,
dtype: Optional[np.dtype] = None,
timestamp: Optional[float | int] = None,
annotations: Optional[list[Annotation]] = None,
):
data_arr = np.asarray(data)
if np.iscomplexobj(data_arr):
# Expect C x N
if data_arr.ndim == 1:
self._data = np.expand_dims(data_arr, axis=0) # N -> 1 x N
elif data_arr.ndim == 2:
self._data = data_arr
else:
raise ValueError("Complex data must be C x N.")
else:
raise ValueError("Input data must be complex.")
if dtype is not None:
self._data = self._data.astype(dtype)
assert np.iscomplexobj(self._data)
if metadata is None:
self._metadata = {}
elif isinstance(metadata, dict):
self._metadata = metadata
else:
raise ValueError(f"Metadata must be a python dict, but was {type(metadata)}.")
if not _is_jsonable(metadata):
raise ValueError("Value must be JSON serializable.")
if "timestamp" not in self.metadata:
if timestamp is not None:
if not isinstance(timestamp, (int, float)):
raise ValueError(f"timestamp must be int or float, not {type(timestamp)}")
self._metadata["timestamp"] = timestamp
else:
self._metadata["timestamp"] = time.time()
else:
if not isinstance(self._metadata["timestamp"], (int, float)):
raise ValueError("timestamp must be int or float, not ", type(self._metadata["timestamp"]))
if "rec_id" not in self.metadata:
self._metadata["rec_id"] = generate_recording_id(data=self.data, timestamp=self._metadata["timestamp"])
if annotations is None:
self._annotations = []
elif isinstance(annotations, list):
self._annotations = annotations
else:
raise ValueError("Annotations must be a list or None.")
if not all(isinstance(annotation, Annotation) for annotation in self._annotations):
raise ValueError("All elements in self._annotations must be of type Annotation.")
self._index = 0
@property
def data(self) -> np.ndarray:
"""
:return: Recording data, as a complex array.
:type: np.ndarray
.. note::
For recordings with more than 1,024 samples, this property returns a read-only view of the data.
.. note::
To access specific samples, consider indexing the object directly with ``rec[c, n]``.
"""
if self._data.size > 1024:
# Returning a read-only view prevents mutation at a distance while maintaining performance.
v = self._data.view()
v.setflags(write=False)
return v
else:
return self._data.copy()
@property
def metadata(self) -> dict:
"""
:return: Dictionary of recording metadata.
:type: dict
"""
return self._metadata.copy()
@property
def annotations(self) -> list[Annotation]:
"""
:return: List of recording annotations
:type: list of Annotation objects
"""
return self._annotations.copy()
@property
def shape(self) -> tuple[int]:
"""
:return: The shape of the data array.
:type: tuple of ints
"""
return np.shape(self.data)
@property
def n_chan(self) -> int:
"""
:return: The number of channels in the recording.
:type: int
"""
return self.shape[0]
@property
def rec_id(self) -> str:
"""
:return: Recording ID.
:type: str
"""
return self.metadata["rec_id"]
@property
def dtype(self) -> str:
"""
:return: Data-type of the data array's elements.
:type: numpy dtype object
"""
return self.data.dtype
@property
def timestamp(self) -> float | int:
"""
:return: Recording timestamp (time in seconds since epoch).
:type: float or int
"""
return self.metadata["timestamp"]
@property
def sample_rate(self) -> float | None:
"""
:return: Sample rate of the recording, or None if 'sample_rate' is not in metadata.
:type: str
"""
return self.metadata.get("sample_rate")
@sample_rate.setter
def sample_rate(self, sample_rate: float | int) -> None:
"""Set the sample rate of the recording.
:param sample_rate: The sample rate of the recording.
:type sample_rate: float or int
:return: None
"""
self.add_to_metadata(key="sample_rate", value=sample_rate)
def astype(self, dtype: np.dtype) -> Recording:
"""Copy of the recording, data cast to a specified type.
.. todo: This method is not yet implemented.
:param dtype: Data-type to which the array is cast. Must be a complex scalar type, such as ``np.complex64`` or
``np.complex128``.
:type dtype: NumPy data type, optional
.. note: Casting to a data type with less precision can risk losing data by truncating or rounding values,
potentially resulting in a loss of accuracy and significant information.
:return: A new recording with the same metadata and data, with dtype.
TODO: Add example usage.
"""
# Rather than check for a valid datatype, let's cast and check the result. This makes it easier to provide
# cross-platform support where the types are aliased across platforms.
with warnings.catch_warnings():
warnings.simplefilter("ignore") # Casting may generate user warnings. E.g., complex -> real
data = self.data.astype(dtype)
if np.iscomplexobj(data):
return Recording(data=data, metadata=self.metadata, annotations=self.annotations)
else:
raise ValueError("dtype must be a complex number scalar type.")
def add_to_metadata(self, key: str, value: Any) -> None:
"""Add a new key-value pair to the recording metadata.
:param key: New metadata key, must be snake_case.
:type key: str
:param value: Corresponding metadata value.
:type value: any
:raises ValueError: If key is already in metadata or if key is not a valid metadata key.
:raises ValueError: If value is not JSON serializable.
:return: None.
**Examples:**
Create a recording and add metadata:
>>> import numpy
>>> from utils.data import Recording
>>>
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
>>> metadata = {
>>> "sample_rate": 1e6,
>>> "center_frequency": 2.44e9,
>>> }
>>>
>>> recording = Recording(data=samples, metadata=metadata)
>>> print(recording.metadata)
{'sample_rate': 1000000.0,
'center_frequency': 2440000000.0,
'timestamp': 17369...,
'rec_id': 'fda0f41...'}
>>>
>>> recording.add_to_metadata(key="author", value="me")
>>> print(recording.metadata)
{'sample_rate': 1000000.0,
'center_frequency': 2440000000.0,
'author': 'me',
'timestamp': 17369...,
'rec_id': 'fda0f41...'}
"""
if key in self.metadata:
raise ValueError(
f"Key {key} already in metadata. Use Recording.update_metadata() to modify existing fields."
)
if not _is_valid_metadata_key(key):
raise ValueError(f"Invalid metadata key: {key}.")
if not _is_jsonable(value):
raise ValueError("Value must be JSON serializable.")
self._metadata[key] = value
def update_metadata(self, key: str, value: Any) -> None:
"""Update the value of an existing metadata key,
or add the key value pair if it does not already exist.
:param key: Existing metadata key.
:type key: str
:param value: New value to enter at key.
:type value: any
:raises ValueError: If value is not JSON serializable
:raises ValueError: If key is protected.
:return: None.
**Examples:**
Create a recording and update metadata:
>>> import numpy
>>> from utils.data import Recording
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
>>> metadata = {
>>> "sample_rate": 1e6,
>>> "center_frequency": 2.44e9,
>>> "author": "me"
>>> }
>>> recording = Recording(data=samples, metadata=metadata)
>>> print(recording.metadata)
{'sample_rate': 1000000.0,
'center_frequency': 2440000000.0,
'author': "me",
'timestamp': 17369...
'rec_id': 'fda0f41...'}
>>> recording.update_metadata(key="author", value=you")
>>> print(recording.metadata)
{'sample_rate': 1000000.0,
'center_frequency': 2440000000.0,
'author': "you",
'timestamp': 17369...
'rec_id': 'fda0f41...'}
"""
if key not in self.metadata:
self.add_to_metadata(key=key, value=value)
if not _is_jsonable(value):
raise ValueError("Value must be JSON serializable.")
if key in PROTECTED_KEYS: # Check protected keys.
raise ValueError(f"Key {key} is protected and cannot be modified or removed.")
else:
self._metadata[key] = value
def remove_from_metadata(self, key: str):
"""
Remove a key from the recording metadata.
Does not remove key if it is protected.
:param key: The key to remove.
:type key: str
:raises ValueError: If key is protected.
:return: None.
**Examples:**
Create a recording and add metadata:
>>> import numpy
>>> from utils.data import Recording
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
>>> metadata = {
... "sample_rate": 1e6,
... "center_frequency": 2.44e9,
... }
>>> recording = Recording(data=samples, metadata=metadata)
>>> print(recording.metadata)
{'sample_rate': 1000000.0,
'center_frequency': 2440000000.0,
'timestamp': 17369..., # Example value
'rec_id': 'fda0f41...'} # Example value
>>> recording.add_to_metadata(key="author", value="me")
>>> print(recording.metadata)
{'sample_rate': 1000000.0,
'center_frequency': 2440000000.0,
'author': 'me',
'timestamp': 17369..., # Example value
'rec_id': 'fda0f41...'} # Example value
"""
if key not in PROTECTED_KEYS:
self._metadata.pop(key)
else:
raise ValueError(f"Key {key} is protected and cannot be modified or removed.")
def view(self, output_path: Optional[str] = "images/signal.png", **kwargs) -> None:
"""Create a plot of various signal visualizations as a PNG image.
:param output_path: The output image path. Defaults to "images/signal.png".
:type output_path: str, optional
:param kwargs: Keyword arguments passed on to utils.view.view_sig.
:type: dict of keyword arguments
**Examples:**
Create a recording and view it as a plot in a .png image:
>>> import numpy
>>> from utils.data import Recording
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
>>> metadata = {
>>> "sample_rate": 1e6,
>>> "center_frequency": 2.44e9,
>>> }
>>> recording = Recording(data=samples, metadata=metadata)
>>> recording.view()
"""
from utils.view import view_sig
view_sig(recording=self, output_path=output_path, **kwargs)
def simple_view(self, **kwargs) -> None:
"""Create a plot of various signal visualizations as a PNG or SVG image.
:param kwargs: Keyword arguments passed on to utils.view.view_signal_simple.create_plots.
:type: dict of keyword arguments
**Examples:**
Create a recording and view it as a plot in a .png image:
>>> import numpy
>>> from utils.data import Recording
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
>>> metadata = {
>>> "sample_rate": 1e6,
>>> "center_frequency": 2.44e9,
>>> }
>>> recording = Recording(data=samples, metadata=metadata)
>>> recording.simple_view()
"""
from utils.view.view_signal_simple import view_simple_sig
view_simple_sig(recording=self, **kwargs)
def to_sigmf(
self, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None, overwrite: bool = False
) -> None:
"""Write recording to a set of SigMF files.
The SigMF io format is defined by the `SigMF Specification Project <https://github.com/sigmf/SigMF>`_
:param recording: The recording to be written to file.
:type recording: utils.data.Recording
:param filename: The name of the file where the recording is to be saved. Defaults to auto generated filename.
:type filename: os.PathLike or str, optional
:param path: The directory path to where the recording is to be saved. Defaults to recordings/.
:type path: os.PathLike or str, optional
:raises IOError: If there is an issue encountered during the file writing process.
:return: None
**Examples:**
Create a recording and view it as a plot in a `.png` image:
>>> import numpy
>>> from utils.data import Recording
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
>>> metadata = {
... "sample_rate": 1e6,
... "center_frequency": 2.44e9,
... }
>>> recording = Recording(data=samples, metadata=metadata)
>>> recording.view()
"""
from utils.io.recording import to_sigmf
to_sigmf(filename=filename, path=path, recording=self, overwrite=overwrite)
def to_npy(
self, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None, overwrite: bool = False
) -> str:
"""Write recording to ``.npy`` binary file.
:param filename: The name of the file where the recording is to be saved. Defaults to auto generated filename.
:type filename: os.PathLike or str, optional
:param path: The directory path to where the recording is to be saved. Defaults to recordings/.
:type path: os.PathLike or str, optional
:raises IOError: If there is an issue encountered during the file writing process.
:return: Path where the file was saved.
:rtype: str
**Examples:**
Create a recording and save it to a .npy file:
>>> import numpy
>>> from utils.data import Recording
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
>>> metadata = {
>>> "sample_rate": 1e6,
>>> "center_frequency": 2.44e9,
>>> }
>>> recording = Recording(data=samples, metadata=metadata)
>>> recording.to_npy()
"""
from utils.io.recording import to_npy
to_npy(recording=self, filename=filename, path=path, overwrite=overwrite)
def to_wav(
self,
filename: Optional[str] = None,
path: Optional[os.PathLike | str] = None,
target_sample_rate: Optional[int] = 48000,
bits_per_sample: int = 32,
overwrite: bool = False,
) -> str:
"""Write recording to WAV file with embedded YAML metadata.
WAV format uses stereo audio with I (in-phase) in left channel and Q (quadrature) in right channel.
Metadata is stored in standard LIST INFO chunks with RF-specific metadata encoded as YAML
in the ICMT (comment) field for human readability.
:param filename: The name of the file where the recording is to be saved. Defaults to auto generated filename.
:type filename: os.PathLike or str, optional
:param path: The directory path to where the recording is to be saved. Defaults to recordings/.
:type path: os.PathLike or str, optional
:param target_sample_rate: Sample rate stored in the WAV header when no sample_rate metadata
is present. IQ samples are written without decimation or interpolation. Default is 48000 Hz.
:type target_sample_rate: int, optional
:param bits_per_sample: Bits per sample (32 for float32, 16 for int16). Default is 32.
:type bits_per_sample: int, optional
:param overwrite: Whether to overwrite existing files. Default is False.
:type overwrite: bool, optional
:raises IOError: If there is an issue encountered during the file writing process.
:return: Path where the file was saved.
:rtype: str
**Examples:**
Create a recording and save it to a .wav file:
>>> import numpy
>>> from utils.data import Recording
>>> samples = numpy.exp(1j * 2 * numpy.pi * 0.1 * numpy.arange(10000))
>>> metadata = {"sample_rate": 1e6, "center_frequency": 915e6}
>>> recording = Recording(data=samples, metadata=metadata)
>>> recording.to_wav()
"""
from utils.io.recording import to_wav
return to_wav(
recording=self,
filename=filename,
path=path,
target_sample_rate=target_sample_rate,
bits_per_sample=bits_per_sample,
overwrite=overwrite,
)
def to_blue(
self,
filename: Optional[str] = None,
path: Optional[os.PathLike | str] = None,
data_format: str = "CI",
overwrite: bool = False,
) -> str:
"""Write recording to MIDAS Blue file format.
MIDAS Blue is a legacy RF file format with a 512-byte binary header.
Commonly used with X-Midas and other RF/radar signal processing tools.
:param filename: The name of the file where the recording is to be saved. Defaults to auto generated filename.
:type filename: os.PathLike or str, optional
:param path: The directory path to where the recording is to be saved. Defaults to recordings/.
:type path: os.PathLike or str, optional
:param data_format: Format code (default 'CI' = complex int16).
Common formats: 'CI' (complex int16), 'CF' (complex float32), 'CD' (complex float64).
Integer formats require the IQ samples to already be scaled within [-1, 1).
:type data_format: str, optional
:param overwrite: Whether to overwrite existing files. Default is False.
:type overwrite: bool, optional
:raises IOError: If there is an issue encountered during the file writing process.
:return: Path where the file was saved.
:rtype: str
**Examples:**
Create a recording and save it to a .blue file:
>>> import numpy
>>> from utils.data import Recording
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
>>> metadata = {"sample_rate": 1e6, "center_frequency": 2.44e9}
>>> recording = Recording(data=samples, metadata=metadata)
>>> recording.to_blue()
"""
from utils.io.recording import to_blue
return to_blue(recording=self, filename=filename, path=path, data_format=data_format, overwrite=overwrite)
def trim(self, num_samples: int, start_sample: Optional[int] = 0) -> Recording:
"""Trim Recording samples to a desired length, shifting annotations to maintain alignment.
:param start_sample: The start index of the desired trimmed recording. Defaults to 0.
:type start_sample: int, optional
:param num_samples: The number of samples that the output trimmed recording will have.
:type num_samples: int
:raises IndexError: If start_sample + num_samples is greater than the length of the recording.
:raises IndexError: If sample_start < 0 or num_samples < 0.
:return: The trimmed Recording.
:rtype: Recording
**Examples:**
Create a recording and trim it:
>>> import numpy
>>> from utils.data import Recording
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
>>> metadata = {
... "sample_rate": 1e6,
... "center_frequency": 2.44e9,
... }
>>> recording = Recording(data=samples, metadata=metadata)
>>> print(len(recording))
10000
>>> trimmed_recording = recording.trim(start_sample=1000, num_samples=1000)
>>> print(len(trimmed_recording))
1000
"""
if start_sample < 0:
raise IndexError("start_sample cannot be < 0.")
elif start_sample + num_samples > len(self):
raise IndexError(
f"start_sample {start_sample} + num_samples {num_samples} > recording length {len(self)}."
)
end_sample = start_sample + num_samples
data = self.data[:, start_sample:end_sample]
new_annotations = copy.deepcopy(self.annotations)
for annotation in new_annotations:
# trim annotation if it goes outside the trim boundaries
if annotation.sample_start < start_sample:
annotation.sample_count = annotation.sample_count - (start_sample - annotation.sample_start)
annotation.sample_start = start_sample
if annotation.sample_start + annotation.sample_count > end_sample:
annotation.sample_count = end_sample - annotation.sample_start
# shift annotation to align with the new start point
annotation.sample_start = annotation.sample_start - start_sample
return Recording(data=data, metadata=self.metadata, annotations=new_annotations)
def normalize(self) -> Recording:
"""Scale the recording data, relative to its maximum value, so that the magnitude of the maximum sample is 1.
:return: Recording where the maximum sample amplitude is 1.
:rtype: Recording
**Examples:**
Create a recording with maximum amplitude 0.5 and normalize to a maximum amplitude of 1:
>>> import numpy
>>> from utils.data import Recording
>>> samples = numpy.ones(10000, dtype=numpy.complex64) * 0.5
>>> metadata = {
... "sample_rate": 1e6,
... "center_frequency": 2.44e9,
... }
>>> recording = Recording(data=samples, metadata=metadata)
>>> print(numpy.max(numpy.abs(recording.data)))
0.5
>>> normalized_recording = recording.normalize()
>>> print(numpy.max(numpy.abs(normalized_recording.data)))
1
"""
scaled_data = self.data / np.max(abs(self.data))
return Recording(data=scaled_data, metadata=self.metadata, annotations=self.annotations)
def __len__(self) -> int:
"""The length of a recording is defined by the number of complex samples in each channel of the recording."""
return self.shape[1]
def __eq__(self, other: Recording) -> bool:
"""Two Recordings are equal if all data, metadata, and annotations are the same."""
# counter used to allow for differently ordered annotation lists
return (
np.array_equal(self.data, other.data)
and self.metadata == other.metadata
and self.annotations == other.annotations
)
def __ne__(self, other: Recording) -> bool:
"""Two Recordings are equal if all data, and metadata, and annotations are the same."""
return not self.__eq__(other=other)
def __iter__(self) -> Iterator:
self._index = 0
return self
def __next__(self) -> np.ndarray:
if self._index < self.n_chan:
to_ret = self.data[self._index]
self._index += 1
return to_ret
else:
raise StopIteration
def __getitem__(self, key: int | tuple[int] | slice) -> np.ndarray | np.complexfloating:
"""If key is an integer, tuple of integers, or a slice, return the corresponding samples.
For arrays with 1,024 or fewer samples, return a copy of the recording data. For larger arrays, return a
read-only view. This prevents mutation at a distance while maintaining performance.
"""
if isinstance(key, (int, tuple, slice)):
v = self._data[key]
if isinstance(v, np.complexfloating):
return v
elif v.size > 1024:
v.setflags(write=False) # Make view read-only.
return v
else:
return v.copy()
else:
raise ValueError(f"Key must be an integer, tuple, or slice but was {type(key)}.")
def __setitem__(self, *args, **kwargs) -> None:
"""Raise an error if an attempt is made to assign to the recording."""
raise ValueError("Assignment to Recording is not allowed.")
def generate_recording_id(data: np.ndarray, timestamp: Optional[float | int] = None) -> str:
"""Generate unique 64-character recording ID. The recording ID is generated by hashing the recording data with
the datetime that the recording data was generated. If no datatime is provided, the current datatime is used.
:param data: Tape of IQ samples, as a NumPy array.
:type data: np.ndarray
:param timestamp: Unix timestamp in seconds. Defaults to None.
:type timestamp: float or int, optional
:return: 256-character hash, to be used as the recording ID.
:rtype: str
"""
if timestamp is None:
timestamp = time.time()
byte_sequence = data.tobytes() + str(timestamp).encode("utf-8")
sha256_hash = hashlib.sha256(byte_sequence)
return sha256_hash.hexdigest()
def _is_jsonable(x: Any) -> bool:
"""
:return: True if x is JSON serializable, False otherwise.
"""
try:
json.dumps(x)
return True
except (TypeError, OverflowError):
return False
def _is_valid_metadata_key(key: Any) -> bool:
"""
:return: True if key is a valid metadata key, False otherwise.
"""
if isinstance(key, str) and key.islower() and re.match(pattern=r"^[a-z_]+$", string=key) is not None:
return True
else:
return False

View File

@ -601,7 +601,7 @@ class Recording:
>>> recording = Recording(data=samples, metadata=metadata) >>> recording = Recording(data=samples, metadata=metadata)
>>> recording.to_wav() >>> recording.to_wav()
""" """
from utils.io.recording import to_wav from ria_toolkit_oss.io.recording import to_wav
return to_wav( return to_wav(
recording=self, recording=self,
@ -651,7 +651,7 @@ class Recording:
>>> recording = Recording(data=samples, metadata=metadata) >>> recording = Recording(data=samples, metadata=metadata)
>>> recording.to_blue() >>> recording.to_blue()
""" """
from utils.io.recording import to_blue from ria_toolkit_oss.io.recording import to_blue
return to_blue(recording=self, filename=filename, path=path, data_format=data_format, overwrite=overwrite) return to_blue(recording=self, filename=filename, path=path, data_format=data_format, overwrite=overwrite)

View File

@ -134,6 +134,27 @@ def from_npy(file: os.PathLike | str, legacy: bool = False) -> Recording:
annotations = list(np.load(f, allow_pickle=True)) annotations = list(np.load(f, allow_pickle=True))
except EOFError: except EOFError:
annotations = [] annotations = []
except ModuleNotFoundError:
# File was pickled with utils.data.Annotation — remap to ria_toolkit_oss
import pickle
import sys
import types
import ria_toolkit_oss.datatypes.annotation as _ann_mod
utils_shim = types.ModuleType("utils")
utils_data = types.ModuleType("utils.data")
utils_data_annotation = types.ModuleType("utils.data.annotation")
utils_data_annotation.Annotation = _ann_mod.Annotation
utils_shim.data = utils_data
utils_data.annotation = utils_data_annotation
sys.modules.setdefault("utils", utils_shim)
sys.modules.setdefault("utils.data", utils_data)
sys.modules.setdefault("utils.data.annotation", utils_data_annotation)
f.seek(0)
np.load(f, allow_pickle=True) # skip data
np.load(f, allow_pickle=True) # skip metadata
annotations = list(np.load(f, allow_pickle=True))
recording = Recording(data=data, metadata=metadata, annotations=annotations) recording = Recording(data=data, metadata=metadata, annotations=annotations)
return recording return recording

Binary file not shown.

Before

Width:  |  Height:  |  Size: 90 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 19 KiB

View File

@ -4,16 +4,21 @@ import textwrap
from typing import Optional from typing import Optional
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from matplotlib.patches import Patch
import numpy as np import numpy as np
from matplotlib import gridspec from matplotlib import gridspec
from matplotlib.patches import Patch
from PIL import Image from PIL import Image
from scipy.fft import fft, fftshift from scipy.fft import fft, fftshift
from scipy.signal import spectrogram from scipy.signal import spectrogram
from scipy.signal.windows import hann from scipy.signal.windows import hann
from utils.data.recording import Recording from ria_toolkit_oss.datatypes.recording import Recording
from utils.view.tools import COLORS, decimate, extract_metadata_fields, set_path from ria_toolkit_oss.view.tools import (
COLORS,
decimate,
extract_metadata_fields,
set_path,
)
def get_fft_size(plot_length): def get_fft_size(plot_length):
@ -57,15 +62,15 @@ def view_annotations(
sample_rate, center_frequency, _ = extract_metadata_fields(recording.metadata) sample_rate, center_frequency, _ = extract_metadata_fields(recording.metadata)
annotations = recording.annotations annotations = recording.annotations
# 2. Setup Color Mapping (No more hardcoded yellow fallback!) # 2. Setup Color Mapping
# available_colors = [ available_colors = [
# COLORS.get("magenta", "magenta"), COLORS.get("magenta", "magenta"),
# COLORS.get("accent", "cyan"), COLORS.get("accent", "cyan"),
# COLORS.get("light", "white"), COLORS.get("light", "white"),
# "lime", "lime",
# ] ]
palette = ["#FF00FF", "#00FF00", "#00FFFF", "#FFFF00", "#FF8000"] palette = ["#2196F3", "#9C27B0", "#64B5F6", "#7B1FA2", "#5C6BC0", "#CE93D8", "#1565C0", "#7C4DFF"]
unique_labels = sorted(list(set(ann.label for ann in annotations if ann.label))) unique_labels = sorted(list(set(ann.label for ann in annotations if ann.label)))
label_to_color = {label: palette[i % len(palette)] for i, label in enumerate(unique_labels)} label_to_color = {label: palette[i % len(palette)] for i, label in enumerate(unique_labels)}
@ -74,18 +79,21 @@ def view_annotations(
complex_signal, NFFT=256, Fs=sample_rate, Fc=center_frequency, noverlap=128, cmap="twilight" complex_signal, NFFT=256, Fs=sample_rate, Fc=center_frequency, noverlap=128, cmap="twilight"
) )
# 4. Draw Annotations # 4. Draw Annotations (highest threshold % first so lower % renders on top)
for annotation in annotations: def _threshold_sort_key(ann):
# --- DEFINING VARIABLES FIRST --- try:
return int(ann.label.rstrip("%"))
except (ValueError, AttributeError):
return 0
for annotation in sorted(annotations, key=_threshold_sort_key, reverse=True):
t_start = annotation.sample_start / sample_rate t_start = annotation.sample_start / sample_rate
t_width = annotation.sample_count / sample_rate t_width = annotation.sample_count / sample_rate
f_start = annotation.freq_lower_edge f_start = annotation.freq_lower_edge
f_height = annotation.freq_upper_edge - annotation.freq_lower_edge f_height = annotation.freq_upper_edge - annotation.freq_lower_edge
# Look up the color for this specific label
ann_color = label_to_color.get(annotation.label, "gray") ann_color = label_to_color.get(annotation.label, "gray")
# Draw the Rectangle
rect = plt.Rectangle( rect = plt.Rectangle(
(t_start, f_start), t_width, f_height, linewidth=1.5, edgecolor=ann_color, facecolor="none", alpha=0.8 (t_start, f_start), t_width, f_height, linewidth=1.5, edgecolor=ann_color, facecolor="none", alpha=0.8
) )
@ -101,7 +109,7 @@ def view_annotations(
ax.set_title(title, fontsize=title_fontsize, pad=20) ax.set_title(title, fontsize=title_fontsize, pad=20)
ax.set_xlabel("Time (s)", fontsize=12) ax.set_xlabel("Time (s)", fontsize=12)
ax.set_ylabel("Frequency (MHz)", fontsize=12) ax.set_ylabel("Frequency (MHz)", fontsize=12)
ax.grid(alpha=0.1) # Add faint grid ax.grid(alpha=0.1)
output_path, _ = set_path(output_path=output_path) output_path, _ = set_path(output_path=output_path)
plt.savefig(output_path, dpi=dpi, bbox_inches="tight") plt.savefig(output_path, dpi=dpi, bbox_inches="tight")
@ -279,7 +287,9 @@ def view_sig(
) )
set_spines(spec_ax, spines) set_spines(spec_ax, spines)
spec_ax.set_title("Spectrogram", loc="center", fontsize=subtitle_fontsize) spec_ax.set_title("Spectrogram", fontsize=subtitle_fontsize)
spec_ax.set_ylabel("Frequency (Hz)")
spec_ax.set_xlabel("Time (s)")
if iq: if iq:
iq_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :]) iq_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :])
@ -363,11 +373,7 @@ def view_sig(
set_spines(meta_ax, spines) set_spines(meta_ax, spines)
if logo and os.path.isfile(logo_path): if logo and os.path.isfile(logo_path):
# logo_ax = plt.subplot(gs[plot_y_indx:, 2]) logo_ax = plt.subplot(gs[plot_y_indx + 2 :, 2])
logo_pos = [0.75, 0.05, 0.2, 0.08]
logo_ax = fig.add_axes(logo_pos, anchor="SE", zorder=10)
plot_x_indx = plot_x_indx + 1
logo_ax.axis("off") logo_ax.axis("off")
try: try:
@ -386,6 +392,7 @@ def view_sig(
hspace=2.5, # Vertical space between subplots hspace=2.5, # Vertical space between subplots
) )
# save path handling
output_path, _ = set_path(output_path=output_path) output_path, _ = set_path(output_path=output_path)
plt.savefig(output_path, dpi=dpi) plt.savefig(output_path, dpi=dpi)
print(f"Saved signal plot to {output_path}") print(f"Saved signal plot to {output_path}")

View File

@ -3,7 +3,6 @@
from __future__ import annotations from __future__ import annotations
import gc import gc
import json
from typing import Optional from typing import Optional
import matplotlib import matplotlib
@ -12,54 +11,13 @@ import numpy as np
from scipy.fft import fft, fftshift from scipy.fft import fft, fftshift
from scipy.signal.windows import hann from scipy.signal.windows import hann
from utils.data.recording import Recording from ria_toolkit_oss.datatypes.recording import Recording
from utils.view.tools import COLORS, decimate, extract_metadata_fields, set_path from ria_toolkit_oss.view.tools import (
COLORS,
decimate,
def _add_annotations(annotations, compact_mode, show_labels, sample_rate_hz, center_freq_hz, ax2): extract_metadata_fields,
if annotations and not compact_mode: set_path,
for annotation in annotations: )
start_idx = annotation.get("core:sample_start", 0)
length = annotation.get("core:sample_count", 0)
start_time = start_idx / sample_rate_hz
end_time = (start_idx + length) / sample_rate_hz
freq_low = annotation.get("core:freq_lower_edge", center_freq_hz - sample_rate_hz / 4)
freq_high = annotation.get("core:freq_upper_edge", center_freq_hz + sample_rate_hz / 4)
comment = annotation.get("core:comment", "{}")
try:
comment_data = json.loads(comment) if isinstance(comment, str) else comment
ann_type = comment_data.get("type", "unknown")
if ann_type == "intersection":
color = COLORS["success"]
elif ann_type == "parallel":
color = COLORS["primary"]
elif ann_type == "standalone":
color = COLORS["warning"]
else:
color = COLORS["error"]
except Exception:
color = COLORS["error"]
rect = plt.Rectangle(
(start_time, freq_low),
end_time - start_time,
freq_high - freq_low,
color=color,
alpha=0.4,
linewidth=2,
)
ax2.add_patch(rect)
if show_labels:
label = annotation.get("core:label", "Signal")
ax2.text(
start_time,
freq_high,
label,
color=COLORS["light"],
fontsize=10,
bbox=dict(boxstyle="round,pad=0.2", facecolor=color, alpha=0.7),
)
def _get_nfft_size(signal, fast_mode): def _get_nfft_size(signal, fast_mode):
@ -180,7 +138,6 @@ def detect_constellation_symbols(signal: np.ndarray, method: str = "differential
def view_simple_sig( def view_simple_sig(
recording: Recording, recording: Recording,
annotations: Optional[list] = None,
output_path: Optional[str] = "images/signal.png", output_path: Optional[str] = "images/signal.png",
saveplot: Optional[bool] = True, saveplot: Optional[bool] = True,
fast_mode: Optional[bool] = False, fast_mode: Optional[bool] = False,
@ -304,15 +261,6 @@ def view_simple_sig(
ax2.set_title("Spectrogram", loc="left", pad=10) ax2.set_title("Spectrogram", loc="left", pad=10)
_add_annotations(
annotations=annotations,
compact_mode=compact_mode,
show_labels=show_labels,
sample_rate_hz=sample_rate_hz,
center_freq_hz=center_freq_hz,
ax2=ax2,
)
if ax_constellation is not None: if ax_constellation is not None:
constellation_samples = _get_plot_samples(signal=signal, fast_mode=fast_mode, slow_max=50_000, fast_max=20_000) constellation_samples = _get_plot_samples(signal=signal, fast_mode=fast_mode, slow_max=50_000, fast_max=20_000)
method = "differential" if fast_mode else "combined" method = "differential" if fast_mode else "combined"
@ -362,7 +310,7 @@ def view_simple_sig(
else: else:
plt.tight_layout() plt.tight_layout()
if show_title: if show_title:
plt.subplots_adjust(top=0.92) plt.subplots_adjust(top=0.90)
if saveplot: if saveplot:
output_path, extension = set_path(output_path=output_path) output_path, extension = set_path(output_path=output_path)

View File

@ -11,8 +11,8 @@ from ria_toolkit_oss.annotations import (
split_recording_annotations, split_recording_annotations,
threshold_qualifier, threshold_qualifier,
) )
from ria_toolkit_oss.data import Annotation from ria_toolkit_oss.datatypes import Annotation
from ria_toolkit_oss.data.recording import Recording from ria_toolkit_oss.datatypes.recording import Recording
from ria_toolkit_oss.io import load_recording, to_blue, to_npy, to_sigmf, to_wav from ria_toolkit_oss.io import load_recording, to_blue, to_npy, to_sigmf, to_wav
from ria_toolkit_oss_cli.ria_toolkit_oss.common import format_frequency, format_sample_count from ria_toolkit_oss_cli.ria_toolkit_oss.common import format_frequency, format_sample_count
@ -50,13 +50,15 @@ def detect_input_format(filepath):
def determine_output_path(input_path, output_path, fmt, quiet, overwrite): def determine_output_path(input_path, output_path, fmt, quiet, overwrite):
input_path = Path(input_path) input_path = Path(input_path)
input_is_annotated = input_path.stem.endswith("_annotated")
if output_path: if output_path:
target = Path(output_path) target = Path(output_path)
final_path = target elif overwrite and input_is_annotated:
# Write back in-place only when the input is already an _annotated file
target = input_path
else: else:
annotated_name = f"{input_path.stem}_annotated" target = input_path.with_name(f"{input_path.stem}_annotated{input_path.suffix}")
target = input_path.with_name(f"{annotated_name}{input_path.suffix}")
if fmt == "sigmf": if fmt == "sigmf":
final_path = normalize_sigmf_path(target) final_path = normalize_sigmf_path(target)
@ -67,8 +69,10 @@ def determine_output_path(input_path, output_path, fmt, quiet, overwrite):
if not quiet: if not quiet:
click.echo(f"Saving to: {final_path}") click.echo(f"Saving to: {final_path}")
if final_path.exists() and not overwrite and final_path != input_path: # Always allow writing to _annotated files; guard against overwriting originals
click.echo(f"Error: {final_path} already exists. Use --overwrite to replace it.", err=True) target_is_annotated = final_path.stem.endswith("_annotated")
if final_path.exists() and not target_is_annotated and final_path != input_path:
click.echo(f"Error: {final_path} is not an annotated file and cannot be overwritten.", err=True)
return None return None
return final_path return final_path
@ -226,8 +230,8 @@ def list(input, verbose):
\b \b
Examples: Examples:
utils annotate list recording.sigmf-data ria annotate list recording.sigmf-data
utils annotate list signal.npy --verbose ria annotate list signal.npy --verbose
""" """
try: try:
recording = load_recording(input) recording = load_recording(input)
@ -295,8 +299,8 @@ def add(input, start, count, label, freq_lower, freq_upper, comment, annotation_
\b \b
Examples: Examples:
utils annotate add file.npy --start 1000 --count 500 --label wifi ria annotate add file.npy --start 1000 --count 500 --label wifi
utils annotate add signal.sigmf-data --start 0 --count 1000 --label burst --comment "Strong signal" ria annotate add signal.sigmf-data --start 0 --count 1000 --label burst --comment "Strong signal"
""" """
try: try:
recording = load_recording(input) recording = load_recording(input)
@ -378,12 +382,12 @@ def add(input, start, count, label, freq_lower, freq_upper, comment, annotation_
def remove(input, index, output, overwrite, quiet): def remove(input, index, output, overwrite, quiet):
"""Remove annotation by index. """Remove annotation by index.
Use 'utils annotate list' to see annotation indices. Use 'ria annotate list' to see annotation indices.
\b \b
Examples: Examples:
utils annotate remove signal.sigmf-data 2 ria annotate remove signal.sigmf-data 2
utils annotate remove file.npy 0 ria annotate remove file.npy 0
""" """
try: try:
recording = load_recording(input) recording = load_recording(input)
@ -432,8 +436,8 @@ def clear(input, output, overwrite, force, quiet):
\b \b
Examples: Examples:
utils annotate clear signal.sigmf-data ria annotate clear signal.sigmf-data
utils annotate clear file.npy --force ria annotate clear file.npy --force
""" """
try: try:
recording = load_recording(input) recording = load_recording(input)
@ -528,10 +532,10 @@ def energy(
\b \b
Examples: Examples:
utils annotate energy capture.sigmf-data --label burst ria annotate energy capture.sigmf-data --label burst
utils annotate energy signal.npy --threshold 1.5 --min-distance 10000 ria annotate energy signal.npy --threshold 1.5 --min-distance 10000
utils annotate energy signal.sigmf-data --freq-method obw ria annotate energy signal.sigmf-data --freq-method obw
utils annotate energy signal.sigmf-data --freq-method full-detected ria annotate energy signal.sigmf-data --freq-method full-detected
""" """
try: try:
@ -607,8 +611,8 @@ def cusum(input, label, min_duration, window_size, tolerance, annotation_type, o
\b \b
Examples: Examples:
utils annotate cusum signal.sigmf-data --min-duration 5.0 ria annotate cusum signal.sigmf-data --min-duration 5.0
utils annotate cusum data.npy --min-duration 10.0 --label state ria annotate cusum data.npy --min-duration 10.0 --label state
""" """
try: try:
recording = load_recording(input) recording = load_recording(input)
@ -654,7 +658,7 @@ def cusum(input, label, min_duration, window_size, tolerance, annotation_type, o
@click.argument("input", type=click.Path(exists=True)) @click.argument("input", type=click.Path(exists=True))
@click.option("--threshold", type=float, required=True, help="Threshold (0.0-1.0, fraction of max magnitude)") @click.option("--threshold", type=float, required=True, help="Threshold (0.0-1.0, fraction of max magnitude)")
@click.option("--label", type=str, default=None, help="Annotation label") @click.option("--label", type=str, default=None, help="Annotation label")
@click.option("--window-size", type=int, default=1024, help="Smoothing window size") @click.option("--window-size", type=int, default=None, help="Smoothing window size in samples (default: 1ms at recording sample rate)")
@click.option( @click.option(
"--type", "--type",
"annotation_type", "annotation_type",
@ -662,10 +666,11 @@ def cusum(input, label, min_duration, window_size, tolerance, annotation_type, o
default="standalone", default="standalone",
help="Annotation type", help="Annotation type",
) )
@click.option("--channel", type=int, default=0, help="Channel index to annotate (default: 0)")
@click.option("--output", "-o", type=click.Path(), help="Output file path") @click.option("--output", "-o", type=click.Path(), help="Output file path")
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") @click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
@click.option("--quiet", is_flag=True, help="Quiet mode") @click.option("--quiet", is_flag=True, help="Quiet mode")
def threshold(input, threshold, label, window_size, annotation_type, output, overwrite, quiet): def threshold(input, threshold, label, window_size, annotation_type, channel, output, overwrite, quiet):
"""Auto-detect signals using threshold method. """Auto-detect signals using threshold method.
Detects samples above a percentage of maximum magnitude. Best for simple Detects samples above a percentage of maximum magnitude. Best for simple
@ -673,8 +678,8 @@ def threshold(input, threshold, label, window_size, annotation_type, output, ove
\b \b
Examples: Examples:
utils annotate threshold signal.sigmf-data --threshold 0.7 --label wifi ria annotate threshold signal.sigmf-data --threshold 0.7 --label wifi
utils annotate threshold data.npy --threshold 0.5 --window-size 2048 ria annotate threshold data.npy --threshold 0.5 --window-size 2048
""" """
if not (0.0 <= threshold <= 1.0): if not (0.0 <= threshold <= 1.0):
raise click.ClickException(f"--threshold must be between 0.0 and 1.0, got {threshold}") raise click.ClickException(f"--threshold must be between 0.0 and 1.0, got {threshold}")
@ -689,7 +694,8 @@ def threshold(input, threshold, label, window_size, annotation_type, output, ove
if not quiet: if not quiet:
click.echo("\nDetecting signals using threshold qualifier...") click.echo("\nDetecting signals using threshold qualifier...")
click.echo(f" Threshold: {threshold * 100:.1f}% of max magnitude") click.echo(f" Threshold: {threshold * 100:.1f}% of max magnitude")
click.echo(f" Window size: {window_size} samples") click.echo(f" Window size: {'auto (1ms)' if window_size is None else f'{window_size} samples'}")
click.echo(f" Channel: {channel}")
try: try:
initial_count = len(recording.annotations) initial_count = len(recording.annotations)
@ -699,6 +705,7 @@ def threshold(input, threshold, label, window_size, annotation_type, output, ove
window_size=window_size, window_size=window_size,
label=label, label=label,
annotation_type=annotation_type, annotation_type=annotation_type,
channel=channel,
) )
added = len(recording.annotations) - initial_count added = len(recording.annotations) - initial_count
@ -747,10 +754,10 @@ def separate(input, indices, nfft, noise_threshold_db, min_component_bw, output,
\b \b
Examples: Examples:
utils annotate separate capture.sigmf-data ria annotate separate capture.sigmf-data
utils annotate separate signal.npy --indices 0,1,2 ria annotate separate signal.npy --indices 0,1,2
utils annotate separate data.sigmf-data --noise-threshold-db -70 ria annotate separate data.sigmf-data --noise-threshold-db -70
utils annotate separate signal.npy --min-component-bw 100000 ria annotate separate signal.npy --min-component-bw 100000
""" """
try: try:

View File

@ -2,6 +2,7 @@
""" """
This module contains all the CLI bindings for the ria package. This module contains all the CLI bindings for the ria package.
""" """
from .annotate import annotate from .annotate import annotate
from .capture import capture from .capture import capture
from .combine import combine from .combine import combine
@ -16,7 +17,7 @@ from .init import init
from .split import split from .split import split
from .transform import transform from .transform import transform
from .transmit import transmit from .transmit import transmit
from .view import viewe from .view import view
# Aliases # Aliases
synth = generate synth = generate

View File

@ -232,8 +232,8 @@ def generate():
\b \b
Examples: Examples:
utils synth chirp -b 1e6 -p 0.01 -s 10e6 -o chirp_basic.sigmf ria synth chirp -b 1e6 -p 0.01 -s 10e6 -o chirp_basic.sigmf
utils synth fsk -M 2 -r 100e3 -s 2e6 -o fsk2_basic.sigmf ria synth fsk -M 2 -r 100e3 -s 2e6 -o fsk2_basic.sigmf
""" """
pass pass

View File

@ -264,13 +264,13 @@ def transform():
Examples:\n Examples:\n
\b \b
# List available augmentations # List available augmentations
utils transform augment --list ria transform augment --list
\b \b
# Apply channel swap # Apply channel swap
utils transform augment channel_swap input.npy ria transform augment channel_swap input.npy
\b \b
# Apply AWGN impairment # Apply AWGN impairment
utils transform impair awgn input.npy --snr-db 15 ria transform impair awgn input.npy --snr-db 15
""" """
pass pass

View File

@ -33,13 +33,9 @@ VISUALIZATION_TYPES = {
"dark", "dark",
"spines", "spines",
], ],
},
"annotations": {
"function": view_annotations,
"description": "Annotation-focused spectrogram view",
"options": ["channel", "dark"],
}, },
"channels": {"function": view_channels, "description": "Multi-channel IQ and spectrogram view", "options": []}, "channels": {"function": view_channels, "description": "Multi-channel IQ and spectrogram view", "options": []},
"annotations": {"function": view_annotations, "description": "Annotated spectrogram view", "options": ["channel", "dark"]},
} }

View File

@ -1,6 +1,6 @@
# CLI Tests # CLI Tests
Comprehensive test suite for the utils CLI commands. Comprehensive test suite for the ria CLI commands.
## Test Structure ## Test Structure

View File

@ -1 +1 @@
"""Tests for utils CLI commands.""" """Tests for ria CLI commands."""