Compare commits
No commits in common. "450fab6df2ab22a34c03f602b5d911500a6a1c6c" and "a0b46a35e28b13a8f289db41dea822fe775e16cb" have entirely different histories.
450fab6df2
...
a0b46a35e2
|
|
@ -1,6 +1,7 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import copy
|
||||
import datetime
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
|
|
@ -11,6 +12,7 @@ from typing import Any, Iterator, Optional
|
|||
|
||||
import numpy as np
|
||||
from numpy.typing import ArrayLike
|
||||
from quantiphy import Quantity
|
||||
|
||||
from ria_toolkit_oss.datatypes.annotation import Annotation
|
||||
|
||||
|
|
@ -448,9 +450,7 @@ class Recording:
|
|||
else:
|
||||
raise ValueError(f"Key {key} is protected and cannot be modified or removed.")
|
||||
|
||||
def to_sigmf(
|
||||
self, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None, overwrite: bool = False
|
||||
) -> None:
|
||||
def to_sigmf(self, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None) -> None:
|
||||
"""Write recording to a set of SigMF files.
|
||||
|
||||
The SigMF io format is defined by the `SigMF Specification Project <https://github.com/sigmf/SigMF>`_
|
||||
|
|
@ -468,11 +468,9 @@ class Recording:
|
|||
"""
|
||||
from ria_toolkit_oss.io.recording import to_sigmf
|
||||
|
||||
to_sigmf(filename=filename, path=path, recording=self, overwrite=overwrite)
|
||||
to_sigmf(filename=filename, path=path, recording=self)
|
||||
|
||||
def to_npy(
|
||||
self, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None, overwrite: bool = False
|
||||
) -> str:
|
||||
def to_npy(self, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None) -> str:
|
||||
"""Write recording to ``.npy`` binary file.
|
||||
|
||||
:param filename: The name of the file where the recording is to be saved. Defaults to auto generated filename.
|
||||
|
|
@ -503,7 +501,7 @@ class Recording:
|
|||
"""
|
||||
from ria_toolkit_oss.io.recording import to_npy
|
||||
|
||||
to_npy(recording=self, filename=filename, path=path, overwrite=overwrite)
|
||||
to_npy(recording=self, filename=filename, path=path)
|
||||
|
||||
def trim(self, num_samples: int, start_sample: Optional[int] = 0) -> Recording:
|
||||
"""Trim Recording samples to a desired length, shifting annotations to maintain alignment.
|
||||
|
|
@ -596,6 +594,40 @@ class Recording:
|
|||
scaled_data = self.data / np.max(abs(self.data))
|
||||
return Recording(data=scaled_data, metadata=self.metadata, annotations=self.annotations)
|
||||
|
||||
def generate_filename(self, tag: Optional[str] = "rec"):
|
||||
"""Generate a filename from metadata.
|
||||
|
||||
:param tag: The string at the beginning of the generated filename. Default is "rec".
|
||||
:type tag: str, optional
|
||||
|
||||
:return: A filename without an extension.
|
||||
:rtype: str
|
||||
"""
|
||||
# TODO: This method should be refactored to use the first 7 characters of the 'rec_id' field.
|
||||
|
||||
tag = tag + "_"
|
||||
source = self.metadata.get("source", "")
|
||||
if source != "":
|
||||
source = source + "_"
|
||||
|
||||
# converts 1000 to 1k for example
|
||||
center_frequency = str(Quantity(self.metadata.get("center_frequency", 0)))
|
||||
if center_frequency != "0":
|
||||
num = center_frequency[:-1]
|
||||
suffix = center_frequency[-1]
|
||||
num = int(np.round(float(num)))
|
||||
else:
|
||||
num = 0
|
||||
suffix = ""
|
||||
center_frequency = str(num) + suffix + "Hz_"
|
||||
|
||||
timestamp = int(self.timestamp)
|
||||
timestamp = datetime.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d_%H-%M-%S") + "_"
|
||||
|
||||
# Add first seven characters of rec_id for uniqueness
|
||||
rec_id = self.rec_id[0:7]
|
||||
return tag + source + center_frequency + timestamp + rec_id
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""The length of a recording is defined by the number of complex samples in each channel of the recording."""
|
||||
return self.shape[1]
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@
|
|||
Utilities for input/output operations on the ria_toolkit_oss.datatypes.Recording object.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import datetime as dt
|
||||
import os
|
||||
from datetime import timezone
|
||||
|
|
@ -10,7 +9,6 @@ from typing import Optional
|
|||
|
||||
import numpy as np
|
||||
import sigmf
|
||||
from quantiphy import Quantity
|
||||
from sigmf import SigMFFile, sigmffile
|
||||
from sigmf.utils import get_data_type_str
|
||||
|
||||
|
|
@ -94,12 +92,7 @@ def convert_to_serializable(obj):
|
|||
raise TypeError(f"Value of type {type(obj)} is not JSON serializable: {obj}")
|
||||
|
||||
|
||||
def to_sigmf(
|
||||
recording: Recording,
|
||||
filename: Optional[str] = None,
|
||||
path: Optional[os.PathLike | str] = None,
|
||||
overwrite: bool = False,
|
||||
) -> None:
|
||||
def to_sigmf(recording: Recording, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None) -> None:
|
||||
"""Write recording to a set of SigMF files.
|
||||
|
||||
The SigMF io format is defined by the `SigMF Specification Project <https://github.com/sigmf/SigMF>`_
|
||||
|
|
@ -128,7 +121,7 @@ def to_sigmf(
|
|||
if filename is not None:
|
||||
filename, _ = os.path.splitext(filename)
|
||||
else:
|
||||
filename = generate_filename(recording=recording)
|
||||
filename = recording.generate_filename()
|
||||
|
||||
if path is None:
|
||||
path = "recordings"
|
||||
|
|
@ -147,13 +140,6 @@ def to_sigmf(
|
|||
samples = multichannel_samples[0]
|
||||
|
||||
data_file_path = os.path.join(path, f"{filename}.sigmf-data")
|
||||
meta_file_path = os.path.join(path, f"{filename}.sigmf-meta")
|
||||
|
||||
if not overwrite:
|
||||
if os.path.isfile(data_file_path):
|
||||
raise IOError("File already exists")
|
||||
if os.path.isfile(meta_file_path):
|
||||
raise IOError("File already exists")
|
||||
|
||||
samples.tofile(data_file_path)
|
||||
global_info = {
|
||||
|
|
@ -202,7 +188,7 @@ def to_sigmf(
|
|||
meta_dict = sigMF_metafile.ordered_metadata()
|
||||
meta_dict["ria"] = metadata
|
||||
|
||||
sigMF_metafile.tofile(meta_file_path)
|
||||
sigMF_metafile.tofile(f"{os.path.join(path, filename)}.sigmf-meta")
|
||||
|
||||
|
||||
def from_sigmf(file: os.PathLike | str) -> Recording:
|
||||
|
|
@ -264,12 +250,7 @@ def from_sigmf(file: os.PathLike | str) -> Recording:
|
|||
return output_recording
|
||||
|
||||
|
||||
def to_npy(
|
||||
recording: Recording,
|
||||
filename: Optional[str] = None,
|
||||
path: Optional[os.PathLike | str] = None,
|
||||
overwrite: bool = False,
|
||||
) -> str:
|
||||
def to_npy(recording: Recording, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None) -> str:
|
||||
"""Write recording to ``.npy`` binary file.
|
||||
|
||||
:param recording: The recording to be written to file.
|
||||
|
|
@ -296,7 +277,7 @@ def to_npy(
|
|||
if filename is not None:
|
||||
filename, _ = os.path.splitext(filename)
|
||||
else:
|
||||
filename = generate_filename(recording=recording)
|
||||
filename = recording.generate_filename()
|
||||
filename = filename + ".npy"
|
||||
|
||||
if path is None:
|
||||
|
|
@ -306,10 +287,6 @@ def to_npy(
|
|||
os.makedirs(path)
|
||||
fullpath = os.path.join(path, filename)
|
||||
|
||||
if not overwrite:
|
||||
if os.path.isfile(fullpath):
|
||||
raise IOError("File already exists")
|
||||
|
||||
data = np.array(recording.data)
|
||||
metadata = recording.metadata
|
||||
annotations = recording.annotations
|
||||
|
|
@ -353,37 +330,3 @@ def from_npy(file: os.PathLike | str) -> Recording:
|
|||
|
||||
recording = Recording(data=data, metadata=metadata, annotations=annotations)
|
||||
return recording
|
||||
|
||||
|
||||
def generate_filename(recording: Recording, tag: Optional[str] = "rec"):
|
||||
"""Generate a filename from metadata.
|
||||
|
||||
:param tag: The string at the beginning of the generated filename. Default is "rec".
|
||||
:type tag: str, optional
|
||||
|
||||
:return: A filename without an extension.
|
||||
:rtype: str
|
||||
"""
|
||||
|
||||
tag = tag + "_"
|
||||
source = recording.metadata.get("source", "")
|
||||
if source != "":
|
||||
source = source + "_"
|
||||
|
||||
# converts 1000 to 1k for example
|
||||
center_frequency = str(Quantity(recording.metadata.get("center_frequency", 0)))
|
||||
if center_frequency != "0":
|
||||
num = center_frequency[:-1]
|
||||
suffix = center_frequency[-1]
|
||||
num = int(np.round(float(num)))
|
||||
else:
|
||||
num = 0
|
||||
suffix = ""
|
||||
center_frequency = str(num) + suffix + "Hz_"
|
||||
|
||||
timestamp = int(recording.timestamp)
|
||||
timestamp = datetime.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d_%H-%M-%S") + "_"
|
||||
|
||||
# Add first seven characters of rec_id for uniqueness
|
||||
rec_id = recording.rec_id[0:7]
|
||||
return tag + source + center_frequency + timestamp + rec_id
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ def test_npy_save_1(tmp_path):
|
|||
|
||||
# Save to tmp_path
|
||||
filename = tmp_path / "test"
|
||||
to_npy(filename=filename.name, path=tmp_path, recording=recording1, overwrite=True)
|
||||
to_npy(filename=filename.name, path=tmp_path, recording=recording1)
|
||||
|
||||
# Reload
|
||||
recording2 = from_npy(filename)
|
||||
|
|
@ -44,7 +44,7 @@ def test_npy_save_2(tmp_path):
|
|||
|
||||
# Save to tmp_path
|
||||
filename = tmp_path / "test"
|
||||
to_npy(filename=filename.name, path=tmp_path, recording=recording1, overwrite=True)
|
||||
to_npy(filename=filename.name, path=tmp_path, recording=recording1)
|
||||
|
||||
# Reload
|
||||
recording2 = from_npy(filename)
|
||||
|
|
@ -63,7 +63,7 @@ def test_npy_save_3(tmp_path):
|
|||
|
||||
# Save to tmp_path
|
||||
filename = tmp_path / "test"
|
||||
to_npy(filename=filename.name, path=tmp_path, recording=recording1, overwrite=True)
|
||||
to_npy(filename=filename.name, path=tmp_path, recording=recording1)
|
||||
|
||||
# Reload
|
||||
recording2 = from_npy(filename)
|
||||
|
|
@ -73,15 +73,6 @@ def test_npy_save_3(tmp_path):
|
|||
assert recording1.metadata == recording2.metadata
|
||||
|
||||
|
||||
def test_npy_save_4(tmp_path):
|
||||
recording1 = Recording(data=nd_complex_data_1)
|
||||
try:
|
||||
filename = tmp_path / "test"
|
||||
to_npy(filename=filename.name, path=tmp_path, recording=recording1)
|
||||
except IOError as e:
|
||||
assert str(e) == "File already exists"
|
||||
|
||||
|
||||
def test_npy_annotations(tmp_path):
|
||||
# Create annotations
|
||||
annotation1 = Annotation(sample_start=0, sample_count=100, freq_lower_edge=0, freq_upper_edge=100)
|
||||
|
|
@ -93,7 +84,7 @@ def test_npy_annotations(tmp_path):
|
|||
|
||||
# Save to tmp_path
|
||||
filename = tmp_path / "test"
|
||||
to_npy(filename=filename.name, path=tmp_path, recording=recording1, overwrite=True)
|
||||
to_npy(filename=filename.name, path=tmp_path, recording=recording1)
|
||||
|
||||
# Reload
|
||||
recording2 = from_npy(filename)
|
||||
|
|
@ -113,7 +104,7 @@ def test_load_recording_npy(tmp_path):
|
|||
|
||||
# Save to tmp_path
|
||||
filename = tmp_path / "test.npy"
|
||||
recording1.to_npy(path=tmp_path, filename=filename.name, overwrite=True)
|
||||
recording1.to_npy(path=tmp_path, filename=filename.name)
|
||||
|
||||
# Load from tmp_path
|
||||
recording2 = load_rec(filename)
|
||||
|
|
@ -139,7 +130,7 @@ def test_sigmf_1(tmp_path):
|
|||
|
||||
# Save to tmp_path in SigMF format
|
||||
filename = tmp_path / "test"
|
||||
to_sigmf(recording=recording1, path=tmp_path, filename=filename.name, overwrite=True)
|
||||
to_sigmf(recording=recording1, path=tmp_path, filename=filename.name)
|
||||
|
||||
# Reload
|
||||
recording2 = from_sigmf(filename)
|
||||
|
|
@ -167,7 +158,7 @@ def test_sigmf_2(tmp_path):
|
|||
|
||||
# Save to tmp_path using the base name
|
||||
filename = tmp_path / "test"
|
||||
to_sigmf(recording=recording1, path=tmp_path, filename=filename.name, overwrite=True)
|
||||
to_sigmf(recording=recording1, path=tmp_path, filename=filename.name)
|
||||
|
||||
# Load from tmp_path; from_sigmf expects the base name
|
||||
recording2 = from_sigmf(filename)
|
||||
|
|
@ -180,12 +171,3 @@ def test_sigmf_2(tmp_path):
|
|||
)
|
||||
|
||||
assert np.array_equal(recording1.data, recording2.data)
|
||||
|
||||
|
||||
def test_sigmf_3(tmp_path):
|
||||
recording1 = Recording(data=complex_data_1, metadata=sample_metadata)
|
||||
try:
|
||||
filename = tmp_path / "test"
|
||||
to_sigmf(recording=recording1, path=tmp_path, filename=filename.name)
|
||||
except IOError as e:
|
||||
assert str(e) == "File already exists"
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user