Compare commits

..

2 Commits

3 changed files with 44 additions and 50 deletions

1
.gitignore vendored
View File

@ -19,6 +19,7 @@ venv.bak/
.vscode/ .vscode/
# Generated files # Generated files
*.dat
*.dot *.dot
*.hdf5 *.hdf5
*.npy *.npy

View File

@ -2,11 +2,16 @@ import os
import random import random
import numpy as np import numpy as np
from utils.data.recording import Recording
from utils.io.recording import from_npy from utils.io.recording import from_npy
from signal_generation import (create_birdie_recording, create_ctnb_recording, from signal_generation import (
create_lfm_recording, create_modulated_signal, create_birdie_recording,
create_noise_recording) create_ctnb_recording,
create_lfm_recording,
create_modulated_signal,
create_noise_recording,
)
class RecordingGenerator: class RecordingGenerator:
@ -28,7 +33,9 @@ class RecordingGenerator:
recording = create_modulated_signal( recording = create_modulated_signal(
modulation=modulation, sps=sps, beta=roll_off, length=length modulation=modulation, sps=sps, beta=roll_off, length=length
) )
recording.to_npy(filename=f"{modulation}_{roll_off_str}") recording.to_npy(
filename=f"{modulation}_{roll_off_str}", overwrite=True
)
print(f"{modulation}_{roll_off_str} file saved.") print(f"{modulation}_{roll_off_str} file saved.")
def generate_lfm( def generate_lfm(
@ -48,22 +55,21 @@ class RecordingGenerator:
) )
print(f"LFM chirp length: {int(self.sample_rate * chirp_period)}") print(f"LFM chirp length: {int(self.sample_rate * chirp_period)}")
recording.to_npy(filename=f"{chirp_type}_chirp") recording.to_npy(filename=f"{chirp_type}_chirp", overwrite=True)
print(f"{chirp_type}_chirp file saved.") print(f"{chirp_type}_chirp file saved.")
def generate_wb(self, num: int = 2, length: int = 8192): def generate_wb(self, num: int = 2, length: int = 8192):
for i in range(num): for i in range(num):
recording = create_noise_recording( recording = create_noise_recording(
length=length, length=length, rms_power=0.2, counter=random.choice([2, 4, 8, 16, 32])
rms_power=0.2,
) )
recording.to_npy(filename=f"wb{i + 1}") recording.to_npy(filename=f"wb{i + 1}", overwrite=True)
print(f"wb{i + 1} file saved.") print(f"wb{i + 1} file saved.")
def generate_ctnb(self, num: int = 2, length: int = 8192): def generate_ctnb(self, num: int = 2, length: int = 8192):
for i in range(num): for i in range(num):
recording = create_ctnb_recording(length=length) recording = create_ctnb_recording(length=length)
recording.to_npy(filename=f"ctnb{i + 1}") recording.to_npy(filename=f"ctnb{i + 1}", overwrite=True)
print(f"ctnb{i + 1} file saved.") print(f"ctnb{i + 1} file saved.")
def generate_birdie(self, num: int = 2, length: int = 8192, wave_num: int = 5): def generate_birdie(self, num: int = 2, length: int = 8192, wave_num: int = 5):
@ -73,9 +79,14 @@ class RecordingGenerator:
length=length, length=length,
wave_number=int(wave_num + i), wave_number=int(wave_num + i),
) )
recording.to_npy(filename=f"birdie{i + 1}") recording.to_npy(filename=f"birdie{i + 1}", overwrite=True)
print(f"birdie{i + 1} file saved.") print(f"birdie{i + 1} file saved.")
def generate_zeros(self, length: int = 8192):
data = np.zeros(shape=length, dtype=np.complex64)
recording = Recording(data=data)
recording.to_npy(filename="zero", overwrite=True)
def convert_to_dat( def convert_to_dat(
self, self,
source_directory: str = "recordings", source_directory: str = "recordings",

View File

@ -34,13 +34,9 @@ class ModulationRegistry:
return cls._registry[mod_type] return cls._registry[mod_type]
def periodic_random(length, divisor=4, seed=256): def create_modulated_signal(
np.random.seed(seed) modulation: str, sps: int, beta: float, length: int
chunk = np.random.rand(int(length / divisor)) ) -> Recording:
return np.tile(chunk, divisor)
def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Recording:
"""Produces a modulated signal Recording.""" """Produces a modulated signal Recording."""
mod_info = ModulationRegistry.get(modulation) mod_info = ModulationRegistry.get(modulation)
if mod_info is None: if mod_info is None:
@ -56,7 +52,7 @@ def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Rec
num_bits_per_symbol=mod_info.bps, num_bits_per_symbol=mod_info.bps,
) )
upsampler = block_generator.Upsampling(factor=sps) upsampler = block_generator.Upsampling(factor=sps)
filter = block_generator.RaisedCosineFilter( filter = block_generator.RootRaisedCosineFilter(
span_in_symbols=10, upsampling_factor=sps, beta=beta span_in_symbols=10, upsampling_factor=sps, beta=beta
) )
@ -66,37 +62,20 @@ def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Rec
upsampled_symbols = upsampler([symbols]) upsampled_symbols = upsampler([symbols])
filtered_samples = filter([upsampled_symbols]) filtered_samples = filter([upsampled_symbols])
output_recording = filtered_samples[length : length * 2] start = (len(filtered_samples) - length) // 2
end = start + length
output_recording = filtered_samples[start:end]
return Recording(data=output_recording) metadata = {
"modulation": modulation,
"bits_per_symbol": mod_info.bps,
"constellation": mod_info.constellation,
"sps": sps,
"beta": beta,
"source": "signal.block_generator",
}
return Recording(data=output_recording, metadata=metadata)
# Old create_modulated_signal code
# def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Recording:
# """Produces a modulated signal Recording."""
# mod_info = ModulationRegistry.get(modulation)
# if mod_info is None:
# raise ValueError(f"Modulation {modulation} not in registry.")
# source_block = block_generator.RandomBinarySource()
# mapper_block = block_generator.Mapper(
# constellation_type=mod_info.constellation,
# num_bits_per_symbol=mod_info.bps,
# )
# upsampler_block = block_generator.Upsampling(factor=sps)
# filter_block = block_generator.RaisedCosineFilter(upsampling_factor=sps, beta=beta)
# mapper_block.connect_input([source_block])
# upsampler_block.connect_input([mapper_block])
# filter_block.connect_input([upsampler_block])
# dividing_factor = sps * mod_info.bps
# while length % dividing_factor != 0:
# length = length + 1
# double_length = length * 2
# recording = filter_block.record(num_samples=double_length)
# return Recording(data=recording.data[:, :length], metadata=recording.metadata)
def create_lfm_recording( def create_lfm_recording(
@ -120,10 +99,11 @@ def create_lfm_recording(
def create_noise_recording( def create_noise_recording(
rms_power: float, rms_power: float,
length: int, length: int,
counter: int,
) -> Recording: ) -> Recording:
"""Generate a Recording of Additive White Gaussian Noise (AWGN).""" """Generate a Recording of Additive White Gaussian Noise (AWGN)."""
# 1. Create a repeating pseudo-random envelope # 1. Create a repeating pseudo-random envelope
np.random.seed(256) np.random.seed(256 + counter)
chunk = np.random.rand(length // 4) chunk = np.random.rand(length // 4)
tiled = np.tile(chunk, 4) tiled = np.tile(chunk, 4)
amplitude_envelope = np.sqrt(tiled) amplitude_envelope = np.sqrt(tiled)
@ -145,11 +125,13 @@ def create_ctnb_recording(length: int) -> Recording:
def create_birdie_recording( def create_birdie_recording(
sample_rate: int, length: int, wave_number: int sample_rate: int, length: int, wave_number: int, sps: int = 1
) -> Recording: ) -> Recording:
recording_data = np.zeros(int(length)) recording_data = np.zeros(int(length))
for _ in range(wave_number): for _ in range(wave_number):
frequency = np.random.choice(np.arange(-sample_rate / 2, sample_rate / 2)) frequency = np.random.choice(
np.arange(-sample_rate / (2 * sps), sample_rate / (2 * sps))
)
recording = complex_sine( recording = complex_sine(
sample_rate=int(sample_rate), length=int(length), frequency=int(frequency) sample_rate=int(sample_rate), length=int(length), frequency=int(frequency)
) )