Compare commits
2 Commits
f6173b4037
...
c4c7a7176a
| Author | SHA1 | Date | |
|---|---|---|---|
| c4c7a7176a | |||
| dcfad2e9e3 |
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -19,6 +19,7 @@ venv.bak/
|
||||||
.vscode/
|
.vscode/
|
||||||
|
|
||||||
# Generated files
|
# Generated files
|
||||||
|
*.dat
|
||||||
*.dot
|
*.dot
|
||||||
*.hdf5
|
*.hdf5
|
||||||
*.npy
|
*.npy
|
||||||
|
|
|
||||||
|
|
@ -2,11 +2,16 @@ import os
|
||||||
import random
|
import random
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
from utils.data.recording import Recording
|
||||||
from utils.io.recording import from_npy
|
from utils.io.recording import from_npy
|
||||||
|
|
||||||
from signal_generation import (create_birdie_recording, create_ctnb_recording,
|
from signal_generation import (
|
||||||
create_lfm_recording, create_modulated_signal,
|
create_birdie_recording,
|
||||||
create_noise_recording)
|
create_ctnb_recording,
|
||||||
|
create_lfm_recording,
|
||||||
|
create_modulated_signal,
|
||||||
|
create_noise_recording,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class RecordingGenerator:
|
class RecordingGenerator:
|
||||||
|
|
@ -28,7 +33,9 @@ class RecordingGenerator:
|
||||||
recording = create_modulated_signal(
|
recording = create_modulated_signal(
|
||||||
modulation=modulation, sps=sps, beta=roll_off, length=length
|
modulation=modulation, sps=sps, beta=roll_off, length=length
|
||||||
)
|
)
|
||||||
recording.to_npy(filename=f"{modulation}_{roll_off_str}")
|
recording.to_npy(
|
||||||
|
filename=f"{modulation}_{roll_off_str}", overwrite=True
|
||||||
|
)
|
||||||
print(f"{modulation}_{roll_off_str} file saved.")
|
print(f"{modulation}_{roll_off_str} file saved.")
|
||||||
|
|
||||||
def generate_lfm(
|
def generate_lfm(
|
||||||
|
|
@ -48,22 +55,21 @@ class RecordingGenerator:
|
||||||
)
|
)
|
||||||
|
|
||||||
print(f"LFM chirp length: {int(self.sample_rate * chirp_period)}")
|
print(f"LFM chirp length: {int(self.sample_rate * chirp_period)}")
|
||||||
recording.to_npy(filename=f"{chirp_type}_chirp")
|
recording.to_npy(filename=f"{chirp_type}_chirp", overwrite=True)
|
||||||
print(f"{chirp_type}_chirp file saved.")
|
print(f"{chirp_type}_chirp file saved.")
|
||||||
|
|
||||||
def generate_wb(self, num: int = 2, length: int = 8192):
|
def generate_wb(self, num: int = 2, length: int = 8192):
|
||||||
for i in range(num):
|
for i in range(num):
|
||||||
recording = create_noise_recording(
|
recording = create_noise_recording(
|
||||||
length=length,
|
length=length, rms_power=0.2, counter=random.choice([2, 4, 8, 16, 32])
|
||||||
rms_power=0.2,
|
|
||||||
)
|
)
|
||||||
recording.to_npy(filename=f"wb{i + 1}")
|
recording.to_npy(filename=f"wb{i + 1}", overwrite=True)
|
||||||
print(f"wb{i + 1} file saved.")
|
print(f"wb{i + 1} file saved.")
|
||||||
|
|
||||||
def generate_ctnb(self, num: int = 2, length: int = 8192):
|
def generate_ctnb(self, num: int = 2, length: int = 8192):
|
||||||
for i in range(num):
|
for i in range(num):
|
||||||
recording = create_ctnb_recording(length=length)
|
recording = create_ctnb_recording(length=length)
|
||||||
recording.to_npy(filename=f"ctnb{i + 1}")
|
recording.to_npy(filename=f"ctnb{i + 1}", overwrite=True)
|
||||||
print(f"ctnb{i + 1} file saved.")
|
print(f"ctnb{i + 1} file saved.")
|
||||||
|
|
||||||
def generate_birdie(self, num: int = 2, length: int = 8192, wave_num: int = 5):
|
def generate_birdie(self, num: int = 2, length: int = 8192, wave_num: int = 5):
|
||||||
|
|
@ -73,9 +79,14 @@ class RecordingGenerator:
|
||||||
length=length,
|
length=length,
|
||||||
wave_number=int(wave_num + i),
|
wave_number=int(wave_num + i),
|
||||||
)
|
)
|
||||||
recording.to_npy(filename=f"birdie{i + 1}")
|
recording.to_npy(filename=f"birdie{i + 1}", overwrite=True)
|
||||||
print(f"birdie{i + 1} file saved.")
|
print(f"birdie{i + 1} file saved.")
|
||||||
|
|
||||||
|
def generate_zeros(self, length: int = 8192):
|
||||||
|
data = np.zeros(shape=length, dtype=np.complex64)
|
||||||
|
recording = Recording(data=data)
|
||||||
|
recording.to_npy(filename="zero", overwrite=True)
|
||||||
|
|
||||||
def convert_to_dat(
|
def convert_to_dat(
|
||||||
self,
|
self,
|
||||||
source_directory: str = "recordings",
|
source_directory: str = "recordings",
|
||||||
|
|
|
||||||
|
|
@ -34,13 +34,9 @@ class ModulationRegistry:
|
||||||
return cls._registry[mod_type]
|
return cls._registry[mod_type]
|
||||||
|
|
||||||
|
|
||||||
def periodic_random(length, divisor=4, seed=256):
|
def create_modulated_signal(
|
||||||
np.random.seed(seed)
|
modulation: str, sps: int, beta: float, length: int
|
||||||
chunk = np.random.rand(int(length / divisor))
|
) -> Recording:
|
||||||
return np.tile(chunk, divisor)
|
|
||||||
|
|
||||||
|
|
||||||
def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Recording:
|
|
||||||
"""Produces a modulated signal Recording."""
|
"""Produces a modulated signal Recording."""
|
||||||
mod_info = ModulationRegistry.get(modulation)
|
mod_info = ModulationRegistry.get(modulation)
|
||||||
if mod_info is None:
|
if mod_info is None:
|
||||||
|
|
@ -56,7 +52,7 @@ def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Rec
|
||||||
num_bits_per_symbol=mod_info.bps,
|
num_bits_per_symbol=mod_info.bps,
|
||||||
)
|
)
|
||||||
upsampler = block_generator.Upsampling(factor=sps)
|
upsampler = block_generator.Upsampling(factor=sps)
|
||||||
filter = block_generator.RaisedCosineFilter(
|
filter = block_generator.RootRaisedCosineFilter(
|
||||||
span_in_symbols=10, upsampling_factor=sps, beta=beta
|
span_in_symbols=10, upsampling_factor=sps, beta=beta
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -66,37 +62,20 @@ def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Rec
|
||||||
|
|
||||||
upsampled_symbols = upsampler([symbols])
|
upsampled_symbols = upsampler([symbols])
|
||||||
filtered_samples = filter([upsampled_symbols])
|
filtered_samples = filter([upsampled_symbols])
|
||||||
output_recording = filtered_samples[length : length * 2]
|
start = (len(filtered_samples) - length) // 2
|
||||||
|
end = start + length
|
||||||
|
output_recording = filtered_samples[start:end]
|
||||||
|
|
||||||
return Recording(data=output_recording)
|
metadata = {
|
||||||
|
"modulation": modulation,
|
||||||
|
"bits_per_symbol": mod_info.bps,
|
||||||
|
"constellation": mod_info.constellation,
|
||||||
|
"sps": sps,
|
||||||
|
"beta": beta,
|
||||||
|
"source": "signal.block_generator",
|
||||||
|
}
|
||||||
|
|
||||||
|
return Recording(data=output_recording, metadata=metadata)
|
||||||
# Old create_modulated_signal code
|
|
||||||
# def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Recording:
|
|
||||||
# """Produces a modulated signal Recording."""
|
|
||||||
# mod_info = ModulationRegistry.get(modulation)
|
|
||||||
# if mod_info is None:
|
|
||||||
# raise ValueError(f"Modulation {modulation} not in registry.")
|
|
||||||
|
|
||||||
# source_block = block_generator.RandomBinarySource()
|
|
||||||
# mapper_block = block_generator.Mapper(
|
|
||||||
# constellation_type=mod_info.constellation,
|
|
||||||
# num_bits_per_symbol=mod_info.bps,
|
|
||||||
# )
|
|
||||||
# upsampler_block = block_generator.Upsampling(factor=sps)
|
|
||||||
# filter_block = block_generator.RaisedCosineFilter(upsampling_factor=sps, beta=beta)
|
|
||||||
|
|
||||||
# mapper_block.connect_input([source_block])
|
|
||||||
# upsampler_block.connect_input([mapper_block])
|
|
||||||
# filter_block.connect_input([upsampler_block])
|
|
||||||
|
|
||||||
# dividing_factor = sps * mod_info.bps
|
|
||||||
# while length % dividing_factor != 0:
|
|
||||||
# length = length + 1
|
|
||||||
# double_length = length * 2
|
|
||||||
|
|
||||||
# recording = filter_block.record(num_samples=double_length)
|
|
||||||
# return Recording(data=recording.data[:, :length], metadata=recording.metadata)
|
|
||||||
|
|
||||||
|
|
||||||
def create_lfm_recording(
|
def create_lfm_recording(
|
||||||
|
|
@ -120,10 +99,11 @@ def create_lfm_recording(
|
||||||
def create_noise_recording(
|
def create_noise_recording(
|
||||||
rms_power: float,
|
rms_power: float,
|
||||||
length: int,
|
length: int,
|
||||||
|
counter: int,
|
||||||
) -> Recording:
|
) -> Recording:
|
||||||
"""Generate a Recording of Additive White Gaussian Noise (AWGN)."""
|
"""Generate a Recording of Additive White Gaussian Noise (AWGN)."""
|
||||||
# 1. Create a repeating pseudo-random envelope
|
# 1. Create a repeating pseudo-random envelope
|
||||||
np.random.seed(256)
|
np.random.seed(256 + counter)
|
||||||
chunk = np.random.rand(length // 4)
|
chunk = np.random.rand(length // 4)
|
||||||
tiled = np.tile(chunk, 4)
|
tiled = np.tile(chunk, 4)
|
||||||
amplitude_envelope = np.sqrt(tiled)
|
amplitude_envelope = np.sqrt(tiled)
|
||||||
|
|
@ -145,11 +125,13 @@ def create_ctnb_recording(length: int) -> Recording:
|
||||||
|
|
||||||
|
|
||||||
def create_birdie_recording(
|
def create_birdie_recording(
|
||||||
sample_rate: int, length: int, wave_number: int
|
sample_rate: int, length: int, wave_number: int, sps: int = 1
|
||||||
) -> Recording:
|
) -> Recording:
|
||||||
recording_data = np.zeros(int(length))
|
recording_data = np.zeros(int(length))
|
||||||
for _ in range(wave_number):
|
for _ in range(wave_number):
|
||||||
frequency = np.random.choice(np.arange(-sample_rate / 2, sample_rate / 2))
|
frequency = np.random.choice(
|
||||||
|
np.arange(-sample_rate / (2 * sps), sample_rate / (2 * sps))
|
||||||
|
)
|
||||||
recording = complex_sine(
|
recording = complex_sine(
|
||||||
sample_rate=int(sample_rate), length=int(length), frequency=int(frequency)
|
sample_rate=int(sample_rate), length=int(length), frequency=int(frequency)
|
||||||
)
|
)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user