cli #15

Merged
madrigal merged 28 commits from cli into main 2025-12-22 10:42:57 -05:00
31 changed files with 165 additions and 2378 deletions
Showing only changes of commit 18395a0af8 - Show all commits

View File

@ -12,6 +12,7 @@ maintainers = [
{ name = "Benjamin Chinnery", email = "ben@qoherent.ai" }, { name = "Benjamin Chinnery", email = "ben@qoherent.ai" },
{ name = "Ashkan Beigi", email = "ash@qoherent.ai" }, { name = "Ashkan Beigi", email = "ash@qoherent.ai" },
{ name = "Madrigal Weersink", email = "madrigal@qoherent.ai" }, { name = "Madrigal Weersink", email = "madrigal@qoherent.ai" },
{ name = "Gillian Ford", email = "gillian@qoherent.ai" }
] ]
keywords = [ keywords = [
"radio", "radio",
@ -67,7 +68,8 @@ all-sdr = [
[tool.poetry] [tool.poetry]
packages = [ packages = [
{ include = "ria_toolkit_oss", from = "src" } { include = "ria_toolkit_oss", from = "src" },
{ include = "ria_toolkit_oss_cli", from = "src/ria_toolkit_oss" }
] ]
include = [ include = [
"**/*.so", # Required for Nuitkaification "**/*.so", # Required for Nuitkaification
@ -97,6 +99,9 @@ pylint = "^3.2.6" # For pyreverse, to automate the creation of UML diagrams
"Source" = "https://riahub.ai/qoherent/ria-toolkit-oss" "Source" = "https://riahub.ai/qoherent/ria-toolkit-oss"
"Issues Board" = "https://riahub.ai/qoherent/ria-toolkit-oss/issues" "Issues Board" = "https://riahub.ai/qoherent/ria-toolkit-oss/issues"
[tool.poetry.scripts]
ria = "ria_toolkit_oss.ria_toolkit_oss_cli.cli:cli"
[tool.black] [tool.black]
line-length = 119 line-length = 119
target-version = ["py310"] target-version = ["py310"]

View File

@ -1,803 +0,0 @@
"""Annotate command - Automatic detection and manual annotation management."""
import json
from pathlib import Path
import click
from src.ria_toolkit_oss.annotations import (
annotate_with_cusum,
detect_signals_energy,
split_recording_annotations,
threshold_qualifier,
)
from ria_toolkit_oss.datatypes import Annotation
from ria_toolkit_oss.datatypes.recording import Recording
from ria_toolkit_oss.io import load_recording, to_blue, to_npy, to_sigmf, to_wav
from ria_toolkit_oss.common import format_frequency, format_sample_count
def normalize_sigmf_path(filepath):
"""Normalize SigMF path to base name without extension."""
path = Path(filepath)
# Handle .sigmf-data, .sigmf-meta, or .sigmf
if ".sigmf" in path.suffix:
# Remove the suffix to get base name
return path.with_suffix("")
else:
return path
def detect_input_format(filepath):
"""Detect file format from extension."""
path = Path(filepath)
ext = path.suffix.lower()
if ext in [".sigmf-data", ".sigmf-meta"]:
return "sigmf"
elif path.name.endswith(".sigmf"):
return "sigmf"
elif ext == ".npy":
return "npy"
elif ext == ".wav":
return "wav"
elif ext == ".blue":
return "blue"
else:
raise click.ClickException(f"Unknown format for '{filepath}'. Supported: .sigmf, .npy, .wav, .blue")
def determine_output_path(input_path, output_path, fmt, quiet, overwrite):
if fmt == "sigmf":
if not quiet:
click.echo(f"Saving annotations to: {output_path}.sigmf-meta")
if output_path:
# Normalize the output path for consistency
return normalize_sigmf_path(output_path)
else:
# Auto-generate from input path
return normalize_sigmf_path(input_path)
else:
if not quiet:
click.echo(f"Saving to: {output_path}")
if output_path:
return Path(output_path)
else:
# Other formats: add _annotated suffix unless --overwrite
if overwrite:
return input_path
else:
return input_path.with_name(input_path.stem + "_annotated" + input_path.suffix)
def save_recording_auto(recording, output_path, input_path, quiet=False, overwrite=False):
"""Save recording, auto-detecting format from extension.
For SigMF: Only overwrites metadata file, data file is unchanged
For other formats: Creates _annotated copy by default, unless overwrite=True
"""
input_path = Path(input_path)
fmt = detect_input_format(input_path)
# Determine output path
output_path = determine_output_path(
input_path=input_path, output_path=output_path, fmt=fmt, quiet=quiet, overwrite=overwrite
)
if fmt == "sigmf":
# Normalize path for SigMF
base_path = output_path
stem = base_path.name
parent = base_path.parent
# For SigMF: only save metadata, copy data if needed
meta_path = parent / f"{stem}.sigmf-meta"
data_path = parent / f"{stem}.sigmf-data"
# If output is different from input, copy data file
input_base = normalize_sigmf_path(input_path)
if input_base != base_path:
import shutil
# Construct input data path correctly
# input_base is like /path/to/recording or /path/to/recording.sigmf
# We need /path/to/recording.sigmf-data
if str(input_base).endswith(".sigmf"):
input_data = Path(str(input_base).replace(".sigmf", ".sigmf-data"))
else:
input_data = input_base.parent / f"{input_base.name}.sigmf-data"
if not quiet:
click.echo(f" Copying: {data_path}")
shutil.copy2(input_data, data_path)
# Always save metadata (this is the whole point)
to_sigmf(recording, filename=stem, path=parent, overwrite=True)
if not quiet:
click.echo(f" Updated: {meta_path}")
if input_base != base_path:
click.echo(f" Created: {data_path}")
elif fmt == "npy":
to_npy(recording, filename=output_path.stem, path=output_path.parent, overwrite=True)
if not quiet:
click.echo(f" Created: {output_path}")
elif fmt == "wav":
to_wav(recording, filename=output_path.stem, path=output_path.parent, overwrite=True)
if not quiet:
click.echo(f" Created: {output_path}")
elif fmt == "blue":
to_blue(recording, filename=output_path.stem, path=output_path.parent, overwrite=True)
if not quiet:
click.echo(f" Created: {output_path}")
def determine_frequency_bounds(recording: Recording, freq_lower, freq_upper):
# Handle frequency bounds
if (freq_lower is None) != (freq_upper is None):
raise click.ClickException("Must specify both --freq-lower and --freq-upper, or neither")
if freq_lower is None:
# Default to full bandwidth
sample_rate = recording.metadata.get("sample_rate", 1)
center_freq = recording.metadata.get("center_frequency", 0)
freq_lower = center_freq - (sample_rate / 2)
freq_upper = center_freq + (sample_rate / 2)
freq_default = True
else:
freq_default = False
if freq_lower >= freq_upper:
raise click.ClickException(
f"Invalid frequency range: lower ({format_frequency(freq_lower)}) "
f"must be < upper ({format_frequency(freq_upper)})"
)
return freq_lower, freq_upper, freq_default
def get_indices_list(indices, recording: Recording):
if indices:
try:
indices_list = [int(idx.strip()) for idx in indices.split(",")]
# Validate indices
for idx in indices_list:
if idx < 0 or idx >= len(recording.annotations):
raise click.ClickException(
f"Invalid index {idx}. Recording has {len(recording.annotations)} annotation(s)"
)
except ValueError as e:
raise click.ClickException(f"Invalid indices format. Expected comma-separated integers: {e}")
return indices_list
else:
return None
# ============================================================================
# Main command group
# ============================================================================
@click.group()
def annotate():
"""Manage and auto-detect annotations on RF recordings.
\b
Subcommands:
list - List annotations
add - Add manual annotation
remove - Remove annotation by index
clear - Clear all annotations
energy - Auto-detect using energy method
cusum - Auto-detect using CUSUM method
threshold - Auto-detect using threshold method
separate - Split annotations by frequency components (Phase 2)
\b
File Path Handling:
- SigMF files: Pass .sigmf-data, .sigmf-meta, or base name
- Other formats: .npy, .wav, .blue files
\b
Output Behavior:
- SigMF: Updates .sigmf-meta only (data unchanged), in-place
- Other: Creates _annotated copy unless --overwrite specified
"""
pass
# ============================================================================
# List subcommand
# ============================================================================
@annotate.command()
@click.argument("input", type=click.Path(exists=True))
@click.option("--verbose", is_flag=True, help="Show detailed annotation info")
def list(input, verbose):
"""List all annotations in a recording.
\b
Examples:
ria annotate list recording.sigmf-data
ria annotate list signal.npy --verbose
"""
try:
recording = load_recording(input)
except Exception as e:
raise click.ClickException(f"Failed to load recording: {e}")
if len(recording.annotations) == 0:
click.echo(f"No annotations in {Path(input).name}")
return
click.echo(f"\nAnnotations in {Path(input).name}:")
for i, ann in enumerate(recording.annotations):
# Parse type from comment JSON
try:
comment_data = json.loads(ann.comment)
ann_type = comment_data.get("type", "unknown")
user_comment = comment_data.get("user_comment", "")
except (json.JSONDecodeError, TypeError):
ann_type = "unknown"
user_comment = ann.comment or ""
# Basic info
freq_range = f"{format_frequency(ann.freq_lower_edge)} - {format_frequency(ann.freq_upper_edge)}"
click.echo(
f" [{i}] Samples {format_sample_count(ann.sample_start)}-"
f"{format_sample_count(ann.sample_start + ann.sample_count)}: {ann.label}"
)
click.echo(f" Type: {ann_type}")
if verbose:
if user_comment:
click.echo(f" Comment: {user_comment}")
click.echo(f" Frequency: {freq_range}")
if ann.detail:
click.echo(f" Detail: {ann.detail}")
click.echo(f"\nTotal: {len(recording.annotations)} annotation(s)")
# ============================================================================
# Add subcommand
# ============================================================================
@annotate.command(context_settings={"max_content_width": 200})
@click.argument("input", type=click.Path(exists=True))
@click.option("--start", type=int, required=True, help="Start sample index")
@click.option("--count", type=int, required=True, help="Sample count")
@click.option("--label", type=str, required=True, help="Annotation label")
@click.option("--freq-lower", type=float, help="Lower frequency edge (Hz)")
@click.option("--freq-upper", type=float, help="Upper frequency edge (Hz)")
@click.option("--comment", type=str, help="Human-readable comment")
@click.option(
"--type",
"annotation_type",
type=click.Choice(["standalone", "parallel", "intersection"]),
default="standalone",
help="Annotation type",
)
@click.option("--output", "-o", type=click.Path(), help="Output file path")
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
@click.option("--quiet", is_flag=True, help="Quiet mode")
def add(input, start, count, label, freq_lower, freq_upper, comment, annotation_type, output, overwrite, quiet):
"""Add a manual annotation.
\b
Examples:
ria annotate add file.npy --start 1000 --count 500 --label wifi
ria annotate add signal.sigmf-data --start 0 --count 1000 --label burst --comment "Strong signal"
"""
try:
recording = load_recording(input)
if not quiet:
click.echo(f"Loaded: {input}")
except Exception as e:
raise click.ClickException(f"Failed to load recording: {e}")
# Validate sample range
n_samples = len(recording.data[0])
if start < 0:
raise click.ClickException(f"--start must be >= 0, got {start}")
if count <= 0:
raise click.ClickException(f"--count must be > 0, got {count}")
if start + count > n_samples:
raise click.ClickException(
f"Invalid annotation range:\n"
f" Start: {start:,}\n"
f" Count: {count:,}\n"
f" End: {start + count:,}\n"
f"Recording only has {n_samples:,} samples"
)
# Handle frequency bounds
freq_lower, freq_upper, freq_default = determine_frequency_bounds(
recording=recording, freq_lower=freq_lower, freq_upper=freq_upper
)
# Build comment JSON
comment_data = {"type": annotation_type}
if comment:
comment_data["user_comment"] = comment
# Create annotation
ann = Annotation(
sample_start=start,
sample_count=count,
freq_lower_edge=freq_lower,
freq_upper_edge=freq_upper,
label=label,
comment=json.dumps(comment_data),
detail={},
)
recording._annotations.append(ann)
if not quiet:
click.echo("\nAdding annotation:")
click.echo(f" Start: {format_sample_count(start)}")
click.echo(f" Count: {format_sample_count(count)} samples")
freq_str = (
"full bandwidth" if freq_default else f"{format_frequency(freq_lower)} - {format_frequency(freq_upper)}"
)
click.echo(f" Frequency: {freq_str}")
click.echo(f" Label: {label}")
click.echo(f" Type: {annotation_type}")
if comment:
click.echo(f" Comment: {comment}")
try:
save_recording_auto(recording, output, input, quiet, overwrite)
if not quiet:
click.echo(" ✓ Saved")
except Exception as e:
raise click.ClickException(f"Failed to save: {e}")
# ============================================================================
# Remove subcommand
# ============================================================================
@annotate.command(context_settings={"max_content_width": 200})
@click.argument("input", type=click.Path(exists=True))
@click.argument("index", type=int)
@click.option("--output", "-o", type=click.Path(), help="Output file path")
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
@click.option("--quiet", is_flag=True, help="Quiet mode")
def remove(input, index, output, overwrite, quiet):
"""Remove annotation by index.
Use 'ria annotate list' to see annotation indices.
\b
Examples:
ria annotate remove signal.sigmf-data 2
ria annotate remove file.npy 0
"""
try:
recording = load_recording(input)
if not quiet:
click.echo(f"Loaded: {input}")
except Exception as e:
raise click.ClickException(f"Failed to load recording: {e}")
if index < 0 or index >= len(recording.annotations):
raise click.ClickException(
f"Cannot remove annotation at index {index}\n"
f"Recording has {len(recording.annotations)} annotation(s) (indices 0-{len(recording.annotations)-1})"
)
removed_ann = recording.annotations[index]
recording._annotations.pop(index)
if not quiet:
click.echo(f"\nRemoving annotation [{index}]:")
click.echo(
f" Removed: samples {format_sample_count(removed_ann.sample_start)}-"
f"{format_sample_count(removed_ann.sample_start + removed_ann.sample_count)} ({removed_ann.label})"
)
try:
save_recording_auto(recording, output, input, quiet, overwrite)
if not quiet:
click.echo(" ✓ Saved")
except Exception as e:
raise click.ClickException(f"Failed to save: {e}")
# ============================================================================
# Clear subcommand
# ============================================================================
@annotate.command(context_settings={"max_content_width": 175})
@click.argument("input", type=click.Path(exists=True))
@click.option("--output", "-o", type=click.Path(), help="Output file path")
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
@click.option("--force", is_flag=True, help="Skip confirmation")
@click.option("--quiet", is_flag=True, help="Quiet mode")
def clear(input, output, overwrite, force, quiet):
"""Clear all annotations.
\b
Examples:
ria annotate clear signal.sigmf-data
ria annotate clear file.npy --force
"""
try:
recording = load_recording(input)
if not quiet:
click.echo(f"Loaded: {input}")
except Exception as e:
raise click.ClickException(f"Failed to load recording: {e}")
count_before = len(recording.annotations)
if count_before == 0:
if not quiet:
click.echo("No annotations to clear")
return
# Confirm unless --force
if not force and not quiet:
click.echo(f"\nWarning: This will remove all {count_before} annotation(s)")
click.confirm("Continue?", abort=True)
recording._annotations = []
if not quiet:
click.echo(f"\nCleared {count_before} annotation(s)")
try:
save_recording_auto(recording, output, input, quiet, overwrite)
if not quiet:
click.echo(" ✓ Saved")
except Exception as e:
raise click.ClickException(f"Failed to save: {e}")
# ============================================================================
# Energy detection subcommand
# ============================================================================
@annotate.command(context_settings={"max_content_width": 200})
@click.argument("input", type=click.Path(exists=True))
@click.option("--label", type=str, default="signal", help="Annotation label")
@click.option("--threshold", type=float, default=1.2, help="Threshold multiplier above noise floor")
@click.option("--segments", type=int, default=10, help="Number of segments for noise estimation")
@click.option("--window-size", type=int, default=200, help="Smoothing window size")
@click.option("--min-distance", type=int, default=5000, help="Min distance between detections")
@click.option(
"--freq-method",
type=click.Choice(["nbw", "obw", "full-detected", "full-bandwidth"]),
default="nbw",
help="Frequency bounding method",
)
@click.option("--nfft", type=int, default=65536, help="FFT size for frequency calculation")
@click.option("--obw-power", type=float, default=0.9999, help="Power percentage for OBW/NBW (0.99-0.9999)")
@click.option(
"--type",
"annotation_type",
type=click.Choice(["standalone", "parallel", "intersection"]),
default="standalone",
help="Annotation type",
)
@click.option("--output", "-o", type=click.Path(), help="Output file path")
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
@click.option("--quiet", is_flag=True, help="Quiet mode")
def energy(
input,
label,
threshold,
segments,
window_size,
min_distance,
freq_method,
nfft,
obw_power,
annotation_type,
output,
overwrite,
quiet,
):
"""Auto-detect signals using energy-based method.
Detects bursts based on energy above noise floor. Best for bursty signals
and intermittent transmissions.
\b
Frequency Bounding Methods:
nbw - Nominal bandwidth (default, best for real signals)
obw - Occupied bandwidth (more conservative, includes sidelobes)
full-detected - Lowest to highest spectral component
full-bandwidth - Entire Nyquist span
\b
Examples:
ria annotate energy capture.sigmf-data --label burst
ria annotate energy signal.npy --threshold 1.5 --min-distance 10000
ria annotate energy signal.sigmf-data --freq-method obw
ria annotate energy signal.sigmf-data --freq-method full-detected
"""
try:
recording = load_recording(input)
if not quiet:
click.echo(f"Loaded: {input}")
except Exception as e:
raise click.ClickException(f"Failed to load recording: {e}")
if not quiet:
click.echo("\nDetecting signals using energy-based method...")
click.echo(" Time detection:")
click.echo(f" Segments: {segments}")
click.echo(f" Threshold: {threshold}x noise floor")
click.echo(f" Window size: {window_size} samples")
click.echo(f" Min distance: {min_distance} samples")
click.echo(f" Frequency bounds: {freq_method}")
try:
initial_count = len(recording.annotations)
recording = detect_signals_energy(
recording,
k=segments,
threshold_factor=threshold,
window_size=window_size,
min_distance=min_distance,
label=label,
annotation_type=annotation_type,
freq_method=freq_method,
nfft=nfft,
obw_power=obw_power,
)
added = len(recording.annotations) - initial_count
if not quiet:
click.echo(f" ✓ Added {added} annotation(s)")
save_recording_auto(recording, output, input, quiet, overwrite)
if not quiet:
click.echo(" ✓ Saved")
except Exception as e:
raise click.ClickException(f"Energy detection failed: {e}")
# ============================================================================
# CUSUM detection subcommand
# ============================================================================
@annotate.command()
@click.argument("input", type=click.Path(exists=True))
@click.option("--label", type=str, default="segment", help="Annotation label")
@click.option("--min-duration", type=float, default=5.0, help="Min duration in ms (prevents over-segmentation)")
@click.option("--window-size", type=int, default=1, help="Smoothing window size")
@click.option("--tolerance", type=int, default=-1, help="Sample tolerance for merging")
@click.option(
"--type",
"annotation_type",
type=click.Choice(["standalone", "parallel", "intersection"]),
default="standalone",
help="Annotation type",
)
@click.option("--output", "-o", type=click.Path(), help="Output file path")
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
@click.option("--quiet", is_flag=True, help="Quiet mode")
def cusum(input, label, min_duration, window_size, tolerance, annotation_type, output, overwrite, quiet):
"""Auto-detect segments using CUSUM method.
Detects signal state changes (on/off, amplitude transitions). Best for
segmenting continuous signals.
IMPORTANT: Always specify --min-duration to prevent excessive segmentation.
\b
Examples:
ria annotate cusum signal.sigmf-data --min-duration 5.0
ria annotate cusum data.npy --min-duration 10.0 --label state
"""
try:
recording = load_recording(input)
if not quiet:
click.echo(f"Loaded: {input}")
except Exception as e:
raise click.ClickException(f"Failed to load recording: {e}")
if not quiet:
click.echo("\nDetecting segments using CUSUM...")
click.echo(f" Min duration: {min_duration} ms")
if window_size != 1:
click.echo(f" Window size: {window_size} samples")
try:
initial_count = len(recording.annotations)
recording = annotate_with_cusum(
recording,
label=label,
window_size=window_size,
min_duration=min_duration,
tolerance=tolerance,
annotation_type=annotation_type,
)
added = len(recording.annotations) - initial_count
if not quiet:
click.echo(f" ✓ Added {added} annotation(s)")
save_recording_auto(recording, output, input, quiet, overwrite)
if not quiet:
click.echo(" ✓ Saved")
except Exception as e:
raise click.ClickException(f"CUSUM detection failed: {e}")
# ============================================================================
# Threshold detection subcommand
# ============================================================================
@annotate.command()
@click.argument("input", type=click.Path(exists=True))
@click.option("--threshold", type=float, required=True, help="Threshold (0.0-1.0, fraction of max magnitude)")
@click.option("--label", type=str, default="signal", help="Annotation label")
@click.option("--window-size", type=int, default=1024, help="Smoothing window size")
@click.option(
"--type",
"annotation_type",
type=click.Choice(["standalone", "parallel", "intersection"]),
default="standalone",
help="Annotation type",
)
@click.option("--output", "-o", type=click.Path(), help="Output file path")
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
@click.option("--quiet", is_flag=True, help="Quiet mode")
def threshold(input, threshold, label, window_size, annotation_type, output, overwrite, quiet):
"""Auto-detect signals using threshold method.
Detects samples above a percentage of maximum magnitude. Best for simple
power-based detection.
\b
Examples:
ria annotate threshold signal.sigmf-data --threshold 0.7 --label wifi
ria annotate threshold data.npy --threshold 0.5 --window-size 2048
"""
if not (0.0 <= threshold <= 1.0):
raise click.ClickException(f"--threshold must be between 0.0 and 1.0, got {threshold}")
try:
recording = load_recording(input)
if not quiet:
click.echo(f"Loaded: {input}")
except Exception as e:
raise click.ClickException(f"Failed to load recording: {e}")
if not quiet:
click.echo("\nDetecting signals using threshold qualifier...")
click.echo(f" Threshold: {threshold * 100:.1f}% of max magnitude")
click.echo(f" Window size: {window_size} samples")
try:
initial_count = len(recording.annotations)
recording = threshold_qualifier(
recording,
threshold=threshold,
window_size=window_size,
label=label,
annotation_type=annotation_type,
)
added = len(recording.annotations) - initial_count
if not quiet:
click.echo(f" ✓ Added {added} annotation(s)")
save_recording_auto(recording, output, input, quiet, overwrite)
if not quiet:
click.echo(" ✓ Saved")
except Exception as e:
raise click.ClickException(f"Threshold detection failed: {e}")
# ============================================================================
# Separate subcommand (Phase 2: Parallel signal separation)
# ============================================================================
@annotate.command()
@click.argument("input", type=click.Path(exists=True))
@click.option("--indices", type=str, help="Comma-separated annotation indices to split (default: all)")
@click.option("--nfft", type=int, default=65536, help="FFT size for spectral analysis")
@click.option("--noise-threshold-db", type=float, help="Noise floor threshold in dB (auto-estimated if not specified)")
@click.option("--min-component-bw", type=float, default=50e3, help="Min component bandwidth in Hz")
@click.option("--output", "-o", type=click.Path(), help="Output file path")
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
@click.option("--quiet", is_flag=True, help="Quiet mode")
@click.option("--verbose", is_flag=True, help="Verbose output (show detected components)")
def separate(input, indices, nfft, noise_threshold_db, min_component_bw, output, overwrite, quiet, verbose):
"""Split annotations by frequency components (Phase 2).
Detects multiple frequency components within single annotations and splits
them into separate annotations. Uses spectral peak detection with dual
bandwidth estimation.
\b
Key Features:
- Spectral peak detection for frequency components
- Auto noise floor estimation (or user-specified)
- Dual bandwidth estimation: -3dB primary, cumulative power fallback
- Handles narrowband and wide signals (OFDM)
\b
Examples:
ria annotate separate capture.sigmf-data
ria annotate separate signal.npy --indices 0,1,2
ria annotate separate data.sigmf-data --noise-threshold-db -70
ria annotate separate signal.npy --min-component-bw 100000
"""
try:
recording = load_recording(input)
if not quiet:
click.echo(f"Loaded: {input}")
except Exception as e:
raise click.ClickException(f"Failed to load recording: {e}")
# Parse indices if specified
indices_list = get_indices_list(indices=indices, recording=recording)
if len(recording.annotations) == 0:
if not quiet:
click.echo("No annotations to split")
return
if not quiet:
click.echo("\nSplitting annotations by frequency components...")
click.echo(f" Input annotations: {len(recording.annotations)}")
if indices_list:
click.echo(f" Splitting indices: {indices_list}")
click.echo(f" FFT size: {nfft}")
if noise_threshold_db is not None:
click.echo(f" Noise threshold: {noise_threshold_db} dB")
else:
click.echo(" Noise threshold: auto-estimated")
click.echo(f" Min component BW: {format_frequency(min_component_bw)}")
try:
initial_count = len(recording.annotations)
recording = split_recording_annotations(
recording,
indices=indices_list,
nfft=nfft,
noise_threshold_db=noise_threshold_db,
min_component_bw=min_component_bw,
)
final_count = len(recording.annotations)
added = final_count - initial_count
if not quiet:
click.echo(f" ✓ Output annotations: {final_count} ({'+' if added >= 0 else ''}{added} change)")
if verbose and added > 0:
click.echo("\n Details:")
for i in range(initial_count, final_count):
ann = recording.annotations[i]
freq_range = f"{format_frequency(ann.freq_lower_edge)} - {format_frequency(ann.freq_upper_edge)}"
click.echo(
f" [{i}] samples {format_sample_count(ann.sample_start)}-"
f"{format_sample_count(ann.sample_start + ann.sample_count)}: {freq_range}"
)
save_recording_auto(recording, output, input, quiet, overwrite)
if not quiet:
click.echo(" ✓ Saved")
except Exception as e:
raise click.ClickException(f"Spectral separation failed: {e}")

View File

@ -1,54 +0,0 @@
"""
The annotations package contains tools and utilities for creating, managing, and processing annotations.
Provides automatic annotation generation using various signal detection algorithms:
- Energy-based detection (detect_signals_energy)
- CUSUM-based segmentation (annotate_with_cusum)
- Threshold-based qualification (threshold_qualifier)
- Signal isolation and extraction (isolate_signal)
- Occupied bandwidth analysis (calculate_occupied_bandwidth, calculate_nominal_bandwidth)
All detection functions return Recording objects with added annotations.
"""
__all__ = [
# Energy-based detection
"detect_signals_energy",
"calculate_occupied_bandwidth",
"calculate_nominal_bandwidth",
"calculate_full_detected_bandwidth",
"annotate_with_obw",
# CUSUM detection
"annotate_with_cusum",
# Threshold detection
"threshold_qualifier",
# Parallel signal separation (Phase 2)
"find_spectral_components",
"split_annotation_by_components",
"split_recording_annotations",
# Signal isolation
"isolate_signal",
# Annotation transforms
"remove_contained_boxes",
"is_annotation_contained",
# Dataset creation
"qualify_slice_from_annotations",
]
from .annotation_transforms import is_annotation_contained, remove_contained_boxes
from .cusum_annotator import annotate_with_cusum
from .energy_detector import (
annotate_with_obw,
calculate_full_detected_bandwidth,
calculate_nominal_bandwidth,
calculate_occupied_bandwidth,
detect_signals_energy,
)
from .parallel_signal_separator import (
find_spectral_components,
split_annotation_by_components,
split_recording_annotations,
)
from .qualify_slice import qualify_slice_from_annotations
from .signal_isolation import isolate_signal
from .threshold_qualifier import threshold_qualifier

View File

@ -1,55 +0,0 @@
from ria_toolkit_oss.datatypes.annotation import Annotation
# TODO figure out how to transfer labels in the merge case
def remove_contained_boxes(annotations: list[Annotation]):
"""
Remove all annotations (bounding boxes) that are entirely contained within other boxes in the list.
:param annotations: A list of Annotation objects.
:type annotations: list[Annotation]
:returns: A new list of Annotation objects.
:rtype: list[Annotation]"""
output_boxes = []
for i in range(len(annotations)):
contained = False
for j in range(len(annotations)):
if i != j and is_annotation_contained(annotations[i], annotations[j]):
contained = True
break
if not contained:
output_boxes.append(annotations[i])
return output_boxes
def is_annotation_contained(inner: Annotation, outer: Annotation) -> bool:
"""
Check if an annotation box is entirely contained within another annotation bounding box.
:param inner: The inner box.
:type inner: Annotation.
:param outer: The outer box.
:type outer: Annotation.
:returns: True if inner is within outer, false otherwise.
:rtype: bool
"""
inner_sample_stop = inner.sample_start + inner.sample_count
outer_sample_stop = outer.sample_start + outer.sample_count
if inner.sample_start > outer.sample_start and inner_sample_stop < outer_sample_stop:
if inner.freq_lower_edge > outer.freq_lower_edge and inner.freq_upper_edge < outer.freq_upper_edge:
return True
return False
def merge_annotations(annotations: list[Annotation], overlap_threshold) -> list[Annotation]:
raise NotImplementedError

View File

@ -1,197 +0,0 @@
import itertools
import json
from itertools import groupby
from typing import Optional
import numpy as np
from ria_toolkit_oss.datatypes import Annotation, Recording
def annotate_with_cusum(
recording: Recording,
label: Optional[str] = "segment",
window_size: Optional[int] = 1,
min_duration: Optional[float] = None,
tolerance: Optional[int] = -1,
annotation_type: Optional[str] = "standalone",
):
"""
Add annotations that divide the recording into distinct time segments.
This algorithm computes the cumulative sum of the sample magnitudes and
determines break points in the signal.
This tool can be used to find points where a signal turns on or off, or
changes between a low and high amplitude.
:param recording: A ``Recording`` object to annotate.
:type recording: ``utils.data.Recording``
:param label: Label for the detected segments.
:type label: str
:param window_size: The length (in samples) of the moving average window.
:type window_size: int
:param min_duration: The minimum duration (in ms) of a segment.
The algorithm will not produce annotations shorter than this length.
:type min_duration: float
:param tolerance: The minimum length (in samples) of a segment.
:type tolerance: int
:param annotation_type: Annotation type (standalone, parallel, intersection).
:type annotation_type: str
"""
sample_rate = recording.metadata["sample_rate"]
center_frequency = recording.metadata.get("center_frequency", 0)
# Create an object of the time segmenter
time_segmenter = TimeSegmenter(sample_rate, min_duration, window_size, tolerance)
# TODO refactor time segmentor such that the _ s are not required.
_, _, change_points, _ = time_segmenter.apply(recording.data[0])
time_segments_indices = np.append(np.insert(change_points, 0, 0), len(recording.data[0]))
annotations = []
for i in range(len(time_segments_indices) - 1):
# Build comment JSON with type metadata
comment_data = {
"type": annotation_type,
"generator": "cusum_annotator",
"params": {
"window_size": window_size,
"min_duration": min_duration,
"tolerance": tolerance,
},
}
annotations.append(
Annotation(
sample_start=time_segments_indices[i],
sample_count=time_segments_indices[i + 1] - time_segments_indices[i],
freq_lower_edge=center_frequency - (sample_rate / 2),
freq_upper_edge=center_frequency + (sample_rate / 2),
label=label,
comment=json.dumps(comment_data),
detail={"generator": "cusum_annotator"},
)
)
return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations + annotations)
def _compute_cusum(_signal, sample_rate: int, tolerance: int = None, min_duration: float = -1):
"""
This function efficiently computes the cumulative sum of a give list (_signal), with an optional tolerance.
Args:
- _signal: array of iq samples.
- Tolerance: the least acceptable length of a block, Defaults to None.
Returns:
- cusum (array): Array of the cumulative sum of the given list
- sample_rate (int): __description_
- change_points (array): Array of the indices at which a change in the CUSUM direction happens.
- min_duration (float): The least acceptable time width of each segment (in ms). Defaults to -1.
"""
# efficiently calculate the running sum of the signal
cusum = list(itertools.accumulate((_signal - np.mean(_signal))))
# 'diff' computes the differences between the consecutive values,
# then 'sign' determines if it is +ve or -ve.
change_indicators = np.sign(np.diff(cusum))
# TODO: Tolerance is not useful, we can get rid of it.
"""
To add the tolerance:
1. Group the consecutive values, count them and mark their start index
"""
if tolerance:
groups_list, index = [], 0
# 1
for key, group in groupby(change_indicators):
group_len = len(list(group))
groups_list.append((key, group_len, index))
index += group_len
# 2
for val, n, idx in groups_list:
if n <= tolerance:
change_indicators[idx : idx + n] = -1 * val
change_points = np.where(np.diff(change_indicators))[0] + 1
# Limit the change_points
# Reject those whose number of samples < minimum accepted #n of samples in (min duration) ms.
if min_duration is not None and min_duration > 0:
min_samples_wide = min_duration * sample_rate / 1000
segments_lengths = np.diff(change_points)
segments_lengths = np.insert(segments_lengths, 0, change_points[0])
change_points = change_points[np.where(segments_lengths > min_samples_wide)[0]]
return cusum, change_points
class TimeSegmenter:
"""Time Segmenter class, it creates a segmenter object with certain\
characteristics to easily split an input signal to segments based on\
the cumulative sum of deviations (of the signal mean)
"""
def __init__(
self, sample_rate: int, min_duration: float = 1, moving_average_window: int = 3, tolerance: int = None
):
"""_summary_
Args:
sample_rate (int): _description_
min_duration (float, optional): _description_. Defaults to 1.
moving_average_window (int, optional): _description_. Defaults to 3.
tolerance (int, optional): _description_. Defaults to None.
"""
self.sample_rate = sample_rate
self.min_duration = min_duration
self.moving_average_window = moving_average_window
self._moving_avg_filter = self._init_filter()
self.tolerance = tolerance
def _init_filter(self):
"""_summary_
Returns:
_type_: _description_
"""
return np.ones(self.moving_average_window) / self.moving_average_window
def _apply_filter(self, iqsignal: np.array):
"""_summary_
Args:
iqsignal (np.array): _description_
Returns:
_type_: _description_
"""
return np.convolve(abs(iqsignal), self._moving_avg_filter, mode="same")
def _create_segments(self, iq_signal: np.array, change_points: np.array):
"""_summary_
Args:
iq_signal (np.array): _description_
change_points (np.array): _description_
Returns:
_type_: _description_
"""
return np.split(iq_signal, change_points)
def apply(self, iq_signal: np.array):
"""_summary_
Args:
iq_signal (np.array): _description_
Returns:
_type_: _description_
"""
smoothed_signal = self._apply_filter(iq_signal)
cusum, change_points = _compute_cusum(smoothed_signal, self.sample_rate, self.tolerance, self.min_duration)
segments = self._create_segments(iq_signal, change_points)
return smoothed_signal, cusum, change_points, segments

View File

@ -1,520 +0,0 @@
"""
Energy-based signal detection and bandwidth analysis.
Provides automatic annotation generation using energy-based signal detection
and occupied bandwidth calculation following ITU-R SM.328 standard.
"""
import json
from typing import Tuple
import numpy as np
from ria_toolkit_oss.datatypes import Annotation, Recording
def detect_signals_energy(
recording: Recording,
k: int = 10,
threshold_factor: float = 1.2,
window_size: int = 200,
min_distance: int = 5000,
label: str = "signal",
annotation_type: str = "standalone",
freq_method: str = "nbw",
nfft: int = 65536,
obw_power: float = 0.9999,
) -> Recording:
"""
Detect signal bursts using energy-based method with adaptive noise floor estimation.
This algorithm smooths the signal with a moving average filter, estimates the noise
floor from k segments, applies a threshold to detect regions above noise, and merges
nearby detections. Detected time boundaries are then assigned frequency bounds based
on the selected frequency method.
Time Detection Algorithm:
1. Smooth signal using moving average (envelope detection)
2. Divide smoothed signal into k segments
3. Estimate noise floor as median of segment mean powers
4. Detect regions where power exceeds threshold_factor * noise_floor
5. Merge regions closer than min_distance samples
Frequency Bounding (freq_method):
- 'nbw': Nominal bandwidth (OBW + center frequency) - DEFAULT
- 'obw': Occupied bandwidth (99.99% power, includes siedelobes)
- 'full-detected': Lowest to highest spectral component
- 'full-bandwidth': Entire Nyquist span (center_freq ± sample_rate/2)
:param recording: Recording to analyze
:type recording: Recording
:param k: Number of segments for noise floor estimation (default: 10)
:type k: int
:param threshold_factor: Threshold multiplier above noise floor (typical: 1.2-2.0, default: 1.2)
:type threshold_factor: float
:param window_size: Moving average window size in samples (default: 200)
:type window_size: int
:param min_distance: Minimum distance between separate signals in samples (default: 5000)
:type min_distance: int
:param label: Label for detected annotations (default: "signal")
:type label: str
:param annotation_type: Annotation type (standalone, parallel, intersection, default: standalone)
:type annotation_type: str
:param freq_method: How to calculate frequency bounds (default: 'nbw')
:type freq_method: str
:param nfft: FFT size for frequency calculations (default: 65536)
:type nfft: int
:param obw_power: Power percentage for OBW (0.9999 = 99.99%, default: 0.9999)
:type obw_power: float
:returns: New Recording with added annotations
:rtype: Recording
**Example**::
>>> from utils.io import load_recording
>>> from utils.annotations import detect_signals_energy
>>> recording = load_recording("capture.sigmf")
>>> # Detect with NBW frequency bounds (default, best for real signals)
>>> annotated = detect_signals_energy(recording, label="burst")
>>> # Detect with OBW (more conservative, includes siedelobes)
>>> annotated = detect_signals_energy(
... recording, label="burst", freq_method="obw"
... )
>>> # Detect with full detected range (captures all spectral components)
>>> annotated = detect_signals_energy(
... recording, label="burst", freq_method="full-detected"
... )
"""
# Extract signal data (use first channel only)
signal = recording.data[0]
# Smooth signal using moving average filter (envelope detection)
smoothed_signal = np.convolve(np.abs(signal), np.ones(window_size) / window_size, mode="valid")
# Estimate noise floor using segment-based median (robust to signal presence)
segment_length = len(smoothed_signal) // k
segment_powers = [np.mean(smoothed_signal[i * segment_length : (i + 1) * segment_length] ** 2) for i in range(k)]
noise_floor = np.median(segment_powers)
threshold = noise_floor * threshold_factor
# Detect signal boundaries (regions above threshold)
boundaries = []
start = None
for i, power in enumerate(smoothed_signal**2):
if power > threshold and start is None:
# Signal starts
start = i
elif power <= threshold and start is not None:
# Signal ends
boundaries.append((start, i - start))
start = None
# Handle case where signal extends to end
if start is not None:
boundaries.append((start, len(smoothed_signal) - start))
# Merge boundaries that are closer than min_distance
merged_boundaries = []
if boundaries:
start, length = boundaries[0]
for next_start, next_length in boundaries[1:]:
if next_start - (start + length) < min_distance:
# Merge with current boundary
length = next_start + next_length - start
else:
# Save current and start new boundary
merged_boundaries.append((start, length))
start, length = next_start, next_length
# Add final boundary
merged_boundaries.append((start, length))
# Create annotations from detected boundaries
sample_rate = recording.metadata["sample_rate"]
center_frequency = recording.metadata.get("center_frequency", 0)
# Validate frequency method
valid_freq_methods = ["nbw", "obw", "full-detected", "full-bandwidth"]
if freq_method not in valid_freq_methods:
raise ValueError(f"Invalid freq_method '{freq_method}'. " f"Must be one of: {', '.join(valid_freq_methods)}")
annotations = []
for start_sample, sample_count in merged_boundaries:
# Calculate frequency bounds based on method
freq_lower, freq_upper = calculate_frequency_bounds(
freq_method, center_frequency, sample_rate, nfft, signal, start_sample, sample_count, obw_power
)
# Build comment JSON with type metadata
comment_data = {
"type": annotation_type,
"generator": "energy_detector",
"freq_method": freq_method,
"params": {
"threshold_factor": threshold_factor,
"window_size": window_size,
"noise_floor": float(noise_floor),
"threshold": float(threshold),
},
}
anno = Annotation(
sample_start=start_sample,
sample_count=sample_count,
freq_lower_edge=freq_lower,
freq_upper_edge=freq_upper,
label=label,
comment=json.dumps(comment_data),
detail={"generator": "energy_detector", "freq_method": freq_method},
)
annotations.append(anno)
# Return new Recording with combined annotations
return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations + annotations)
def calculate_occupied_bandwidth(
signal: np.ndarray,
sampling_rate: float,
nfft: int = 65536,
power_percentage: float = 0.9999,
start_offset: int = 1000,
) -> Tuple[float, float, float]:
"""
Calculate occupied bandwidth following ITU-R SM.328 standard.
Occupied bandwidth (OBW) is defined as the bandwidth containing a specified
percentage of the total signal power. This implementation uses FFT-based
power spectral density analysis.
The default power_percentage of 99.99% (0.9999) captures the main lobe and
first sidelobe, providing a realistic measure of actual spectral occupancy.
ITU-R SM.328 defines OBW as bandwidth containing 99% of power, but this
implementation allows customization.
:param signal: Complex IQ signal samples
:type signal: np.ndarray
:param sampling_rate: Sample rate in Hz
:type sampling_rate: float
:param nfft: FFT size (larger = better frequency resolution)
:type nfft: int
:param power_percentage: Fraction of power to contain (0.99 = 99%, 0.9999 = 99.99%)
:type power_percentage: float
:param start_offset: Skip this many samples at start (avoid transients)
:type start_offset: int
:returns: Tuple of (occupied_bandwidth_hz, lower_edge_hz, upper_edge_hz)
:rtype: Tuple[float, float, float]
:raises ValueError: If signal is too short for nfft + start_offset
**Example**::
>>> import numpy as np
>>> from utils.annotations import calculate_occupied_bandwidth
>>> # Generate 1 MHz QPSK signal at 10 Msps
>>> signal = np.random.randn(100000) + 1j * np.random.randn(100000)
>>> obw, f_lower, f_upper = calculate_occupied_bandwidth(
... signal, sampling_rate=10e6, power_percentage=0.99
... )
>>> print(f"99% OBW: {obw/1e6:.3f} MHz")
>>> print(f"Range: {f_lower/1e6:.3f} to {f_upper/1e6:.3f} MHz")
**References**:
ITU-R SM.328: "Spectra and bandwidth of emissions"
FCC Part 15: Uses 99% power containment for bandwidth limits
"""
# Validate input
if len(signal) < nfft + start_offset:
raise ValueError(
f"Signal too short: need {nfft + start_offset} samples, "
f"got {len(signal)}. Reduce nfft or start_offset."
)
# Extract segment (skip transients at start)
signal_segment = signal[start_offset : nfft + start_offset]
# Compute FFT and power spectral density
freq_spectrum = np.fft.fft(signal_segment, n=nfft)
psd = np.abs(freq_spectrum) ** 2
# Shift to center DC at middle (makes freq interpretation easier)
psd_shifted = np.fft.fftshift(psd)
freq_bins = np.fft.fftshift(np.fft.fftfreq(nfft, 1 / sampling_rate))
# Compute total power
total_power = np.sum(psd_shifted)
# Find frequency range containing specified power percentage
# Use cumulative sum to find edges
cumulative_power = np.cumsum(psd_shifted)
# Lower edge: where cumulative power reaches (1 - power_percentage) / 2
# Upper edge: where cumulative power reaches 1 - (1 - power_percentage) / 2
# This centers the power percentage window
lower_threshold = total_power * (1 - power_percentage) / 2
upper_threshold = total_power * (1 - (1 - power_percentage) / 2)
# Find indices
lower_idx = np.where(cumulative_power >= lower_threshold)[0][0]
upper_idx = np.where(cumulative_power <= upper_threshold)[0][-1]
# Get frequency values
lower_freq = freq_bins[lower_idx]
upper_freq = freq_bins[upper_idx]
# Calculate occupied bandwidth
occupied_bandwidth = upper_freq - lower_freq
return occupied_bandwidth, lower_freq, upper_freq
def calculate_nominal_bandwidth(
signal: np.ndarray,
sampling_rate: float,
nfft: int = 65536,
power_percentage: float = 0.9999,
start_offset: int = 1000,
) -> Tuple[float, float]:
"""
Calculate nominal bandwidth and center frequency.
Nominal bandwidth (NBW) is the occupied bandwidth along with the center
frequency of the signal's spectral occupancy. Useful for characterizing
signals with unknown or drifting center frequencies.
:param signal: Complex IQ signal samples
:type signal: np.ndarray
:param sampling_rate: Sample rate in Hz
:type sampling_rate: float
:param nfft: FFT size
:type nfft: int
:param power_percentage: Fraction of power to contain
:type power_percentage: float
:param start_offset: Skip samples at start
:type start_offset: int
:returns: Tuple of (nominal_bandwidth_hz, center_frequency_hz)
:rtype: Tuple[float, float]
**Example**::
>>> from utils.annotations import calculate_nominal_bandwidth
>>> nbw, center = calculate_nominal_bandwidth(signal, sampling_rate=10e6)
>>> print(f"NBW: {nbw/1e6:.3f} MHz, Center: {center/1e6:.3f} MHz")
"""
obw, lower_freq, upper_freq = calculate_occupied_bandwidth(
signal, sampling_rate, nfft, power_percentage, start_offset
)
# Center frequency is midpoint of occupied band
center_freq = (lower_freq + upper_freq) / 2
return obw, center_freq
def calculate_full_detected_bandwidth(
signal: np.ndarray,
sampling_rate: float,
nfft: int = 65536,
start_offset: int = 1000,
) -> Tuple[float, float, float]:
"""
Calculate frequency range from lowest to highest spectral component.
Unlike OBW/NBW which define a power-based bandwidth, this calculates
the absolute frequency span from the lowest non-zero spectral component
to the highest non-zero component.
Useful for:
- Signals with spectral gaps
- Multiple parallel signals (captures all of them)
- Understanding total occupied spectrum vs. actual bandwidth
:param signal: Complex IQ signal samples
:type signal: np.ndarray
:param sampling_rate: Sample rate in Hz
:type sampling_rate: float
:param nfft: FFT size
:type nfft: int
:param start_offset: Skip samples at start
:type start_offset: int
:returns: Tuple of (bandwidth_hz, lower_freq_hz, upper_freq_hz)
:rtype: Tuple[float, float, float]
**Example**::
>>> # Signal with two components at different frequencies
>>> bw, f_low, f_high = calculate_full_detected_bandwidth(
... signal, sampling_rate=10e6, nfft=65536
... )
>>> print(f"Full span: {f_low/1e6:.3f} to {f_high/1e6:.3f} MHz")
"""
# Validate input
if len(signal) < nfft + start_offset:
raise ValueError(
f"Signal too short: need {nfft + start_offset} samples, "
f"got {len(signal)}. Reduce nfft or start_offset."
)
# Extract segment
signal_segment = signal[start_offset : nfft + start_offset]
# Compute FFT and power spectral density
freq_spectrum = np.fft.fft(signal_segment, n=nfft)
psd = np.abs(freq_spectrum) ** 2
# Shift to center DC
psd_shifted = np.fft.fftshift(psd)
freq_bins = np.fft.fftshift(np.fft.fftfreq(nfft, 1 / sampling_rate))
# Find noise floor (mean of lowest 10% of bins)
noise_floor = np.mean(np.sort(psd_shifted)[: int(len(psd_shifted) * 0.1)])
# Find all bins above noise floor
above_noise = np.where(psd_shifted > noise_floor * 1.5)[0]
if len(above_noise) == 0:
# No signal above noise, return zero bandwidth
return 0.0, 0.0, 0.0
# Get frequency range of signal components
lower_idx = above_noise[0]
upper_idx = above_noise[-1]
lower_freq = freq_bins[lower_idx]
upper_freq = freq_bins[upper_idx]
bandwidth = upper_freq - lower_freq
return bandwidth, lower_freq, upper_freq
def annotate_with_obw(
recording: Recording,
label: str = "signal",
annotation_type: str = "standalone",
nfft: int = 65536,
power_percentage: float = 0.9999,
start_offset: int = 1000,
) -> Recording:
"""
Create a single annotation spanning the occupied bandwidth of the entire recording.
Analyzes the full recording to find its occupied bandwidth and creates an annotation
covering that frequency range for the entire time duration.
:param recording: Recording to analyze
:type recording: Recording
:param label: Annotation label
:type label: str
:param annotation_type: Annotation type
:type annotation_type: str
:param nfft: FFT size
:type nfft: int
:param power_percentage: Power percentage for OBW calculation
:type power_percentage: float
:param start_offset: Sample offset
:type start_offset: int
:returns: Recording with OBW annotation added
:rtype: Recording
**Example**::
>>> from utils.annotations import annotate_with_obw
>>> annotated = annotate_with_obw(recording, label="signal_obw")
"""
signal = recording.data[0]
sample_rate = recording.metadata["sample_rate"]
center_freq = recording.metadata.get("center_frequency", 0)
# Calculate OBW
obw, lower_offset, upper_offset = calculate_occupied_bandwidth(
signal, sample_rate, nfft, power_percentage, start_offset
)
# Convert baseband offsets to absolute frequencies
freq_lower = center_freq + lower_offset
freq_upper = center_freq + upper_offset
# Create comment JSON
comment_data = {
"type": annotation_type,
"generator": "obw_annotator",
"obw_hz": float(obw),
"power_percentage": power_percentage,
"params": {"nfft": nfft, "start_offset": start_offset},
}
# Create annotation spanning entire recording
anno = Annotation(
sample_start=0,
sample_count=len(signal),
freq_lower_edge=freq_lower,
freq_upper_edge=freq_upper,
label=label,
comment=json.dumps(comment_data),
detail={"generator": "obw_annotator", "obw_hz": float(obw)},
)
return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations + [anno])
def calculate_frequency_bounds(
freq_method, center_frequency, sample_rate, nfft, signal, start_sample, sample_count, obw_power
):
if freq_method == "full-bandwidth":
# Full Nyquist span
freq_lower = center_frequency - (sample_rate / 2)
freq_upper = center_frequency + (sample_rate / 2)
else:
# Extract segment for frequency analysis
segment_start = start_sample
segment_end = min(start_sample + sample_count, len(signal))
segment = signal[segment_start:segment_end]
if len(segment) >= nfft:
if freq_method == "nbw":
# Nominal bandwidth (OBW + center frequency)
try:
nbw_val, center = calculate_nominal_bandwidth(segment, sample_rate, nfft, obw_power)
freq_lower = center_frequency + center - (nbw_val / 2)
freq_upper = center_frequency + center + (nbw_val / 2)
except (ValueError, IndexError):
# Fallback if calculation fails
freq_lower = center_frequency - (sample_rate / 2)
freq_upper = center_frequency + (sample_rate / 2)
elif freq_method == "obw":
# Occupied bandwidth
try:
_, f_lower, f_upper = calculate_occupied_bandwidth(segment, sample_rate, nfft, obw_power)
freq_lower = center_frequency + f_lower
freq_upper = center_frequency + f_upper
except (ValueError, IndexError):
# Fallback if calculation fails
freq_lower = center_frequency - (sample_rate / 2)
freq_upper = center_frequency + (sample_rate / 2)
elif freq_method == "full-detected":
# Full detected range (lowest to highest component)
try:
_, f_lower, f_upper = calculate_full_detected_bandwidth(segment, sample_rate, nfft)
freq_lower = center_frequency + f_lower
freq_upper = center_frequency + f_upper
except (ValueError, IndexError):
# Fallback if calculation fails
freq_lower = center_frequency - (sample_rate / 2)
freq_upper = center_frequency + (sample_rate / 2)
else:
# Segment too short for FFT, use full bandwidth
freq_lower = center_frequency - (sample_rate / 2)
freq_upper = center_frequency + (sample_rate / 2)
return freq_lower, freq_upper

View File

@ -1,484 +0,0 @@
"""
Parallel signal separation for multi-component frequency-offset signals.
Provides methods to detect and separate overlapping frequency-domain signals
that occupy the same time window but different frequency bands.
This module implements **spectral peak detection** to identify distinct frequency
components and split single time-domain annotations into frequency-specific
sub-annotations.
**Key Design Decisions** (per Codex review):
1. **Complex IQ Support**: Uses `scipy.signal.welch` with `return_onesided=False`
for proper complex signal handling. Window length automatically adapts to
signal length via `nperseg=min(nfft, len(signal))` to handle bursts <nfft.
2. **Frequency Representation**: Components are detected in **relative** frequency
(baseband, centered at 0 Hz). Caller must add RF center_frequency_hz when
writing to SigMF annotations. This separation of concerns avoids the frequency
context bug where absolute Hz would be meaningless for baseband processing.
3. **Bandwidth Estimation**: Dual strategy avoids -3dB limitations:
- Primary: -3dB rolloff for typical narrowband signals
- Fallback: Cumulative power (99% like OBW) for wide/OFDM signals
- Auto-fallback when -3dB bandwidth is anomalous
4. **Noise Floor**: Auto-estimated via `np.percentile(psd_db, 10)` from data
to adapt across hardware (Pluto vs. ThinkRF). User can override if needed.
5. **Filter Sizing (Optional FIR extraction)**: When extracting components,
uses Kaiser window FIR with proper stopband specification. Auto-sizes
numtaps based on desired transition bandwidth. Includes downsampling
guidance for long captures.
6. **CLI Surface**: Single `separate` subcommand for all separation operations.
Can be chained after any detector or used standalone on existing annotations.
Example:
Two WiFi channels captured simultaneously:
>>> from utils.annotations import find_spectral_components
>>> # Detect the two distinct channels (returns relative frequencies)
>>> components = find_spectral_components(signal, sampling_rate=20e6)
>>> print(f"Found {len(components)} components")
Found 2 components
The module is designed to work with detected time-domain annotations,
allowing splitting of overlapping signals into separate training samples.
"""
import json
from typing import List, Optional, Tuple
import numpy as np
from scipy import ndimage
from scipy import signal as scipy_signal
from ria_toolkit_oss.datatypes import Annotation, Recording
def find_spectral_components(
signal_data: np.ndarray,
sampling_rate: float,
nfft: int = 65536,
noise_threshold_db: Optional[float] = None,
min_component_bw: float = 50e3,
power_threshold: float = 0.99,
) -> List[Tuple[float, float, float]]:
"""
Find distinct frequency components using spectral peak detection.
Identifies separate frequency components in a signal by analyzing the power
spectral density and finding peaks corresponding to distinct signals. This is
useful for separating parallel signals that occupy different frequency bands.
**Frequency Representation**: Returns frequencies in **baseband/relative** Hz
(centered at 0). To get absolute RF frequencies, add center_frequency_hz from
recording metadata to all returned values.
Algorithm:
1. Compute power spectral density using Welch (properly handles complex IQ)
2. Auto-estimate noise floor from data if not specified
3. Smooth PSD to reduce spurious peaks
4. Find local maxima above noise floor
5. Estimate bandwidth per peak using -3dB (fallback: cumulative power)
6. Filter components below minimum bandwidth threshold
:param signal_data: Complex IQ signal samples (np.complex64/128)
:type signal_data: np.ndarray
:param sampling_rate: Sample rate in Hz
:type sampling_rate: float
:param nfft: FFT size / window length for Welch. Automatically capped at
signal length to handle bursts (default: 65536)
:type nfft: int
:param noise_threshold_db: Minimum SNR threshold in dB. If None (default),
auto-estimates as np.percentile(psd_db, 10).
Adapt this across hardware (Pluto: ~-100, ThinkRF: ~-60).
:type noise_threshold_db: Optional[float]
:param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz)
:type min_component_bw: float
:param power_threshold: Cumulative power threshold for fallback bandwidth
estimation (default: 0.99 = 99% power, like OBW)
:type power_threshold: float
:returns: List of (center_freq_hz, lower_freq_hz, upper_freq_hz) tuples.
**All frequencies are relative (baseband, 0-centered).**
Add recording metadata['center_frequency'] to get absolute RF frequencies.
:rtype: List[Tuple[float, float, float]]
:raises ValueError: If signal has fewer than 256 samples
**Example**::
>>> from utils.io import load_recording
>>> from utils.annotations import find_spectral_components
>>> recording = load_recording("capture.sigmf")
>>> segment = recording.data[0][start:end]
>>> # Components in relative (baseband) frequency
>>> components = find_spectral_components(segment, sampling_rate=20e6)
>>> for center_rel, lower_rel, upper_rel in components:
... # Convert to absolute RF frequency
... center_abs = recording.metadata['center_frequency'] + center_rel
... print(f"Component @ {center_abs/1e9:.3f} GHz")
**Key Implementation Notes**:
1. **Welch for complex signals**: Uses `return_onesided=False` to properly
handle complex IQ pairs. Window length auto-capped via
`nperseg = min(nfft, len(signal))` so bursts <nfft don't crash.
2. **Noise floor auto-estimation**: Without user override, estimates from
data using 10th percentile, adapting to hardware SNR characteristics.
3. **Dual bandwidth estimation**: -3dB heuristic works for narrowband signals
but fails on OFDM/wide modulation (skirts never drop 3dB). Fallback uses
cumulative power (like OBW) when -3dB estimate seems anomalous.
4. **No frequency context**: Returns relative frequencies because the function
doesn't know RF center frequency. Caller must add metadata['center_frequency']
when creating annotations. This prevents frequency bugs across baseband
and RF processing pipelines.
"""
# Validate input
min_samples = 256
if len(signal_data) < min_samples:
raise ValueError(f"Signal too short: need at least {min_samples} samples, " f"got {len(signal_data)}.")
# Compute PSD using Welch method for complex IQ signals
# CRITICAL: return_onesided=False for proper complex signal handling
nperseg = min(nfft, len(signal_data))
try:
freqs, psd = scipy_signal.welch(
signal_data,
fs=sampling_rate,
nperseg=nperseg,
window="hann",
scaling="density",
return_onesided=False, # REQUIRED for complex IQ
)
except Exception as e:
raise ValueError(
f"Welch PSD computation failed: {e}. " f"Check signal format (should be complex IQ, not real)."
)
# Shift zero-frequency to center (makes interpretation easier)
psd_shifted = np.fft.fftshift(psd)
freqs_shifted = np.fft.fftshift(freqs)
# Convert to dB
psd_db = 10 * np.log10(psd_shifted + 1e-10)
# Auto-estimate noise floor if not provided
if noise_threshold_db is None:
# Use 10th percentile as noise floor estimate
noise_threshold_db = np.percentile(psd_db, 10)
# Smooth PSD to reduce spurious peaks from noise
psd_smooth = ndimage.gaussian_filter1d(psd_db, sigma=5)
# Find peaks above threshold
peaks, properties = scipy_signal.find_peaks(
psd_smooth,
height=noise_threshold_db + 3, # At least 3dB above noise floor
distance=5, # Minimum 5 bins between peaks
prominence=3, # At least 3dB above surroundings
)
if len(peaks) == 0:
# No components found
return []
# Extract bandwidth for each component
components = []
for peak_idx in peaks:
peak_freq = freqs_shifted[peak_idx]
peak_power = psd_smooth[peak_idx]
threshold_3db = peak_power - 3
# Strategy 1: Try -3dB bandwidth estimation
left_indices = np.where(psd_smooth[:peak_idx] >= threshold_3db)[0]
right_indices = np.where(psd_smooth[peak_idx:] >= threshold_3db)[0]
if len(left_indices) > 0 and len(right_indices) > 0:
lower_idx = left_indices[0]
upper_idx = peak_idx + right_indices[-1]
lower_freq = freqs_shifted[lower_idx]
upper_freq = freqs_shifted[upper_idx]
bw_3db = upper_freq - lower_freq
# Sanity check: if -3dB BW is unreasonably wide (>sampling_rate/2),
# fallback to power-based estimation
if bw_3db > sampling_rate / 4:
# Strategy 2: Use cumulative power (like OBW) for wide signals
psd_around_peak = psd_smooth[max(0, peak_idx - 100) : peak_idx + 100]
if len(psd_around_peak) > 0:
total_power = np.sum(psd_around_peak)
cumulative = np.cumsum(psd_around_peak)
# Find edges where cumulative power = power_threshold * total
# threshold_power = power_threshold * total_power
# Crude but effective: find where we cross the threshold
left_local = np.where(cumulative >= (1 - power_threshold) / 2 * total_power)[0]
right_local = np.where(cumulative <= (1 + power_threshold) / 2 * total_power)[0]
if len(left_local) > 0 and len(right_local) > 0:
lower_idx_local = left_local[0]
upper_idx_local = right_local[-1]
lower_freq = freqs_shifted[max(0, peak_idx - 100) + lower_idx_local]
upper_freq = freqs_shifted[max(0, peak_idx - 100) + upper_idx_local]
else:
# Fallback: estimate bandwidth from spectral width heuristic
lower_freq = peak_freq - min_component_bw / 2
upper_freq = peak_freq + min_component_bw / 2
center_freq = (lower_freq + upper_freq) / 2
bw = upper_freq - lower_freq
# Filter by minimum bandwidth
if bw >= min_component_bw:
# All frequencies are relative (baseband)
components.append((center_freq, lower_freq, upper_freq))
return components
def split_annotation_by_components(
annotation: Annotation,
signal: np.ndarray,
sampling_rate: float,
center_frequency_hz: float = 0.0,
nfft: int = 65536,
noise_threshold_db: Optional[float] = None,
min_component_bw: float = 50e3,
) -> List[Annotation]:
"""
Split an annotation into multiple annotations by detected frequency components.
Takes an existing annotation spanning multiple frequency components and
analyzes the frequency content to create separate sub-annotations for
each distinct frequency component.
**Use case**: Energy detection found a time window with 2-3 parallel WiFi
channels. This function splits it into separate annotations per channel.
**Frequency Handling**: `find_spectral_components` returns relative (baseband)
frequencies. This function adds `center_frequency_hz` to convert to absolute
RF frequencies for SigMF annotation bounds. This ensures correct frequency
context across baseband and RF domains.
:param annotation: Original annotation to split
:type annotation: Annotation
:param signal: Full signal array (complex IQ)
:type signal: np.ndarray
:param sampling_rate: Sample rate in Hz
:type sampling_rate: float
:param center_frequency_hz: RF center frequency to add to relative frequencies
from peak detection (default: 0.0 = baseband)
:type center_frequency_hz: float
:param nfft: FFT size for analysis (default: 65536, auto-capped at signal length)
:type nfft: int
:param noise_threshold_db: Noise floor threshold in dB. If None (default),
auto-estimates from data.
:type noise_threshold_db: Optional[float]
:param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz)
:type min_component_bw: float
:returns: List of new annotations (one per detected component).
Returns empty list if no components found or segment too short.
:rtype: List[Annotation]
**Example**::
>>> from utils.io import load_recording
>>> from utils.annotations import split_annotation_by_components
>>> recording = load_recording("capture.sigmf")
>>> # Original annotation spans multiple channels
>>> original = recording.annotations[0]
>>> # Split using RF center frequency from metadata
>>> components = split_annotation_by_components(
... original,
... recording.data[0],
... recording.metadata['sample_rate'],
... center_frequency_hz=recording.metadata.get('center_frequency', 0.0)
... )
>>> print(f"Split into {len(components)} components")
Split into 2 components
**Algorithm**:
1. Extract segment corresponding to annotation time bounds
2. Find frequency components in that segment (returns relative frequencies)
3. Add center_frequency_hz to get absolute RF frequencies
4. Create new annotation for each component
5. Preserve original metadata (label, type, etc.)
6. Add component info to comment JSON
**Notes**:
- Original annotation is not modified
- Returns empty list if segment too short (<256 samples)
- Segments <nfft get auto-downsampled to nfft (see find_spectral_components)
- Each component inherits label from original
- Component frequencies in comment JSON are absolute (RF) frequencies
"""
# Extract segment corresponding to annotation time bounds
start_sample = annotation.sample_start
end_sample = min(start_sample + annotation.sample_count, len(signal))
segment = signal[start_sample:end_sample]
# Validate segment length
if len(segment) < 256:
# Segment too short for spectral analysis
return []
# Find components in this segment (returns relative/baseband frequencies)
try:
components = find_spectral_components(segment, sampling_rate, nfft, noise_threshold_db, min_component_bw)
except ValueError:
# Spectral analysis failed (e.g., not complex IQ)
return []
if not components:
# No components found
return []
# Create annotations for each component
new_annotations = []
for center_freq_rel, lower_freq_rel, upper_freq_rel in components:
# Convert relative (baseband) frequencies to absolute (RF) frequencies
center_freq_abs = center_frequency_hz + center_freq_rel
lower_freq_abs = center_frequency_hz + lower_freq_rel
upper_freq_abs = center_frequency_hz + upper_freq_rel
# Parse original annotation metadata
try:
comment_data = json.loads(annotation.comment)
except (json.JSONDecodeError, TypeError):
comment_data = {"type": "standalone"}
# Add component information (with absolute RF frequencies)
comment_data["split_from_annotation"] = True
comment_data["original_freq_bounds"] = {
"lower": float(annotation.freq_lower_edge),
"upper": float(annotation.freq_upper_edge),
}
comment_data["component_freq_bounds_rf"] = {
"center": float(center_freq_abs),
"lower": float(lower_freq_abs),
"upper": float(upper_freq_abs),
}
# Create new annotation with absolute RF frequency bounds
new_anno = Annotation(
sample_start=annotation.sample_start,
sample_count=annotation.sample_count,
freq_lower_edge=lower_freq_abs,
freq_upper_edge=upper_freq_abs,
label=annotation.label,
comment=json.dumps(comment_data),
detail={
"generator": "parallel_signal_separator",
"center_freq_hz": float(center_freq_abs),
},
)
new_annotations.append(new_anno)
return new_annotations
def split_recording_annotations(
recording: Recording,
indices: Optional[List[int]] = None,
nfft: int = 65536,
noise_threshold_db: Optional[float] = None,
min_component_bw: float = 50e3,
) -> Recording:
"""
Split multiple annotations in a recording by frequency components.
Processes specified annotations (or all if indices=None), replacing each
with its frequency-separated components. Uses RF center_frequency from
recording metadata for proper absolute frequency conversion.
:param recording: Recording to process
:type recording: Recording
:param indices: Annotation indices to split (None = all, default: None).
Use indices=[] to skip splitting (returns unchanged recording).
:type indices: Optional[List[int]]
:param nfft: FFT size for spectral analysis (default: 65536,
auto-capped at signal segment length)
:type nfft: int
:param noise_threshold_db: Noise floor threshold in dB. If None (default),
auto-estimates from each segment.
:type noise_threshold_db: Optional[float]
:param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz).
Components narrower than this are filtered out.
:type min_component_bw: float
:returns: New Recording with split annotations
:rtype: Recording
**Example**::
>>> from utils.io import load_recording
>>> from utils.annotations import split_recording_annotations
>>> recording = load_recording("capture.sigmf")
>>> # Split all annotations
>>> split_rec = split_recording_annotations(recording)
>>> print(f"Original: {len(recording.annotations)} annotations")
>>> print(f"Split: {len(split_rec.annotations)} annotations")
Original: 5 annotations
Split: 9 annotations
**Algorithm**:
1. For each annotation in indices (or all if None):
2. Call split_annotation_by_components with RF center_frequency
3. If components found, replace annotation with components
4. If no components found, keep original annotation
5. Annotations not in indices are kept unchanged
**Notes**:
- Original recording is not modified
- Returns empty Recording.annotations if recording has no annotations
- RF center_frequency from metadata ensures correct absolute frequencies
- If an annotation can't be split (too short, wrong format), original kept
"""
if indices is None:
# Split all annotations
indices = list(range(len(recording.annotations)))
if not recording.annotations:
# No annotations to split
return recording
signal = recording.data[0]
sample_rate = recording.metadata["sample_rate"]
center_frequency = recording.metadata.get("center_frequency", 0.0)
# Build new annotation list
new_annotations = []
for i, anno in enumerate(recording.annotations):
if i in indices:
# Attempt to split this annotation
try:
components = split_annotation_by_components(
anno,
signal,
sample_rate,
center_frequency_hz=center_frequency,
nfft=nfft,
noise_threshold_db=noise_threshold_db,
min_component_bw=min_component_bw,
)
if components:
# Split successful, use components
new_annotations.extend(components)
else:
# No components found, keep original
new_annotations.append(anno)
except Exception:
# Split failed for any reason, keep original
new_annotations.append(anno)
else:
# Not in split list, keep as-is
new_annotations.append(anno)
return Recording(data=recording.data, metadata=recording.metadata, annotations=new_annotations)

View File

@ -1,35 +0,0 @@
import numpy as np
from ria_toolkit_oss.datatypes import Recording
def qualify_slice_from_annotations(recording: Recording, slice_length: int):
"""
Slice a recording into many smaller recordings,
discarding any slices which do not have annotations that apply to those samples.
Used together with an annotation based qualifier.
:param recording: The recording to slice.
:type recording: Recording
:param slice_length: The length in samples of a slice.
:type slice_length: int"""
if len(recording.annotations) == 0:
print("Warning, no annotations.")
annotation_mask = np.zeros(len(recording.data[0]))
for annotation in recording.annotations:
annotation_mask[annotation.sample_start : annotation.sample_start + annotation.sample_count] = 1
output_recordings = []
for i in range((len(recording.data[0]) // slice_length) - 1):
start_index = slice_length * i
end_index = slice_length * (i + 1)
if 1 in annotation_mask[start_index:end_index]:
sl = recording.data[:, start_index:end_index]
output_recordings.append(Recording(data=sl, metadata=recording.metadata))
return output_recordings

View File

@ -1,97 +0,0 @@
import numpy as np
from scipy.signal import butter, lfilter
from ria_toolkit_oss.datatypes.annotation import Annotation
from ria_toolkit_oss.datatypes.recording import Recording
def isolate_signal(recording: Recording, annotation: Annotation) -> Recording:
"""
Slice, filter and frequency shift the input recording according to the bounding box defined by the annotation.
:param recording: The input Recording to be sliced.
:type recording: Recording
:param annotation: The Annotation object defining the area of the recording to isolate.
:type annotation: Annotation
:param decimate: Decimate the input signal after filtering to reduce the sample rate.
:type decimate: bool
:returns: The subsection of the original recording defined by the annotation.
:rtype: Recording"""
sample_start = max(0, annotation.sample_start)
sample_stop = min(len(recording), annotation.sample_start + annotation.sample_count)
anno_base_center_freq = (annotation.freq_lower_edge + annotation.freq_upper_edge) / 2 - recording.metadata.get(
"center_frequency", 0
)
anno_bw = annotation.freq_upper_edge - annotation.freq_lower_edge
signal_slice = recording.data[0, sample_start:sample_stop]
# normalize
signal_slice = signal_slice / np.max(np.abs(signal_slice))
isolation_bw = anno_bw
# frequency shift the center of the box about zero
shifted_signal_slice = frequency_shift_iq_samples(
iq_samples=signal_slice,
sample_rate=recording.metadata["sample_rate"],
shift_frequency=-1 * anno_base_center_freq,
)
# filter
if isolation_bw < recording.metadata["sample_rate"] - 1:
filtered_signal = apply_complex_lowpass_filter(
signal=shifted_signal_slice, cutoff_frequency=isolation_bw, sample_rate=recording.metadata["sample_rate"]
)
else:
filtered_signal = shifted_signal_slice
output = Recording(data=[filtered_signal], metadata=recording.metadata)
return output
def frequency_shift_iq_samples(iq_samples, sample_rate, shift_frequency):
# Number of samples
num_samples = len(iq_samples)
# Create a time vector from 0 to the total duration in seconds
time_vector = np.arange(num_samples) / sample_rate
# Generate the complex exponential for the frequency shift
complex_exponential = np.exp(1j * 2 * np.pi * shift_frequency * time_vector)
# Apply the frequency shift to the IQ samples
shifted_samples = iq_samples * complex_exponential
return shifted_samples
# Function to apply a lowpass Butterworth filter to a complex signal
def apply_complex_lowpass_filter(signal, cutoff_frequency, sample_rate, order=5):
# Design the lowpass filter
b, a = design_complex_lowpass_filter(cutoff_frequency, sample_rate, order)
# Apply the lowpass filter
filtered_signal = lfilter(b, a, signal)
return filtered_signal
def design_complex_lowpass_filter(cutoff_frequency, sample_rate, order=5):
# Nyquist frequency for complex signals is the sample rate
nyquist = sample_rate
# Ensure the cutoff frequency is positive and within the Nyquist limit
if cutoff_frequency <= 0 or cutoff_frequency > nyquist:
raise ValueError("Cutoff frequency must be between 0 and the Nyquist frequency.")
# Normalize the cutoff frequency to the Nyquist frequency
cutoff_normalized = cutoff_frequency / nyquist
# Create a Butterworth lowpass filter
b, a = butter(order, cutoff_normalized, btype="low")
return b, a

View File

@ -1,114 +0,0 @@
import json
import warnings
from typing import Optional
import numpy as np
from ria_toolkit_oss.datatypes import Annotation, Recording
def _find_ranges(indices, window_size):
if len(indices) == 0:
return []
ranges = []
start = indices[0] # Initialize the start of the first range
in_range = False # Track if we are currently in a valid range
for i in range(1, len(indices)):
# Check if the current index is within `window_size` of the previous one
if indices[i] - indices[i - 1] <= window_size:
if not in_range:
# Start a new range
start = indices[i - 1]
in_range = True
else:
# If we were in a range, close it
if in_range:
ranges.append((start, indices[i - 1]))
in_range = False
# Handle the last range if it is still open
if in_range:
ranges.append((start, indices[-1]))
return ranges
def threshold_qualifier(
recording: Recording,
threshold: float,
window_size: Optional[int] = 1024,
label: Optional[str] = "signal",
annotation_type: Optional[str] = "standalone",
):
"""
Annotate a recording with bounding boxes labeled "signal" for regions above a threshold.
Threshold is defined as a fraction of the maximum sample magnitude.
This algorithm searches for samples above the threshold and combines them into ranges if they
are within window_size of each other.
:param recording: The recording to annotate.
:type recording: utils.data.Recording
:param threshold: The threshold value, range 0-1.
The actual threshold value used is this number multiplied by the max sample magnitude.
:type threshold: float
:param window_size: The size of the sliding window. Defaults to 1024 samples.
:type window_size: int
:param label: The label of the output annotations. Defaults to "signal".
:type label: str
:param annotation_type: Annotation type (standalone, parallel, intersection).
:type annotation_type: str
:returns: Recording with added annotations.
:rtype: Recording
"""
sample_data = recording.data
if sample_data.shape[0] > 1:
warnings.warn(
"Warning: Multichannel recording input to threshold_qualifier. "
"Only the first channel will be considered by this algorithm."
)
# do the absolute value calculation once for efficiency
abs_channel_data = np.abs(sample_data[0])
max_sample = np.max(abs_channel_data)
threshold_value = threshold * max_sample
indices = np.where(abs_channel_data > threshold_value)[0]
ranges = _find_ranges(indices=indices, window_size=window_size)
annotations = []
# to input freq upper and lower based on center frequency and sample rate
sample_rate = recording.metadata["sample_rate"]
center_frequency = recording.metadata.get("center_frequency", 0)
for range in ranges:
start, stop = range
# Build comment JSON with type metadata
comment_data = {
"type": annotation_type,
"generator": "threshold_qualifier",
"params": {
"threshold": threshold,
"threshold_value": float(threshold_value),
"max_sample": float(max_sample),
"window_size": window_size,
},
}
anno = Annotation(
sample_start=start,
sample_count=stop - start,
freq_lower_edge=center_frequency - (sample_rate / 2),
freq_upper_edge=center_frequency + (sample_rate / 2),
label=label,
comment=json.dumps(comment_data),
detail={"generator": "threshold_qualifier"},
)
annotations.append(anno)
return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations + annotations)

View File

@ -0,0 +1,83 @@
"""
Utilities for common input/output operations.
"""
import os
import ria_toolkit_oss
def exists(fid: str | os.PathLike) -> bool:
"""Check if the file or directory exists.
.. todo::
This method is not yet implemented.
:param fid: The path to the file or directory to check for existence.
:type fid: str or os.PathLike
:return: True if the file or directory exists, False otherwise.
:rtype: bool
"""
raise NotImplementedError
def validate(fid: str | os.PathLike) -> bool:
"""Validate the contents of the file or directory to ensure it is not corrupted,
the correct format for its extension, and readable RIA.
.. todo::
This method is not yet implemented.
:param fid: The path to the file or directory to validate.
:type fid: str or os.PathLike
:return: True if the file or directory is valid and readable, False otherwise.
"""
raise NotImplementedError
def move(source_path: str | os.PathLike, destination_path: str | os.PathLike, copy: bool = False) -> None:
"""Recursively move a file or directory at source_path to destination_path.
.. todo::
This method is not yet implemented.
:param source_path: The path to the source file or directory.
:type source_path: str or os.PathLike
:param destination_path: The path to the destination directory.
:type destination_path: str or os.PathLike
:param copy: If True, perform a copy instead of a move. Default is False.
:type copy: bool, optional
:raises RuntimeError: If the move was unsuccessful.
:return: None
"""
if copy:
ria_toolkit_oss.io.common.copy(source_path=source_path, destination_path=destination_path)
return
raise NotImplementedError
def copy(source_path: str | os.PathLike, destination_path: str | os.PathLike) -> None:
"""Copy the file or directory at source_path to destination_path.
.. todo::
This function is not yet implemented.
:param source_path: The path to the source file or directory.
:type source_path: str or os.PathLike
:param destination_path: The path to the destination directory.
:type destination_path: str or os.PathLike
:raises RuntimeError: If the copy was unsuccessful.
:return: None
"""
raise NotImplementedError

View File

@ -4,7 +4,7 @@ This module contains the main group for the ria toolkit oss CLI.
import click import click
from ria_toolkit_oss_cli.ria_toolkit_oss import commands from ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss import commands
@click.group() @click.group()

View File

@ -4,9 +4,9 @@ import os
import click import click
from utils.io import to_blue, to_npy, to_sigmf, to_wav from ria_toolkit_oss.io import to_blue, to_npy, to_sigmf, to_wav
from utils.io.recording import generate_filename from ria_toolkit_oss.io.recording import generate_filename
from utils.view.view_signal_simple import view_simple_sig from ria_toolkit_oss.view.view_signal_simple import view_simple_sig
from .common import ( from .common import (
echo_progress, echo_progress,

View File

@ -3,7 +3,6 @@
This module contains all the CLI bindings for the utils package. This module contains all the CLI bindings for the utils package.
""" """
from .annotate import annotate
from .capture import capture from .capture import capture
from .combine import combine from .combine import combine
from .convert import convert from .convert import convert

View File

@ -7,15 +7,15 @@ import click
import numpy as np import numpy as np
import yaml import yaml
import utils.signal.basic_signal_generator as basic_gen import ria_toolkit_oss.signal.basic_signal_generator as basic_gen
from utils.data import Recording from ria_toolkit_oss.datatypes import Recording
from utils.signal.block_gen.continuous_modulation.fsk_modulator import FSKModulator from ria_toolkit_oss.signal.block_gen.continuous_modulation.fsk_modulator import FSKModulator
from utils.signal.block_generator.basic import FrequencyShift from ria_toolkit_oss.signal.block_generator.basic import FrequencyShift
from utils.signal.block_generator.data_types import DataType from ria_toolkit_oss.signal.block_generator.data_types import DataType
from utils.signal.block_generator.mapping.apsk_mapper import _APSKMapper from ria_toolkit_oss.signal.block_generator.mapping.apsk_mapper import _APSKMapper
from utils.signal.block_generator.mapping.cross_qam_mapper import _CrossQAMMapper from ria_toolkit_oss.signal.block_generator.mapping.cross_qam_mapper import _CrossQAMMapper
from utils.signal.block_generator.mapping.mapper import Mapper from ria_toolkit_oss.signal.block_generator.mapping.mapper import Mapper
from utils.signal.block_generator.modulation import ( from ria_toolkit_oss.signal.block_generator.modulation import (
GMSKModulator, GMSKModulator,
OOKModulator, OOKModulator,
OQPSKModulator, OQPSKModulator,

View File

@ -4,9 +4,8 @@ recordings and radio datasets.
""" """
__all__ = [ __all__ = [
"view_annotations",
"view_channels", "view_channels",
"view_sig", "view_sig",
] ]
from .view_signal import view_annotations, view_channels, view_sig from .view_signal import view_channels, view_sig

View File

@ -4,7 +4,7 @@ import matplotlib.pyplot as plt
import numpy as np import numpy as np
from matplotlib.backends.backend_pdf import PdfPages from matplotlib.backends.backend_pdf import PdfPages
from utils.io.recording import from_npy from ria_toolkit_oss.io.recording import from_npy
def create_dataset_pdf(dataset_path, output_path, div=64, metadata_keys=None): def create_dataset_pdf(dataset_path, output_path, div=64, metadata_keys=None):

View File

@ -4,7 +4,7 @@ import scipy.signal as signal
from plotly.graph_objs import Figure from plotly.graph_objs import Figure
from scipy.fft import fft, fftshift from scipy.fft import fft, fftshift
from utils.data import Recording from ria_toolkit_oss.datatypes import Recording
def spectrogram(rec: Recording, thumbnail: bool = False) -> Figure: def spectrogram(rec: Recording, thumbnail: bool = False) -> Figure:

View File

@ -38,6 +38,66 @@ def set_spines(ax, spines):
ax.spines["bottom"].set_visible(False) ax.spines["bottom"].set_visible(False)
ax.spines["left"].set_visible(False) ax.spines["left"].set_visible(False)
def view_channels(
recording: Recording,
output_path: Optional[str] = "images/signal.png",
title: Optional[str] = "Multichannel Signal Plot",
) -> None:
"""Create a PNG of the recording samples, spectrogram, and constellation plot.
Plot is automatically saved to file at output_path.
:param recording: The recording object to plot
:type recording: Recording
:param output_path: The path to save the image. Defaults to "images/signal.png".
:type output_path: str, optional
:param title: The plot title. Defaults to "Multichannel Signal Plot".
:type title: str, optional
:return: None
**Examples:**
.. todo:: Usage examples coming soon.
"""
num_channels = recording.data.shape[0]
fig, axes = plt.subplots(nrows=num_channels, ncols=2)
fig.subplots_adjust(wspace=0.5, hspace=0.5)
plt.style.use("dark_background")
fig.suptitle(title, fontsize=16)
axes[0, 0].set_title("IQ Signal", color=COLORS["light"])
axes[0, 1].set_title("Spectrogram", color=COLORS["light"])
linewidth = 0.5
tick_fontsize = 4
center_frequency = recording.metadata.get("center_frequency", 0)
sample_rate = recording.metadata.get("sample_rate", 1)
sample_indexes = np.arange(0, len(recording.data[0]), 1)
t = sample_indexes / sample_rate
for i in range(num_channels):
axes[i, 0].plot(t, np.real(recording.data[i]), linewidth=linewidth)
axes[i, 0].plot(t, np.imag(recording.data[i]), linewidth=linewidth)
axes[i, 1].specgram(recording.data[i], Fs=sample_rate, Fc=center_frequency)
axes[i, 0].tick_params(labelsize=tick_fontsize, colors=COLORS["light"])
axes[i, 1].tick_params(labelsize=tick_fontsize, colors=COLORS["light"])
axes[i, 0].set_ylabel("Amplitude", fontsize=6, color=COLORS["light"])
axes[i, 1].set_ylabel("Freq (Hz)", fontsize=6, color=COLORS["light"])
if i != num_channels - 1:
axes[i, 0].set_xticks([])
axes[i, 1].set_xticks([])
else:
axes[i, 0].set_xlabel("Time (s)", fontsize=6, color=COLORS["light"])
axes[i, 1].set_xlabel("Time (s)", fontsize=6, color=COLORS["light"])
output_path, _ = set_path(output_path=output_path)
plt.savefig(output_path, dpi=1000)
print(f"Saved signal plot to {output_path}")
def view_sig( def view_sig(
recording: Recording, recording: Recording,