Compare commits
No commits in common. "ee2ce3b1f44ecdb0bdb21f1d8ed60454c6716240" and "2bb2d9d5a780dbc17172135a5a1f10eba14b1af4" have entirely different histories.
ee2ce3b1f4
...
2bb2d9d5a7
18
CHANGELOG.md
18
CHANGELOG.md
|
|
@ -1,18 +0,0 @@
|
||||||
# Changelog
|
|
||||||
|
|
||||||
## [Unreleased] - 2026-02-20
|
|
||||||
|
|
||||||
### Added
|
|
||||||
- **Dual-Threshold Detection:** Logic to capture the start and end of signals, not just the peak.
|
|
||||||
- **Signal Smoothing & Noise Filters:** Prevents detections from breaking into fragments and ignores short interference spikes.
|
|
||||||
- **Auto-Frequency Calculation:** Automatically adjusts bounding boxes to fit signal frequency ranges tightly.
|
|
||||||
|
|
||||||
### Changed
|
|
||||||
- **Signal Power Detection:** Switched from raw signal strength to power for improved accuracy.
|
|
||||||
- **CLI Workflow:** `Clear` and `Remove` commands now modify files directly (in-place) to avoid redundant copies.
|
|
||||||
- **Metadata Logic:** Updated labels to show detection percentages and overhauled internal metadata cleaning.
|
|
||||||
- **Viewer UI:** Moved legend outside the plot, added a black background, and adjusted transparency for better spectrogram visibility.
|
|
||||||
|
|
||||||
### Fixed
|
|
||||||
- Prevented redundant `_annotated` suffixes in file naming patterns.
|
|
||||||
- Simplified internal math to increase processing speed and precision.
|
|
||||||
|
|
@ -1,62 +1,4 @@
|
||||||
<<<<<<< HEAD
|
|
||||||
|
|
||||||
"""
|
|
||||||
The annotations package contains tools and utilities for creating, managing, and processing annotations.
|
|
||||||
|
|
||||||
Provides automatic annotation generation using various signal detection algorithms:
|
|
||||||
- Energy-based detection (detect_signals_energy)
|
|
||||||
- CUSUM-based segmentation (annotate_with_cusum)
|
|
||||||
- Threshold-based qualification (threshold_qualifier)
|
|
||||||
- Signal isolation and extraction (isolate_signal)
|
|
||||||
- Occupied bandwidth analysis (calculate_occupied_bandwidth, calculate_nominal_bandwidth)
|
|
||||||
|
|
||||||
All detection functions return Recording objects with added annotations.
|
|
||||||
"""
|
|
||||||
|
|
||||||
__all__ = [
|
|
||||||
# Energy-based detection
|
|
||||||
"detect_signals_energy",
|
|
||||||
"calculate_occupied_bandwidth",
|
|
||||||
"calculate_nominal_bandwidth",
|
|
||||||
"calculate_full_detected_bandwidth",
|
|
||||||
"annotate_with_obw",
|
|
||||||
# CUSUM detection
|
|
||||||
"annotate_with_cusum",
|
|
||||||
# Threshold detection
|
|
||||||
"threshold_qualifier",
|
|
||||||
# Parallel signal separation (Phase 2)
|
|
||||||
"find_spectral_components",
|
|
||||||
"split_annotation_by_components",
|
|
||||||
"split_recording_annotations",
|
|
||||||
# Signal isolation
|
|
||||||
"isolate_signal",
|
|
||||||
# Annotation transforms
|
|
||||||
"remove_contained_boxes",
|
|
||||||
"is_annotation_contained",
|
|
||||||
# Dataset creation
|
|
||||||
"qualify_slice_from_annotations",
|
|
||||||
]
|
|
||||||
|
|
||||||
from .annotation_transforms import is_annotation_contained, remove_contained_boxes
|
|
||||||
from .cusum_annotator import annotate_with_cusum
|
|
||||||
from .energy_detector import (
|
|
||||||
annotate_with_obw,
|
|
||||||
calculate_full_detected_bandwidth,
|
|
||||||
calculate_nominal_bandwidth,
|
|
||||||
calculate_occupied_bandwidth,
|
|
||||||
detect_signals_energy,
|
|
||||||
)
|
|
||||||
from .parallel_signal_separator import (
|
|
||||||
find_spectral_components,
|
|
||||||
split_annotation_by_components,
|
|
||||||
split_recording_annotations,
|
|
||||||
)
|
|
||||||
from .qualify_slice import qualify_slice_from_annotations
|
|
||||||
from .signal_isolation import isolate_signal
|
|
||||||
from .threshold_qualifier import threshold_qualifier
|
|
||||||
=======
|
|
||||||
from .cusum_annotator import annotate_with_cusum
|
from .cusum_annotator import annotate_with_cusum
|
||||||
from .energy_detector import detect_signals_energy
|
from .energy_detector import detect_signals_energy
|
||||||
from .parallel_signal_separator import split_recording_annotations
|
from .parallel_signal_separator import split_recording_annotations
|
||||||
from .threshold_qualifier import threshold_qualifier
|
from .threshold_qualifier import threshold_qualifier
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,4 @@
|
||||||
<<<<<<< HEAD
|
|
||||||
from utils.data.annotation import Annotation
|
|
||||||
=======
|
|
||||||
from ria_toolkit_oss.datatypes.annotation import Annotation
|
from ria_toolkit_oss.datatypes.annotation import Annotation
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
|
|
||||||
# TODO figure out how to transfer labels in the merge case
|
# TODO figure out how to transfer labels in the merge case
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,11 +3,7 @@ from typing import Optional
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
from utils.data import Annotation, Recording
|
|
||||||
=======
|
|
||||||
from ria_toolkit_oss.datatypes import Annotation, Recording
|
from ria_toolkit_oss.datatypes import Annotation, Recording
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
|
|
||||||
|
|
||||||
def annotate_with_cusum(
|
def annotate_with_cusum(
|
||||||
|
|
@ -28,11 +24,7 @@ def annotate_with_cusum(
|
||||||
changes between a low and high amplitude.
|
changes between a low and high amplitude.
|
||||||
|
|
||||||
:param recording: A ``Recording`` object to annotate.
|
:param recording: A ``Recording`` object to annotate.
|
||||||
<<<<<<< HEAD
|
|
||||||
:type recording: ``utils.data.Recording``
|
|
||||||
=======
|
|
||||||
:type recording: ``ria_toolkit_oss.datatypes.Recording``
|
:type recording: ``ria_toolkit_oss.datatypes.Recording``
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
:param label: Label for the detected segments.
|
:param label: Label for the detected segments.
|
||||||
:type label: str
|
:type label: str
|
||||||
:param window_size: The length (in samples) of the moving average window.
|
:param window_size: The length (in samples) of the moving average window.
|
||||||
|
|
|
||||||
|
|
@ -11,11 +11,7 @@ from typing import Tuple
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from scipy.signal import filtfilt
|
from scipy.signal import filtfilt
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
from utils.data import Annotation, Recording
|
|
||||||
=======
|
|
||||||
from ria_toolkit_oss.datatypes import Annotation, Recording
|
from ria_toolkit_oss.datatypes import Annotation, Recording
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
|
|
||||||
|
|
||||||
def detect_signals_energy(
|
def detect_signals_energy(
|
||||||
|
|
@ -77,13 +73,8 @@ def detect_signals_energy(
|
||||||
|
|
||||||
**Example**::
|
**Example**::
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
>>> from utils.io import load_recording
|
|
||||||
>>> from utils.annotations import detect_signals_energy
|
|
||||||
=======
|
|
||||||
>>> from ria.io import load_recording
|
>>> from ria.io import load_recording
|
||||||
>>> from ria_toolkit_oss.annotations import detect_signals_energy
|
>>> from ria_toolkit_oss.annotations import detect_signals_energy
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
>>> recording = load_recording("capture.sigmf")
|
>>> recording = load_recording("capture.sigmf")
|
||||||
|
|
||||||
>>> # Detect with NBW frequency bounds (default, best for real signals)
|
>>> # Detect with NBW frequency bounds (default, best for real signals)
|
||||||
|
|
@ -356,11 +347,7 @@ def annotate_with_obw(
|
||||||
|
|
||||||
**Example**::
|
**Example**::
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
>>> from utils.annotations import annotate_with_obw
|
|
||||||
=======
|
|
||||||
>>> from ria_toolkit_oss.annotations import annotate_with_obw
|
>>> from ria_toolkit_oss.annotations import annotate_with_obw
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
>>> annotated = annotate_with_obw(recording, label="signal_obw")
|
>>> annotated = annotate_with_obw(recording, label="signal_obw")
|
||||||
"""
|
"""
|
||||||
signal = recording.data[0]
|
signal = recording.data[0]
|
||||||
|
|
|
||||||
|
|
@ -38,11 +38,7 @@ sub-annotations.
|
||||||
Example:
|
Example:
|
||||||
Two WiFi channels captured simultaneously:
|
Two WiFi channels captured simultaneously:
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
>>> from utils.annotations import find_spectral_components
|
|
||||||
=======
|
|
||||||
>>> from ria_toolkit_oss.annotations import find_spectral_components
|
>>> from ria_toolkit_oss.annotations import find_spectral_components
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
>>> # Detect the two distinct channels (returns relative frequencies)
|
>>> # Detect the two distinct channels (returns relative frequencies)
|
||||||
>>> components = find_spectral_components(signal, sampling_rate=20e6)
|
>>> components = find_spectral_components(signal, sampling_rate=20e6)
|
||||||
>>> print(f"Found {len(components)} components")
|
>>> print(f"Found {len(components)} components")
|
||||||
|
|
@ -59,11 +55,7 @@ import numpy as np
|
||||||
from scipy import ndimage
|
from scipy import ndimage
|
||||||
from scipy import signal as scipy_signal
|
from scipy import signal as scipy_signal
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
from utils.data import Annotation, Recording
|
|
||||||
=======
|
|
||||||
from ria_toolkit_oss.datatypes import Annotation, Recording
|
from ria_toolkit_oss.datatypes import Annotation, Recording
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
|
|
||||||
|
|
||||||
def find_spectral_components(
|
def find_spectral_components(
|
||||||
|
|
@ -119,13 +111,8 @@ def find_spectral_components(
|
||||||
|
|
||||||
**Example**::
|
**Example**::
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
>>> from utils.io import load_recording
|
|
||||||
>>> from utils.annotations import find_spectral_components
|
|
||||||
=======
|
|
||||||
>>> from ria.io import load_recording
|
>>> from ria.io import load_recording
|
||||||
>>> from ria_toolkit_oss.annotations import find_spectral_components
|
>>> from ria_toolkit_oss.annotations import find_spectral_components
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
>>> recording = load_recording("capture.sigmf")
|
>>> recording = load_recording("capture.sigmf")
|
||||||
>>> segment = recording.data[0][start:end]
|
>>> segment = recording.data[0][start:end]
|
||||||
>>> # Components in relative (baseband) frequency
|
>>> # Components in relative (baseband) frequency
|
||||||
|
|
@ -254,13 +241,8 @@ def split_annotation_by_components(
|
||||||
|
|
||||||
**Example**::
|
**Example**::
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
>>> from utils.io import load_recording
|
|
||||||
>>> from utils.annotations import split_annotation_by_components
|
|
||||||
=======
|
|
||||||
>>> from ria.io import load_recording
|
>>> from ria.io import load_recording
|
||||||
>>> from ria_toolkit_oss.annotations import split_annotation_by_components
|
>>> from ria_toolkit_oss.annotations import split_annotation_by_components
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
>>> recording = load_recording("capture.sigmf")
|
>>> recording = load_recording("capture.sigmf")
|
||||||
>>> # Original annotation spans multiple channels
|
>>> # Original annotation spans multiple channels
|
||||||
>>> original = recording.annotations[0]
|
>>> original = recording.annotations[0]
|
||||||
|
|
@ -387,13 +369,8 @@ def split_recording_annotations(
|
||||||
|
|
||||||
**Example**::
|
**Example**::
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
>>> from utils.io import load_recording
|
|
||||||
>>> from utils.annotations import split_recording_annotations
|
|
||||||
=======
|
|
||||||
>>> from ria.io import load_recording
|
>>> from ria.io import load_recording
|
||||||
>>> from ria_toolkit_oss.annotations import split_recording_annotations
|
>>> from ria_toolkit_oss.annotations import split_recording_annotations
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
>>> recording = load_recording("capture.sigmf")
|
>>> recording = load_recording("capture.sigmf")
|
||||||
>>> # Split all annotations
|
>>> # Split all annotations
|
||||||
>>> split_rec = split_recording_annotations(recording)
|
>>> split_rec = split_recording_annotations(recording)
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,6 @@
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
from utils.data import Recording
|
|
||||||
=======
|
|
||||||
from ria_toolkit_oss.datatypes import Recording
|
from ria_toolkit_oss.datatypes import Recording
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
|
|
||||||
|
|
||||||
def qualify_slice_from_annotations(recording: Recording, slice_length: int):
|
def qualify_slice_from_annotations(recording: Recording, slice_length: int):
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,8 @@
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from scipy.signal import butter, lfilter
|
from scipy.signal import butter, lfilter
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
from utils.data.annotation import Annotation
|
|
||||||
from utils.data.recording import Recording
|
|
||||||
=======
|
|
||||||
from ria_toolkit_oss.datatypes.annotation import Annotation
|
from ria_toolkit_oss.datatypes.annotation import Annotation
|
||||||
from ria_toolkit_oss.datatypes.recording import Recording
|
from ria_toolkit_oss.datatypes.recording import Recording
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
|
|
||||||
|
|
||||||
def isolate_signal(recording: Recording, annotation: Annotation) -> Recording:
|
def isolate_signal(recording: Recording, annotation: Annotation) -> Recording:
|
||||||
|
|
|
||||||
|
|
@ -46,29 +46,17 @@ from typing import Optional
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
from utils.data import Annotation, Recording
|
|
||||||
|
|
||||||
|
|
||||||
def _find_ranges(indices, window_size):
|
|
||||||
=======
|
|
||||||
from ria_toolkit_oss.datatypes import Annotation, Recording
|
from ria_toolkit_oss.datatypes import Annotation, Recording
|
||||||
|
|
||||||
|
|
||||||
def _find_ranges(indices, max_gap):
|
def _find_ranges(indices, max_gap):
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
"""
|
"""
|
||||||
Groups individual indices into continuous temporal ranges.
|
Groups individual indices into continuous temporal ranges.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
indices: Array of indices where the signal exceeded a threshold.
|
indices: Array of indices where the signal exceeded a threshold.
|
||||||
<<<<<<< HEAD
|
|
||||||
window_size: Maximum gap allowed between indices to consider them part
|
|
||||||
of the same range.
|
|
||||||
=======
|
|
||||||
max_gap: Maximum gap allowed between indices to consider them part
|
max_gap: Maximum gap allowed between indices to consider them part
|
||||||
of the same range.
|
of the same range.
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
A list of (start, stop) tuples representing detected signal segments.
|
A list of (start, stop) tuples representing detected signal segments.
|
||||||
|
|
@ -77,30 +65,6 @@ def _find_ranges(indices, max_gap):
|
||||||
if len(indices) == 0:
|
if len(indices) == 0:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
ranges = []
|
|
||||||
|
|
||||||
start = indices[0]
|
|
||||||
in_range = False
|
|
||||||
|
|
||||||
for i in range(1, len(indices)):
|
|
||||||
# If the gap between current and previous index is within window_size,
|
|
||||||
# keep the range alive.
|
|
||||||
if indices[i] - indices[i - 1] <= window_size:
|
|
||||||
if not in_range:
|
|
||||||
# Start a new range
|
|
||||||
start = indices[i - 1]
|
|
||||||
in_range = True
|
|
||||||
else:
|
|
||||||
# Gap is too large; close the current range if one was active.
|
|
||||||
if in_range:
|
|
||||||
ranges.append((start, indices[i - 1]))
|
|
||||||
in_range = False
|
|
||||||
|
|
||||||
# Ensure the final segment is captured if the loop ends while in_range.
|
|
||||||
if in_range:
|
|
||||||
ranges.append((start, indices[-1]))
|
|
||||||
=======
|
|
||||||
start = indices[0]
|
start = indices[0]
|
||||||
prev = indices[0]
|
prev = indices[0]
|
||||||
ranges = []
|
ranges = []
|
||||||
|
|
@ -112,19 +76,10 @@ def _find_ranges(indices, max_gap):
|
||||||
prev = indices[i]
|
prev = indices[i]
|
||||||
|
|
||||||
ranges.append((start, prev))
|
ranges.append((start, prev))
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
|
|
||||||
return ranges
|
return ranges
|
||||||
|
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
def threshold_qualifier(
|
|
||||||
recording: Recording,
|
|
||||||
threshold: float,
|
|
||||||
window_size: Optional[int] = 1024,
|
|
||||||
label: Optional[str] = None,
|
|
||||||
annotation_type: Optional[str] = "standalone",
|
|
||||||
=======
|
|
||||||
def _expand_and_filter_ranges(
|
def _expand_and_filter_ranges(
|
||||||
smoothed_power: np.ndarray,
|
smoothed_power: np.ndarray,
|
||||||
initial_ranges: list[tuple[int, int]],
|
initial_ranges: list[tuple[int, int]],
|
||||||
|
|
@ -231,7 +186,6 @@ def threshold_qualifier(
|
||||||
label: Optional[str] = None,
|
label: Optional[str] = None,
|
||||||
annotation_type: Optional[str] = "standalone",
|
annotation_type: Optional[str] = "standalone",
|
||||||
channel: int = 0,
|
channel: int = 0,
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
) -> Recording:
|
) -> Recording:
|
||||||
"""
|
"""
|
||||||
Annotate a recording with bounding boxes for regions above a threshold.
|
Annotate a recording with bounding boxes for regions above a threshold.
|
||||||
|
|
@ -249,27 +203,15 @@ def threshold_qualifier(
|
||||||
Args:
|
Args:
|
||||||
recording: The Recording object containing IQ or real signal data.
|
recording: The Recording object containing IQ or real signal data.
|
||||||
threshold: Sensitivity multiplier (0.0 to 1.0) applied to max power.
|
threshold: Sensitivity multiplier (0.0 to 1.0) applied to max power.
|
||||||
<<<<<<< HEAD
|
|
||||||
window_size: Size of the smoothing filter and max gap for merging hits.
|
|
||||||
label: Custom string label for annotations.
|
|
||||||
annotation_type: Metadata string for the 'type' field in the annotation.
|
|
||||||
=======
|
|
||||||
window_size: Size of the smoothing filter in samples. Defaults to 1ms worth of samples.
|
window_size: Size of the smoothing filter in samples. Defaults to 1ms worth of samples.
|
||||||
label: Custom string label for annotations.
|
label: Custom string label for annotations.
|
||||||
annotation_type: Metadata string for the 'type' field in the annotation.
|
annotation_type: Metadata string for the 'type' field in the annotation.
|
||||||
channel: Index of the channel to annotate. Defaults to 0.
|
channel: Index of the channel to annotate. Defaults to 0.
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
A new Recording object populated with detected Annotations.
|
A new Recording object populated with detected Annotations.
|
||||||
"""
|
"""
|
||||||
# Extract signal and metadata
|
# Extract signal and metadata
|
||||||
<<<<<<< HEAD
|
|
||||||
sample_data = recording.data[0]
|
|
||||||
sample_rate = recording.metadata["sample_rate"]
|
|
||||||
center_frequency = recording.metadata.get("center_frequency", 0)
|
|
||||||
|
|
||||||
=======
|
|
||||||
sample_data = recording.data[channel]
|
sample_data = recording.data[channel]
|
||||||
sample_rate = recording.metadata["sample_rate"]
|
sample_rate = recording.metadata["sample_rate"]
|
||||||
center_frequency = recording.metadata.get("center_frequency", 0)
|
center_frequency = recording.metadata.get("center_frequency", 0)
|
||||||
|
|
@ -277,69 +219,11 @@ def threshold_qualifier(
|
||||||
if window_size is None:
|
if window_size is None:
|
||||||
window_size = max(64, int(sample_rate * 0.001))
|
window_size = max(64, int(sample_rate * 0.001))
|
||||||
|
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
# --- 1. SIGNAL CONDITIONING ---
|
# --- 1. SIGNAL CONDITIONING ---
|
||||||
# Convert to power (Magnitude squared)
|
# Convert to power (Magnitude squared)
|
||||||
power_data = np.abs(sample_data) ** 2
|
power_data = np.abs(sample_data) ** 2
|
||||||
smoothing_window = np.ones(window_size) / window_size
|
smoothing_window = np.ones(window_size) / window_size
|
||||||
smoothed_power = np.convolve(power_data, smoothing_window, mode="same")
|
smoothed_power = np.convolve(power_data, smoothing_window, mode="same")
|
||||||
<<<<<<< HEAD
|
|
||||||
|
|
||||||
# Define thresholds based on the global peak of the smoothed signal
|
|
||||||
max_power = np.max(smoothed_power)
|
|
||||||
trigger_val = threshold * max_power # High threshold to trigger detection
|
|
||||||
boundary_val = (threshold / 2) * max_power # Low threshold to define signal edges
|
|
||||||
|
|
||||||
# --- 2. INITIAL DETECTION ---
|
|
||||||
# Identify indices that strictly exceed the high trigger
|
|
||||||
indices = np.where(smoothed_power > trigger_val)[0]
|
|
||||||
initial_ranges = _find_ranges(indices=indices, window_size=window_size)
|
|
||||||
|
|
||||||
annotations = []
|
|
||||||
|
|
||||||
threshold_base = min(sample_rate, len(sample_data))
|
|
||||||
|
|
||||||
for start, stop in initial_ranges:
|
|
||||||
if (stop - start) < (threshold_base * 0.01):
|
|
||||||
continue
|
|
||||||
|
|
||||||
# --- 3. HYSTERESIS (Boundary Expansion) ---
|
|
||||||
# Search backward from 'start' until power drops below the low boundary_val
|
|
||||||
true_start = start
|
|
||||||
while true_start > 0 and smoothed_power[true_start] > boundary_val:
|
|
||||||
true_start -= 1
|
|
||||||
|
|
||||||
# Search forward from 'stop' until power drops below the low boundary_val
|
|
||||||
true_stop = stop
|
|
||||||
while true_stop < len(smoothed_power) - 1 and smoothed_power[true_stop] > boundary_val:
|
|
||||||
true_stop += 1
|
|
||||||
|
|
||||||
# --- 4. SPECTRAL ANALYSIS (Frequency Detection) ---
|
|
||||||
signal_segment = sample_data[true_start:true_stop]
|
|
||||||
if len(signal_segment) > 0:
|
|
||||||
fft_data = np.abs(np.fft.fftshift(np.fft.fft(signal_segment)))
|
|
||||||
fft_freqs = np.fft.fftshift(np.fft.fftfreq(len(signal_segment), 1 / sample_rate))
|
|
||||||
|
|
||||||
# Determine frequency bounds where spectral energy is > 15% of segment peak
|
|
||||||
spectral_thresh = np.max(fft_data) * 0.15
|
|
||||||
sig_indices = np.where(fft_data > spectral_thresh)[0]
|
|
||||||
|
|
||||||
# Ensure the signal has some spectral width before annotating
|
|
||||||
if len(sig_indices) < 5:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if len(sig_indices) > 0:
|
|
||||||
f_min, f_max = fft_freqs[sig_indices[0]], fft_freqs[sig_indices[-1]]
|
|
||||||
else:
|
|
||||||
# Default to middle half of bandwidth if no clear peaks found
|
|
||||||
f_min, f_max = -sample_rate / 4, sample_rate / 4
|
|
||||||
else:
|
|
||||||
f_min, f_max = -sample_rate / 4, sample_rate / 4
|
|
||||||
|
|
||||||
# --- 5. ANNOTATION GENERATION ---
|
|
||||||
if label is None:
|
|
||||||
label = f"{int(threshold*100)}%"
|
|
||||||
=======
|
|
||||||
group_gap_samples = _estimate_group_gap(sample_rate)
|
group_gap_samples = _estimate_group_gap(sample_rate)
|
||||||
|
|
||||||
# Define thresholds using peak relative to baseline.
|
# Define thresholds using peak relative to baseline.
|
||||||
|
|
@ -442,7 +326,6 @@ def threshold_qualifier(
|
||||||
|
|
||||||
# --- 5. ANNOTATION GENERATION ---
|
# --- 5. ANNOTATION GENERATION ---
|
||||||
ann_label = label if label is not None else f"{int(threshold*100)}%"
|
ann_label = label if label is not None else f"{int(threshold*100)}%"
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
|
|
||||||
# Pack metadata for the UI/Downstream processing
|
# Pack metadata for the UI/Downstream processing
|
||||||
comment_data = {
|
comment_data = {
|
||||||
|
|
@ -459,11 +342,7 @@ def threshold_qualifier(
|
||||||
sample_count=true_stop - true_start,
|
sample_count=true_stop - true_start,
|
||||||
freq_lower_edge=center_frequency + f_min,
|
freq_lower_edge=center_frequency + f_min,
|
||||||
freq_upper_edge=center_frequency + f_max,
|
freq_upper_edge=center_frequency + f_max,
|
||||||
<<<<<<< HEAD
|
|
||||||
label=label,
|
|
||||||
=======
|
|
||||||
label=ann_label,
|
label=ann_label,
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
comment=json.dumps(comment_data),
|
comment=json.dumps(comment_data),
|
||||||
detail={"generator": "hysteresis_qualifier"},
|
detail={"generator": "hysteresis_qualifier"},
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -1,8 +0,0 @@
|
||||||
"""
|
|
||||||
The Data package contains abstract data types tailored for radio machine learning, such as ``Recording``, as well
|
|
||||||
as the abstract interfaces for the radio dataset and radio dataset builder framework.
|
|
||||||
"""
|
|
||||||
|
|
||||||
__all__ = ["Annotation", "Recording"]
|
|
||||||
from .annotation import Annotation
|
|
||||||
from .recording import Recording
|
|
||||||
|
|
@ -1,128 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import json
|
|
||||||
from typing import Any, Optional
|
|
||||||
|
|
||||||
from sigmf import SigMFFile
|
|
||||||
|
|
||||||
|
|
||||||
class Annotation:
|
|
||||||
"""Signal annotations are labels or additional information associated with specific data points or segments within
|
|
||||||
a signal. These annotations could be used for tasks like supervised learning, where the goal is to train a model
|
|
||||||
to recognize patterns or characteristics in the signal associated with these annotations.
|
|
||||||
|
|
||||||
Annotations can be used to label interesting points in your recording.
|
|
||||||
|
|
||||||
:param sample_start: The index of the starting sample of the annotation.
|
|
||||||
:type sample_start: int
|
|
||||||
:param sample_count: The index of the ending sample of the annotation, inclusive.
|
|
||||||
:type sample_count: int
|
|
||||||
:param freq_lower_edge: The lower frequency of the annotation.
|
|
||||||
:type freq_lower_edge: float
|
|
||||||
:param freq_upper_edge: The upper frequency of the annotation.
|
|
||||||
:type freq_upper_edge: float
|
|
||||||
:param label: The label that will be displayed with the bounding box in compatible viewers including IQEngine.
|
|
||||||
Defaults to an emtpy string.
|
|
||||||
:type label: str, optional
|
|
||||||
:param comment: A human-readable comment. Defaults to an empty string.
|
|
||||||
:type comment: str, optional
|
|
||||||
:param detail: A dictionary of user defined annotation-specific metadata. Defaults to None.
|
|
||||||
:type detail: dict, optional
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
sample_start: int,
|
|
||||||
sample_count: int,
|
|
||||||
freq_lower_edge: float,
|
|
||||||
freq_upper_edge: float,
|
|
||||||
label: Optional[str] = "",
|
|
||||||
comment: Optional[str] = "",
|
|
||||||
detail: Optional[dict] = None,
|
|
||||||
):
|
|
||||||
"""Initialize a new Annotation instance."""
|
|
||||||
self.sample_start = int(sample_start)
|
|
||||||
self.sample_count = int(sample_count)
|
|
||||||
self.freq_lower_edge = float(freq_lower_edge)
|
|
||||||
self.freq_upper_edge = float(freq_upper_edge)
|
|
||||||
self.label = str(label)
|
|
||||||
self.comment = str(comment)
|
|
||||||
|
|
||||||
if detail is None:
|
|
||||||
self.detail = {}
|
|
||||||
elif not _is_jsonable(detail):
|
|
||||||
raise ValueError(f"Detail object is not json serializable: {detail}")
|
|
||||||
else:
|
|
||||||
self.detail = detail
|
|
||||||
|
|
||||||
def is_valid(self) -> bool:
|
|
||||||
"""
|
|
||||||
Check that the annotation sample count is > 0 and the freq_lower_edge<freq_upper_edge.
|
|
||||||
|
|
||||||
:returns: True if valid, False if not.
|
|
||||||
"""
|
|
||||||
|
|
||||||
return self.sample_count > 0 and self.freq_lower_edge < self.freq_upper_edge
|
|
||||||
|
|
||||||
def overlap(self, other):
|
|
||||||
"""
|
|
||||||
Quantify how much the bounding box in this annotation overlaps with another annotation.
|
|
||||||
|
|
||||||
:param other: The other annotation.
|
|
||||||
:type other: Annotation
|
|
||||||
|
|
||||||
:returns: The area of the overlap in samples*frequency, or 0 if they do not overlap."""
|
|
||||||
|
|
||||||
sample_overlap_start = max(self.sample_start, other.sample_start)
|
|
||||||
sample_overlap_end = min(self.sample_start + self.sample_count, other.sample_start + other.sample_count)
|
|
||||||
|
|
||||||
freq_overlap_start = max(self.freq_lower_edge, other.freq_lower_edge)
|
|
||||||
freq_overlap_end = min(self.freq_upper_edge, other.freq_upper_edge)
|
|
||||||
|
|
||||||
if freq_overlap_start >= freq_overlap_end or sample_overlap_start >= sample_overlap_end:
|
|
||||||
return 0
|
|
||||||
else:
|
|
||||||
return (sample_overlap_end - sample_overlap_start) * (freq_overlap_end - freq_overlap_start)
|
|
||||||
|
|
||||||
def area(self):
|
|
||||||
"""
|
|
||||||
The 'area' of the bounding box, samples*frequency.
|
|
||||||
Useful to quantify annotation size.
|
|
||||||
|
|
||||||
:returns: sample length multiplied by bandwidth."""
|
|
||||||
|
|
||||||
return self.sample_count * (self.freq_upper_edge - self.freq_lower_edge)
|
|
||||||
|
|
||||||
def __eq__(self, other: Annotation) -> bool:
|
|
||||||
return self.__dict__ == other.__dict__
|
|
||||||
|
|
||||||
def to_sigmf_format(self):
|
|
||||||
"""
|
|
||||||
Returns a JSON dictionary representing this annotation formatted to be saved in a .sigmf-meta file.
|
|
||||||
"""
|
|
||||||
|
|
||||||
annotation_dict = {SigMFFile.START_INDEX_KEY: self.sample_start, SigMFFile.LENGTH_INDEX_KEY: self.sample_count}
|
|
||||||
|
|
||||||
annotation_dict["metadata"] = {
|
|
||||||
SigMFFile.LABEL_KEY: self.label,
|
|
||||||
SigMFFile.COMMENT_KEY: self.comment,
|
|
||||||
SigMFFile.FHI_KEY: self.freq_upper_edge,
|
|
||||||
SigMFFile.FLO_KEY: self.freq_lower_edge,
|
|
||||||
"ria:detail": self.detail,
|
|
||||||
}
|
|
||||||
|
|
||||||
if _is_jsonable(annotation_dict):
|
|
||||||
return annotation_dict
|
|
||||||
else:
|
|
||||||
raise ValueError("Annotation dictionary was not json serializable.")
|
|
||||||
|
|
||||||
|
|
||||||
def _is_jsonable(x: Any) -> bool:
|
|
||||||
"""
|
|
||||||
:return: True if x is JSON serializable, False otherwise.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
json.dumps(x)
|
|
||||||
return True
|
|
||||||
except (TypeError, OverflowError):
|
|
||||||
return False
|
|
||||||
|
|
@ -1,853 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import copy
|
|
||||||
import hashlib
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
import time
|
|
||||||
import warnings
|
|
||||||
from typing import Any, Iterator, Optional
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
from numpy.typing import ArrayLike
|
|
||||||
|
|
||||||
from utils.data.annotation import Annotation
|
|
||||||
|
|
||||||
PROTECTED_KEYS = ["rec_id", "timestamp"]
|
|
||||||
|
|
||||||
|
|
||||||
class Recording:
|
|
||||||
"""Tape of complex IQ (in-phase and quadrature) samples with associated metadata and annotations.
|
|
||||||
|
|
||||||
Recording data is a complex array of shape C x N, where C is the number of channels
|
|
||||||
and N is the number of samples in each channel.
|
|
||||||
|
|
||||||
Metadata is stored in a dictionary of key value pairs,
|
|
||||||
to include information such as sample_rate and center_frequency.
|
|
||||||
|
|
||||||
Annotations are a list of :ref:`Annotation <utils.data.Annotation>`,
|
|
||||||
defining bounding boxes in time and frequency with labels and metadata.
|
|
||||||
|
|
||||||
Here, signal data is represented as a NumPy array. This class is then extended in the RIA Backends to provide
|
|
||||||
support for different data structures, such as Tensors.
|
|
||||||
|
|
||||||
Recordings are long-form tapes can be obtained either from a software-defined radio (SDR) or generated
|
|
||||||
synthetically. Then, machine learning datasets are curated from collection of recordings by segmenting these
|
|
||||||
longer-form tapes into shorter units called slices.
|
|
||||||
|
|
||||||
All recordings are assigned a unique 64-character recording ID, ``rec_id``. If this field is missing from the
|
|
||||||
provided metadata, a new ID will be generated upon object instantiation.
|
|
||||||
|
|
||||||
:param data: Signal data as a tape IQ samples, either C x N complex, where C is the number of
|
|
||||||
channels and N is number of samples in the signal. If data is a one-dimensional array of complex samples with
|
|
||||||
length N, it will be reshaped to a two-dimensional array with dimensions 1 x N.
|
|
||||||
:type data: array_like
|
|
||||||
|
|
||||||
:param metadata: Additional information associated with the recording.
|
|
||||||
:type metadata: dict, optional
|
|
||||||
:param annotations: A collection of ``Annotation`` objects defining bounding boxes.
|
|
||||||
:type annotations: list of Annotations, optional
|
|
||||||
|
|
||||||
:param dtype: Explicitly specify the data-type of the complex samples. Must be a complex NumPy type, such as
|
|
||||||
``np.complex64`` or ``np.complex128``. Default is None, in which case the type is determined implicitly. If
|
|
||||||
``data`` is a NumPy array, the Recording will use the dtype of ``data`` directly without any conversion.
|
|
||||||
:type dtype: numpy dtype object, optional
|
|
||||||
:param timestamp: The timestamp when the recording data was generated. If provided, it should be a float or integer
|
|
||||||
representing the time in seconds since epoch (e.g., ``time.time()``). Only used if the `timestamp` field is not
|
|
||||||
present in the provided metadata.
|
|
||||||
:type dtype: float or int, optional
|
|
||||||
|
|
||||||
:raises ValueError: If data is not complex 1xN or CxN.
|
|
||||||
:raises ValueError: If metadata is not a python dict.
|
|
||||||
:raises ValueError: If metadata is not json serializable.
|
|
||||||
:raises ValueError: If annotations is not a list of valid annotation objects.
|
|
||||||
|
|
||||||
**Examples:**
|
|
||||||
|
|
||||||
>>> import numpy
|
|
||||||
>>> from utils.data import Recording, Annotation
|
|
||||||
|
|
||||||
>>> # Create an array of complex samples, just 1s in this case.
|
|
||||||
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
|
|
||||||
|
|
||||||
>>> # Create a dictionary of relevant metadata.
|
|
||||||
>>> sample_rate = 1e6
|
|
||||||
>>> center_frequency = 2.44e9
|
|
||||||
>>> metadata = {
|
|
||||||
... "sample_rate": sample_rate,
|
|
||||||
... "center_frequency": center_frequency,
|
|
||||||
... "author": "me",
|
|
||||||
... }
|
|
||||||
|
|
||||||
>>> # Create an annotation for the annotations list.
|
|
||||||
>>> annotations = [
|
|
||||||
... Annotation(
|
|
||||||
... sample_start=0,
|
|
||||||
... sample_count=1000,
|
|
||||||
... freq_lower_edge=center_frequency - (sample_rate / 2),
|
|
||||||
... freq_upper_edge=center_frequency + (sample_rate / 2),
|
|
||||||
... label="example",
|
|
||||||
... )
|
|
||||||
... ]
|
|
||||||
|
|
||||||
>>> # Store samples, metadata, and annotations together in a convenient object.
|
|
||||||
>>> recording = Recording(data=samples, metadata=metadata, annotations=annotations)
|
|
||||||
>>> print(recording.metadata)
|
|
||||||
{'sample_rate': 1000000.0, 'center_frequency': 2440000000.0, 'author': 'me'}
|
|
||||||
>>> print(recording.annotations[0].label)
|
|
||||||
'example'
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__( # noqa C901
|
|
||||||
self,
|
|
||||||
data: ArrayLike | list[list],
|
|
||||||
metadata: Optional[dict[str, any]] = None,
|
|
||||||
dtype: Optional[np.dtype] = None,
|
|
||||||
timestamp: Optional[float | int] = None,
|
|
||||||
annotations: Optional[list[Annotation]] = None,
|
|
||||||
):
|
|
||||||
|
|
||||||
data_arr = np.asarray(data)
|
|
||||||
|
|
||||||
if np.iscomplexobj(data_arr):
|
|
||||||
# Expect C x N
|
|
||||||
if data_arr.ndim == 1:
|
|
||||||
self._data = np.expand_dims(data_arr, axis=0) # N -> 1 x N
|
|
||||||
elif data_arr.ndim == 2:
|
|
||||||
self._data = data_arr
|
|
||||||
else:
|
|
||||||
raise ValueError("Complex data must be C x N.")
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise ValueError("Input data must be complex.")
|
|
||||||
|
|
||||||
if dtype is not None:
|
|
||||||
self._data = self._data.astype(dtype)
|
|
||||||
|
|
||||||
assert np.iscomplexobj(self._data)
|
|
||||||
|
|
||||||
if metadata is None:
|
|
||||||
self._metadata = {}
|
|
||||||
elif isinstance(metadata, dict):
|
|
||||||
self._metadata = metadata
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Metadata must be a python dict, but was {type(metadata)}.")
|
|
||||||
|
|
||||||
if not _is_jsonable(metadata):
|
|
||||||
raise ValueError("Value must be JSON serializable.")
|
|
||||||
|
|
||||||
if "timestamp" not in self.metadata:
|
|
||||||
if timestamp is not None:
|
|
||||||
if not isinstance(timestamp, (int, float)):
|
|
||||||
raise ValueError(f"timestamp must be int or float, not {type(timestamp)}")
|
|
||||||
self._metadata["timestamp"] = timestamp
|
|
||||||
else:
|
|
||||||
self._metadata["timestamp"] = time.time()
|
|
||||||
else:
|
|
||||||
if not isinstance(self._metadata["timestamp"], (int, float)):
|
|
||||||
raise ValueError("timestamp must be int or float, not ", type(self._metadata["timestamp"]))
|
|
||||||
|
|
||||||
if "rec_id" not in self.metadata:
|
|
||||||
self._metadata["rec_id"] = generate_recording_id(data=self.data, timestamp=self._metadata["timestamp"])
|
|
||||||
|
|
||||||
if annotations is None:
|
|
||||||
self._annotations = []
|
|
||||||
elif isinstance(annotations, list):
|
|
||||||
self._annotations = annotations
|
|
||||||
else:
|
|
||||||
raise ValueError("Annotations must be a list or None.")
|
|
||||||
|
|
||||||
if not all(isinstance(annotation, Annotation) for annotation in self._annotations):
|
|
||||||
raise ValueError("All elements in self._annotations must be of type Annotation.")
|
|
||||||
|
|
||||||
self._index = 0
|
|
||||||
|
|
||||||
@property
|
|
||||||
def data(self) -> np.ndarray:
|
|
||||||
"""
|
|
||||||
:return: Recording data, as a complex array.
|
|
||||||
:type: np.ndarray
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
|
|
||||||
For recordings with more than 1,024 samples, this property returns a read-only view of the data.
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
|
|
||||||
To access specific samples, consider indexing the object directly with ``rec[c, n]``.
|
|
||||||
"""
|
|
||||||
if self._data.size > 1024:
|
|
||||||
# Returning a read-only view prevents mutation at a distance while maintaining performance.
|
|
||||||
v = self._data.view()
|
|
||||||
v.setflags(write=False)
|
|
||||||
return v
|
|
||||||
else:
|
|
||||||
return self._data.copy()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def metadata(self) -> dict:
|
|
||||||
"""
|
|
||||||
:return: Dictionary of recording metadata.
|
|
||||||
:type: dict
|
|
||||||
"""
|
|
||||||
return self._metadata.copy()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def annotations(self) -> list[Annotation]:
|
|
||||||
"""
|
|
||||||
:return: List of recording annotations
|
|
||||||
:type: list of Annotation objects
|
|
||||||
"""
|
|
||||||
return self._annotations.copy()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def shape(self) -> tuple[int]:
|
|
||||||
"""
|
|
||||||
:return: The shape of the data array.
|
|
||||||
:type: tuple of ints
|
|
||||||
"""
|
|
||||||
return np.shape(self.data)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def n_chan(self) -> int:
|
|
||||||
"""
|
|
||||||
:return: The number of channels in the recording.
|
|
||||||
:type: int
|
|
||||||
"""
|
|
||||||
return self.shape[0]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def rec_id(self) -> str:
|
|
||||||
"""
|
|
||||||
:return: Recording ID.
|
|
||||||
:type: str
|
|
||||||
"""
|
|
||||||
return self.metadata["rec_id"]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def dtype(self) -> str:
|
|
||||||
"""
|
|
||||||
:return: Data-type of the data array's elements.
|
|
||||||
:type: numpy dtype object
|
|
||||||
"""
|
|
||||||
return self.data.dtype
|
|
||||||
|
|
||||||
@property
|
|
||||||
def timestamp(self) -> float | int:
|
|
||||||
"""
|
|
||||||
:return: Recording timestamp (time in seconds since epoch).
|
|
||||||
:type: float or int
|
|
||||||
"""
|
|
||||||
return self.metadata["timestamp"]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def sample_rate(self) -> float | None:
|
|
||||||
"""
|
|
||||||
:return: Sample rate of the recording, or None if 'sample_rate' is not in metadata.
|
|
||||||
:type: str
|
|
||||||
"""
|
|
||||||
return self.metadata.get("sample_rate")
|
|
||||||
|
|
||||||
@sample_rate.setter
|
|
||||||
def sample_rate(self, sample_rate: float | int) -> None:
|
|
||||||
"""Set the sample rate of the recording.
|
|
||||||
|
|
||||||
:param sample_rate: The sample rate of the recording.
|
|
||||||
:type sample_rate: float or int
|
|
||||||
|
|
||||||
:return: None
|
|
||||||
"""
|
|
||||||
self.add_to_metadata(key="sample_rate", value=sample_rate)
|
|
||||||
|
|
||||||
def astype(self, dtype: np.dtype) -> Recording:
|
|
||||||
"""Copy of the recording, data cast to a specified type.
|
|
||||||
|
|
||||||
.. todo: This method is not yet implemented.
|
|
||||||
|
|
||||||
:param dtype: Data-type to which the array is cast. Must be a complex scalar type, such as ``np.complex64`` or
|
|
||||||
``np.complex128``.
|
|
||||||
:type dtype: NumPy data type, optional
|
|
||||||
|
|
||||||
.. note: Casting to a data type with less precision can risk losing data by truncating or rounding values,
|
|
||||||
potentially resulting in a loss of accuracy and significant information.
|
|
||||||
|
|
||||||
:return: A new recording with the same metadata and data, with dtype.
|
|
||||||
|
|
||||||
TODO: Add example usage.
|
|
||||||
"""
|
|
||||||
# Rather than check for a valid datatype, let's cast and check the result. This makes it easier to provide
|
|
||||||
# cross-platform support where the types are aliased across platforms.
|
|
||||||
with warnings.catch_warnings():
|
|
||||||
warnings.simplefilter("ignore") # Casting may generate user warnings. E.g., complex -> real
|
|
||||||
data = self.data.astype(dtype)
|
|
||||||
|
|
||||||
if np.iscomplexobj(data):
|
|
||||||
return Recording(data=data, metadata=self.metadata, annotations=self.annotations)
|
|
||||||
else:
|
|
||||||
raise ValueError("dtype must be a complex number scalar type.")
|
|
||||||
|
|
||||||
def add_to_metadata(self, key: str, value: Any) -> None:
|
|
||||||
"""Add a new key-value pair to the recording metadata.
|
|
||||||
|
|
||||||
:param key: New metadata key, must be snake_case.
|
|
||||||
:type key: str
|
|
||||||
:param value: Corresponding metadata value.
|
|
||||||
:type value: any
|
|
||||||
|
|
||||||
:raises ValueError: If key is already in metadata or if key is not a valid metadata key.
|
|
||||||
:raises ValueError: If value is not JSON serializable.
|
|
||||||
|
|
||||||
:return: None.
|
|
||||||
|
|
||||||
**Examples:**
|
|
||||||
|
|
||||||
Create a recording and add metadata:
|
|
||||||
|
|
||||||
>>> import numpy
|
|
||||||
>>> from utils.data import Recording
|
|
||||||
>>>
|
|
||||||
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
|
|
||||||
>>> metadata = {
|
|
||||||
>>> "sample_rate": 1e6,
|
|
||||||
>>> "center_frequency": 2.44e9,
|
|
||||||
>>> }
|
|
||||||
>>>
|
|
||||||
>>> recording = Recording(data=samples, metadata=metadata)
|
|
||||||
>>> print(recording.metadata)
|
|
||||||
{'sample_rate': 1000000.0,
|
|
||||||
'center_frequency': 2440000000.0,
|
|
||||||
'timestamp': 17369...,
|
|
||||||
'rec_id': 'fda0f41...'}
|
|
||||||
>>>
|
|
||||||
>>> recording.add_to_metadata(key="author", value="me")
|
|
||||||
>>> print(recording.metadata)
|
|
||||||
{'sample_rate': 1000000.0,
|
|
||||||
'center_frequency': 2440000000.0,
|
|
||||||
'author': 'me',
|
|
||||||
'timestamp': 17369...,
|
|
||||||
'rec_id': 'fda0f41...'}
|
|
||||||
"""
|
|
||||||
if key in self.metadata:
|
|
||||||
raise ValueError(
|
|
||||||
f"Key {key} already in metadata. Use Recording.update_metadata() to modify existing fields."
|
|
||||||
)
|
|
||||||
|
|
||||||
if not _is_valid_metadata_key(key):
|
|
||||||
raise ValueError(f"Invalid metadata key: {key}.")
|
|
||||||
|
|
||||||
if not _is_jsonable(value):
|
|
||||||
raise ValueError("Value must be JSON serializable.")
|
|
||||||
|
|
||||||
self._metadata[key] = value
|
|
||||||
|
|
||||||
def update_metadata(self, key: str, value: Any) -> None:
|
|
||||||
"""Update the value of an existing metadata key,
|
|
||||||
or add the key value pair if it does not already exist.
|
|
||||||
|
|
||||||
:param key: Existing metadata key.
|
|
||||||
:type key: str
|
|
||||||
:param value: New value to enter at key.
|
|
||||||
:type value: any
|
|
||||||
|
|
||||||
:raises ValueError: If value is not JSON serializable
|
|
||||||
:raises ValueError: If key is protected.
|
|
||||||
|
|
||||||
:return: None.
|
|
||||||
|
|
||||||
**Examples:**
|
|
||||||
|
|
||||||
Create a recording and update metadata:
|
|
||||||
|
|
||||||
>>> import numpy
|
|
||||||
>>> from utils.data import Recording
|
|
||||||
|
|
||||||
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
|
|
||||||
>>> metadata = {
|
|
||||||
>>> "sample_rate": 1e6,
|
|
||||||
>>> "center_frequency": 2.44e9,
|
|
||||||
>>> "author": "me"
|
|
||||||
>>> }
|
|
||||||
|
|
||||||
>>> recording = Recording(data=samples, metadata=metadata)
|
|
||||||
>>> print(recording.metadata)
|
|
||||||
{'sample_rate': 1000000.0,
|
|
||||||
'center_frequency': 2440000000.0,
|
|
||||||
'author': "me",
|
|
||||||
'timestamp': 17369...
|
|
||||||
'rec_id': 'fda0f41...'}
|
|
||||||
|
|
||||||
>>> recording.update_metadata(key="author", value=you")
|
|
||||||
>>> print(recording.metadata)
|
|
||||||
{'sample_rate': 1000000.0,
|
|
||||||
'center_frequency': 2440000000.0,
|
|
||||||
'author': "you",
|
|
||||||
'timestamp': 17369...
|
|
||||||
'rec_id': 'fda0f41...'}
|
|
||||||
"""
|
|
||||||
if key not in self.metadata:
|
|
||||||
self.add_to_metadata(key=key, value=value)
|
|
||||||
|
|
||||||
if not _is_jsonable(value):
|
|
||||||
raise ValueError("Value must be JSON serializable.")
|
|
||||||
|
|
||||||
if key in PROTECTED_KEYS: # Check protected keys.
|
|
||||||
raise ValueError(f"Key {key} is protected and cannot be modified or removed.")
|
|
||||||
|
|
||||||
else:
|
|
||||||
self._metadata[key] = value
|
|
||||||
|
|
||||||
def remove_from_metadata(self, key: str):
|
|
||||||
"""
|
|
||||||
Remove a key from the recording metadata.
|
|
||||||
Does not remove key if it is protected.
|
|
||||||
|
|
||||||
:param key: The key to remove.
|
|
||||||
:type key: str
|
|
||||||
|
|
||||||
:raises ValueError: If key is protected.
|
|
||||||
|
|
||||||
:return: None.
|
|
||||||
|
|
||||||
**Examples:**
|
|
||||||
|
|
||||||
Create a recording and add metadata:
|
|
||||||
|
|
||||||
>>> import numpy
|
|
||||||
>>> from utils.data import Recording
|
|
||||||
|
|
||||||
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
|
|
||||||
>>> metadata = {
|
|
||||||
... "sample_rate": 1e6,
|
|
||||||
... "center_frequency": 2.44e9,
|
|
||||||
... }
|
|
||||||
|
|
||||||
>>> recording = Recording(data=samples, metadata=metadata)
|
|
||||||
>>> print(recording.metadata)
|
|
||||||
{'sample_rate': 1000000.0,
|
|
||||||
'center_frequency': 2440000000.0,
|
|
||||||
'timestamp': 17369..., # Example value
|
|
||||||
'rec_id': 'fda0f41...'} # Example value
|
|
||||||
|
|
||||||
>>> recording.add_to_metadata(key="author", value="me")
|
|
||||||
>>> print(recording.metadata)
|
|
||||||
{'sample_rate': 1000000.0,
|
|
||||||
'center_frequency': 2440000000.0,
|
|
||||||
'author': 'me',
|
|
||||||
'timestamp': 17369..., # Example value
|
|
||||||
'rec_id': 'fda0f41...'} # Example value
|
|
||||||
"""
|
|
||||||
if key not in PROTECTED_KEYS:
|
|
||||||
self._metadata.pop(key)
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Key {key} is protected and cannot be modified or removed.")
|
|
||||||
|
|
||||||
def view(self, output_path: Optional[str] = "images/signal.png", **kwargs) -> None:
|
|
||||||
"""Create a plot of various signal visualizations as a PNG image.
|
|
||||||
|
|
||||||
:param output_path: The output image path. Defaults to "images/signal.png".
|
|
||||||
:type output_path: str, optional
|
|
||||||
:param kwargs: Keyword arguments passed on to utils.view.view_sig.
|
|
||||||
:type: dict of keyword arguments
|
|
||||||
|
|
||||||
**Examples:**
|
|
||||||
|
|
||||||
Create a recording and view it as a plot in a .png image:
|
|
||||||
|
|
||||||
>>> import numpy
|
|
||||||
>>> from utils.data import Recording
|
|
||||||
|
|
||||||
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
|
|
||||||
>>> metadata = {
|
|
||||||
>>> "sample_rate": 1e6,
|
|
||||||
>>> "center_frequency": 2.44e9,
|
|
||||||
>>> }
|
|
||||||
|
|
||||||
>>> recording = Recording(data=samples, metadata=metadata)
|
|
||||||
>>> recording.view()
|
|
||||||
"""
|
|
||||||
from utils.view import view_sig
|
|
||||||
|
|
||||||
view_sig(recording=self, output_path=output_path, **kwargs)
|
|
||||||
|
|
||||||
def simple_view(self, **kwargs) -> None:
|
|
||||||
"""Create a plot of various signal visualizations as a PNG or SVG image.
|
|
||||||
|
|
||||||
:param kwargs: Keyword arguments passed on to utils.view.view_signal_simple.create_plots.
|
|
||||||
:type: dict of keyword arguments
|
|
||||||
|
|
||||||
**Examples:**
|
|
||||||
|
|
||||||
Create a recording and view it as a plot in a .png image:
|
|
||||||
|
|
||||||
>>> import numpy
|
|
||||||
>>> from utils.data import Recording
|
|
||||||
|
|
||||||
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
|
|
||||||
>>> metadata = {
|
|
||||||
>>> "sample_rate": 1e6,
|
|
||||||
>>> "center_frequency": 2.44e9,
|
|
||||||
>>> }
|
|
||||||
|
|
||||||
>>> recording = Recording(data=samples, metadata=metadata)
|
|
||||||
>>> recording.simple_view()
|
|
||||||
"""
|
|
||||||
from utils.view.view_signal_simple import view_simple_sig
|
|
||||||
|
|
||||||
view_simple_sig(recording=self, **kwargs)
|
|
||||||
|
|
||||||
def to_sigmf(
|
|
||||||
self, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None, overwrite: bool = False
|
|
||||||
) -> None:
|
|
||||||
"""Write recording to a set of SigMF files.
|
|
||||||
|
|
||||||
The SigMF io format is defined by the `SigMF Specification Project <https://github.com/sigmf/SigMF>`_
|
|
||||||
|
|
||||||
:param recording: The recording to be written to file.
|
|
||||||
:type recording: utils.data.Recording
|
|
||||||
:param filename: The name of the file where the recording is to be saved. Defaults to auto generated filename.
|
|
||||||
:type filename: os.PathLike or str, optional
|
|
||||||
:param path: The directory path to where the recording is to be saved. Defaults to recordings/.
|
|
||||||
:type path: os.PathLike or str, optional
|
|
||||||
|
|
||||||
:raises IOError: If there is an issue encountered during the file writing process.
|
|
||||||
|
|
||||||
:return: None
|
|
||||||
|
|
||||||
**Examples:**
|
|
||||||
|
|
||||||
Create a recording and view it as a plot in a `.png` image:
|
|
||||||
|
|
||||||
>>> import numpy
|
|
||||||
>>> from utils.data import Recording
|
|
||||||
|
|
||||||
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
|
|
||||||
>>> metadata = {
|
|
||||||
... "sample_rate": 1e6,
|
|
||||||
... "center_frequency": 2.44e9,
|
|
||||||
... }
|
|
||||||
|
|
||||||
>>> recording = Recording(data=samples, metadata=metadata)
|
|
||||||
>>> recording.view()
|
|
||||||
"""
|
|
||||||
from utils.io.recording import to_sigmf
|
|
||||||
|
|
||||||
to_sigmf(filename=filename, path=path, recording=self, overwrite=overwrite)
|
|
||||||
|
|
||||||
def to_npy(
|
|
||||||
self, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None, overwrite: bool = False
|
|
||||||
) -> str:
|
|
||||||
"""Write recording to ``.npy`` binary file.
|
|
||||||
|
|
||||||
:param filename: The name of the file where the recording is to be saved. Defaults to auto generated filename.
|
|
||||||
:type filename: os.PathLike or str, optional
|
|
||||||
:param path: The directory path to where the recording is to be saved. Defaults to recordings/.
|
|
||||||
:type path: os.PathLike or str, optional
|
|
||||||
|
|
||||||
:raises IOError: If there is an issue encountered during the file writing process.
|
|
||||||
|
|
||||||
:return: Path where the file was saved.
|
|
||||||
:rtype: str
|
|
||||||
|
|
||||||
**Examples:**
|
|
||||||
|
|
||||||
Create a recording and save it to a .npy file:
|
|
||||||
|
|
||||||
>>> import numpy
|
|
||||||
>>> from utils.data import Recording
|
|
||||||
|
|
||||||
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
|
|
||||||
>>> metadata = {
|
|
||||||
>>> "sample_rate": 1e6,
|
|
||||||
>>> "center_frequency": 2.44e9,
|
|
||||||
>>> }
|
|
||||||
|
|
||||||
>>> recording = Recording(data=samples, metadata=metadata)
|
|
||||||
>>> recording.to_npy()
|
|
||||||
"""
|
|
||||||
from utils.io.recording import to_npy
|
|
||||||
|
|
||||||
to_npy(recording=self, filename=filename, path=path, overwrite=overwrite)
|
|
||||||
|
|
||||||
def to_wav(
|
|
||||||
self,
|
|
||||||
filename: Optional[str] = None,
|
|
||||||
path: Optional[os.PathLike | str] = None,
|
|
||||||
target_sample_rate: Optional[int] = 48000,
|
|
||||||
bits_per_sample: int = 32,
|
|
||||||
overwrite: bool = False,
|
|
||||||
) -> str:
|
|
||||||
"""Write recording to WAV file with embedded YAML metadata.
|
|
||||||
|
|
||||||
WAV format uses stereo audio with I (in-phase) in left channel and Q (quadrature) in right channel.
|
|
||||||
Metadata is stored in standard LIST INFO chunks with RF-specific metadata encoded as YAML
|
|
||||||
in the ICMT (comment) field for human readability.
|
|
||||||
|
|
||||||
:param filename: The name of the file where the recording is to be saved. Defaults to auto generated filename.
|
|
||||||
:type filename: os.PathLike or str, optional
|
|
||||||
:param path: The directory path to where the recording is to be saved. Defaults to recordings/.
|
|
||||||
:type path: os.PathLike or str, optional
|
|
||||||
:param target_sample_rate: Sample rate stored in the WAV header when no sample_rate metadata
|
|
||||||
is present. IQ samples are written without decimation or interpolation. Default is 48000 Hz.
|
|
||||||
:type target_sample_rate: int, optional
|
|
||||||
:param bits_per_sample: Bits per sample (32 for float32, 16 for int16). Default is 32.
|
|
||||||
:type bits_per_sample: int, optional
|
|
||||||
:param overwrite: Whether to overwrite existing files. Default is False.
|
|
||||||
:type overwrite: bool, optional
|
|
||||||
|
|
||||||
:raises IOError: If there is an issue encountered during the file writing process.
|
|
||||||
|
|
||||||
:return: Path where the file was saved.
|
|
||||||
:rtype: str
|
|
||||||
|
|
||||||
**Examples:**
|
|
||||||
|
|
||||||
Create a recording and save it to a .wav file:
|
|
||||||
|
|
||||||
>>> import numpy
|
|
||||||
>>> from utils.data import Recording
|
|
||||||
>>> samples = numpy.exp(1j * 2 * numpy.pi * 0.1 * numpy.arange(10000))
|
|
||||||
>>> metadata = {"sample_rate": 1e6, "center_frequency": 915e6}
|
|
||||||
>>> recording = Recording(data=samples, metadata=metadata)
|
|
||||||
>>> recording.to_wav()
|
|
||||||
"""
|
|
||||||
from utils.io.recording import to_wav
|
|
||||||
|
|
||||||
return to_wav(
|
|
||||||
recording=self,
|
|
||||||
filename=filename,
|
|
||||||
path=path,
|
|
||||||
target_sample_rate=target_sample_rate,
|
|
||||||
bits_per_sample=bits_per_sample,
|
|
||||||
overwrite=overwrite,
|
|
||||||
)
|
|
||||||
|
|
||||||
def to_blue(
|
|
||||||
self,
|
|
||||||
filename: Optional[str] = None,
|
|
||||||
path: Optional[os.PathLike | str] = None,
|
|
||||||
data_format: str = "CI",
|
|
||||||
overwrite: bool = False,
|
|
||||||
) -> str:
|
|
||||||
"""Write recording to MIDAS Blue file format.
|
|
||||||
|
|
||||||
MIDAS Blue is a legacy RF file format with a 512-byte binary header.
|
|
||||||
Commonly used with X-Midas and other RF/radar signal processing tools.
|
|
||||||
|
|
||||||
:param filename: The name of the file where the recording is to be saved. Defaults to auto generated filename.
|
|
||||||
:type filename: os.PathLike or str, optional
|
|
||||||
:param path: The directory path to where the recording is to be saved. Defaults to recordings/.
|
|
||||||
:type path: os.PathLike or str, optional
|
|
||||||
:param data_format: Format code (default 'CI' = complex int16).
|
|
||||||
Common formats: 'CI' (complex int16), 'CF' (complex float32), 'CD' (complex float64).
|
|
||||||
Integer formats require the IQ samples to already be scaled within [-1, 1).
|
|
||||||
:type data_format: str, optional
|
|
||||||
:param overwrite: Whether to overwrite existing files. Default is False.
|
|
||||||
:type overwrite: bool, optional
|
|
||||||
|
|
||||||
:raises IOError: If there is an issue encountered during the file writing process.
|
|
||||||
|
|
||||||
:return: Path where the file was saved.
|
|
||||||
:rtype: str
|
|
||||||
|
|
||||||
**Examples:**
|
|
||||||
|
|
||||||
Create a recording and save it to a .blue file:
|
|
||||||
|
|
||||||
>>> import numpy
|
|
||||||
>>> from utils.data import Recording
|
|
||||||
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
|
|
||||||
>>> metadata = {"sample_rate": 1e6, "center_frequency": 2.44e9}
|
|
||||||
>>> recording = Recording(data=samples, metadata=metadata)
|
|
||||||
>>> recording.to_blue()
|
|
||||||
"""
|
|
||||||
from utils.io.recording import to_blue
|
|
||||||
|
|
||||||
return to_blue(recording=self, filename=filename, path=path, data_format=data_format, overwrite=overwrite)
|
|
||||||
|
|
||||||
def trim(self, num_samples: int, start_sample: Optional[int] = 0) -> Recording:
|
|
||||||
"""Trim Recording samples to a desired length, shifting annotations to maintain alignment.
|
|
||||||
|
|
||||||
:param start_sample: The start index of the desired trimmed recording. Defaults to 0.
|
|
||||||
:type start_sample: int, optional
|
|
||||||
:param num_samples: The number of samples that the output trimmed recording will have.
|
|
||||||
:type num_samples: int
|
|
||||||
:raises IndexError: If start_sample + num_samples is greater than the length of the recording.
|
|
||||||
:raises IndexError: If sample_start < 0 or num_samples < 0.
|
|
||||||
|
|
||||||
:return: The trimmed Recording.
|
|
||||||
:rtype: Recording
|
|
||||||
|
|
||||||
**Examples:**
|
|
||||||
|
|
||||||
Create a recording and trim it:
|
|
||||||
|
|
||||||
>>> import numpy
|
|
||||||
>>> from utils.data import Recording
|
|
||||||
|
|
||||||
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
|
|
||||||
>>> metadata = {
|
|
||||||
... "sample_rate": 1e6,
|
|
||||||
... "center_frequency": 2.44e9,
|
|
||||||
... }
|
|
||||||
|
|
||||||
>>> recording = Recording(data=samples, metadata=metadata)
|
|
||||||
>>> print(len(recording))
|
|
||||||
10000
|
|
||||||
|
|
||||||
>>> trimmed_recording = recording.trim(start_sample=1000, num_samples=1000)
|
|
||||||
>>> print(len(trimmed_recording))
|
|
||||||
1000
|
|
||||||
"""
|
|
||||||
|
|
||||||
if start_sample < 0:
|
|
||||||
raise IndexError("start_sample cannot be < 0.")
|
|
||||||
elif start_sample + num_samples > len(self):
|
|
||||||
raise IndexError(
|
|
||||||
f"start_sample {start_sample} + num_samples {num_samples} > recording length {len(self)}."
|
|
||||||
)
|
|
||||||
|
|
||||||
end_sample = start_sample + num_samples
|
|
||||||
|
|
||||||
data = self.data[:, start_sample:end_sample]
|
|
||||||
|
|
||||||
new_annotations = copy.deepcopy(self.annotations)
|
|
||||||
for annotation in new_annotations:
|
|
||||||
# trim annotation if it goes outside the trim boundaries
|
|
||||||
if annotation.sample_start < start_sample:
|
|
||||||
annotation.sample_count = annotation.sample_count - (start_sample - annotation.sample_start)
|
|
||||||
annotation.sample_start = start_sample
|
|
||||||
|
|
||||||
if annotation.sample_start + annotation.sample_count > end_sample:
|
|
||||||
annotation.sample_count = end_sample - annotation.sample_start
|
|
||||||
|
|
||||||
# shift annotation to align with the new start point
|
|
||||||
annotation.sample_start = annotation.sample_start - start_sample
|
|
||||||
|
|
||||||
return Recording(data=data, metadata=self.metadata, annotations=new_annotations)
|
|
||||||
|
|
||||||
def normalize(self) -> Recording:
|
|
||||||
"""Scale the recording data, relative to its maximum value, so that the magnitude of the maximum sample is 1.
|
|
||||||
|
|
||||||
:return: Recording where the maximum sample amplitude is 1.
|
|
||||||
:rtype: Recording
|
|
||||||
|
|
||||||
**Examples:**
|
|
||||||
|
|
||||||
Create a recording with maximum amplitude 0.5 and normalize to a maximum amplitude of 1:
|
|
||||||
|
|
||||||
>>> import numpy
|
|
||||||
>>> from utils.data import Recording
|
|
||||||
|
|
||||||
>>> samples = numpy.ones(10000, dtype=numpy.complex64) * 0.5
|
|
||||||
>>> metadata = {
|
|
||||||
... "sample_rate": 1e6,
|
|
||||||
... "center_frequency": 2.44e9,
|
|
||||||
... }
|
|
||||||
|
|
||||||
>>> recording = Recording(data=samples, metadata=metadata)
|
|
||||||
>>> print(numpy.max(numpy.abs(recording.data)))
|
|
||||||
0.5
|
|
||||||
|
|
||||||
>>> normalized_recording = recording.normalize()
|
|
||||||
>>> print(numpy.max(numpy.abs(normalized_recording.data)))
|
|
||||||
1
|
|
||||||
"""
|
|
||||||
scaled_data = self.data / np.max(abs(self.data))
|
|
||||||
return Recording(data=scaled_data, metadata=self.metadata, annotations=self.annotations)
|
|
||||||
|
|
||||||
def __len__(self) -> int:
|
|
||||||
"""The length of a recording is defined by the number of complex samples in each channel of the recording."""
|
|
||||||
return self.shape[1]
|
|
||||||
|
|
||||||
def __eq__(self, other: Recording) -> bool:
|
|
||||||
"""Two Recordings are equal if all data, metadata, and annotations are the same."""
|
|
||||||
|
|
||||||
# counter used to allow for differently ordered annotation lists
|
|
||||||
return (
|
|
||||||
np.array_equal(self.data, other.data)
|
|
||||||
and self.metadata == other.metadata
|
|
||||||
and self.annotations == other.annotations
|
|
||||||
)
|
|
||||||
|
|
||||||
def __ne__(self, other: Recording) -> bool:
|
|
||||||
"""Two Recordings are equal if all data, and metadata, and annotations are the same."""
|
|
||||||
return not self.__eq__(other=other)
|
|
||||||
|
|
||||||
def __iter__(self) -> Iterator:
|
|
||||||
self._index = 0
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __next__(self) -> np.ndarray:
|
|
||||||
if self._index < self.n_chan:
|
|
||||||
to_ret = self.data[self._index]
|
|
||||||
self._index += 1
|
|
||||||
return to_ret
|
|
||||||
else:
|
|
||||||
raise StopIteration
|
|
||||||
|
|
||||||
def __getitem__(self, key: int | tuple[int] | slice) -> np.ndarray | np.complexfloating:
|
|
||||||
"""If key is an integer, tuple of integers, or a slice, return the corresponding samples.
|
|
||||||
|
|
||||||
For arrays with 1,024 or fewer samples, return a copy of the recording data. For larger arrays, return a
|
|
||||||
read-only view. This prevents mutation at a distance while maintaining performance.
|
|
||||||
"""
|
|
||||||
if isinstance(key, (int, tuple, slice)):
|
|
||||||
v = self._data[key]
|
|
||||||
if isinstance(v, np.complexfloating):
|
|
||||||
return v
|
|
||||||
elif v.size > 1024:
|
|
||||||
v.setflags(write=False) # Make view read-only.
|
|
||||||
return v
|
|
||||||
else:
|
|
||||||
return v.copy()
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Key must be an integer, tuple, or slice but was {type(key)}.")
|
|
||||||
|
|
||||||
def __setitem__(self, *args, **kwargs) -> None:
|
|
||||||
"""Raise an error if an attempt is made to assign to the recording."""
|
|
||||||
raise ValueError("Assignment to Recording is not allowed.")
|
|
||||||
|
|
||||||
|
|
||||||
def generate_recording_id(data: np.ndarray, timestamp: Optional[float | int] = None) -> str:
|
|
||||||
"""Generate unique 64-character recording ID. The recording ID is generated by hashing the recording data with
|
|
||||||
the datetime that the recording data was generated. If no datatime is provided, the current datatime is used.
|
|
||||||
|
|
||||||
:param data: Tape of IQ samples, as a NumPy array.
|
|
||||||
:type data: np.ndarray
|
|
||||||
:param timestamp: Unix timestamp in seconds. Defaults to None.
|
|
||||||
:type timestamp: float or int, optional
|
|
||||||
|
|
||||||
:return: 256-character hash, to be used as the recording ID.
|
|
||||||
:rtype: str
|
|
||||||
"""
|
|
||||||
if timestamp is None:
|
|
||||||
timestamp = time.time()
|
|
||||||
|
|
||||||
byte_sequence = data.tobytes() + str(timestamp).encode("utf-8")
|
|
||||||
sha256_hash = hashlib.sha256(byte_sequence)
|
|
||||||
|
|
||||||
return sha256_hash.hexdigest()
|
|
||||||
|
|
||||||
|
|
||||||
def _is_jsonable(x: Any) -> bool:
|
|
||||||
"""
|
|
||||||
:return: True if x is JSON serializable, False otherwise.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
json.dumps(x)
|
|
||||||
return True
|
|
||||||
except (TypeError, OverflowError):
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def _is_valid_metadata_key(key: Any) -> bool:
|
|
||||||
"""
|
|
||||||
:return: True if key is a valid metadata key, False otherwise.
|
|
||||||
"""
|
|
||||||
if isinstance(key, str) and key.islower() and re.match(pattern=r"^[a-z_]+$", string=key) is not None:
|
|
||||||
return True
|
|
||||||
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
BIN
src/ria_toolkit_oss/view/graphics/Qoherent-logo-black-transparent.png
(Stored with Git LFS)
BIN
src/ria_toolkit_oss/view/graphics/Qoherent-logo-black-transparent.png
(Stored with Git LFS)
Binary file not shown.
BIN
src/ria_toolkit_oss/view/graphics/Qoherent-logo-white-transparent.png
(Stored with Git LFS)
BIN
src/ria_toolkit_oss/view/graphics/Qoherent-logo-white-transparent.png
(Stored with Git LFS)
Binary file not shown.
|
|
@ -7,14 +7,18 @@ import matplotlib.pyplot as plt
|
||||||
from matplotlib.patches import Patch
|
from matplotlib.patches import Patch
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from matplotlib import gridspec
|
from matplotlib import gridspec
|
||||||
from matplotlib.patches import Patch
|
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
from scipy.fft import fft, fftshift
|
from scipy.fft import fft, fftshift
|
||||||
from scipy.signal import spectrogram
|
from scipy.signal import spectrogram
|
||||||
from scipy.signal.windows import hann
|
from scipy.signal.windows import hann
|
||||||
|
|
||||||
from utils.data.recording import Recording
|
from ria_toolkit_oss.datatypes.recording import Recording
|
||||||
from utils.view.tools import COLORS, decimate, extract_metadata_fields, set_path
|
from ria_toolkit_oss.view.tools import (
|
||||||
|
COLORS,
|
||||||
|
decimate,
|
||||||
|
extract_metadata_fields,
|
||||||
|
set_path,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_fft_size(plot_length):
|
def get_fft_size(plot_length):
|
||||||
|
|
@ -58,17 +62,6 @@ def view_annotations(
|
||||||
sample_rate, center_frequency, _ = extract_metadata_fields(recording.metadata)
|
sample_rate, center_frequency, _ = extract_metadata_fields(recording.metadata)
|
||||||
annotations = recording.annotations
|
annotations = recording.annotations
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
# 2. Setup Color Mapping (No more hardcoded yellow fallback!)
|
|
||||||
# available_colors = [
|
|
||||||
# COLORS.get("magenta", "magenta"),
|
|
||||||
# COLORS.get("accent", "cyan"),
|
|
||||||
# COLORS.get("light", "white"),
|
|
||||||
# "lime",
|
|
||||||
# ]
|
|
||||||
|
|
||||||
palette = ["#FF00FF", "#00FF00", "#00FFFF", "#FFFF00", "#FF8000"]
|
|
||||||
=======
|
|
||||||
# 2. Setup Color Mapping
|
# 2. Setup Color Mapping
|
||||||
available_colors = [
|
available_colors = [
|
||||||
COLORS.get("magenta", "magenta"),
|
COLORS.get("magenta", "magenta"),
|
||||||
|
|
@ -78,7 +71,6 @@ def view_annotations(
|
||||||
]
|
]
|
||||||
|
|
||||||
palette = ["#2196F3", "#9C27B0", "#64B5F6", "#7B1FA2", "#5C6BC0", "#CE93D8", "#1565C0", "#7C4DFF"]
|
palette = ["#2196F3", "#9C27B0", "#64B5F6", "#7B1FA2", "#5C6BC0", "#CE93D8", "#1565C0", "#7C4DFF"]
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
unique_labels = sorted(list(set(ann.label for ann in annotations if ann.label)))
|
unique_labels = sorted(list(set(ann.label for ann in annotations if ann.label)))
|
||||||
label_to_color = {label: palette[i % len(palette)] for i, label in enumerate(unique_labels)}
|
label_to_color = {label: palette[i % len(palette)] for i, label in enumerate(unique_labels)}
|
||||||
|
|
||||||
|
|
@ -87,11 +79,6 @@ def view_annotations(
|
||||||
complex_signal, NFFT=256, Fs=sample_rate, Fc=center_frequency, noverlap=128, cmap="twilight"
|
complex_signal, NFFT=256, Fs=sample_rate, Fc=center_frequency, noverlap=128, cmap="twilight"
|
||||||
)
|
)
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
# 4. Draw Annotations
|
|
||||||
for annotation in annotations:
|
|
||||||
# --- DEFINING VARIABLES FIRST ---
|
|
||||||
=======
|
|
||||||
# 4. Draw Annotations (highest threshold % first so lower % renders on top)
|
# 4. Draw Annotations (highest threshold % first so lower % renders on top)
|
||||||
def _threshold_sort_key(ann):
|
def _threshold_sort_key(ann):
|
||||||
try:
|
try:
|
||||||
|
|
@ -100,21 +87,13 @@ def view_annotations(
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
for annotation in sorted(annotations, key=_threshold_sort_key, reverse=True):
|
for annotation in sorted(annotations, key=_threshold_sort_key, reverse=True):
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
t_start = annotation.sample_start / sample_rate
|
t_start = annotation.sample_start / sample_rate
|
||||||
t_width = annotation.sample_count / sample_rate
|
t_width = annotation.sample_count / sample_rate
|
||||||
f_start = annotation.freq_lower_edge
|
f_start = annotation.freq_lower_edge
|
||||||
f_height = annotation.freq_upper_edge - annotation.freq_lower_edge
|
f_height = annotation.freq_upper_edge - annotation.freq_lower_edge
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
# Look up the color for this specific label
|
|
||||||
ann_color = label_to_color.get(annotation.label, "gray")
|
ann_color = label_to_color.get(annotation.label, "gray")
|
||||||
|
|
||||||
# Draw the Rectangle
|
|
||||||
=======
|
|
||||||
ann_color = label_to_color.get(annotation.label, "gray")
|
|
||||||
|
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
rect = plt.Rectangle(
|
rect = plt.Rectangle(
|
||||||
(t_start, f_start), t_width, f_height, linewidth=1.5, edgecolor=ann_color, facecolor="none", alpha=0.8
|
(t_start, f_start), t_width, f_height, linewidth=1.5, edgecolor=ann_color, facecolor="none", alpha=0.8
|
||||||
)
|
)
|
||||||
|
|
@ -130,11 +109,7 @@ def view_annotations(
|
||||||
ax.set_title(title, fontsize=title_fontsize, pad=20)
|
ax.set_title(title, fontsize=title_fontsize, pad=20)
|
||||||
ax.set_xlabel("Time (s)", fontsize=12)
|
ax.set_xlabel("Time (s)", fontsize=12)
|
||||||
ax.set_ylabel("Frequency (MHz)", fontsize=12)
|
ax.set_ylabel("Frequency (MHz)", fontsize=12)
|
||||||
<<<<<<< HEAD
|
|
||||||
ax.grid(alpha=0.1) # Add faint grid
|
|
||||||
=======
|
|
||||||
ax.grid(alpha=0.1)
|
ax.grid(alpha=0.1)
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
|
|
||||||
output_path, _ = set_path(output_path=output_path)
|
output_path, _ = set_path(output_path=output_path)
|
||||||
plt.savefig(output_path, dpi=dpi, bbox_inches="tight")
|
plt.savefig(output_path, dpi=dpi, bbox_inches="tight")
|
||||||
|
|
@ -312,7 +287,9 @@ def view_sig(
|
||||||
)
|
)
|
||||||
|
|
||||||
set_spines(spec_ax, spines)
|
set_spines(spec_ax, spines)
|
||||||
spec_ax.set_title("Spectrogram", loc="center", fontsize=subtitle_fontsize)
|
spec_ax.set_title("Spectrogram", fontsize=subtitle_fontsize)
|
||||||
|
spec_ax.set_ylabel("Frequency (Hz)")
|
||||||
|
spec_ax.set_xlabel("Time (s)")
|
||||||
|
|
||||||
if iq:
|
if iq:
|
||||||
iq_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :])
|
iq_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :])
|
||||||
|
|
@ -396,11 +373,7 @@ def view_sig(
|
||||||
set_spines(meta_ax, spines)
|
set_spines(meta_ax, spines)
|
||||||
|
|
||||||
if logo and os.path.isfile(logo_path):
|
if logo and os.path.isfile(logo_path):
|
||||||
# logo_ax = plt.subplot(gs[plot_y_indx:, 2])
|
logo_ax = plt.subplot(gs[plot_y_indx + 2 :, 2])
|
||||||
logo_pos = [0.75, 0.05, 0.2, 0.08]
|
|
||||||
logo_ax = fig.add_axes(logo_pos, anchor="SE", zorder=10)
|
|
||||||
plot_x_indx = plot_x_indx + 1
|
|
||||||
|
|
||||||
logo_ax.axis("off")
|
logo_ax.axis("off")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
@ -419,6 +392,7 @@ def view_sig(
|
||||||
hspace=2.5, # Vertical space between subplots
|
hspace=2.5, # Vertical space between subplots
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# save path handling
|
||||||
output_path, _ = set_path(output_path=output_path)
|
output_path, _ = set_path(output_path=output_path)
|
||||||
plt.savefig(output_path, dpi=dpi)
|
plt.savefig(output_path, dpi=dpi)
|
||||||
print(f"Saved signal plot to {output_path}")
|
print(f"Saved signal plot to {output_path}")
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,6 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import gc
|
import gc
|
||||||
import json
|
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
import matplotlib
|
import matplotlib
|
||||||
|
|
@ -12,54 +11,13 @@ import numpy as np
|
||||||
from scipy.fft import fft, fftshift
|
from scipy.fft import fft, fftshift
|
||||||
from scipy.signal.windows import hann
|
from scipy.signal.windows import hann
|
||||||
|
|
||||||
from utils.data.recording import Recording
|
from ria_toolkit_oss.datatypes.recording import Recording
|
||||||
from utils.view.tools import COLORS, decimate, extract_metadata_fields, set_path
|
from ria_toolkit_oss.view.tools import (
|
||||||
|
COLORS,
|
||||||
|
decimate,
|
||||||
def _add_annotations(annotations, compact_mode, show_labels, sample_rate_hz, center_freq_hz, ax2):
|
extract_metadata_fields,
|
||||||
if annotations and not compact_mode:
|
set_path,
|
||||||
for annotation in annotations:
|
)
|
||||||
start_idx = annotation.get("core:sample_start", 0)
|
|
||||||
length = annotation.get("core:sample_count", 0)
|
|
||||||
start_time = start_idx / sample_rate_hz
|
|
||||||
end_time = (start_idx + length) / sample_rate_hz
|
|
||||||
freq_low = annotation.get("core:freq_lower_edge", center_freq_hz - sample_rate_hz / 4)
|
|
||||||
freq_high = annotation.get("core:freq_upper_edge", center_freq_hz + sample_rate_hz / 4)
|
|
||||||
comment = annotation.get("core:comment", "{}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
comment_data = json.loads(comment) if isinstance(comment, str) else comment
|
|
||||||
ann_type = comment_data.get("type", "unknown")
|
|
||||||
if ann_type == "intersection":
|
|
||||||
color = COLORS["success"]
|
|
||||||
elif ann_type == "parallel":
|
|
||||||
color = COLORS["primary"]
|
|
||||||
elif ann_type == "standalone":
|
|
||||||
color = COLORS["warning"]
|
|
||||||
else:
|
|
||||||
color = COLORS["error"]
|
|
||||||
except Exception:
|
|
||||||
color = COLORS["error"]
|
|
||||||
|
|
||||||
rect = plt.Rectangle(
|
|
||||||
(start_time, freq_low),
|
|
||||||
end_time - start_time,
|
|
||||||
freq_high - freq_low,
|
|
||||||
color=color,
|
|
||||||
alpha=0.4,
|
|
||||||
linewidth=2,
|
|
||||||
)
|
|
||||||
ax2.add_patch(rect)
|
|
||||||
if show_labels:
|
|
||||||
label = annotation.get("core:label", "Signal")
|
|
||||||
ax2.text(
|
|
||||||
start_time,
|
|
||||||
freq_high,
|
|
||||||
label,
|
|
||||||
color=COLORS["light"],
|
|
||||||
fontsize=10,
|
|
||||||
bbox=dict(boxstyle="round,pad=0.2", facecolor=color, alpha=0.7),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _get_nfft_size(signal, fast_mode):
|
def _get_nfft_size(signal, fast_mode):
|
||||||
|
|
@ -180,7 +138,6 @@ def detect_constellation_symbols(signal: np.ndarray, method: str = "differential
|
||||||
|
|
||||||
def view_simple_sig(
|
def view_simple_sig(
|
||||||
recording: Recording,
|
recording: Recording,
|
||||||
annotations: Optional[list] = None,
|
|
||||||
output_path: Optional[str] = "images/signal.png",
|
output_path: Optional[str] = "images/signal.png",
|
||||||
saveplot: Optional[bool] = True,
|
saveplot: Optional[bool] = True,
|
||||||
fast_mode: Optional[bool] = False,
|
fast_mode: Optional[bool] = False,
|
||||||
|
|
@ -304,15 +261,6 @@ def view_simple_sig(
|
||||||
|
|
||||||
ax2.set_title("Spectrogram", loc="left", pad=10)
|
ax2.set_title("Spectrogram", loc="left", pad=10)
|
||||||
|
|
||||||
_add_annotations(
|
|
||||||
annotations=annotations,
|
|
||||||
compact_mode=compact_mode,
|
|
||||||
show_labels=show_labels,
|
|
||||||
sample_rate_hz=sample_rate_hz,
|
|
||||||
center_freq_hz=center_freq_hz,
|
|
||||||
ax2=ax2,
|
|
||||||
)
|
|
||||||
|
|
||||||
if ax_constellation is not None:
|
if ax_constellation is not None:
|
||||||
constellation_samples = _get_plot_samples(signal=signal, fast_mode=fast_mode, slow_max=50_000, fast_max=20_000)
|
constellation_samples = _get_plot_samples(signal=signal, fast_mode=fast_mode, slow_max=50_000, fast_max=20_000)
|
||||||
method = "differential" if fast_mode else "combined"
|
method = "differential" if fast_mode else "combined"
|
||||||
|
|
@ -362,7 +310,7 @@ def view_simple_sig(
|
||||||
else:
|
else:
|
||||||
plt.tight_layout()
|
plt.tight_layout()
|
||||||
if show_title:
|
if show_title:
|
||||||
plt.subplots_adjust(top=0.92)
|
plt.subplots_adjust(top=0.90)
|
||||||
|
|
||||||
if saveplot:
|
if saveplot:
|
||||||
output_path, extension = set_path(output_path=output_path)
|
output_path, extension = set_path(output_path=output_path)
|
||||||
|
|
|
||||||
|
|
@ -11,13 +11,8 @@ from ria_toolkit_oss.annotations import (
|
||||||
split_recording_annotations,
|
split_recording_annotations,
|
||||||
threshold_qualifier,
|
threshold_qualifier,
|
||||||
)
|
)
|
||||||
<<<<<<< HEAD
|
|
||||||
from ria_toolkit_oss.data import Annotation
|
|
||||||
from ria_toolkit_oss.data.recording import Recording
|
|
||||||
=======
|
|
||||||
from ria_toolkit_oss.datatypes import Annotation
|
from ria_toolkit_oss.datatypes import Annotation
|
||||||
from ria_toolkit_oss.datatypes.recording import Recording
|
from ria_toolkit_oss.datatypes.recording import Recording
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
from ria_toolkit_oss.io import load_recording, to_blue, to_npy, to_sigmf, to_wav
|
from ria_toolkit_oss.io import load_recording, to_blue, to_npy, to_sigmf, to_wav
|
||||||
from ria_toolkit_oss_cli.ria_toolkit_oss.common import format_frequency, format_sample_count
|
from ria_toolkit_oss_cli.ria_toolkit_oss.common import format_frequency, format_sample_count
|
||||||
|
|
||||||
|
|
@ -55,15 +50,6 @@ def detect_input_format(filepath):
|
||||||
|
|
||||||
def determine_output_path(input_path, output_path, fmt, quiet, overwrite):
|
def determine_output_path(input_path, output_path, fmt, quiet, overwrite):
|
||||||
input_path = Path(input_path)
|
input_path = Path(input_path)
|
||||||
<<<<<<< HEAD
|
|
||||||
|
|
||||||
if output_path:
|
|
||||||
target = Path(output_path)
|
|
||||||
final_path = target
|
|
||||||
else:
|
|
||||||
annotated_name = f"{input_path.stem}_annotated"
|
|
||||||
target = input_path.with_name(f"{annotated_name}{input_path.suffix}")
|
|
||||||
=======
|
|
||||||
input_is_annotated = input_path.stem.endswith("_annotated")
|
input_is_annotated = input_path.stem.endswith("_annotated")
|
||||||
|
|
||||||
if output_path:
|
if output_path:
|
||||||
|
|
@ -73,7 +59,6 @@ def determine_output_path(input_path, output_path, fmt, quiet, overwrite):
|
||||||
target = input_path
|
target = input_path
|
||||||
else:
|
else:
|
||||||
target = input_path.with_name(f"{input_path.stem}_annotated{input_path.suffix}")
|
target = input_path.with_name(f"{input_path.stem}_annotated{input_path.suffix}")
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
|
|
||||||
if fmt == "sigmf":
|
if fmt == "sigmf":
|
||||||
final_path = normalize_sigmf_path(target)
|
final_path = normalize_sigmf_path(target)
|
||||||
|
|
@ -84,15 +69,10 @@ def determine_output_path(input_path, output_path, fmt, quiet, overwrite):
|
||||||
if not quiet:
|
if not quiet:
|
||||||
click.echo(f"Saving to: {final_path}")
|
click.echo(f"Saving to: {final_path}")
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
if final_path.exists() and not overwrite and final_path != input_path:
|
|
||||||
click.echo(f"Error: {final_path} already exists. Use --overwrite to replace it.", err=True)
|
|
||||||
=======
|
|
||||||
# Always allow writing to _annotated files; guard against overwriting originals
|
# Always allow writing to _annotated files; guard against overwriting originals
|
||||||
target_is_annotated = final_path.stem.endswith("_annotated")
|
target_is_annotated = final_path.stem.endswith("_annotated")
|
||||||
if final_path.exists() and not target_is_annotated and final_path != input_path:
|
if final_path.exists() and not target_is_annotated and final_path != input_path:
|
||||||
click.echo(f"Error: {final_path} is not an annotated file and cannot be overwritten.", err=True)
|
click.echo(f"Error: {final_path} is not an annotated file and cannot be overwritten.", err=True)
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
return final_path
|
return final_path
|
||||||
|
|
@ -250,13 +230,8 @@ def list(input, verbose):
|
||||||
|
|
||||||
\b
|
\b
|
||||||
Examples:
|
Examples:
|
||||||
<<<<<<< HEAD
|
|
||||||
utils annotate list recording.sigmf-data
|
|
||||||
utils annotate list signal.npy --verbose
|
|
||||||
=======
|
|
||||||
ria annotate list recording.sigmf-data
|
ria annotate list recording.sigmf-data
|
||||||
ria annotate list signal.npy --verbose
|
ria annotate list signal.npy --verbose
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
recording = load_recording(input)
|
recording = load_recording(input)
|
||||||
|
|
@ -324,13 +299,8 @@ def add(input, start, count, label, freq_lower, freq_upper, comment, annotation_
|
||||||
|
|
||||||
\b
|
\b
|
||||||
Examples:
|
Examples:
|
||||||
<<<<<<< HEAD
|
|
||||||
utils annotate add file.npy --start 1000 --count 500 --label wifi
|
|
||||||
utils annotate add signal.sigmf-data --start 0 --count 1000 --label burst --comment "Strong signal"
|
|
||||||
=======
|
|
||||||
ria annotate add file.npy --start 1000 --count 500 --label wifi
|
ria annotate add file.npy --start 1000 --count 500 --label wifi
|
||||||
ria annotate add signal.sigmf-data --start 0 --count 1000 --label burst --comment "Strong signal"
|
ria annotate add signal.sigmf-data --start 0 --count 1000 --label burst --comment "Strong signal"
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
recording = load_recording(input)
|
recording = load_recording(input)
|
||||||
|
|
@ -412,21 +382,12 @@ def add(input, start, count, label, freq_lower, freq_upper, comment, annotation_
|
||||||
def remove(input, index, output, overwrite, quiet):
|
def remove(input, index, output, overwrite, quiet):
|
||||||
"""Remove annotation by index.
|
"""Remove annotation by index.
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
Use 'utils annotate list' to see annotation indices.
|
|
||||||
|
|
||||||
\b
|
|
||||||
Examples:
|
|
||||||
utils annotate remove signal.sigmf-data 2
|
|
||||||
utils annotate remove file.npy 0
|
|
||||||
=======
|
|
||||||
Use 'ria annotate list' to see annotation indices.
|
Use 'ria annotate list' to see annotation indices.
|
||||||
|
|
||||||
\b
|
\b
|
||||||
Examples:
|
Examples:
|
||||||
ria annotate remove signal.sigmf-data 2
|
ria annotate remove signal.sigmf-data 2
|
||||||
ria annotate remove file.npy 0
|
ria annotate remove file.npy 0
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
recording = load_recording(input)
|
recording = load_recording(input)
|
||||||
|
|
@ -475,13 +436,8 @@ def clear(input, output, overwrite, force, quiet):
|
||||||
|
|
||||||
\b
|
\b
|
||||||
Examples:
|
Examples:
|
||||||
<<<<<<< HEAD
|
|
||||||
utils annotate clear signal.sigmf-data
|
|
||||||
utils annotate clear file.npy --force
|
|
||||||
=======
|
|
||||||
ria annotate clear signal.sigmf-data
|
ria annotate clear signal.sigmf-data
|
||||||
ria annotate clear file.npy --force
|
ria annotate clear file.npy --force
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
recording = load_recording(input)
|
recording = load_recording(input)
|
||||||
|
|
@ -576,17 +532,10 @@ def energy(
|
||||||
|
|
||||||
\b
|
\b
|
||||||
Examples:
|
Examples:
|
||||||
<<<<<<< HEAD
|
|
||||||
utils annotate energy capture.sigmf-data --label burst
|
|
||||||
utils annotate energy signal.npy --threshold 1.5 --min-distance 10000
|
|
||||||
utils annotate energy signal.sigmf-data --freq-method obw
|
|
||||||
utils annotate energy signal.sigmf-data --freq-method full-detected
|
|
||||||
=======
|
|
||||||
ria annotate energy capture.sigmf-data --label burst
|
ria annotate energy capture.sigmf-data --label burst
|
||||||
ria annotate energy signal.npy --threshold 1.5 --min-distance 10000
|
ria annotate energy signal.npy --threshold 1.5 --min-distance 10000
|
||||||
ria annotate energy signal.sigmf-data --freq-method obw
|
ria annotate energy signal.sigmf-data --freq-method obw
|
||||||
ria annotate energy signal.sigmf-data --freq-method full-detected
|
ria annotate energy signal.sigmf-data --freq-method full-detected
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
|
@ -662,13 +611,8 @@ def cusum(input, label, min_duration, window_size, tolerance, annotation_type, o
|
||||||
|
|
||||||
\b
|
\b
|
||||||
Examples:
|
Examples:
|
||||||
<<<<<<< HEAD
|
|
||||||
utils annotate cusum signal.sigmf-data --min-duration 5.0
|
|
||||||
utils annotate cusum data.npy --min-duration 10.0 --label state
|
|
||||||
=======
|
|
||||||
ria annotate cusum signal.sigmf-data --min-duration 5.0
|
ria annotate cusum signal.sigmf-data --min-duration 5.0
|
||||||
ria annotate cusum data.npy --min-duration 10.0 --label state
|
ria annotate cusum data.npy --min-duration 10.0 --label state
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
recording = load_recording(input)
|
recording = load_recording(input)
|
||||||
|
|
@ -714,11 +658,7 @@ def cusum(input, label, min_duration, window_size, tolerance, annotation_type, o
|
||||||
@click.argument("input", type=click.Path(exists=True))
|
@click.argument("input", type=click.Path(exists=True))
|
||||||
@click.option("--threshold", type=float, required=True, help="Threshold (0.0-1.0, fraction of max magnitude)")
|
@click.option("--threshold", type=float, required=True, help="Threshold (0.0-1.0, fraction of max magnitude)")
|
||||||
@click.option("--label", type=str, default=None, help="Annotation label")
|
@click.option("--label", type=str, default=None, help="Annotation label")
|
||||||
<<<<<<< HEAD
|
|
||||||
@click.option("--window-size", type=int, default=1024, help="Smoothing window size")
|
|
||||||
=======
|
|
||||||
@click.option("--window-size", type=int, default=None, help="Smoothing window size in samples (default: 1ms at recording sample rate)")
|
@click.option("--window-size", type=int, default=None, help="Smoothing window size in samples (default: 1ms at recording sample rate)")
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
@click.option(
|
@click.option(
|
||||||
"--type",
|
"--type",
|
||||||
"annotation_type",
|
"annotation_type",
|
||||||
|
|
@ -726,18 +666,11 @@ def cusum(input, label, min_duration, window_size, tolerance, annotation_type, o
|
||||||
default="standalone",
|
default="standalone",
|
||||||
help="Annotation type",
|
help="Annotation type",
|
||||||
)
|
)
|
||||||
<<<<<<< HEAD
|
|
||||||
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
|
||||||
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
|
||||||
@click.option("--quiet", is_flag=True, help="Quiet mode")
|
|
||||||
def threshold(input, threshold, label, window_size, annotation_type, output, overwrite, quiet):
|
|
||||||
=======
|
|
||||||
@click.option("--channel", type=int, default=0, help="Channel index to annotate (default: 0)")
|
@click.option("--channel", type=int, default=0, help="Channel index to annotate (default: 0)")
|
||||||
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
||||||
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
||||||
@click.option("--quiet", is_flag=True, help="Quiet mode")
|
@click.option("--quiet", is_flag=True, help="Quiet mode")
|
||||||
def threshold(input, threshold, label, window_size, annotation_type, channel, output, overwrite, quiet):
|
def threshold(input, threshold, label, window_size, annotation_type, channel, output, overwrite, quiet):
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
"""Auto-detect signals using threshold method.
|
"""Auto-detect signals using threshold method.
|
||||||
|
|
||||||
Detects samples above a percentage of maximum magnitude. Best for simple
|
Detects samples above a percentage of maximum magnitude. Best for simple
|
||||||
|
|
@ -745,13 +678,8 @@ def threshold(input, threshold, label, window_size, annotation_type, channel, ou
|
||||||
|
|
||||||
\b
|
\b
|
||||||
Examples:
|
Examples:
|
||||||
<<<<<<< HEAD
|
|
||||||
utils annotate threshold signal.sigmf-data --threshold 0.7 --label wifi
|
|
||||||
utils annotate threshold data.npy --threshold 0.5 --window-size 2048
|
|
||||||
=======
|
|
||||||
ria annotate threshold signal.sigmf-data --threshold 0.7 --label wifi
|
ria annotate threshold signal.sigmf-data --threshold 0.7 --label wifi
|
||||||
ria annotate threshold data.npy --threshold 0.5 --window-size 2048
|
ria annotate threshold data.npy --threshold 0.5 --window-size 2048
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
"""
|
"""
|
||||||
if not (0.0 <= threshold <= 1.0):
|
if not (0.0 <= threshold <= 1.0):
|
||||||
raise click.ClickException(f"--threshold must be between 0.0 and 1.0, got {threshold}")
|
raise click.ClickException(f"--threshold must be between 0.0 and 1.0, got {threshold}")
|
||||||
|
|
@ -766,12 +694,8 @@ def threshold(input, threshold, label, window_size, annotation_type, channel, ou
|
||||||
if not quiet:
|
if not quiet:
|
||||||
click.echo("\nDetecting signals using threshold qualifier...")
|
click.echo("\nDetecting signals using threshold qualifier...")
|
||||||
click.echo(f" Threshold: {threshold * 100:.1f}% of max magnitude")
|
click.echo(f" Threshold: {threshold * 100:.1f}% of max magnitude")
|
||||||
<<<<<<< HEAD
|
|
||||||
click.echo(f" Window size: {window_size} samples")
|
|
||||||
=======
|
|
||||||
click.echo(f" Window size: {'auto (1ms)' if window_size is None else f'{window_size} samples'}")
|
click.echo(f" Window size: {'auto (1ms)' if window_size is None else f'{window_size} samples'}")
|
||||||
click.echo(f" Channel: {channel}")
|
click.echo(f" Channel: {channel}")
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
initial_count = len(recording.annotations)
|
initial_count = len(recording.annotations)
|
||||||
|
|
@ -781,10 +705,7 @@ def threshold(input, threshold, label, window_size, annotation_type, channel, ou
|
||||||
window_size=window_size,
|
window_size=window_size,
|
||||||
label=label,
|
label=label,
|
||||||
annotation_type=annotation_type,
|
annotation_type=annotation_type,
|
||||||
<<<<<<< HEAD
|
|
||||||
=======
|
|
||||||
channel=channel,
|
channel=channel,
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
)
|
)
|
||||||
added = len(recording.annotations) - initial_count
|
added = len(recording.annotations) - initial_count
|
||||||
|
|
||||||
|
|
@ -833,17 +754,10 @@ def separate(input, indices, nfft, noise_threshold_db, min_component_bw, output,
|
||||||
|
|
||||||
\b
|
\b
|
||||||
Examples:
|
Examples:
|
||||||
<<<<<<< HEAD
|
|
||||||
utils annotate separate capture.sigmf-data
|
|
||||||
utils annotate separate signal.npy --indices 0,1,2
|
|
||||||
utils annotate separate data.sigmf-data --noise-threshold-db -70
|
|
||||||
utils annotate separate signal.npy --min-component-bw 100000
|
|
||||||
=======
|
|
||||||
ria annotate separate capture.sigmf-data
|
ria annotate separate capture.sigmf-data
|
||||||
ria annotate separate signal.npy --indices 0,1,2
|
ria annotate separate signal.npy --indices 0,1,2
|
||||||
ria annotate separate data.sigmf-data --noise-threshold-db -70
|
ria annotate separate data.sigmf-data --noise-threshold-db -70
|
||||||
ria annotate separate signal.npy --min-component-bw 100000
|
ria annotate separate signal.npy --min-component-bw 100000
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
|
|
||||||
|
|
@ -2,10 +2,7 @@
|
||||||
"""
|
"""
|
||||||
This module contains all the CLI bindings for the ria package.
|
This module contains all the CLI bindings for the ria package.
|
||||||
"""
|
"""
|
||||||
<<<<<<< HEAD
|
|
||||||
=======
|
|
||||||
|
|
||||||
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
|
|
||||||
from .annotate import annotate
|
from .annotate import annotate
|
||||||
from .capture import capture
|
from .capture import capture
|
||||||
from .combine import combine
|
from .combine import combine
|
||||||
|
|
@ -20,7 +17,7 @@ from .init import init
|
||||||
from .split import split
|
from .split import split
|
||||||
from .transform import transform
|
from .transform import transform
|
||||||
from .transmit import transmit
|
from .transmit import transmit
|
||||||
from .view import viewe
|
from .view import view
|
||||||
|
|
||||||
# Aliases
|
# Aliases
|
||||||
synth = generate
|
synth = generate
|
||||||
|
|
|
||||||
|
|
@ -33,11 +33,6 @@ VISUALIZATION_TYPES = {
|
||||||
"dark",
|
"dark",
|
||||||
"spines",
|
"spines",
|
||||||
],
|
],
|
||||||
},
|
|
||||||
"annotations": {
|
|
||||||
"function": view_annotations,
|
|
||||||
"description": "Annotation-focused spectrogram view",
|
|
||||||
"options": ["channel", "dark"],
|
|
||||||
},
|
},
|
||||||
"channels": {"function": view_channels, "description": "Multi-channel IQ and spectrogram view", "options": []},
|
"channels": {"function": view_channels, "description": "Multi-channel IQ and spectrogram view", "options": []},
|
||||||
"annotations": {"function": view_annotations, "description": "Annotated spectrogram view", "options": ["channel", "dark"]},
|
"annotations": {"function": view_annotations, "description": "Annotated spectrogram view", "options": ["channel", "dark"]},
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user