"""Common utilities for CLI commands.""" import os from pathlib import Path from typing import Any, Dict, List, Optional import click import yaml from ria_toolkit_oss.datatypes.recording import Recording from ria_toolkit_oss.io.recording import to_blue, to_npy, to_sigmf, to_wav def load_yaml_config(config_file: str) -> Dict[str, Any]: """Load YAML configuration file. Args: config_file: Path to YAML file Returns: Dictionary of configuration parameters Raises: click.ClickException: If file cannot be loaded """ try: with open(config_file, "r") as f: config = yaml.safe_load(f) return config or {} except FileNotFoundError: raise click.ClickException(f"Config file not found: {config_file}") except yaml.YAMLError as e: raise click.ClickException(f"Error parsing YAML config: {e}") def detect_file_format(filepath): """Detect file format from extension. Args: filepath: Path to file Returns: str: Format name ('sigmf', 'npy', 'wav', 'blue') Raises: click.ClickException: If format cannot be determined """ filepath = Path(filepath) ext = filepath.suffix.lower() if ext in [".sigmf", ".sigmf-data", ".sigmf-meta"]: return "sigmf" elif ext == ".npy": return "npy" elif ext == ".wav": return "wav" elif ext == ".blue": return "blue" else: raise click.ClickException( f"Unknown format for '{filepath}'\n" f"Supported extensions: .sigmf, .npy, .wav, .blue" ) def parse_metadata_args(metadata_args: List[str]) -> Dict[str, Any]: """Parse metadata KEY=VALUE arguments. Args: metadata_args: List of "KEY=VALUE" strings Returns: Dictionary of parsed metadata Raises: click.ClickException: If metadata format is invalid """ metadata = {} for arg in metadata_args: if "=" not in arg: raise click.ClickException(f"Invalid metadata format: '{arg}'. Expected KEY=VALUE") key, value = arg.split("=", 1) if key in ["experiment", "campaign", "project"]: metadata[key] = value else: # Try to parse numeric values try: # Try float first (handles both int and float) if "." in value or "e" in value.lower(): metadata[key] = float(value) else: metadata[key] = int(value) except ValueError: # Keep as string metadata[key] = value return metadata def parse_frequency(freq_str: str) -> float: """Parse frequency string with suffixes (k, M, G). Args: freq_str: Frequency string (e.g., "915e6", "2.4G", "433M") Returns: Frequency in Hz Raises: click.ClickException: If frequency format is invalid """ try: # Handle scientific notation and plain numbers if "e" in freq_str.lower() or freq_str.replace(".", "").replace("-", "").isdigit(): return float(freq_str) # Handle suffix notation (k, M, G) multipliers = {"k": 1e3, "K": 1e3, "M": 1e6, "G": 1e9} for suffix, mult in multipliers.items(): if freq_str.endswith(suffix): return float(freq_str[:-1]) * mult # No suffix, try as plain number return float(freq_str) except ValueError: raise click.ClickException( f"Invalid frequency format: '{freq_str}'. " "Use formats like: 915e6, 2.4G, 433M, 100k" ) def format_frequency(freq_hz: float) -> str: """Format frequency in human-readable form. Args: freq_hz: Frequency in Hz Returns: Formatted string (e.g., "915.0 MHz") """ if freq_hz >= 1e9: return f"{freq_hz/1e9:.2f} GHz" elif freq_hz >= 1e6: return f"{freq_hz/1e6:.2f} MHz" elif freq_hz >= 1e3: return f"{freq_hz/1e3:.2f} kHz" else: return f"{freq_hz:.2f} Hz" def format_sample_rate(rate_hz: float) -> str: """Format sample rate in human-readable form. Args: rate_hz: Sample rate in Hz Returns: Formatted string (e.g., "2.0 MSPS") """ if rate_hz >= 1e6: return f"{rate_hz/1e6:.2f} MS/s" elif rate_hz >= 1e3: return f"{rate_hz/1e3:.2f} kS/s" else: return f"{rate_hz:.2f} S/s" def format_sample_count(count): """Format sample count with thousands separator.""" return f"{count:,}" def get_output_path(filename: Optional[str], path: Optional[str], default_dir: str = "recordings") -> str: """Generate full output path. Args: filename: Output filename (can be None for auto-generated) path: Output directory path default_dir: Default directory if path not specified Returns: Full path for output file """ if path is None: path = default_dir # Create directory if it doesn't exist if not os.path.exists(path): os.makedirs(path) if filename: return os.path.join(path, filename) else: return path def save_recording(recording: Recording, output_path=None, output_format=None, overwrite=False, verbose=False): """Save recording to file with format-specific handling. Args: recording: Recording object to save output_path: Output file path output_format: Optional format override overwrite: Whether to overwrite existing files verbose: Verbose output Raises: click.ClickException: If save fails """ if output_path is None: # Auto-generate filename timestamp = recording.timestamp rec_id = recording.rec_id[:8] signal_type = recording.metadata.get("signal_type", "signal") output_path = f"{signal_type}_{rec_id}_{int(timestamp)}" output_path = Path(output_path) # Detect format if not specified if output_format is None: output_format = detect_file_format(output_path) # For sigmf, strip extension to get base name if output_format == "sigmf" and output_path.suffix not in [".sigmf-data", ".sigmf-meta", ".sigmf"]: base_name = output_path.name else: base_name = output_path.stem output_dir = output_path.parent # Create output directory if needed if output_dir and not output_dir.exists(): output_dir.mkdir(parents=True, exist_ok=True) echo_verbose(f"Created directory: {output_dir}", verbose) # Check for overwriting check_for_overwriting(overwrite, output_format, output_path) # Save based on format try: if output_format == "sigmf": to_sigmf(recording, filename=base_name, path=str(output_dir), overwrite=overwrite) elif output_format == "npy": to_npy(recording, filename=str(output_path), overwrite=overwrite) elif output_format == "wav": to_wav(recording, filename=str(output_path), overwrite=overwrite) elif output_format == "blue": to_blue(recording, filename=str(output_path), overwrite=overwrite) else: raise click.ClickException(f"Unsupported output format: {output_format}") except Exception as e: raise click.ClickException(f"Failed to save output: {e}") def echo_verbose(message: str, verbose: bool): """Print message only in verbose mode. Args: message: Message to print verbose: Whether verbose mode is enabled """ if verbose: click.echo(message) def echo_progress(message: str, quiet: bool = False): """Print progress message unless in quiet mode. Args: message: Progress message quiet: Whether quiet mode is enabled """ if not quiet: click.echo(message, err=True) def confirm_dangerous_operation(message: str, skip_confirm: bool = False) -> bool: """Ask for confirmation of potentially dangerous operation. Args: message: Warning message skip_confirm: Skip confirmation (for automation) Returns: True if user confirmed, False otherwise """ if skip_confirm: return True click.echo(click.style("WARNING: ", fg="yellow", bold=True) + message, err=True) return click.confirm("Continue?", default=False) def check_for_overwriting(overwrite, output_format, output_path): # Check if output exists (unless overwriting) if not overwrite: output_path = Path(output_path) if output_format == "sigmf": data_file = output_path.with_suffix(".sigmf-data") meta_file = output_path.with_suffix(".sigmf-meta") if data_file.exists() or meta_file.exists(): raise click.ClickException( f"Output files exist: {data_file.name}, {meta_file.name}\n" f"Use --overwrite to replace" ) elif output_path.exists(): raise click.ClickException(f"Output file '{output_path}' already exists\n" f"Use --overwrite to replace") def parse_ident(ident: Optional[str]) -> tuple[Optional[str], Optional[str]]: """ Parse device identifier into IP address or name. Args: ident: Device identifier (IP address or name=value) Returns: Tuple of (ip_address, name) where one will be None """ if not ident: return None, None if "=" in ident: key, value = ident.split("=", 1) if key.lower() == "name": return None, value else: return ident, None else: return ident, None def get_sdr_device(device_type: str, ident: Optional[str] = None, tx=False): """ Get TX-capable SDR device instance. Args: device_type: Type of device (pluto, hackrf, bladerf, usrp) ident: Device identifier (IP address or name=value) Returns: SDR device instance Raises: click.ClickException: If device cannot be initialized or doesn't support TX """ TX_CAPABLE_DEVICES = ["pluto", "hackrf", "bladerf", "usrp"] if tx and device_type not in TX_CAPABLE_DEVICES: raise click.ClickException( f"Device '{device_type}' does not support transmission (RX only)\n" f"TX-capable devices: {', '.join(TX_CAPABLE_DEVICES)}" ) ip_addr, name = parse_ident(ident) try: if device_type == "pluto": from ria_toolkit_oss.sdr.pluto import Pluto if ip_addr: return Pluto(identifier=ip_addr) else: return Pluto() elif device_type == "hackrf": from ria_toolkit_oss.sdr.hackrf import HackRF return HackRF() elif device_type == "bladerf": from ria_toolkit_oss.sdr.blade import Blade return Blade() elif device_type == "usrp": from ria_toolkit_oss.sdr.usrp import USRP if ip_addr: return USRP(identifier=f"addr={ip_addr}") elif name: return USRP(identifier=f"name={name}") else: return USRP() elif device_type == "rtlsdr": from ria_toolkit_oss.sdr.rtlsdr import RTLSDR return RTLSDR() elif device_type == "thinkrf": from ria_toolkit_oss.sdr.thinkrf import ThinkRF if ip_addr: return ThinkRF(identifier=ip_addr) else: return ThinkRF() else: raise click.ClickException(f"Unknown device type: {device_type}") except ImportError as e: raise click.ClickException( f"Failed to import {device_type} driver: {e}\n" f"Ensure required dependencies are installed" ) except Exception as e: raise click.ClickException(f"Failed to initialize {device_type}: {e}")