reorganized file struture

This commit is contained in:
liyuxiao2 2025-05-21 15:46:28 -04:00
parent 3df27cf012
commit ba7d0d9f67
32 changed files with 0 additions and 355 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,69 +0,0 @@
from utils.data import Recording
import numpy as np
from utils.signal import block_generator
mods = {
"bpsk": {"num_bits_per_symbol": 1, "constellation_type": "psk"},
"qpsk": {"num_bits_per_symbol": 2, "constellation_type": "psk"},
"qam16": {"num_bits_per_symbol": 4, "constellation_type": "qam"},
"qam64": {"num_bits_per_symbol": 6, "constellation_type": "qam"},
}
def generate_modulated_signals():
for modulation in ["bpsk", "qpsk", "qam16", "qam64"]:
for snr in np.arange(-6, 13, 3):
recording_length = 1024
beta = 0.3 # the rolloff factor, can be changed to add variety
sps = 4 # samples per symbol, or the relative bandwidth of the digital signal. Can also be changed.
# blocks don't directly take the string 'qpsk' so we use the dict 'mods' to get parameters
constellation_type = mods[modulation]["constellation_type"]
num_bits_per_symbol = mods[modulation]["num_bits_per_symbol"]
# construct the digital modulation blocks with these parameters
# we have bit source -> mapper -> upsampling -> pulse shaping
bit_source = block_generator.RandomBinarySource()
mapper = block_generator.Mapper(
constellation_type=constellation_type,
num_bits_per_symbol=num_bits_per_symbol,
)
upsampler = block_generator.Upsampling(factor=sps)
pulse_shaping_filter = block_generator.RaisedCosineFilter(
upsampling_factor=sps, beta=beta
)
pulse_shaping_filter.connect_input([upsampler])
upsampler.connect_input([mapper])
mapper.connect_input([bit_source])
modulation_recording = pulse_shaping_filter.record(
num_samples=recording_length
)
# add noise by calculating the power of the modulation recording and generating AWGN from the snr parameter
signal_power = np.mean(np.abs(modulation_recording.data[0] ** 2))
awgn_source = block_generator.AWGNSource(
variance=(signal_power / 2) * (10 ** (((-1 * snr) / 20)))
)
noise = awgn_source.record(num_samples=recording_length)
samples_with_noise = modulation_recording.data + noise.data
output_recording = Recording(data=samples_with_noise)
# add metadata for ML later
output_recording.add_to_metadata(key="modulation", value=modulation)
output_recording.add_to_metadata(key="snr", value=int(snr))
output_recording.add_to_metadata(key="beta", value=beta)
output_recording.add_to_metadata(key="sps", value=sps)
# view if you want
# output_recording.view()
# save to file
output_recording.to_npy() # optionally add path and filename parameters
if __name__ == "__main__":
generate_modulated_signals()

View File

