""" Utilities for input/output operations on the ria_toolkit_oss.datatypes.Recording object. """ import datetime import datetime as dt import numbers import os import re import struct from datetime import timezone from typing import Any, List, Optional import numpy as np import sigmf from quantiphy import Quantity from sigmf import SigMFFile, sigmffile from sigmf.utils import get_data_type_str from ria_toolkit_oss.datatypes import Annotation from ria_toolkit_oss.datatypes.recording import Recording _BLUE_META_PREFIX = "META_" _BLUE_META_TAG_MAX_LEN = 60 _BLUE_SKIP_METADATA_KEYS = {"blue_data_format", "blue_endian", "blue_keywords"} _BLUE_NUMERIC_DTYPE = { "B": "i1", "I": "i2", "L": "i4", "F": "f4", "D": "f8", } SIGMF_KEY_CONVERSION = { SigMFFile.AUTHOR_KEY: "author", SigMFFile.COLLECTION_KEY: "sigmf:collection", SigMFFile.DATASET_KEY: "sigmf:dataset", SigMFFile.DATATYPE_KEY: "datatype", SigMFFile.DATA_DOI_KEY: "data_doi", SigMFFile.DESCRIPTION_KEY: "description", SigMFFile.EXTENSIONS_KEY: "sigmf:extensions", SigMFFile.GEOLOCATION_KEY: "geolocation", SigMFFile.HASH_KEY: "sigmf:hash", SigMFFile.HW_KEY: "sdr", SigMFFile.LICENSE_KEY: "license", SigMFFile.META_DOI_KEY: "metadata", SigMFFile.METADATA_ONLY_KEY: "sigmf:metadata_only", SigMFFile.NUM_CHANNELS_KEY: "sigmf:num_channels", SigMFFile.RECORDER_KEY: "source_software", SigMFFile.SAMPLE_RATE_KEY: "sample_rate", SigMFFile.START_OFFSET_KEY: "sigmf:start_offset", SigMFFile.TRAILING_BYTES_KEY: "sigmf:trailing_bytes", SigMFFile.VERSION_KEY: "sigmf:version", } def to_npy( recording: Recording, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None, overwrite: bool = False, ) -> str: """Write recording to ``.npy`` binary file. :param recording: The recording to be written to file. :type recording: ria_toolkit_oss.datatypes.Recording :param filename: The name of the file where the recording is to be saved. Defaults to auto generated filename. :type filename: os.PathLike or str, optional :param path: The directory path to where the recording is to be saved. Defaults to recordings/. :type path: os.PathLike or str, optional :raises IOError: If there is an issue encountered during the file writing process. :return: Path where the file was saved. :rtype: str **Examples:** >>> from ria_toolkit_oss.sdr import Synth >>> from ria_toolkit_oss.data import Recording >>> from ria_toolkit_oss.io import to_npy >>> sdr = Synth() >>> rec = sdr.record(center_frequency=2.4e9, sample_rate=20e6) >>> to_npy(recording=rec, file="sample_recording.npy") """ filename, path, fullpath = generate_fullpath( recording=recording, filename=filename, path=path, extension=".npy", overwrite=overwrite ) data = np.array(recording.data) metadata = recording.metadata annotations = recording.annotations with open(file=fullpath, mode="wb") as f: np.save(f, data) np.save(f, metadata) np.save(f, annotations) # print(f"Saved recording to {os.getcwd()}/{fullpath}") return str(fullpath) def from_npy(file: os.PathLike | str, legacy: bool = False) -> Recording: """Load a recording from a ``.npy`` binary file. :param file: The directory path to the recording file, with or without the ``.npy`` file extension. :type file: str or os.PathLike :param legacy: If True, load legacy format (iqdata, meta[4], extended_meta dict). If False, load current format (data, metadata dict, annotations list). Default is False. :type legacy: bool, optional :raises IOError: If there is an issue encountered during the file reading process. :return: The recording, as initialized from the ``.npy`` file. :rtype: ria_toolkit_oss.datatypes.Recording """ filename, extension = os.path.splitext(file) if extension != ".npy" and extension != "": raise ValueError("Cannot use from_npy if file extension is not .npy") # Rebuild with .npy extension. filename = str(filename) + ".npy" if legacy: return from_npy_legacy(filename) with open(file=filename, mode="rb") as f: data = np.load(f, allow_pickle=True) metadata = np.load(f, allow_pickle=True) metadata = metadata.tolist() try: annotations = list(np.load(f, allow_pickle=True)) except EOFError: annotations = [] recording = Recording(data=data, metadata=metadata, annotations=annotations) return recording def from_npy_legacy(file: os.PathLike | str) -> Recording: """Load a recording from legacy NPY format. Legacy format (pre-utils) stores three numpy arrays: 1. iqdata: shape (2, N) with I and Q as separate rows (float32) 2. meta: shape (4,) with [center_freq, rec_length, decimation, sample_rate] 3. extended_meta: dict with additional metadata :param file: The directory path to the recording file, with or without the ``.npy`` file extension. :type file: str or os.PathLike :raises IOError: If there is an issue encountered during the file reading process. :return: The recording, as initialized from the legacy ``.npy`` file. :rtype: ria_toolkit_oss.datatypes.Recording **Examples:** Load legacy SRS recordings: >>> from ria_toolkit_oss.io import from_npy_legacy >>> rec = from_npy_legacy("~/sample_recs/srs/example_srs_recordings/bw40M_Youtube_sr46.08/iq3775MHz053601.npy") >>> print(rec.metadata.get('protocol')) 5G40 """ filename, extension = os.path.splitext(file) if extension != ".npy" and extension != "": raise ValueError("Cannot use from_npy_legacy if file extension is not .npy") # Rebuild with .npy extension. filename = str(filename) + ".npy" with open(filename, "rb") as f: # Read IQ data (2, N) format iqdata = np.load(f) # Read basic metadata array [center_freq, rec_length, decimation, sample_rate] meta = np.load(f) # Read extended metadata dict extended_meta = np.load(f, allow_pickle=True)[0] # Convert IQ data from (2, N) to (N,) complex format i_channel = iqdata[0, :] q_channel = iqdata[1, :] complex_data = i_channel + 1j * q_channel # Build metadata dictionary metadata = {} # Extract from basic meta array if available if len(meta) >= 4: metadata["center_frequency"] = float(meta[0]) metadata["legacy_rec_length"] = int(meta[1]) metadata["legacy_decimation"] = int(meta[2]) metadata["sample_rate"] = float(meta[3]) # Merge extended metadata if isinstance(extended_meta, dict): for key, value in extended_meta.items(): # Convert keys to lowercase snake_case if needed key_lower = key.lower() # Don't overwrite already set values if key_lower not in metadata: metadata[key_lower] = value return Recording(data=complex_data, metadata=metadata) def to_sigmf( recording: Recording, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None, overwrite: bool = False, ) -> None: """Write recording to a set of SigMF files. The SigMF io format is defined by the `SigMF Specification Project `_ :param recording: The recording to be written to file. :type recording: ria_toolkit_oss.datatypes.Recording :param filename: The name of the file where the recording is to be saved. Defaults to auto generated filename. :type filename: os.PathLike or str, optional :param path: The directory path to where the recording is to be saved. Defaults to recordings/. :type path: os.PathLike or str, optional :raises IOError: If there is an issue encountered during the file writing process. :return: None **Examples:** >>> from ria_toolkit_oss.sdr import Synth >>> from ria_toolkit_oss.data import Recording >>> from ria_toolkit_oss.io import to_sigmf >>> sdr = Synth() >>> rec = sdr.record(center_frequency=2.4e9, sample_rate=20e6) >>> to_sigmf(recording=rec, file="sample_recording") """ filename, path, _ = generate_fullpath( recording=recording, filename=filename, path=path, extension="", overwrite=True ) multichannel_samples = recording.data metadata = recording.metadata annotations = recording.annotations if multichannel_samples.shape[0] > 1: raise NotImplementedError("SigMF File Saving Not Implemented for Multichannel Recordings") else: # extract single channel samples = multichannel_samples[0] data_file_path = os.path.join(path, f"{filename}.sigmf-data") meta_file_path = os.path.join(path, f"{filename}.sigmf-meta") if not overwrite: if os.path.isfile(data_file_path): raise IOError("File already exists") if os.path.isfile(meta_file_path): raise IOError("File already exists") samples.tofile(data_file_path) global_info = { SigMFFile.DATATYPE_KEY: get_data_type_str(samples), SigMFFile.VERSION_KEY: sigmf.__version__, SigMFFile.RECORDER_KEY: "RIA", } converted_metadata = { sigmf_key: metadata[metadata_key] for sigmf_key, metadata_key in SIGMF_KEY_CONVERSION.items() if metadata_key in metadata } # Merge dictionaries, giving priority to sigmf_meta global_info = {**converted_metadata, **global_info} ria_metadata = {f"ria:{key}": value for key, value in metadata.items()} ria_metadata = convert_to_serializable(ria_metadata) global_info.update(ria_metadata) sigMF_metafile = SigMFFile( data_file=data_file_path, global_info=global_info, ) for annotation_object in annotations: annotation_dict = annotation_object.to_sigmf_format() annotation_dict = convert_to_serializable(annotation_dict) sigMF_metafile.add_annotation( start_index=annotation_dict[SigMFFile.START_INDEX_KEY], length=annotation_dict[SigMFFile.LENGTH_INDEX_KEY], metadata=annotation_dict["metadata"], ) sigMF_metafile.add_capture( 0, metadata={ SigMFFile.FREQUENCY_KEY: metadata.get("center_frequency", 0), SigMFFile.DATETIME_KEY: dt.datetime.fromtimestamp(float(metadata.get("timestamp", 0)), tz=timezone.utc) .isoformat() .replace("+00:00", "Z"), }, ) meta_dict = sigMF_metafile.ordered_metadata() meta_dict["ria"] = metadata sigMF_metafile.tofile(meta_file_path) def from_sigmf(file: os.PathLike | str) -> Recording: """Load a recording from a set of SigMF files. :param file: The directory path to the SigMF recording files, without any file extension. The recording will be initialized from ``file_name.sigmf-data`` and ``file_name.sigmf-meta``. Both the data and meta files must be present for a successful read. :type file: str or os.PathLike :raises IOError: If there is an issue encountered during the file reading process. :return: The recording, as initialized from the SigMF files. :rtype: ria_toolkit_oss.datatypes.Recording """ if len(file) > 11: if file[-11:-5] != ".sigmf": file = file + ".sigmf-data" sigmf_file = sigmffile.fromfile(file) data = sigmf_file.read_samples() global_metadata = sigmf_file.get_global_info() dict_annotations = sigmf_file.get_annotations() processed_metadata = {} for key, value in global_metadata.items(): # Process core keys if key.startswith("core:"): base_key = key[5:] # Remove 'core:' prefix converted_key = SIGMF_KEY_CONVERSION.get(base_key, base_key) # Process ria keys elif key.startswith("ria:"): converted_key = key[4:] # Remove 'ria:' prefix else: # Load non-core/ria keys as is converted_key = key processed_metadata[converted_key] = value annotations = [] for dict in dict_annotations: annotations.append( Annotation( sample_start=dict[SigMFFile.START_INDEX_KEY], sample_count=dict[SigMFFile.LENGTH_INDEX_KEY], freq_lower_edge=dict.get(SigMFFile.FLO_KEY, None), freq_upper_edge=dict.get(SigMFFile.FHI_KEY, None), label=dict.get(SigMFFile.LABEL_KEY, None), comment=dict.get(SigMFFile.COMMENT_KEY, None), detail=dict.get("ria:detail", None), ) ) output_recording = Recording(data=data, metadata=processed_metadata, annotations=annotations) return output_recording def to_wav( recording: Recording, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None, target_sample_rate: Optional[int] = 48000, bits_per_sample: int = 32, overwrite: bool = False, ) -> str: """Write recording to WAV file with embedded YAML metadata in LIST INFO chunk. WAV format uses stereo audio with I (in-phase) in left channel and Q (quadrature) in right channel. Metadata is stored in standard LIST INFO chunks with RF-specific metadata encoded as YAML in the ICMT (comment) field for human readability. :param recording: The recording to be written to file. :type recording: ria_toolkit_oss.datatypes.Recording :param filename: The name of the file where the recording is to be saved. Defaults to auto-generated filename. :type filename: str, optional :param path: The directory path to where the recording is to be saved. Defaults to recordings/. :type path: os.PathLike or str, optional :param target_sample_rate: Sample rate written to the WAV header when the recording metadata does not specify one. Defaults to 48 kHz. No decimation is performed— IQ samples are written sample-for-sample exactly as provided. :type target_sample_rate: int, optional :param bits_per_sample: Bits per sample (32 for float32, 16 for int16). Default is 32 (float32). :type bits_per_sample: int, optional :param overwrite: Whether to overwrite existing files. Default is False. :type overwrite: bool, optional :raises IOError: If file already exists and overwrite is False. :raises ValueError: If recording has multiple channels. :raises ValueError: If bits_per_sample is not 16 or 32. :raises ValueError: If 16-bit export is requested but samples fall outside [-1, 1). :return: Path where the file was saved. :rtype: str """ import wave if recording.n_chan > 1: raise ValueError("WAV export not supported for multichannel recordings") if bits_per_sample not in [16, 32]: raise ValueError("bits_per_sample must be 16 or 32") # Generate filename if not provided filename, path, fullpath = generate_fullpath( recording=recording, filename=filename, path=path, extension=".wav", overwrite=overwrite ) # Extract single channel iq_samples = np.asarray(recording.data[0]) # Determine WAV header sample rate (metadata only) wav_sample_rate = recording.sample_rate or target_sample_rate or 48000 # Convert complex to stereo (I and Q channels) i_channel = np.real(iq_samples) q_channel = np.imag(iq_samples) # Convert to target data type if bits_per_sample == 32: # 32-bit float i_data = np.ascontiguousarray(i_channel, dtype=np.float32) q_data = np.ascontiguousarray(q_channel, dtype=np.float32) sample_width = 4 else: # 16-bit int max_mag = np.max(np.abs(np.concatenate([i_channel, q_channel]))) if max_mag > 1.0: raise ValueError("16-bit WAV export requires samples within [-1, 1). Use float32 or normalize manually.") scale = np.iinfo(np.int16).max i_scaled = np.clip(i_channel, -1.0, 1.0 - (1.0 / scale)) q_scaled = np.clip(q_channel, -1.0, 1.0 - (1.0 / scale)) i_data = np.ascontiguousarray(np.round(i_scaled * scale).astype(np.int16)) q_data = np.ascontiguousarray(np.round(q_scaled * scale).astype(np.int16)) sample_width = 2 # Interleave I and Q stereo = np.empty(len(iq_samples) * 2, dtype=i_data.dtype) stereo[0::2] = i_data stereo[1::2] = q_data # Write WAV file with wave.open(fullpath, "wb") as wav: wav.setnchannels(2) # Stereo (I and Q) wav.setsampwidth(sample_width) wav.setframerate(int(wav_sample_rate)) if bits_per_sample == 32: wav.setcomptype("NONE", "not compressed") wav.writeframes(stereo.tobytes()) # Prepare metadata for LIST INFO chunk rf_metadata = recording.metadata.copy() # Record both RF and WAV header sample rates for clarity if recording.sample_rate: rf_metadata["rf_sample_rate_hz"] = float(recording.sample_rate) rf_metadata["wav_sample_rate_hz"] = float(wav_sample_rate) # Rename common keys to more descriptive versions if "center_frequency" in rf_metadata: rf_metadata["center_frequency_hz"] = rf_metadata.pop("center_frequency") if "sample_rate" in rf_metadata and "rf_sample_rate_hz" not in rf_metadata: rf_metadata["rf_sample_rate_hz"] = rf_metadata.pop("sample_rate") # Append LIST INFO chunk with metadata _append_wav_list_info_chunk(fullpath, rf_metadata) return fullpath def from_wav(file: os.PathLike | str) -> Recording: """Load recording from WAV file and extract RF metadata. :param file: The path to the WAV file to load. :type file: str or os.PathLike :raises IOError: If there is an issue reading the file. :raises ValueError: If file is not stereo or has unsupported format. :return: The recording, as initialized from the WAV file. :rtype: ria_toolkit_oss.datatypes.Recording """ import wave filename = str(file) if not filename.endswith(".wav"): filename = filename + ".wav" # Read audio data with wave.open(filename, "rb") as wav: n_channels = wav.getnchannels() sample_width = wav.getsampwidth() sample_rate = wav.getframerate() n_frames = wav.getnframes() comp_type = wav.getcomptype() audio_bytes = wav.readframes(n_frames) if n_channels != 2: raise ValueError(f"Expected stereo WAV file, got {n_channels} channels") # Determine data type if sample_width == 4 and comp_type == "NONE": # 32-bit float dtype = np.float32 elif sample_width == 2: # 16-bit int dtype = np.int16 else: raise ValueError(f"Unsupported WAV format: {sample_width} bytes per sample, comp_type={comp_type}") # Convert bytes to stereo array stereo = np.frombuffer(audio_bytes, dtype=dtype) # De-interleave I and Q i_channel = stereo[0::2] q_channel = stereo[1::2] # Normalize int16 to float if dtype == np.int16: i_channel = i_channel.astype(np.float32) / 32767.0 q_channel = q_channel.astype(np.float32) / 32767.0 # Convert to complex iq_samples = i_channel + 1j * q_channel # Extract LIST INFO metadata metadata = _extract_wav_list_info(filename) if metadata is None: metadata = {} # Ensure sample_rate is in metadata if "sample_rate" not in metadata: # Prefer RF sample rate if available, otherwise fall back to WAV header if "rf_sample_rate_hz" in metadata: metadata["sample_rate"] = metadata["rf_sample_rate_hz"] elif "wav_sample_rate_hz" in metadata: metadata["sample_rate"] = metadata["wav_sample_rate_hz"] else: metadata["sample_rate"] = float(sample_rate) # Restore original keys for compatibility if "center_frequency_hz" in metadata and "center_frequency" not in metadata: metadata["center_frequency"] = metadata["center_frequency_hz"] return Recording(data=iq_samples, metadata=metadata) def to_blue( recording: Recording, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None, data_format: str = "CI", overwrite: bool = False, ) -> str: """ Write recording to MIDAS Blue file format. MIDAS Blue is a legacy RF file format with a 512-byte binary header. Commonly used with X-Midas and other RF/radar signal processing tools. :param recording: The recording to be written to file. :type recording: ria_toolkit_oss.datatypes.Recording :param filename: The name of the file where the recording is to be saved. Defaults to auto-generated filename. :type filename: str, optional :param path: The directory path to where the recording is to be saved. Defaults to recordings/. :type path: os.PathLike or str, optional :param data_format: Format code (default 'CI' = complex int16). Common formats: 'CI' (complex int16), 'CF' (complex float32), 'CD' (complex float64). :type data_format: str, optional :param overwrite: Whether to overwrite existing files. Default is False. :type overwrite: bool, optional :raises IOError: If file already exists and overwrite is False. :raises ValueError: If recording has multiple channels. :raises ValueError: If data_format is not supported. :raises ValueError: If integer formats are requested but samples fall outside [-1, 1). :return: Path where the file was saved. :rtype: str """ if recording.n_chan > 1: raise ValueError("MIDAS Blue export not supported for multichannel recordings") if recording.sample_rate is None: raise ValueError("Recording metadata must include 'sample_rate' for MIDAS Blue export.") # Generate filename if not provided filename, path, fullpath = generate_fullpath( recording=recording, filename=filename, path=path, extension=".blue", overwrite=overwrite ) # Extract single channel iq_samples = np.asarray(recording.data[0]) sample_rate = float(recording.sample_rate) metadata = recording.metadata or {} # Data format if data_format not in ["CI", "CF", "CD", "SI", "SF", "SD"]: raise ValueError(f"Unsupported data format: {data_format}. Use CI, CF, CD, SI, SF, or SD") # Convert IQ samples to specified format dtype_map = { "CI": np.int16, "CF": np.float32, "CD": np.float64, "SI": np.int16, "SF": np.float32, "SD": np.float64, } dtype = dtype_map[data_format] # Separate I and Q for complex formats if data_format.startswith("C"): # Convert using requested data type if np.issubdtype(dtype, np.integer): i_data = np.real(iq_samples) q_data = np.imag(iq_samples) max_mag = np.max(np.abs(np.concatenate([i_data, q_data]))) if max_mag > 1.0: raise ValueError( "Integer MIDAS Blue export requires samples within [-1, 1). " "Normalize or export using a float format (CF/CD)." ) max_val = np.iinfo(dtype).max eps = 1.0 / max_val i_scaled = np.clip(i_data, -1.0, 1.0 - eps) q_scaled = np.clip(q_data, -1.0, 1.0 - eps) i_converted = np.round(i_scaled * max_val).astype(dtype) q_converted = np.round(q_scaled * max_val).astype(dtype) else: i_converted = np.real(iq_samples).astype(dtype, copy=False) q_converted = np.imag(iq_samples).astype(dtype, copy=False) # Interleave I and Q interleaved = np.empty(len(iq_samples) * 2, dtype=dtype) interleaved[0::2] = i_converted interleaved[1::2] = q_converted else: # Real-valued data (use only I channel) if np.issubdtype(dtype, np.integer): real_channel = np.real(iq_samples) max_mag = np.max(np.abs(real_channel)) if max_mag >= 1.0: raise ValueError( "Integer MIDAS Blue export requires samples within [-1, 1). " "Normalize or export using a float format (SF/SD)." ) max_val = np.iinfo(dtype).max eps = 1.0 / max_val clipped = np.clip(real_channel, -1.0, 1.0 - eps) interleaved = np.round(clipped * max_val).astype(dtype) else: interleaved = np.real(iq_samples).astype(dtype, copy=False) # Create 512-byte header header = bytearray(512) header[0:4] = b"BLUE" header[4:8] = b"EEEI" header[8:12] = b"EEEI" header[52:54] = data_format.encode("ascii") struct.pack_into(" Recording: """ Load recording from MIDAS Blue file. :param file: The path to the MIDAS Blue file to load. :type file: str or os.PathLike :raises IOError: If there is an issue reading the file. :raises ValueError: If file format is not valid or unsupported. :return: The recording, as initialized from the Blue file. :rtype: ria_toolkit_oss.datatypes.Recording """ filename = str(file) if not filename.endswith(".blue"): filename = filename + ".blue" with open(filename, "rb") as f: header_bytes = f.read(512) if len(header_bytes) < 512: raise ValueError("File too small to be a valid MIDAS Blue file") magic = header_bytes[0:4].decode("ascii", errors="ignore") if magic != "BLUE": raise ValueError(f"Not a Blue file (magic={magic})") header_rep = header_bytes[4:8].decode("ascii", errors="ignore") data_rep = header_bytes[8:12].decode("ascii", errors="ignore") header_endian = ">" if header_rep == "IEEE" else "<" data_endian = ">" if data_rep == "IEEE" else "<" ext_start = struct.unpack(f"{header_endian}i", header_bytes[24:28])[0] ext_size = struct.unpack(f"{header_endian}i", header_bytes[28:32])[0] data_start_offset = int(struct.unpack(f"{header_endian}d", header_bytes[32:40])[0]) data_size_bytes = int(struct.unpack(f"{header_endian}d", header_bytes[40:48])[0]) data_format = header_bytes[52:54].decode("ascii", errors="ignore") timecode = struct.unpack(f"{header_endian}d", header_bytes[56:64])[0] time_interval = struct.unpack(f"{header_endian}d", header_bytes[264:272])[0] sample_rate = 1.0 / time_interval if time_interval > 0 else 0 file_size = os.path.getsize(filename) if data_start_offset <= 0: data_start_offset = 512 if data_size_bytes <= 0: data_end = ext_start * 512 if ext_start > 0 else file_size data_size_bytes = max(0, data_end - data_start_offset) # Map format code to numpy dtype dtype_map = { "CB": (np.int8, True), "CI": (np.int16, True), "CL": (np.int32, True), "CF": (np.float32, True), "CD": (np.float64, True), "SB": (np.int8, False), "SI": (np.int16, False), "SL": (np.int32, False), "SF": (np.float32, False), "SD": (np.float64, False), } base_dtype, is_complex = dtype_map.get(data_format, (None, False)) if base_dtype is None: raise ValueError(f"Unsupported format: {data_format}") # Apply endianness dtype = np.dtype(base_dtype).newbyteorder(data_endian) ext_keywords: dict[str, Any] = {} with open(filename, "rb") as f: f.seek(data_start_offset) num_elements = data_size_bytes // dtype.itemsize if dtype.itemsize else 0 data = np.fromfile(f, dtype=dtype, count=num_elements) if ext_start > 0 and ext_size > 0: f.seek(ext_start * 512) ext_bytes = f.read(ext_size) ext_keywords = _decode_blue_keywords(ext_bytes, header_rep) # Convert to complex if needed if is_complex: # Interleaved IQ: [I0, Q0, I1, Q1, ...] i_samples = data[0::2] q_samples = data[1::2] # Normalize integer data if np.issubdtype(base_dtype, np.integer): max_val = np.iinfo(base_dtype).max i_samples = i_samples.astype(np.float32) / max_val q_samples = q_samples.astype(np.float32) / max_val iq_samples = i_samples + 1j * q_samples else: # Real data - convert to complex if np.issubdtype(base_dtype, np.integer): max_val = np.iinfo(base_dtype).max real_samples = data.astype(np.float32) / max_val else: real_samples = data.astype(np.float32) iq_samples = real_samples.astype(np.complex64) # Create metadata metadata = { "sample_rate": float(sample_rate), "blue_data_format": data_format, "blue_endian": data_rep, } if ext_keywords: metadata["blue_keywords"] = ext_keywords for tag, value in ext_keywords.items(): meta_key = _meta_key_from_tag(tag) if meta_key and meta_key not in metadata: metadata[meta_key] = value if isinstance(timecode, numbers.Real) and timecode != 0: metadata.setdefault("timestamp", timecode) metadata["timecode"] = timecode return Recording(data=iq_samples, metadata=metadata) def load_recording(file: os.PathLike) -> Recording: """Load a recording from file. :param file: The directory path to the file(s) to load, **with** the file extension. To loading from SigMF, the file extension must be one of *sigmf*, *sigmf-data*, or *sigmf-meta*, either way both the SigMF data and meta files must be present for a successful read. :type file: os.PathLike :raises IOError: If there is an issue encountered during the file reading process. :raises ValueError: If the inferred file extension is not supported. :return: The recording, as initialized from file(s). :rtype: ria_toolkit_oss.datatypes.Recording """ _, extension = os.path.splitext(file) extension = extension.lstrip(".") if extension.lower() in ["sigmf", "sigmf-data", "sigmf-meta"]: return from_sigmf(file=file) elif extension.lower() == "npy": return from_npy(file=file) elif extension.lower() == "wav": return from_wav(file=file) elif extension.lower() == "blue": return from_blue(file=file) else: raise ValueError(f"File extension {extension} not supported.") def convert_to_serializable(obj): """ Recursively convert a JSON-compatible structure into a fully JSON-serializable one. Handles cases like NumPy data types, nested dicts, lists, and sets. """ if isinstance(obj, np.integer): return int(obj) # Convert NumPy int to Python int elif isinstance(obj, np.floating): return float(obj) # Convert NumPy float to Python float elif isinstance(obj, np.ndarray): return obj.tolist() # Convert NumPy array to list elif isinstance(obj, (list, tuple)): return [convert_to_serializable(item) for item in obj] # Process list or tuple elif isinstance(obj, dict): return {key: convert_to_serializable(value) for key, value in obj.items()} # Process dict elif isinstance(obj, set): return list(obj) # Convert set to list elif obj in [float("inf"), float("-inf"), None]: # Handle infinity or None return None elif isinstance(obj, (str, int, float, bool)) or obj is None: return obj # Base case: already serializable else: raise TypeError(f"Value of type {type(obj)} is not JSON serializable: {obj}") def generate_filename(recording: Recording, tag: Optional[str] = "rec"): """Generate a filename from metadata. :param tag: The string at the beginning of the generated filename. Default is "rec". :type tag: str, optional :return: A filename without an extension. :rtype: str """ tag = tag + "_" source = recording.metadata.get("source", "") if source != "": source = source + "_" # converts 1000 to 1k for example center_frequency = str(Quantity(recording.metadata.get("center_frequency", 0))) if center_frequency != "0": num = center_frequency[:-1] suffix = center_frequency[-1] num = int(np.round(float(num))) else: num = 0 suffix = "" center_frequency = str(num) + suffix + "Hz_" timestamp = int(recording.timestamp) timestamp = datetime.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d_%H-%M-%S") + "_" # Add first seven characters of rec_id for uniqueness rec_id = recording.rec_id[0:7] return tag + source + center_frequency + timestamp + rec_id def generate_fullpath(recording: Recording, filename: str, path: os.PathLike | str, extension: str, overwrite: bool): """ Generate the filename, path, and fullpath of the given recording. """ # Generate filename if not provided if filename is not None: filename, _ = os.path.splitext(filename) else: filename = generate_filename(recording=recording) filename = filename + extension if path is None: path = "recordings" if not os.path.exists(path): os.makedirs(path) fullpath = os.path.join(path, filename) if not overwrite and os.path.isfile(fullpath): raise IOError(f"File already exists: {fullpath}") return filename, path, fullpath def _append_wav_list_info_chunk(filename: str, rf_metadata: dict) -> None: """Append LIST INFO chunk to existing WAV file. Uses ICMT field for YAML-formatted RF metadata. :param filename: Path to WAV file. :type filename: str :param rf_metadata: Dictionary of RF metadata to embed. :type rf_metadata: dict """ import yaml # Convert metadata to YAML string yaml_str = "# RF Recording Metadata\n" yaml_str += yaml.dump(rf_metadata, default_flow_style=False, sort_keys=False) # Create LIST INFO chunk data info_data = b"" # Add ICMT (comments) tag with YAML metadata icmt_value = yaml_str.encode("utf-8", errors="ignore") icmt_value += b"\x00" # NULL terminator # Pad to even length (RIFF requirement) if len(icmt_value) % 2: icmt_value += b"\x00" info_data += b"ICMT" info_data += len(icmt_value).to_bytes(4, "little") info_data += icmt_value # Add ISFT (software) tag isft_value = b"riatoolkit oss SDR toolchain\x00" if len(isft_value) % 2: isft_value += b"\x00" info_data += b"ISFT" info_data += len(isft_value).to_bytes(4, "little") info_data += isft_value # Create LIST chunk list_chunk = b"LIST" list_chunk += (4 + len(info_data)).to_bytes(4, "little") # Size includes "INFO" tag list_chunk += b"INFO" # List type list_chunk += info_data # Append to WAV file with open(filename, "r+b") as f: # Read RIFF header f.seek(0) riff_header = f.read(4) if riff_header != b"RIFF": raise ValueError("Not a valid RIFF/WAV file") old_size = int.from_bytes(f.read(4), "little") # Update RIFF chunk size f.seek(4) new_size = old_size + len(list_chunk) f.write(new_size.to_bytes(4, "little")) # Append LIST INFO chunk at end f.seek(0, 2) # End of file f.write(list_chunk) def _extract_wav_list_info(filename: str) -> Optional[dict]: """Extract LIST INFO chunk and parse ICMT field as YAML. :param filename: Path to WAV file. :type filename: str :return: Dictionary of metadata from ICMT field, or None if not found. :rtype: dict or None """ with open(filename, "rb") as f: # Read RIFF header riff_header = f.read(4) if riff_header != b"RIFF": return None file_size = int.from_bytes(f.read(4), "little") wave_header = f.read(4) if wave_header != b"WAVE": return None # Skip to chunks after header (12 bytes = RIFF + size + WAVE) f.seek(12) while f.tell() < file_size + 8: chunk_id = f.read(4) if len(chunk_id) < 4: break chunk_size = int.from_bytes(f.read(4), "little") if chunk_id == b"LIST": list_type = f.read(4) if list_type == b"INFO": # Read INFO chunk data info_data = f.read(chunk_size - 4) return _parse_wav_info_chunk(info_data) else: # Skip this LIST chunk f.seek(chunk_size - 4, 1) else: # Skip chunk (align to even boundary) skip_size = chunk_size if chunk_size % 2: skip_size += 1 f.seek(skip_size, 1) return None def _parse_wav_info_chunk(info_data: bytes) -> Optional[dict]: """Parse INFO chunk data and extract ICMT field as YAML. :param info_data: Raw bytes from INFO chunk. :type info_data: bytes :return: Dictionary parsed from YAML in ICMT field, or None. :rtype: dict or None """ import yaml offset = 0 while offset < len(info_data) - 8: tag = info_data[offset : offset + 4] size = int.from_bytes(info_data[offset + 4 : offset + 8], "little") value_bytes = info_data[offset + 8 : offset + 8 + size] if tag == b"ICMT": # Found comments field - decode and parse YAML icmt_str = value_bytes.decode("utf-8", errors="ignore").rstrip("\x00") try: metadata = yaml.safe_load(icmt_str) # If YAML parsing returns a string (no YAML structure), wrap it if isinstance(metadata, str): return {"raw_comment": metadata} return metadata except yaml.YAMLError: # If YAML parsing fails, return as raw comment return {"raw_comment": icmt_str} # Move to next tag (aligned to even boundary) offset += 8 + size if size % 2: offset += 1 return None def _blue_meta_tag_from_key(key: str) -> str: base = re.sub(r"[^0-9A-Za-z]+", "_", key).strip("_") if not base: return "" base = base.upper()[:_BLUE_META_TAG_MAX_LEN] return f"{_BLUE_META_PREFIX}{base}" def _encode_blue_value(value: Any) -> Optional[tuple[str, bytes]]: if value is None: return None if isinstance(value, np.generic): value = value.item() if isinstance(value, bool): value = int(value) if isinstance(value, numbers.Integral): if -(2**31) <= int(value) < 2**31: return "L", struct.pack(" Optional[bytes]: encoded = _encode_blue_value(value) if encoded is None: return None type_char, value_bytes = encoded tag_bytes = tag.encode("ascii", errors="ignore") ltag = len(tag_bytes) value_length = len(value_bytes) base_length = 8 + value_length + ltag padding = (8 - (base_length % 8)) % 8 lkey = base_length + padding lext = 8 + ltag + padding parts = [ struct.pack(" bytes: if not metadata: return b"" keywords: List[bytes] = [] for key in sorted(metadata.keys()): if key in _BLUE_SKIP_METADATA_KEYS: continue tag = _blue_meta_tag_from_key(key) if not tag: continue encoded = _encode_blue_keyword(tag, metadata[key]) if encoded: keywords.append(encoded) return b"".join(keywords) def _decode_blue_keyword_value(type_char: str, value_bytes: bytes, endian: str) -> Any: if type_char == "A": return value_bytes.decode("utf-8", errors="ignore").rstrip("\x00") dtype_code = _BLUE_NUMERIC_DTYPE.get(type_char) if dtype_code is None or not value_bytes: return value_bytes if value_bytes else None dtype = np.dtype(endian + dtype_code) array = np.frombuffer(value_bytes, dtype=dtype) if array.size == 0: return None if array.size == 1: return array[0].item() return array.tolist() def _decode_blue_keywords(data: bytes, endian: str) -> dict[str, Any]: if not data: return {} metadata: dict[str, Any] = {} offset = 0 endian_prefix = "<" if endian in ["EEEI", "VAX", ""] else ">" while offset + 8 <= len(data): lkey = struct.unpack_from(f"{endian_prefix}i", data, offset)[0] if lkey <= 0 or offset + lkey > len(data): break lext = struct.unpack_from(f"{endian_prefix}h", data, offset + 4)[0] ltag = data[offset + 6] type_char = chr(data[offset + 7]) value_len = lkey - lext value_start = offset + 8 value_end = value_start + value_len tag_start = value_end tag_end = tag_start + ltag if value_len < 0 or tag_end > offset + lkey: break value_bytes = data[value_start:value_end] tag = data[tag_start:tag_end].decode("ascii", errors="ignore").strip() metadata[tag] = _decode_blue_keyword_value(type_char, value_bytes, endian_prefix) offset += lkey return metadata def _meta_key_from_tag(tag: str) -> str: if not tag.startswith(_BLUE_META_PREFIX): return "" base = tag[len(_BLUE_META_PREFIX) :].lower() base = re.sub(r"__+", "_", base) return base.strip("_")