"""Generate command - Generate synthetic signals.""" from pathlib import Path from typing import Optional import click import numpy as np import yaml import utils.signal.basic_signal_generator as basic_gen from utils.data import Recording from utils.signal.block_gen.continuous_modulation.fsk_modulator import FSKModulator from utils.signal.block_generator.basic import FrequencyShift from utils.signal.block_generator.data_types import DataType from utils.signal.block_generator.mapping.apsk_mapper import _APSKMapper from utils.signal.block_generator.mapping.cross_qam_mapper import _CrossQAMMapper from utils.signal.block_generator.mapping.mapper import Mapper from utils.signal.block_generator.modulation import ( GMSKModulator, OOKModulator, OQPSKModulator, ) from utils.signal.block_generator.pulse_shaping import ( RaisedCosineFilter, RootRaisedCosineFilter, Upsampling, ) from utils.signal.block_generator.source import ( LFMJammingSource, RandomBinarySource, RecordingSource, SawtoothSource, SquareSource, ) # Block Generator Imports from utils.signal.block_generator.source_block import SourceBlock # Transforms for impairments from utils.transforms.iq_channel_models import ( complex_multipath_rayleigh_channel, rician_fading_channel, ) from utils.transforms.iq_impairments import ( add_compression, add_doppler, add_gain_fluctuation, add_phase_noise, iq_imbalance, ) # NR 5G Import try: from utils.signal.block_gen.nr_5g.nr_5g_generator import NR5GGenerator HAS_NR5G = True except ImportError: HAS_NR5G = False from utils_cli.utils.common import ( echo_progress, echo_verbose, format_frequency, format_sample_rate, parse_metadata_args, save_recording, ) from utils_cli.utils.config import load_user_config # Extend Mapper to support new types def _create_extended_mapper(self): if self.constellation_type.upper() == "APSK": return _APSKMapper(self.num_bits_per_symbol, self.normalize, self.use_gray_code) elif self.constellation_type.upper() == "CROSS_QAM": return _CrossQAMMapper(self.num_bits_per_symbol, self.normalize, self.use_gray_code) else: # Original factory return self._original_create_constellation_mapper() # Monkey patch Mapper to support new types without modifying original file Mapper._original_create_constellation_mapper = Mapper._create_constellation_mapper Mapper._create_constellation_mapper = _create_extended_mapper def load_config_options(ctx, param, value): """Callback to load options from YAML config file.""" if not value: return None try: with open(value, "r") as f: config = yaml.safe_load(f) # Store config in context for other commands to access ctx.default_map = config return value except Exception as e: raise click.BadParameter(f"Error loading config file: {e}") def apply_user_config_metadata(metadata_tuple): """Apply user config metadata and merge with CLI metadata. Args: metadata_tuple: Tuple of metadata KEY=VALUE strings from CLI Returns: dict: Merged metadata dictionary """ # Load user config user_config = load_user_config() metadata_dict = {} # Apply user config metadata (if user config exists) if user_config: # Add standard metadata fields from config for key in ["author", "organization", "project", "location", "testbed"]: if key in user_config: metadata_dict[key] = user_config[key] # Add SigMF fields from config if "sigmf" in user_config: sigmf = user_config["sigmf"] for key in ["license", "hw", "dataset"]: if key in sigmf: metadata_dict[key] = sigmf[key] # CLI metadata overrides everything if metadata_tuple: metadata_dict.update(parse_metadata_args(metadata_tuple)) return metadata_dict def get_output_format(output: Optional[str], format_opt: Optional[str]) -> str: """Determine output format from filename or option.""" if format_opt: return format_opt if not output: return "sigmf" # Default to sigmf for better metadata support ext = Path(output).suffix.lower() if ext in [".sigmf", ".sigmf-data", ".sigmf-meta"]: return "sigmf" elif ext == ".npy": return "npy" elif ext == ".wav": return "wav" elif ext == ".blue": return "blue" else: return "sigmf" class FileSourceBlock(SourceBlock): """Generates bits from a file or bytes.""" def __init__(self, data: bytes, repeat: bool = True): self.data = data self.repeat = repeat # Convert to bits bits = np.unpackbits(np.frombuffer(data, dtype=np.uint8)) self.bits = bits.astype(np.float32) # SourceBlock expects float32 bits (0.0, 1.0) self.idx = 0 @property def output_type(self) -> DataType: return DataType.BITS def __call__(self, num_samples: int) -> np.ndarray: out = np.zeros(num_samples, dtype=np.float32) filled = 0 while filled < num_samples: remaining = num_samples - filled available = len(self.bits) - self.idx take = min(remaining, available) out[filled : filled + take] = self.bits[self.idx : self.idx + take] self.idx += take filled += take if self.idx >= len(self.bits): if self.repeat: self.idx = 0 else: # Pad with zeros if not repeating break return out def apply_post_processing( recording: Recording, frequency_shift: float, channel_type: str, channel_params: dict, verbose: bool ) -> Recording: """Apply frequency shift and channel models to a recording.""" # 1. Frequency Shift (Pre-channel) if frequency_shift != 0: echo_verbose(f"Applying frequency shift: {format_frequency(frequency_shift)}", verbose) # Use simple phase shift if only 1 block? No, basic gen FrequencyShift # We can use RecordingSource + FrequencyShift + record() source = RecordingSource(recording) fs_block = FrequencyShift(shift_frequency=frequency_shift, sampling_rate=recording.sample_rate) fs_block.input = [source] num = len(recording.data[0]) if recording.n_chan > 0 else len(recording.data) # get_samples processed = fs_block.get_samples(num) recording = Recording(data=processed, metadata=recording.metadata) # 2. Dynamic Impairments (Transforms) # Rician / Rayleigh if channel_type == "rayleigh": # Use improved complex multipath if available echo_verbose("Applying Multipath Rayleigh Channel", verbose) recording = complex_multipath_rayleigh_channel( recording, num_paths=channel_params.get("multipath_paths") or 3, max_delay=channel_params.get("multipath_max_delay") or 2.6e-6, sample_rate=recording.sample_rate, snr_db=None, # We handle noise separately ) elif channel_type == "rician": echo_verbose(f"Applying Rician Channel (K={channel_params.get('rician_k', 2.0)})", verbose) recording = rician_fading_channel( recording, k_factor=channel_params.get("rician_k", 2.0), num_paths=channel_params.get("multipath_paths") or 3, max_delay=channel_params.get("multipath_max_delay") or 1.2e-6, sample_rate=recording.sample_rate, snr_db=None, ) # Doppler doppler_freq = channel_params.get("doppler_freq") if doppler_freq: echo_verbose(f"Applying Doppler (Shift={doppler_freq} Hz)", verbose) # add_doppler expects velocity. Convert freq to velocity assuming 1GHz carrier or pass freq directly? # dynamic_channel wrapper handles this conversion. # Or use add_doppler directly if we have velocity. # User supplied doppler_freq. # Let's use a simple transform or dynamic_channel # We need to reuse dynamic_channel logic for freq->velocity conversion or assume carrier. # Or create add_doppler_freq(signal, freq_shift) # add_doppler takes satellite_velocity etc. # dynamic_channel takes doppler_hz. # We use dynamic_channel logic here but just for Doppler part c_light = 299792458 f_carrier = 1e9 # Assumption for conversion velocity = doppler_freq * c_light / f_carrier recording = add_doppler( recording, satellite_velocity=velocity, satellite_initial_distance=1000, frequency=f_carrier, sample_rate=recording.sample_rate, ) # IQ Imbalance amp = channel_params.get("iq_amp_imbalance") phase = channel_params.get("iq_phase_imbalance") dc = channel_params.get("iq_dc_offset") if amp or phase or dc: echo_verbose(f"Applying IQ Imbalance (Amp={amp}dB, Phase={phase}rad, DC={dc})", verbose) recording = iq_imbalance( recording, amplitude_imbalance=( amp if amp is not None else 0 ), # iq_imbalance defaults to 1.5? We want 0 if not set but one of others is set. phase_imbalance=phase if phase is not None else 0, dc_offset=dc if dc is not None else 0, ) # Phase Noise pn = channel_params.get("phase_noise") if pn: echo_verbose(f"Applying Phase Noise (Var={pn})", verbose) recording = add_phase_noise(recording, phase_variance=pn) # Gain Fluctuation gf = channel_params.get("gain_fluctuation") if gf: echo_verbose(f"Applying Gain Fluctuation (Var={gf})", verbose) recording = add_gain_fluctuation(recording, amplitude_variance=gf) # Compression comp = channel_params.get("compression") if comp: echo_verbose(f"Applying Compression (Gain={comp})", verbose) recording = add_compression(recording, compression_gain=comp) # 3. AWGN (Final stage usually) if channel_type == "awgn" or channel_params.get("noise_power"): # If 'awgn' selected OR noise_power explicitly set (default is 0.1, so always set?) # If channel_type is NOT awgn/rayleigh/rician, and noise_power is default 0.1? # If user didn't specify noise_power, but did specify channel_type=none, do we add noise? # Default noise_power is 0.1. # If channel_type == 'none', we probably shouldn't add noise unless user asked for it. # But noise_power has default. # Let's check if channel_type is 'awgn'. # Or if user provided --noise-power? # (We can't distinguish default vs user provided easily with click unless we use ctx) # For now: only add noise if channel_type is set to something, or if noise_power > 0 and user intended it. # Simpler: If channel_type == 'awgn', definitely add. # If rayleigh/rician, they might want noise too. # If 'none', skip noise? should_add_noise = False if channel_type in ["awgn", "rayleigh", "rician"]: should_add_noise = True if should_add_noise: npow = channel_params.get("noise_power", 0.1) echo_verbose(f"Applying AWGN (Power={npow})", verbose) # Convert Power (variance) to SNR? # add_awgn_to_signal takes SNR. # AWGNChannel block takes Variance. # Use AWGNChannel block logic (additive noise with variance) # or utils.transforms.iq_channel_models.awgn_channel which takes SNR. # The user CLI says --noise-power (variance). # We should use a simple additive noise function with variance. # transforms.iq_augmentations.generate_awgn uses SNR. # Let's implement simple additive noise here or use AWGNChannel block. # Use AWGNChannel block logic directly noise_std = np.sqrt(npow / 2) noise = noise_std * (np.random.randn(*recording.data.shape) + 1j * np.random.randn(*recording.data.shape)) recording = Recording(data=recording.data + noise, metadata=recording.metadata) return recording @click.group() def generate(): """Generate synthetic signals. \b Examples: utils synth chirp -b 1e6 -p 0.01 -s 10e6 -o chirp_basic.sigmf utils synth fsk -M 2 -r 100e3 -s 2e6 -o fsk2_basic.sigmf """ pass def common_options(f): """Decorator for common options.""" f = click.option("--sample-rate", "-s", type=float, required=True, help="Sample rate in Hz")(f) f = click.option("--num-samples", "-n", type=int, help="Number of samples")(f) f = click.option("--duration", "-t", type=float, help="Duration in seconds (alternative to --num-samples)")(f) f = click.option("--frequency-shift", type=float, default=0.0, help="Digital frequency shift from baseband (Hz)")( f ) f = click.option("--center-frequency", "-fc", type=float, help="Metadata center frequency (Hz)")(f) f = click.option( "--channel-type", type=click.Choice(["none", "awgn", "rayleigh"]), default="none", help="Channel model" )(f) f = click.option("--noise-power", type=float, default=0.1, help="Noise power (variance) for AWGN")(f) f = click.option("--path-gain", type=float, default=0.0, help="Path gain (dB) for Rayleigh")(f) f = click.option("--output", "-o", required=True, help="Output filename")(f) f = click.option("--format", "-F", type=click.Choice(["npy", "sigmf", "wav", "blue"]), help="Output format")(f) # Impairment options f = click.option("--rician-k", type=float, help="Rician K-factor")(f) f = click.option("--multipath-paths", type=int, help="Multipath: Number of paths")(f) f = click.option("--multipath-max-delay", type=float, help="Multipath: Max delay (s)")(f) f = click.option("--doppler-freq", type=float, help="Doppler: Frequency shift (Hz)")(f) f = click.option("--iq-amp-imbalance", type=float, help="IQ Imbalance: Amplitude (dB)")(f) f = click.option("--iq-phase-imbalance", type=float, help="IQ Imbalance: Phase (rad)")(f) f = click.option("--iq-dc-offset", type=float, help="IQ Imbalance: DC Offset")(f) f = click.option("--phase-noise", type=float, help="Phase Noise: Variance")(f) f = click.option("--gain-fluctuation", type=float, help="Gain Fluctuation: Variance")(f) f = click.option("--compression", type=float, help="Compression: Gain")(f) f = click.option( "--config", "-c", callback=load_config_options, is_eager=True, expose_value=False, type=click.Path(exists=True), help="Load parameters from YAML", )(f) f = click.option("--overwrite", "-w", is_flag=True, help="Overwrite existing file")(f) f = click.option("--metadata", "-m", multiple=True, help="Add metadata KEY=VALUE")(f) f = click.option("--verbose", "-v", is_flag=True, help="Verbose output")(f) f = click.option("--quiet", "-q", is_flag=True, help="Suppress output")(f) return f def resolve_length(sample_rate, num_samples, duration, symbols=None, sps=None): """Resolve generation length.""" if symbols is not None and sps is not None: # Modulation specific if num_samples: # If both provided, check consistency or prefer num_samples? # We'll treat symbols as the driver if provided. pass return int(symbols * sps) if num_samples: return int(num_samples) if duration: return int(duration * sample_rate) # Default return 10000 @generate.command() @click.option("--frequency", "-f", type=float, default=1000.0, help="Tone frequency relative to carrier (Hz)") @click.option("--amplitude", "-a", type=float, default=1.0, help="Amplitude (0.0-1.0)") @click.option("--phase", "-p", type=float, default=0.0, help="Initial phase in radians") @common_options def tone( sample_rate, num_samples, duration, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, frequency, amplitude, phase, **kwargs, ): """Generate a complex tone.""" ns = resolve_length(sample_rate, num_samples, duration) echo_progress(f"Generating tone: {format_frequency(frequency)} at {format_sample_rate(sample_rate)}", quiet) # Use basic_gen for core tone recording = basic_gen.sine( sample_rate=int(sample_rate), length=ns, frequency=frequency, amplitude=amplitude, baseband_phase=phase ) if center_frequency: recording._metadata["center_frequency"] = center_frequency echo_verbose(f"Center Frequency: {format_frequency(center_frequency)}", verbose) # Post processing chan_params = {"noise_power": noise_power, "path_gain": path_gain} recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) # User metadata metadata = apply_user_config_metadata(metadata) metadata["signal_type"] = "tone" for key, value in metadata.items(): recording.update_metadata(key, value) fmt = get_output_format(output, format) save_recording(recording, output, fmt, overwrite, verbose) @generate.command() @click.option("--noise-type", "-T", type=click.Choice(["gaussian", "uniform"]), default="gaussian", help="Noise type") @click.option("--power", "-p", type=float, default=1.0, help="Signal power/variance") @common_options def noise( sample_rate, num_samples, duration, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, noise_type, power, **kwargs, ): """Generate random noise.""" ns = resolve_length(sample_rate, num_samples, duration) echo_progress(f"Generating {noise_type} noise...", quiet) if noise_type == "gaussian": # AWGN rms = np.sqrt(power) recording = basic_gen.noise(sample_rate=int(sample_rate), length=ns, rms_power=rms) else: # Uniform real = np.random.uniform(-1, 1, ns) imag = np.random.uniform(-1, 1, ns) a = np.sqrt(3 * power / 2) data = a * (real + 1j * imag) recording = Recording(data=data, metadata={"sample_rate": sample_rate}) recording._metadata["signal_type"] = "noise" recording._metadata["noise_type"] = noise_type if center_frequency: recording._metadata["center_frequency"] = center_frequency # Post processing chan_params = {"noise_power": noise_power, "path_gain": path_gain} recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) for key, value in apply_user_config_metadata(metadata).items(): recording.update_metadata(key, value) fmt = get_output_format(output, format) save_recording(recording, output, fmt, overwrite, verbose) @generate.command() @click.option("--bandwidth", "-b", type=float, required=True, help="Chirp bandwidth (Hz)") @click.option("--period", "-p", type=float, required=True, help="Chirp period (seconds)") @click.option("--type", "chirp_type", type=click.Choice(["up", "down", "up_down"]), default="up", help="Chirp type") @common_options def chirp( sample_rate, num_samples, duration, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, bandwidth, period, chirp_type, **kwargs, ): """Generate LFM Chirp signal.""" ns = resolve_length(sample_rate, num_samples, duration) echo_progress(f"Generating {chirp_type} chirp ({format_frequency(bandwidth)}, {period}s)...", quiet) source = LFMJammingSource(sample_rate=sample_rate, bandwidth=bandwidth, chirp_period=period, chirp_type=chirp_type) recording = source.record(ns) recording._metadata["signal_type"] = "chirp" recording._metadata["chirp_type"] = chirp_type recording._metadata["bandwidth"] = bandwidth recording._metadata["period"] = period if center_frequency: recording._metadata["center_frequency"] = center_frequency # Post processing chan_params = {"noise_power": noise_power, "path_gain": path_gain} recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) for key, value in apply_user_config_metadata(metadata).items(): recording.update_metadata(key, value) fmt = get_output_format(output, format) save_recording(recording, output, fmt, overwrite, verbose) @generate.command() @click.option("--frequency", "-f", type=float, default=1000.0, help="Frequency (Hz)") @click.option("--amplitude", "-a", type=float, default=1.0, help="Amplitude") @click.option("--duty-cycle", "-d", type=float, default=0.5, help="Duty cycle (0.0-1.0)") @click.option("--phase", "-p", type=float, default=0.0, help="Phase shift (radians)") @common_options def square( sample_rate, num_samples, duration, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, frequency, amplitude, duty_cycle, phase, **kwargs, ): """Generate Square wave.""" ns = resolve_length(sample_rate, num_samples, duration) echo_progress(f"Generating square wave: {format_frequency(frequency)}...", quiet) source = SquareSource( frequency=frequency, sample_rate=sample_rate, amplitude=amplitude, duty_cycle=duty_cycle, phase_shift=phase ) recording = source.record(ns) recording._metadata["signal_type"] = "square" if center_frequency: recording._metadata["center_frequency"] = center_frequency chan_params = {"noise_power": noise_power, "path_gain": path_gain} recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) for key, value in apply_user_config_metadata(metadata).items(): recording.update_metadata(key, value) fmt = get_output_format(output, format) save_recording(recording, output, fmt, overwrite, verbose) @generate.command() @click.option("--frequency", "-f", type=float, default=1000.0, help="Frequency (Hz)") @click.option("--amplitude", "-a", type=float, default=1.0, help="Amplitude") @click.option("--phase", "-p", type=float, default=0.0, help="Phase shift (radians)") @common_options def sawtooth( sample_rate, num_samples, duration, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, frequency, amplitude, phase, **kwargs, ): """Generate Sawtooth wave.""" ns = resolve_length(sample_rate, num_samples, duration) echo_progress(f"Generating sawtooth wave: {format_frequency(frequency)}...", quiet) source = SawtoothSource(frequency=frequency, sample_rate=sample_rate, amplitude=amplitude, phase_shift=phase) recording = source.record(ns) recording._metadata["signal_type"] = "sawtooth" if center_frequency: recording._metadata["center_frequency"] = center_frequency chan_params = {"noise_power": noise_power, "path_gain": path_gain} recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) for key, value in apply_user_config_metadata(metadata).items(): recording.update_metadata(key, value) fmt = get_output_format(output, format) save_recording(recording, output, fmt, overwrite, verbose) def load_source(message_source, message_content, num_bits=None): if num_bits is not None: if message_source == "random": return RandomBinarySource()((1, num_bits)) elif message_source == "string": if not message_content: raise click.BadParameter("Message content required for string source") return FileSourceBlock(message_content.encode("utf-8"), repeat=True)(num_bits).reshape(1, -1) elif message_source == "file": if not message_content: raise click.BadParameter("File path required for file source") p = Path(message_content) if not p.exists(): raise click.BadParameter(f"File not found: {p}") return FileSourceBlock(p.read_bytes(), repeat=True)(num_bits).reshape(1, -1) else: if message_source == "random": return RandomBinarySource() # Infinite source elif message_source == "string": if not message_content: raise click.BadParameter("Message content required for string source") return FileSourceBlock(message_content.encode("utf-8"), repeat=True) elif message_source == "file": if not message_content: raise click.BadParameter("File path required for file source") p = Path(message_content) if not p.exists(): raise click.BadParameter(f"File not found: {p}") return FileSourceBlock(p.read_bytes(), repeat=True) def _run_mod_gen( mod_type, sample_rate, symbols, num_samples, duration, order, symbol_rate, filter_type, filter_span, filter_beta, message_source, message_content, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, ): # Resolve length # If symbols provided, it drives. # If not, use num_samples/duration to calculate symbols if symbol_rate is None: # Try to infer? No, required. raise click.BadParameter("Symbol rate required") sps = sample_rate / symbol_rate if not sps.is_integer(): sps_int = int(round(sps)) if sps_int < 1: sps_int = 1 actual_sr = sps_int * symbol_rate echo_progress(f"Warning: Non-integer samples per symbol ({sps:.4f}). Rounding to {sps_int}.", quiet) echo_progress(f"Actual sample rate will be {format_sample_rate(actual_sr)}", quiet) sps = int(sps_int) sample_rate = actual_sr else: sps = int(sps) if symbols is None: # Calc from duration/samples ns = resolve_length(sample_rate, num_samples, duration) symbols = int(np.ceil(ns / sps)) echo_progress(f"Generating {mod_type}-{order} ({symbols} symbols)...", quiet) echo_verbose(f" Sample Rate: {format_sample_rate(sample_rate)} (SPS={sps})", verbose) bps = int(np.log2(order)) total_samples = symbols * sps # Source source = load_source(message_source, message_content, None) # Mapper and Pulse Shaping mapper = Mapper(constellation_type=mod_type, num_bits_per_symbol=bps) upsampler = Upsampling(factor=sps) # Filter if filter_type == "rrc": filter_block = RootRaisedCosineFilter(span_in_symbols=filter_span, upsampling_factor=sps, beta=filter_beta) elif filter_type == "rc": filter_block = RaisedCosineFilter(span_in_symbols=filter_span, upsampling_factor=sps, beta=filter_beta) elif filter_type == "gaussian": raise click.ClickException("Gaussian filter not supported yet") else: filter_block = None # Generate base signal mapper.connect_input([source]) upsampler.connect_input([mapper]) if filter_block: filter_block.connect_input([upsampler]) base_recording = filter_block.record(total_samples) else: base_recording = upsampler.record(total_samples) # Update metadata for key, value in { "modulation": mod_type, "order": order, "symbol_rate": symbol_rate, "symbols": symbols, "filter": filter_type, }.items(): base_recording.update_metadata(key, value) if center_frequency: base_recording.update_metadata("center_frequency", center_frequency) # Post Processing chan_params = {"noise_power": noise_power, "path_gain": path_gain} final_recording = apply_post_processing(base_recording, frequency_shift, channel_type, chan_params, verbose) # Trim if explicit num_samples was requested and we generated more (due to symbol alignment) target_ns = resolve_length(sample_rate, num_samples, duration) if target_ns and len(final_recording.data[0]) > target_ns: # Only trim if difference is significant? # User usually wants exact length if specified. if num_samples or duration: # If explicitly asked for length final_recording = final_recording.trim(target_ns) for key, value in apply_user_config_metadata(metadata).items(): final_recording.update_metadata(key, value) fmt = get_output_format(output, format) save_recording(final_recording, output, fmt, overwrite, verbose) @generate.command() @click.option("--symbols", "-N", type=int, help="Number of symbols") @click.option("--order", "-M", type=int, required=True, help="QAM Order (4, 16, 32, 64, 128, 256, 1024)") @click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") @click.option( "--filter", "filter_type", type=click.Choice(["rrc", "rc", "gaussian", "none"]), default="rrc", help="Pulse shaping filter", ) @click.option("--filter-span", type=int, default=6, help="Filter span in symbols") @click.option("--filter-beta", type=float, default=0.35, help="Filter roll-off factor") @click.option( "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" ) @click.option("--message-content", help="File path or string content") @common_options def qam( sample_rate, num_samples, duration, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, rician_k, multipath_paths, multipath_max_delay, doppler_freq, iq_amp_imbalance, iq_phase_imbalance, iq_dc_offset, phase_noise, gain_fluctuation, compression, symbols, order, symbol_rate, filter_type, filter_span, filter_beta, message_source, message_content, **kwargs, ): """Generate QAM modulated signal.""" # Determine modulation type (Normal QAM vs Cross QAM) if order in [32, 128]: mod_type = "CROSS_QAM" else: mod_type = "QAM" _run_mod_gen( mod_type, sample_rate, symbols, num_samples, duration, order, symbol_rate, filter_type, filter_span, filter_beta, message_source, message_content, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, ) @generate.command() @click.option("--symbols", "-N", type=int, help="Number of symbols") @click.option("--order", "-M", type=int, required=True, help="APSK Order (16, 32, 64, 128, 256)") @click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") @click.option( "--filter", "filter_type", type=click.Choice(["rrc", "rc", "gaussian", "none"]), default="rrc", help="Pulse shaping filter", ) @click.option("--filter-span", type=int, default=6, help="Filter span in symbols") @click.option("--filter-beta", type=float, default=0.35, help="Filter roll-off factor") @click.option( "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" ) @click.option("--message-content", help="File path or string content") @common_options def apsk( sample_rate, num_samples, duration, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, rician_k, multipath_paths, multipath_max_delay, doppler_freq, iq_amp_imbalance, iq_phase_imbalance, iq_dc_offset, phase_noise, gain_fluctuation, compression, symbols, order, symbol_rate, filter_type, filter_span, filter_beta, message_source, message_content, **kwargs, ): """Generate APSK modulated signal.""" _run_mod_gen( "APSK", sample_rate, symbols, num_samples, duration, order, symbol_rate, filter_type, filter_span, filter_beta, message_source, message_content, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, ) @generate.command() @click.option("--symbols", "-N", type=int, help="Number of symbols") @click.option("--order", "-M", type=int, required=True, help="PAM Order (4, 8, 16)") @click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") @click.option( "--filter", "filter_type", type=click.Choice(["rrc", "rc", "gaussian", "none"]), default="rrc", help="Pulse shaping filter", ) @click.option("--filter-span", type=int, default=6, help="Filter span in symbols") @click.option("--filter-beta", type=float, default=0.35, help="Filter roll-off factor") @click.option( "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" ) @click.option("--message-content", help="File path or string content") @common_options def pam( sample_rate, num_samples, duration, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, rician_k, multipath_paths, multipath_max_delay, doppler_freq, iq_amp_imbalance, iq_phase_imbalance, iq_dc_offset, phase_noise, gain_fluctuation, compression, symbols, order, symbol_rate, filter_type, filter_span, filter_beta, message_source, message_content, **kwargs, ): """Generate PAM modulated signal.""" _run_mod_gen( "PAM", sample_rate, symbols, num_samples, duration, order, symbol_rate, filter_type, filter_span, filter_beta, message_source, message_content, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, ) @generate.command() @click.option("--symbols", "-N", type=int, help="Number of symbols") @click.option("--order", "-M", type=int, default=2, help="FSK Order (2, 4, 8)") @click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") @click.option("--freq-spacing", type=float, help="Frequency spacing (Hz)") @click.option("--modulation-index", "-h", type=float, help="Modulation Index (alternative to spacing)") @click.option( "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" ) @click.option("--message-content", help="File path or string content") @common_options def fsk( sample_rate, num_samples, duration, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, rician_k, multipath_paths, multipath_max_delay, doppler_freq, iq_amp_imbalance, iq_phase_imbalance, iq_dc_offset, phase_noise, gain_fluctuation, compression, symbols, order, symbol_rate, freq_spacing, modulation_index, message_source, message_content, **kwargs, ): """Generate FSK modulated signal.""" # FSK uses FSKModulator which is a standalone Source/Modulator block? No, it's a Modulator. # Takes bits input. # Determine spacing if freq_spacing is None: if modulation_index is None: modulation_index = 1.0 # Default freq_spacing = modulation_index * symbol_rate # Samples per symbol sps = sample_rate / symbol_rate # FSKModulator takes sampling_freq and symbol_duration (1/rate) symbol_duration = 1.0 / symbol_rate # Resolve length ns = resolve_length(sample_rate, num_samples, duration, symbols, sps) if symbols is None: symbols = int(np.ceil(ns / sps)) echo_progress(f"Generating {order}-FSK (Spacing={format_frequency(freq_spacing)})...", quiet) # Bits bps = int(np.log2(order)) num_bits = symbols * bps # Source source_bits = load_source(message_source, message_content, num_bits) # Modulator mod = FSKModulator( num_bits_per_symbol=bps, frequency_spacing=freq_spacing, symbol_duration=symbol_duration, sampling_frequency=sample_rate, ) # Generate samples = mod(source_bits) # Flatten samples = samples.flatten()[:ns] recording = Recording(data=samples, metadata={"sample_rate": sample_rate}) recording._metadata.update( { "modulation": "FSK", "order": order, "symbol_rate": symbol_rate, "freq_spacing": freq_spacing, "mod_index": modulation_index if modulation_index else freq_spacing / symbol_rate, } ) if center_frequency: recording._metadata["center_frequency"] = center_frequency chan_params = { "noise_power": noise_power, "path_gain": path_gain, "rician_k": rician_k, "multipath_paths": multipath_paths, "multipath_max_delay": multipath_max_delay, "doppler_freq": doppler_freq, "iq_amp_imbalance": iq_amp_imbalance, "iq_phase_imbalance": iq_phase_imbalance, "iq_dc_offset": iq_dc_offset, "phase_noise": phase_noise, "gain_fluctuation": gain_fluctuation, "compression": compression, } recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) for key, value in apply_user_config_metadata(metadata).items(): recording.update_metadata(key, value) fmt = get_output_format(output, format) save_recording(recording, output, fmt, overwrite, verbose) @generate.command() @click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") @click.option( "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" ) @click.option("--message-content", help="File path or string content") @common_options def ook( sample_rate, num_samples, duration, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, rician_k, multipath_paths, multipath_max_delay, doppler_freq, iq_amp_imbalance, iq_phase_imbalance, iq_dc_offset, phase_noise, gain_fluctuation, compression, symbol_rate, message_source, message_content, **kwargs, ): """Generate On-Off Keying (OOK) signal.""" sps = int(sample_rate / symbol_rate) ns = resolve_length(sample_rate, num_samples, duration) echo_progress("Generating OOK...", quiet) # Source Block source = load_source(message_source, message_content, None) # OOK Modulator mod = OOKModulator(source, samples_per_symbol=sps) recording = mod.record(ns) recording._metadata["sample_rate"] = sample_rate recording._metadata["modulation"] = "OOK" if center_frequency: recording._metadata["center_frequency"] = center_frequency chan_params = { "noise_power": noise_power, "path_gain": path_gain, "rician_k": rician_k, "multipath_paths": multipath_paths, "multipath_max_delay": multipath_max_delay, "doppler_freq": doppler_freq, "iq_amp_imbalance": iq_amp_imbalance, "iq_phase_imbalance": iq_phase_imbalance, "iq_dc_offset": iq_dc_offset, "phase_noise": phase_noise, "gain_fluctuation": gain_fluctuation, "compression": compression, } recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) for key, value in apply_user_config_metadata(metadata).items(): recording.update_metadata(key, value) fmt = get_output_format(output, format) save_recording(recording, output, fmt, overwrite, verbose) @generate.command() @click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") @click.option( "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" ) @click.option("--message-content", help="File path or string content") @common_options def oqpsk( sample_rate, num_samples, duration, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, rician_k, multipath_paths, multipath_max_delay, doppler_freq, iq_amp_imbalance, iq_phase_imbalance, iq_dc_offset, phase_noise, gain_fluctuation, compression, symbol_rate, message_source, message_content, **kwargs, ): """Generate Offset QPSK (OQPSK) signal.""" sps = int(sample_rate / symbol_rate) ns = resolve_length(sample_rate, num_samples, duration) echo_progress("Generating OQPSK...", quiet) # Source Block source = load_source(message_source, message_content, None) # OQPSK Modulator mod = OQPSKModulator(source, samples_per_symbol=sps) recording = mod.record(ns) recording._metadata["sample_rate"] = sample_rate recording._metadata["modulation"] = "OQPSK" if center_frequency: recording._metadata["center_frequency"] = center_frequency chan_params = { "noise_power": noise_power, "path_gain": path_gain, "rician_k": rician_k, "multipath_paths": multipath_paths, "multipath_max_delay": multipath_max_delay, "doppler_freq": doppler_freq, "iq_amp_imbalance": iq_amp_imbalance, "iq_phase_imbalance": iq_phase_imbalance, "iq_dc_offset": iq_dc_offset, "phase_noise": phase_noise, "gain_fluctuation": gain_fluctuation, "compression": compression, } recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) for key, value in apply_user_config_metadata(metadata).items(): recording.update_metadata(key, value) fmt = get_output_format(output, format) save_recording(recording, output, fmt, overwrite, verbose) @generate.command() @click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") @click.option("--bt", type=float, default=0.3, help="Bandwidth-Time product (e.g., 0.3, 0.5)") @click.option( "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" ) @click.option("--message-content", help="File path or string content") @common_options def gmsk( sample_rate, num_samples, duration, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, rician_k, multipath_paths, multipath_max_delay, doppler_freq, iq_amp_imbalance, iq_phase_imbalance, iq_dc_offset, phase_noise, gain_fluctuation, compression, symbol_rate, bt, message_source, message_content, **kwargs, ): """Generate GMSK modulated signal.""" sps = int(sample_rate / symbol_rate) ns = resolve_length(sample_rate, num_samples, duration) echo_progress(f"Generating GMSK (BT={bt})...", quiet) # Source Block source = load_source(message_source, message_content, None) # GMSK Modulator mod = GMSKModulator(source, samples_per_symbol=sps, bt=bt) recording = mod.record(ns) recording._metadata["sample_rate"] = sample_rate recording._metadata["modulation"] = "GMSK" recording._metadata["bt_product"] = bt if center_frequency: recording._metadata["center_frequency"] = center_frequency chan_params = { "noise_power": noise_power, "path_gain": path_gain, "rician_k": rician_k, "multipath_paths": multipath_paths, "multipath_max_delay": multipath_max_delay, "doppler_freq": doppler_freq, "iq_amp_imbalance": iq_amp_imbalance, "iq_phase_imbalance": iq_phase_imbalance, "iq_dc_offset": iq_dc_offset, "phase_noise": phase_noise, "gain_fluctuation": gain_fluctuation, "compression": compression, } recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) for key, value in apply_user_config_metadata(metadata).items(): recording.update_metadata(key, value) fmt = get_output_format(output, format) save_recording(recording, output, fmt, overwrite, verbose) @generate.command() @click.option("--symbols", "-N", type=int, help="Number of symbols") @click.option("--order", "-M", type=int, required=True, help="PSK Order (2, 4, 8)") @click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") @click.option( "--filter", "filter_type", type=click.Choice(["rrc", "rc", "gaussian", "none"]), default="rrc", help="Pulse shaping filter", ) @click.option("--filter-span", type=int, default=6, help="Filter span in symbols") @click.option("--filter-beta", type=float, default=0.35, help="Filter roll-off factor") @click.option( "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" ) @click.option("--message-content", help="File path or string content") @common_options def psk( sample_rate, num_samples, duration, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, symbols, order, symbol_rate, filter_type, filter_span, filter_beta, message_source, message_content, **kwargs, ): """Generate PSK modulated signal.""" _run_mod_gen( "PSK", sample_rate, symbols, num_samples, duration, order, symbol_rate, filter_type, filter_span, filter_beta, message_source, message_content, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, ) @generate.command() @click.option("--bandwidth", "-b", type=int, required=True, help="Bandwidth in MHz (e.g. 10, 20)") @click.option("--mu", "-u", type=int, default=1, help="Numerology (0-3)") @click.option("--frames", type=int, default=1, help="Number of 10ms frames") @click.option("--ssb/--no-ssb", default=True, help="Enable SSB") @common_options def nr5g( sample_rate, frequency_shift, center_frequency, channel_type, noise_power, path_gain, output, format, overwrite, metadata, verbose, quiet, bandwidth, mu, frames, ssb, **kwargs, ): """Generate 5G NR frame.""" if not HAS_NR5G: raise click.ClickException("5G NR Generator not available (missing dependencies or module)") echo_progress(f"Generating 5G NR ({bandwidth} MHz, mu={mu}, {frames} frames)...", quiet) # NR5GGenerator parameters # It determines sample rate based on bandwidth/mu/fr? # nr_ofdm_params(bandwidth_mhz, mu, fr) returns fs. # We should verify if user supplied sample_rate matches or we should ignore user sample_rate? # Or we resample? # The generator has fixed fs for a given BW/mu config usually. # Let's instantiate it and see its fs. gen = NR5GGenerator(bandwidth_mhz=bandwidth, mu=mu, frames_per_recording=frames, ssb=ssb) native_fs = gen.fs if sample_rate and abs(sample_rate - native_fs) > 1.0: echo_progress( message=( f"Warning: Requested sample rate {format_sample_rate(sample_rate)} " f"differs from native NR rate {format_sample_rate(native_fs)}." ), quiet=quiet, ) echo_progress("Output will be at native rate.", quiet) # If we really wanted to support arbitrary rate, we'd need resampling. # For now, just warn and use native. recording = gen.record(batch_size=1) recording._metadata["signal_type"] = "nr5g" if center_frequency: recording._metadata["center_frequency"] = center_frequency # Post processing chan_params = {"noise_power": noise_power, "path_gain": path_gain} recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) for key, value in apply_user_config_metadata(metadata).items(): recording.update_metadata(key, value) fmt = get_output_format(output, format) save_recording(recording, output, fmt, overwrite, verbose)