@ -1,159 +0,0 @@
import os, h5py, numpy as np
from utils.io import from_npy
from split_dataset import split
from helpers.app_settings import get_app_settings
meta_dtype = np.dtype(
[
("rec_id", "S256"),
("snippet_idx", np.int32),
("modulation", "S32"),
("snr", np.int32),
("beta", np.float32),
("sps", np.int32),
]
)
info_dtype = np.dtype(
[
("num_records", np.int32),
("dataset_name", "S64"), # up to 64byte UTF-8 strings
("creator", "S64"),
]
)
def write_hdf5_file(records, output_path, dataset_name="data"):
"""
Writes a list of records to an HDF5 file.
Parameters:
records (list): List of records to be written to the file
output_path (str): Path to the output HDF5 file
dataset_name (str): Name of the dataset in the HDF5 file (default: "data")
Returns:
str: Path to the created HDF5 file
"""
meta_arr = np.empty(len(records), dtype=meta_dtype)
for i, (_, md) in enumerate(records):
meta_arr[i] = (
md["rec_id"].encode("utf-8"),
md["snippet_idx"],
md["modulation"].encode("utf-8"),
int(md["snr"]),
float(md["beta"]),
int(md["sps"]),
)
first_rec, _ = records[0] # records[0] is a tuple of (data, md)
sample = first_rec
shape, dtype = sample.shape, sample.dtype
with h5py.File(output_path, "w") as hf:
dset = hf.create_dataset(
dataset_name, shape=(len(records),) + shape, dtype=dtype, compression="gzip"
)
for idx, (snip, md) in enumerate(records):
dset[idx, ...] = snip
mg = hf.create_group("metadata")
mg.create_dataset("metadata", data=meta_arr, compression="gzip")
print(dset.shape, f"snippets created in {dataset_name}")
info_arr = np.array(
[
(
len(records),
dataset_name.encode("utf-8"),
b"generate_dataset.py", # already bytes
)
],
dtype=info_dtype,
)
mg.create_dataset("dataset_info", data=info_arr)
return output_path
def split_recording(recording_list, num_snippets):
"""
Splits a list of recordings into smaller chunks.
Parameters:
recording_list (list): List of recordings to be split
Returns: yeah yeah
list: List of split recordings
"""
snippet_list = []
for data, md in recording_list:
C, N = data.shape
L = N // num_snippets
for i in range(num_snippets):
start = i * L
end = (i + 1) * L
snippet = data[:, start:end]
# copy the metadata, adding a snippet index
snippet_md = md.copy()
snippet_md["snippet_idx"] = i
snippet_list.append((snippet, snippet_md))
return snippet_list
def generate_datasets(cfg):
"""
Generates a dataset from a folder of .npy files and saves it to an HDF5 file
Parameters:
path_to_recordings (str): Path to the folder containing .npy files
output_path (str): Path to the output HDF5 file
dataset_name (str): Name of the dataset in the HDF5 file (default: "data")
Returns:
dset (h5py.Dataset): The created dataset object
"""
parent = os.path.dirname(cfg.output_dir)
if not parent:
os.makedirs(cfg.output_dir, exist_ok=True)
# we assume the recordings are in .npy format
files = os.listdir(cfg.input_dir)
if not files:
raise ValueError("No files found in the specified directory.")
records = []
for fname in files:
rec = from_npy(os.path.join(cfg.input_dir, fname))
data = rec.data
md = rec.metadata # pull metadata from the recordinh
md.setdefault("recid", len(records))
records.append((data, md))
# split each recording into 8 snippets each
records = split_recording(records, cfg.num_slices)
train_records, val_records = split(records, cfg.train_split, cfg.seed)
train_path = os.path.join(cfg.output_dir, "train.h5")
val_path = os.path.join(cfg.output_dir, "val.h5")
write_hdf5_file(train_records, train_path, "training_data")
write_hdf5_file(val_records, val_path, "validation_data")
return train_path, val_path
def main():
settings = get_app_settings()
dataset_cfg = settings.dataset
train_path, val_path = generate_datasets(dataset_cfg)
print(f"✅ Train: {train_path}\n✅ Val: {val_path}")
if __name__ == "__main__":
main()

View File

@ -1,43 +0,0 @@
import random
from collections import defaultdict
def split(dataset, train_frac=0.8, seed=42):
"""
Splits a dataset into smaller datasets based on the specified lengths.
Parameters:
dataset (list): The dataset to be split.
lengths (list): A list of lengths for each split.
Returns:
list: A list of split datasets.
"""
N = len(dataset)
target = int(N * train_frac)
by_rec = defaultdict(list)
for i, (_, md) in enumerate(dataset):
by_rec[md["rec_id"]].append(i)
rec_ids = list(by_rec.keys())
random.seed(seed)
random.shuffle(rec_ids)
train_set = set()
count = 0
for rec_id in rec_ids:
index = by_rec[rec_id]
if count + len(index) <= target:
train_set.update(index)
count += len(index)
validation_set = set(range(N)) - train_set
print(f"Train set :{len(train_set)}")
print(f"val set :{len(validation_set)}")
train_records = [dataset[i] for i in sorted(train_set)]
val_records = [dataset[i] for i in sorted(validation_set)]
return train_records, val_records