from dataclasses import dataclass from typing import Optional import numpy as np from utils.data.recording import Recording from utils.signal import block_generator from utils.signal.basic_signal_generator import complex_sine @dataclass(frozen=True) class ModulationInfo: bps: int constellation: str class ModulationRegistry: _registry = { "qam16": ModulationInfo(bps=4, constellation="qam"), "qam32": ModulationInfo(bps=5, constellation="qam"), "qam64": ModulationInfo(bps=6, constellation="qam"), "qam256": ModulationInfo(bps=8, constellation="qam"), "qam1024": ModulationInfo(bps=10, constellation="qam"), "pam4": ModulationInfo(bps=2, constellation="pam"), "pam8": ModulationInfo(bps=3, constellation="pam"), "bpsk": ModulationInfo(bps=1, constellation="psk"), "qpsk": ModulationInfo(bps=2, constellation="psk"), "psk8": ModulationInfo(bps=3, constellation="psk"), "psk16": ModulationInfo(bps=4, constellation="psk"), "psk32": ModulationInfo(bps=5, constellation="psk"), } @classmethod def get(cls, mod_type: str) -> ModulationInfo: return cls._registry[mod_type] def create_modulated_signal( modulation: str, sps: int, beta: float, length: int ) -> Recording: """Produces a modulated signal Recording.""" mod_info = ModulationRegistry.get(modulation) if mod_info is None: raise ValueError(f"Modulation {modulation} not in registry.") while length % sps != 0 or length % mod_info.bps != 0: length = length + 1 # needs to be multiple of bits per symbol and sps num_bits = int(length * sps) mapper = block_generator.Mapper( constellation_type=mod_info.constellation, num_bits_per_symbol=mod_info.bps, ) upsampler = block_generator.Upsampling(factor=sps) filter = block_generator.RootRaisedCosineFilter( span_in_symbols=10, upsampling_factor=sps, beta=beta ) bits = [(np.random.rand(num_bits) > 0.5).astype(np.float32)] long_bits = np.concatenate([bits, bits, bits], axis=1) symbols = mapper(long_bits) upsampled_symbols = upsampler([symbols]) filtered_samples = filter([upsampled_symbols]) start = (len(filtered_samples) - length) // 2 end = start + length output_recording = filtered_samples[start:end] metadata = { "modulation": modulation, "bits_per_symbol": mod_info.bps, "constellation": mod_info.constellation, "sps": sps, "beta": beta, "source": "signal.block_generator", } return Recording(data=output_recording, metadata=metadata) def create_lfm_recording( sample_rate: int, width: Optional[int], chirp_period: Optional[float], chirp_type: str, length: int, ) -> Recording: """Produces a Recording of Linear Frequency Modulation (LFM) Jamming.""" lfm_jamming_source = block_generator.LFMJammingSource( sample_rate=sample_rate, bandwidth=width, chirp_period=chirp_period, chirp_type=chirp_type, ) return lfm_jamming_source.record(num_samples=length) def create_noise_recording( rms_power: float, length: int, counter: int, ) -> Recording: """Generate a Recording of Additive White Gaussian Noise (AWGN).""" # 1. Create a repeating pseudo-random envelope np.random.seed(256 + counter) chunk = np.random.rand(length // 4) tiled = np.tile(chunk, 4) amplitude_envelope = np.sqrt(tiled) # 2. Generate complex Gaussian noise with unit power real = np.random.normal(0, 1, length) imag = np.random.normal(0, 1, length) complex_noise = real + 1j * imag # 3. Scale noise by desired power and envelope scaled_noise = complex_noise * amplitude_envelope * np.sqrt(rms_power) metadata = {"interference": "wb", "signal_type": "noise"} return Recording(data=scaled_noise, metadata=metadata) def create_ctnb_recording(length: int) -> Recording: ones_source = block_generator.ConstantSource() return ones_source.record(num_samples=length) def create_birdie_recording( sample_rate: int, length: int, wave_number: int, sps: int = 1 ) -> Recording: recording_data = np.zeros(int(length)) for _ in range(wave_number): frequency = np.random.choice( np.arange(-sample_rate / (2 * sps), sample_rate / (2 * sps)) ) recording = complex_sine( sample_rate=int(sample_rate), length=int(length), frequency=int(frequency) ) recording_data = recording_data + recording.data return Recording(data=recording_data, metadata=recording.metadata) def frequency_shift( recording: Recording, freq_shift: float, sample_rate: int ) -> Recording: """Applies a frequency shift the input recording.""" source = block_generator.RecordingSource(recording=recording) frequency_shift_block = block_generator.FrequencyShift( shift_frequency=freq_shift, sampling_rate=sample_rate ) frequency_shift_block.connect_input([source]) return frequency_shift_block.record(num_samples=len(recording))