"""Annotate command - Automatic detection and manual annotation management.""" import json from pathlib import Path import click from ria_toolkit_oss.annotations import ( annotate_with_cusum, detect_signals_energy, split_recording_annotations, threshold_qualifier, ) from ria_toolkit_oss.datatypes import Annotation from ria_toolkit_oss.datatypes.recording import Recording from ria_toolkit_oss.io import load_recording, to_blue, to_npy, to_sigmf, to_wav from ria_toolkit_oss_cli.ria_toolkit_oss.common import ( format_frequency, format_sample_count, ) def normalize_sigmf_path(filepath): """Normalize SigMF path to base name without extension.""" path = Path(filepath) # Handle .sigmf-data, .sigmf-meta, or .sigmf if ".sigmf" in path.suffix: # Remove the suffix to get base name return path.with_suffix("") else: return path def detect_input_format(filepath): """Detect file format from extension.""" path = Path(filepath) ext = path.suffix.lower() if ext in [".sigmf-data", ".sigmf-meta"]: return "sigmf" elif path.name.endswith(".sigmf"): return "sigmf" elif ext == ".npy": return "npy" elif ext == ".wav": return "wav" elif ext == ".blue": return "blue" else: raise click.ClickException(f"Unknown format for '{filepath}'. Supported: .sigmf, .npy, .wav, .blue") def determine_output_path(input_path, output_path, fmt, quiet, overwrite): input_path = Path(input_path) input_is_annotated = input_path.stem.endswith("_annotated") if output_path: target = Path(output_path) elif overwrite and input_is_annotated: # Write back in-place only when the input is already an _annotated file target = input_path else: target = input_path.with_name(f"{input_path.stem}_annotated{input_path.suffix}") if fmt == "sigmf": final_path = normalize_sigmf_path(target) if not quiet: click.echo(f"Saving SigMF metadata to: {final_path}") else: final_path = target if not quiet: click.echo(f"Saving to: {final_path}") # Always allow writing to _annotated files; guard against overwriting originals target_is_annotated = final_path.stem.endswith("_annotated") if final_path.exists() and not target_is_annotated and final_path != input_path: click.echo(f"Error: {final_path} is not an annotated file and cannot be overwritten.", err=True) return None return final_path def save_recording_auto(recording, output_path, input_path, quiet=False, overwrite=False): """Save recording, auto-detecting format from extension. For SigMF: Only overwrites metadata file, data file is unchanged For other formats: Creates _annotated copy by default, unless overwrite=True """ input_path = Path(input_path) fmt = detect_input_format(input_path) # Determine output path output_path = determine_output_path( input_path=input_path, output_path=output_path, fmt=fmt, quiet=quiet, overwrite=overwrite ) if fmt == "sigmf": # Normalize path for SigMF base_path = output_path stem = base_path.name parent = base_path.parent # For SigMF: only save metadata, copy data if needed meta_path = parent / f"{stem}.sigmf-meta" data_path = parent / f"{stem}.sigmf-data" # If output is different from input, copy data file input_base = normalize_sigmf_path(input_path) if input_base != base_path: import shutil # Construct input data path correctly # input_base is like /path/to/recording or /path/to/recording.sigmf # We need /path/to/recording.sigmf-data if str(input_base).endswith(".sigmf"): input_data = Path(str(input_base).replace(".sigmf", ".sigmf-data")) else: input_data = input_base.parent / f"{input_base.name}.sigmf-data" if not quiet: click.echo(f" Copying: {data_path}") shutil.copy2(input_data, data_path) # Always save metadata (this is the whole point) to_sigmf(recording, filename=stem, path=parent, overwrite=True) if not quiet: click.echo(f" Updated: {meta_path}") if input_base != base_path: click.echo(f" Created: {data_path}") elif fmt == "npy": to_npy(recording, filename=output_path.stem, path=output_path.parent, overwrite=True) if not quiet: click.echo(f" Created: {output_path}") elif fmt == "wav": to_wav(recording, filename=output_path.stem, path=output_path.parent, overwrite=True) if not quiet: click.echo(f" Created: {output_path}") elif fmt == "blue": to_blue(recording, filename=output_path.stem, path=output_path.parent, overwrite=True) if not quiet: click.echo(f" Created: {output_path}") def determine_frequency_bounds(recording: Recording, freq_lower, freq_upper): # Handle frequency bounds if (freq_lower is None) != (freq_upper is None): raise click.ClickException("Must specify both --freq-lower and --freq-upper, or neither") if freq_lower is None: # Default to full bandwidth sample_rate = recording.metadata.get("sample_rate", 1) center_freq = recording.metadata.get("center_frequency", 0) freq_lower = center_freq - (sample_rate / 2) freq_upper = center_freq + (sample_rate / 2) freq_default = True else: freq_default = False if freq_lower >= freq_upper: raise click.ClickException( f"Invalid frequency range: lower ({format_frequency(freq_lower)}) " f"must be < upper ({format_frequency(freq_upper)})" ) return freq_lower, freq_upper, freq_default def get_indices_list(indices, recording: Recording): if indices: try: indices_list = [int(idx.strip()) for idx in indices.split(",")] # Validate indices for idx in indices_list: if idx < 0 or idx >= len(recording.annotations): raise click.ClickException( f"Invalid index {idx}. Recording has {len(recording.annotations)} annotation(s)" ) except ValueError as e: raise click.ClickException(f"Invalid indices format. Expected comma-separated integers: {e}") return indices_list else: return None # ============================================================================ # Main command group # ============================================================================ @click.group() def annotate(): """Manage and auto-detect annotations on RF recordings. \b MANUAL MANAGEMENT: list - List all current annotations add - Manually add a specific annotation remove - Delete an annotation by its index clear - Remove all annotations from the recording \b DETECTION & SEPARATION: energy - Auto-detect using energy-based thresholding cusum - Auto-detect segments using signal state changes threshold - Auto-detect samples above magnitude percentage separate - Auto-detect parallel frequency-offset signals, split into sub-bands \b File Path Handling: - SigMF files: Pass .sigmf-data, .sigmf-meta, or base name - Other formats: .npy, .wav, .blue files \b Output Behavior: - SigMF: Updates .sigmf-meta only (data unchanged), in-place - Other: Creates _annotated copy unless --overwrite specified """ pass # ============================================================================ # List subcommand # ============================================================================ @annotate.command() @click.argument("input", type=click.Path(exists=True)) @click.option("--verbose", is_flag=True, help="Show detailed annotation info") def list(input, verbose): """List all annotations in a recording. \b Examples: ria annotate list recording.sigmf-data ria annotate list signal.npy --verbose """ try: recording = load_recording(input) except Exception as e: raise click.ClickException(f"Failed to load recording: {e}") if len(recording.annotations) == 0: click.echo(f"No annotations in {Path(input).name}") return click.echo(f"\nAnnotations in {Path(input).name}:") for i, ann in enumerate(recording.annotations): # Parse type from comment JSON try: comment_data = json.loads(ann.comment) ann_type = comment_data.get("type", "unknown") user_comment = comment_data.get("user_comment", "") except (json.JSONDecodeError, TypeError): ann_type = "unknown" user_comment = ann.comment or "" # Basic info freq_range = f"{format_frequency(ann.freq_lower_edge)} - {format_frequency(ann.freq_upper_edge)}" click.echo( f" [{i}] Samples {format_sample_count(ann.sample_start)}-" f"{format_sample_count(ann.sample_start + ann.sample_count)}: {ann.label}" ) click.echo(f" Type: {ann_type}") if verbose: if user_comment: click.echo(f" Comment: {user_comment}") click.echo(f" Frequency: {freq_range}") if ann.detail: click.echo(f" Detail: {ann.detail}") click.echo(f"\nTotal: {len(recording.annotations)} annotation(s)") # ============================================================================ # Add subcommand # ============================================================================ @annotate.command(context_settings={"max_content_width": 200}) @click.argument("input", type=click.Path(exists=True)) @click.option("--start", type=int, required=True, help="Start sample index") @click.option("--count", type=int, required=True, help="Sample count") @click.option("--label", type=str, required=True, help="Annotation label") @click.option("--freq-lower", type=float, help="Lower frequency edge (Hz)") @click.option("--freq-upper", type=float, help="Upper frequency edge (Hz)") @click.option("--comment", type=str, help="Human-readable comment") @click.option( "--type", "annotation_type", type=click.Choice(["standalone", "parallel", "intersection"]), default="standalone", help="Annotation type", ) @click.option("--output", "-o", type=click.Path(), help="Output file path") @click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") @click.option("--quiet", is_flag=True, help="Quiet mode") def add(input, start, count, label, freq_lower, freq_upper, comment, annotation_type, output, overwrite, quiet): """Add a manual annotation. \b Examples: ria annotate add file.npy --start 1000 --count 500 --label wifi ria annotate add signal.sigmf-data --start 0 --count 1000 --label burst --comment "Strong signal" """ try: recording = load_recording(input) if not quiet: click.echo(f"Loaded: {input}") except Exception as e: raise click.ClickException(f"Failed to load recording: {e}") # Validate sample range n_samples = len(recording.data[0]) if start < 0: raise click.ClickException(f"--start must be >= 0, got {start}") if count <= 0: raise click.ClickException(f"--count must be > 0, got {count}") if start + count > n_samples: raise click.ClickException( f"Invalid annotation range:\n" f" Start: {start:,}\n" f" Count: {count:,}\n" f" End: {start + count:,}\n" f"Recording only has {n_samples:,} samples" ) # Handle frequency bounds freq_lower, freq_upper, freq_default = determine_frequency_bounds( recording=recording, freq_lower=freq_lower, freq_upper=freq_upper ) # Build comment JSON comment_data = {"type": annotation_type} if comment: comment_data["user_comment"] = comment # Create annotation ann = Annotation( sample_start=start, sample_count=count, freq_lower_edge=freq_lower, freq_upper_edge=freq_upper, label=label, comment=json.dumps(comment_data), detail={}, ) recording._annotations.append(ann) if not quiet: click.echo("\nAdding annotation:") click.echo(f" Start: {format_sample_count(start)}") click.echo(f" Count: {format_sample_count(count)} samples") freq_str = ( "full bandwidth" if freq_default else f"{format_frequency(freq_lower)} - {format_frequency(freq_upper)}" ) click.echo(f" Frequency: {freq_str}") click.echo(f" Label: {label}") click.echo(f" Type: {annotation_type}") if comment: click.echo(f" Comment: {comment}") try: save_recording_auto(recording, output, input, quiet, overwrite) if not quiet: click.echo(" ✓ Saved") except Exception as e: raise click.ClickException(f"Failed to save: {e}") # ============================================================================ # Remove subcommand # ============================================================================ @annotate.command(context_settings={"max_content_width": 200}) @click.argument("input", type=click.Path(exists=True)) @click.argument("index", type=int) @click.option("--output", "-o", type=click.Path(), help="Output file path") @click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") @click.option("--quiet", is_flag=True, help="Quiet mode") def remove(input, index, output, overwrite, quiet): """Remove annotation by index. Use 'ria annotate list' to see annotation indices. \b Examples: ria annotate remove signal.sigmf-data 2 ria annotate remove file.npy 0 """ try: recording = load_recording(input) if not quiet: click.echo(f"Loaded: {input}") except Exception as e: raise click.ClickException(f"Failed to load recording: {e}") if index < 0 or index >= len(recording.annotations): raise click.ClickException( f"Cannot remove annotation at index {index}\n" f"Recording has {len(recording.annotations)} annotation(s) (indices 0-{len(recording.annotations)-1})" ) removed_ann = recording.annotations[index] recording._annotations.pop(index) if not quiet: click.echo(f"\nRemoving annotation [{index}]:") click.echo( f" Removed: samples {format_sample_count(removed_ann.sample_start)}-" f"{format_sample_count(removed_ann.sample_start + removed_ann.sample_count)} ({removed_ann.label})" ) try: save_recording_auto(recording, output_path=input, input_path=input, quiet=quiet, overwrite=True) if not quiet: click.echo(" ✓ Saved") except Exception as e: raise click.ClickException(f"Failed to save: {e}") # ============================================================================ # Clear subcommand # ============================================================================ @annotate.command(context_settings={"max_content_width": 175}) @click.argument("input", type=click.Path(exists=True)) @click.option("--output", "-o", type=click.Path(), help="Output file path") @click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") @click.option("--force", is_flag=True, help="Skip confirmation") @click.option("--quiet", is_flag=True, help="Quiet mode") def clear(input, output, overwrite, force, quiet): """Clear all annotations. \b Examples: ria annotate clear signal.sigmf-data ria annotate clear file.npy --force """ try: recording = load_recording(input) if not quiet: click.echo(f"Loaded: {input}") except Exception as e: raise click.ClickException(f"Failed to load recording: {e}") count_before = len(recording.annotations) if count_before == 0: if not quiet: click.echo("No annotations to clear") return # Confirm unless --force if not force and not quiet: click.echo(f"\nWarning: This will remove all {count_before} annotation(s)") click.confirm("Continue?", abort=True) recording._annotations = [] if not quiet: click.echo(f"\nCleared {count_before} annotation(s)") recording._annotations = [] try: save_recording_auto(recording, output_path=input, input_path=input, quiet=quiet, overwrite=True) if not quiet: click.echo(" ✓ Saved") except Exception as e: raise click.ClickException(f"Failed to save: {e}") # ============================================================================ # Energy detection subcommand # ============================================================================ @annotate.command(context_settings={"max_content_width": 200}) @click.argument("input", type=click.Path(exists=True)) @click.option("--label", type=str, default="signal", help="Annotation label") @click.option("--threshold", type=float, default=1.2, help="Threshold multiplier above noise floor") @click.option("--segments", type=int, default=10, help="Number of segments for noise estimation") @click.option("--window-size", type=int, default=200, help="Smoothing window size") @click.option("--min-distance", type=int, default=5000, help="Min distance between detections") @click.option( "--freq-method", type=click.Choice(["nbw", "obw", "full-detected", "full-bandwidth"]), default="nbw", help="Frequency bounding method", ) @click.option("--nfft", type=int, default=None, help="FFT size for frequency calculation") @click.option("--obw-power", type=float, default=0.99, help="Power percentage for OBW/NBW (0.98-0.9999)") @click.option( "--type", "annotation_type", type=click.Choice(["standalone", "parallel", "intersection"]), default="standalone", help="Annotation type", ) @click.option("--output", "-o", type=click.Path(), help="Output file path") @click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") @click.option("--quiet", is_flag=True, help="Quiet mode") def energy( input, label, threshold, segments, window_size, min_distance, freq_method, nfft, obw_power, annotation_type, output, overwrite, quiet, ): """Auto-detect signals using energy-based method. Detects bursts based on energy above noise floor. Best for bursty signals and intermittent transmissions. \b Frequency Bounding Methods: nbw - Nominal bandwidth (default, best for real signals) obw - Occupied bandwidth (more conservative, includes sidelobes) full-detected - Lowest to highest spectral component full-bandwidth - Entire Nyquist span \b Examples: ria annotate energy capture.sigmf-data --label burst ria annotate energy signal.npy --threshold 1.5 --min-distance 10000 ria annotate energy signal.sigmf-data --freq-method obw ria annotate energy signal.sigmf-data --freq-method full-detected """ try: recording = load_recording(input) if not quiet: click.echo(f"Loaded: {input}") except Exception as e: raise click.ClickException(f"Failed to load recording: {e}") if not quiet: click.echo("\nDetecting signals using energy-based method...") click.echo(" Time detection:") click.echo(f" Segments: {segments}") click.echo(f" Threshold: {threshold}x noise floor") click.echo(f" Window size: {window_size} samples") click.echo(f" Min distance: {min_distance} samples") click.echo(f" Frequency bounds: {freq_method}") try: initial_count = len(recording.annotations) recording = detect_signals_energy( recording, k=segments, threshold_factor=threshold, window_size=window_size, min_distance=min_distance, label=label, annotation_type=annotation_type, freq_method=freq_method, nfft=nfft, obw_power=obw_power, ) added = len(recording.annotations) - initial_count if not quiet: click.echo(f" ✓ Added {added} annotation(s)") save_recording_auto(recording, output, input, quiet, overwrite) if not quiet: click.echo(" ✓ Saved") except Exception as e: raise click.ClickException(f"Energy detection failed: {e}") # ============================================================================ # CUSUM detection subcommand # ============================================================================ @annotate.command() @click.argument("input", type=click.Path(exists=True)) @click.option("--label", type=str, default="segment", help="Annotation label") @click.option("--min-duration", type=float, default=5.0, help="Min duration in ms (prevents over-segmentation)") @click.option("--window-size", type=int, default=1, help="Smoothing window size") @click.option("--tolerance", type=int, default=-1, help="Sample tolerance for merging") @click.option( "--type", "annotation_type", type=click.Choice(["standalone", "parallel", "intersection"]), default="standalone", help="Annotation type", ) @click.option("--output", "-o", type=click.Path(), help="Output file path") @click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") @click.option("--quiet", is_flag=True, help="Quiet mode") def cusum(input, label, min_duration, window_size, tolerance, annotation_type, output, overwrite, quiet): """Auto-detect segments using CUSUM method. Detects signal state changes (on/off, amplitude transitions). Best for segmenting continuous signals. IMPORTANT: Always specify --min-duration to prevent excessive segmentation. \b Examples: ria annotate cusum signal.sigmf-data --min-duration 5.0 ria annotate cusum data.npy --min-duration 10.0 --label state """ try: recording = load_recording(input) if not quiet: click.echo(f"Loaded: {input}") except Exception as e: raise click.ClickException(f"Failed to load recording: {e}") if not quiet: click.echo("\nDetecting segments using CUSUM...") click.echo(f" Min duration: {min_duration} ms") if window_size != 1: click.echo(f" Window size: {window_size} samples") try: initial_count = len(recording.annotations) recording = annotate_with_cusum( recording, label=label, window_size=window_size, min_duration=min_duration, tolerance=tolerance, annotation_type=annotation_type, ) added = len(recording.annotations) - initial_count if not quiet: click.echo(f" ✓ Added {added} annotation(s)") save_recording_auto(recording, output, input, quiet, overwrite) if not quiet: click.echo(" ✓ Saved") except Exception as e: raise click.ClickException(f"CUSUM detection failed: {e}") # ============================================================================ # Threshold detection subcommand # ============================================================================ @annotate.command() @click.argument("input", type=click.Path(exists=True)) @click.option("--threshold", type=float, required=True, help="Threshold (0.0-1.0, fraction of max magnitude)") @click.option("--label", type=str, default=None, help="Annotation label") @click.option( "--window-size", type=int, default=None, help="Smoothing window size in samples (default: 1ms at recording sample rate)", ) @click.option( "--type", "annotation_type", type=click.Choice(["standalone", "parallel", "intersection"]), default="standalone", help="Annotation type", ) @click.option("--channel", type=int, default=0, help="Channel index to annotate (default: 0)") @click.option("--output", "-o", type=click.Path(), help="Output file path") @click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") @click.option("--quiet", is_flag=True, help="Quiet mode") def threshold(input, threshold, label, window_size, annotation_type, channel, output, overwrite, quiet): """Auto-detect signals using threshold method. Detects samples above a percentage of maximum magnitude. Best for simple power-based detection. \b Examples: ria annotate threshold signal.sigmf-data --threshold 0.7 --label wifi ria annotate threshold data.npy --threshold 0.5 --window-size 2048 """ if not (0.0 <= threshold <= 1.0): raise click.ClickException(f"--threshold must be between 0.0 and 1.0, got {threshold}") try: recording = load_recording(input) if not quiet: click.echo(f"Loaded: {input}") except Exception as e: raise click.ClickException(f"Failed to load recording: {e}") if not quiet: click.echo("\nDetecting signals using threshold qualifier...") click.echo(f" Threshold: {threshold * 100:.1f}% of max magnitude") click.echo(f" Window size: {'auto (1ms)' if window_size is None else f'{window_size} samples'}") click.echo(f" Channel: {channel}") try: initial_count = len(recording.annotations) recording = threshold_qualifier( recording, threshold=threshold, window_size=window_size, label=label, annotation_type=annotation_type, channel=channel, ) added = len(recording.annotations) - initial_count if not quiet: click.echo(f" ✓ Added {added} annotation(s)") save_recording_auto(recording, output, input, quiet, overwrite) if not quiet: click.echo(" ✓ Saved") except Exception as e: raise click.ClickException(f"Threshold detection failed: {e}") # ============================================================================ # Separate subcommand (Phase 2: Parallel signal separation) # ============================================================================ @annotate.command() @click.argument("input", type=click.Path(exists=True)) @click.option("--indices", type=str, help="Comma-separated annotation indices to split (default: all)") @click.option("--nfft", type=int, default=65536, help="FFT size for spectral analysis") @click.option("--noise-threshold-db", type=float, help="Noise floor threshold in dB (auto-estimated if not specified)") @click.option("--min-component-bw", type=float, default=50e3, help="Min component bandwidth in Hz") @click.option("--output", "-o", type=click.Path(), help="Output file path") @click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") @click.option("--quiet", is_flag=True, help="Quiet mode") @click.option("--verbose", is_flag=True, help="Verbose output (show detected components)") def separate(input, indices, nfft, noise_threshold_db, min_component_bw, output, overwrite, quiet, verbose): """ Auto-detect parallel frequency-offset signals and split into sub-bands. Provides methods to detect and separate overlapping frequency-domain signals that occupy the same time window but different frequency bands. Detects multiple frequency components within single annotations and splits them into separate annotations. Uses spectral peak detection with dual bandwidth estimation. \b Key Features: - Spectral peak detection for frequency components - Auto noise floor estimation (or user-specified) - Dual bandwidth estimation: -3dB primary, cumulative power fallback - Handles narrowband and wide signals (OFDM) \b Examples: ria annotate separate capture.sigmf-data ria annotate separate signal.npy --indices 0,1,2 ria annotate separate data.sigmf-data --noise-threshold-db -70 ria annotate separate signal.npy --min-component-bw 100000 """ try: recording = load_recording(input) if not quiet: click.echo(f"Loaded: {input}") except Exception as e: raise click.ClickException(f"Failed to load recording: {e}") # Parse indices if specified indices_list = get_indices_list(indices=indices, recording=recording) if len(recording.annotations) == 0: if not quiet: click.echo("No annotations to split") return if not quiet: click.echo("\nSplitting annotations by frequency components...") click.echo(f" Input annotations: {len(recording.annotations)}") if indices_list: click.echo(f" Splitting indices: {indices_list}") click.echo(f" FFT size: {nfft}") if noise_threshold_db is not None: click.echo(f" Noise threshold: {noise_threshold_db} dB") else: click.echo(" Noise threshold: auto-estimated") click.echo(f" Min component BW: {format_frequency(min_component_bw)}") try: initial_count = len(recording.annotations) recording = split_recording_annotations( recording, indices=indices_list, nfft=nfft, noise_threshold_db=noise_threshold_db, min_component_bw=min_component_bw, ) final_count = len(recording.annotations) added = final_count - initial_count if not quiet: click.echo(f" ✓ Output annotations: {final_count} ({'+' if added >= 0 else ''}{added} change)") if verbose and added > 0: click.echo("\n Details:") for i in range(initial_count, final_count): ann = recording.annotations[i] freq_range = f"{format_frequency(ann.freq_lower_edge)} - {format_frequency(ann.freq_upper_edge)}" click.echo( f" [{i}] samples {format_sample_count(ann.sample_start)}-" f"{format_sample_count(ann.sample_start + ann.sample_count)}: {freq_range}" ) save_recording_auto(recording, output, input, quiet, overwrite) if not quiet: click.echo(" ✓ Saved") except Exception as e: raise click.ClickException(f"Spectral separation failed: {e}")