Compare commits
No commits in common. "24730850b07a0f040bcd14106c8ad93daf7cef91" and "77d1773370249c119446cbec1013cf8b0ca65a93" have entirely different histories.
24730850b0
...
77d1773370
|
|
@ -1,6 +1,7 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import copy
|
import copy
|
||||||
|
import datetime
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
|
@ -11,6 +12,7 @@ from typing import Any, Iterator, Optional
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from numpy.typing import ArrayLike
|
from numpy.typing import ArrayLike
|
||||||
|
from quantiphy import Quantity
|
||||||
|
|
||||||
from ria_toolkit_oss.datatypes.annotation import Annotation
|
from ria_toolkit_oss.datatypes.annotation import Annotation
|
||||||
|
|
||||||
|
|
@ -448,9 +450,7 @@ class Recording:
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Key {key} is protected and cannot be modified or removed.")
|
raise ValueError(f"Key {key} is protected and cannot be modified or removed.")
|
||||||
|
|
||||||
def to_sigmf(
|
def to_sigmf(self, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None) -> None:
|
||||||
self, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None, overwrite: bool = False
|
|
||||||
) -> None:
|
|
||||||
"""Write recording to a set of SigMF files.
|
"""Write recording to a set of SigMF files.
|
||||||
|
|
||||||
The SigMF io format is defined by the `SigMF Specification Project <https://github.com/sigmf/SigMF>`_
|
The SigMF io format is defined by the `SigMF Specification Project <https://github.com/sigmf/SigMF>`_
|
||||||
|
|
@ -468,11 +468,9 @@ class Recording:
|
||||||
"""
|
"""
|
||||||
from ria_toolkit_oss.io.recording import to_sigmf
|
from ria_toolkit_oss.io.recording import to_sigmf
|
||||||
|
|
||||||
to_sigmf(filename=filename, path=path, recording=self, overwrite=overwrite)
|
to_sigmf(filename=filename, path=path, recording=self)
|
||||||
|
|
||||||
def to_npy(
|
def to_npy(self, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None) -> str:
|
||||||
self, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None, overwrite: bool = False
|
|
||||||
) -> str:
|
|
||||||
"""Write recording to ``.npy`` binary file.
|
"""Write recording to ``.npy`` binary file.
|
||||||
|
|
||||||
:param filename: The name of the file where the recording is to be saved. Defaults to auto generated filename.
|
:param filename: The name of the file where the recording is to be saved. Defaults to auto generated filename.
|
||||||
|
|
@ -503,7 +501,7 @@ class Recording:
|
||||||
"""
|
"""
|
||||||
from ria_toolkit_oss.io.recording import to_npy
|
from ria_toolkit_oss.io.recording import to_npy
|
||||||
|
|
||||||
to_npy(recording=self, filename=filename, path=path, overwrite=overwrite)
|
to_npy(recording=self, filename=filename, path=path)
|
||||||
|
|
||||||
def trim(self, num_samples: int, start_sample: Optional[int] = 0) -> Recording:
|
def trim(self, num_samples: int, start_sample: Optional[int] = 0) -> Recording:
|
||||||
"""Trim Recording samples to a desired length, shifting annotations to maintain alignment.
|
"""Trim Recording samples to a desired length, shifting annotations to maintain alignment.
|
||||||
|
|
@ -596,6 +594,40 @@ class Recording:
|
||||||
scaled_data = self.data / np.max(abs(self.data))
|
scaled_data = self.data / np.max(abs(self.data))
|
||||||
return Recording(data=scaled_data, metadata=self.metadata, annotations=self.annotations)
|
return Recording(data=scaled_data, metadata=self.metadata, annotations=self.annotations)
|
||||||
|
|
||||||
|
def generate_filename(self, tag: Optional[str] = "rec"):
|
||||||
|
"""Generate a filename from metadata.
|
||||||
|
|
||||||
|
:param tag: The string at the beginning of the generated filename. Default is "rec".
|
||||||
|
:type tag: str, optional
|
||||||
|
|
||||||
|
:return: A filename without an extension.
|
||||||
|
:rtype: str
|
||||||
|
"""
|
||||||
|
# TODO: This method should be refactored to use the first 7 characters of the 'rec_id' field.
|
||||||
|
|
||||||
|
tag = tag + "_"
|
||||||
|
source = self.metadata.get("source", "")
|
||||||
|
if source != "":
|
||||||
|
source = source + "_"
|
||||||
|
|
||||||
|
# converts 1000 to 1k for example
|
||||||
|
center_frequency = str(Quantity(self.metadata.get("center_frequency", 0)))
|
||||||
|
if center_frequency != "0":
|
||||||
|
num = center_frequency[:-1]
|
||||||
|
suffix = center_frequency[-1]
|
||||||
|
num = int(np.round(float(num)))
|
||||||
|
else:
|
||||||
|
num = 0
|
||||||
|
suffix = ""
|
||||||
|
center_frequency = str(num) + suffix + "Hz_"
|
||||||
|
|
||||||
|
timestamp = int(self.timestamp)
|
||||||
|
timestamp = datetime.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d_%H-%M-%S") + "_"
|
||||||
|
|
||||||
|
# Add first seven characters of rec_id for uniqueness
|
||||||
|
rec_id = self.rec_id[0:7]
|
||||||
|
return tag + source + center_frequency + timestamp + rec_id
|
||||||
|
|
||||||
def __len__(self) -> int:
|
def __len__(self) -> int:
|
||||||
"""The length of a recording is defined by the number of complex samples in each channel of the recording."""
|
"""The length of a recording is defined by the number of complex samples in each channel of the recording."""
|
||||||
return self.shape[1]
|
return self.shape[1]
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,6 @@
|
||||||
Utilities for input/output operations on the ria_toolkit_oss.datatypes.Recording object.
|
Utilities for input/output operations on the ria_toolkit_oss.datatypes.Recording object.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import datetime
|
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
import os
|
import os
|
||||||
from datetime import timezone
|
from datetime import timezone
|
||||||
|
|
@ -10,7 +9,6 @@ from typing import Optional
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import sigmf
|
import sigmf
|
||||||
from quantiphy import Quantity
|
|
||||||
from sigmf import SigMFFile, sigmffile
|
from sigmf import SigMFFile, sigmffile
|
||||||
from sigmf.utils import get_data_type_str
|
from sigmf.utils import get_data_type_str
|
||||||
|
|
||||||
|
|
@ -94,12 +92,7 @@ def convert_to_serializable(obj):
|
||||||
raise TypeError(f"Value of type {type(obj)} is not JSON serializable: {obj}")
|
raise TypeError(f"Value of type {type(obj)} is not JSON serializable: {obj}")
|
||||||
|
|
||||||
|
|
||||||
def to_sigmf(
|
def to_sigmf(recording: Recording, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None) -> None:
|
||||||
recording: Recording,
|
|
||||||
filename: Optional[str] = None,
|
|
||||||
path: Optional[os.PathLike | str] = None,
|
|
||||||
overwrite: bool = False,
|
|
||||||
) -> None:
|
|
||||||
"""Write recording to a set of SigMF files.
|
"""Write recording to a set of SigMF files.
|
||||||
|
|
||||||
The SigMF io format is defined by the `SigMF Specification Project <https://github.com/sigmf/SigMF>`_
|
The SigMF io format is defined by the `SigMF Specification Project <https://github.com/sigmf/SigMF>`_
|
||||||
|
|
@ -128,7 +121,7 @@ def to_sigmf(
|
||||||
if filename is not None:
|
if filename is not None:
|
||||||
filename, _ = os.path.splitext(filename)
|
filename, _ = os.path.splitext(filename)
|
||||||
else:
|
else:
|
||||||
filename = generate_filename(recording=recording)
|
filename = recording.generate_filename()
|
||||||
|
|
||||||
if path is None:
|
if path is None:
|
||||||
path = "recordings"
|
path = "recordings"
|
||||||
|
|
@ -147,13 +140,6 @@ def to_sigmf(
|
||||||
samples = multichannel_samples[0]
|
samples = multichannel_samples[0]
|
||||||
|
|
||||||
data_file_path = os.path.join(path, f"{filename}.sigmf-data")
|
data_file_path = os.path.join(path, f"{filename}.sigmf-data")
|
||||||
meta_file_path = os.path.join(path, f"{filename}.sigmf-meta")
|
|
||||||
|
|
||||||
if not overwrite:
|
|
||||||
if os.path.isfile(data_file_path):
|
|
||||||
raise IOError("File already exists")
|
|
||||||
if os.path.isfile(meta_file_path):
|
|
||||||
raise IOError("File already exists")
|
|
||||||
|
|
||||||
samples.tofile(data_file_path)
|
samples.tofile(data_file_path)
|
||||||
global_info = {
|
global_info = {
|
||||||
|
|
@ -202,7 +188,7 @@ def to_sigmf(
|
||||||
meta_dict = sigMF_metafile.ordered_metadata()
|
meta_dict = sigMF_metafile.ordered_metadata()
|
||||||
meta_dict["ria"] = metadata
|
meta_dict["ria"] = metadata
|
||||||
|
|
||||||
sigMF_metafile.tofile(meta_file_path)
|
sigMF_metafile.tofile(f"{os.path.join(path, filename)}.sigmf-meta")
|
||||||
|
|
||||||
|
|
||||||
def from_sigmf(file: os.PathLike | str) -> Recording:
|
def from_sigmf(file: os.PathLike | str) -> Recording:
|
||||||
|
|
@ -264,12 +250,7 @@ def from_sigmf(file: os.PathLike | str) -> Recording:
|
||||||
return output_recording
|
return output_recording
|
||||||
|
|
||||||
|
|
||||||
def to_npy(
|
def to_npy(recording: Recording, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None) -> str:
|
||||||
recording: Recording,
|
|
||||||
filename: Optional[str] = None,
|
|
||||||
path: Optional[os.PathLike | str] = None,
|
|
||||||
overwrite: bool = False,
|
|
||||||
) -> str:
|
|
||||||
"""Write recording to ``.npy`` binary file.
|
"""Write recording to ``.npy`` binary file.
|
||||||
|
|
||||||
:param recording: The recording to be written to file.
|
:param recording: The recording to be written to file.
|
||||||
|
|
@ -296,7 +277,7 @@ def to_npy(
|
||||||
if filename is not None:
|
if filename is not None:
|
||||||
filename, _ = os.path.splitext(filename)
|
filename, _ = os.path.splitext(filename)
|
||||||
else:
|
else:
|
||||||
filename = generate_filename(recording=recording)
|
filename = recording.generate_filename()
|
||||||
filename = filename + ".npy"
|
filename = filename + ".npy"
|
||||||
|
|
||||||
if path is None:
|
if path is None:
|
||||||
|
|
@ -306,10 +287,6 @@ def to_npy(
|
||||||
os.makedirs(path)
|
os.makedirs(path)
|
||||||
fullpath = os.path.join(path, filename)
|
fullpath = os.path.join(path, filename)
|
||||||
|
|
||||||
if not overwrite:
|
|
||||||
if os.path.isfile(fullpath):
|
|
||||||
raise IOError("File already exists")
|
|
||||||
|
|
||||||
data = np.array(recording.data)
|
data = np.array(recording.data)
|
||||||
metadata = recording.metadata
|
metadata = recording.metadata
|
||||||
annotations = recording.annotations
|
annotations = recording.annotations
|
||||||
|
|
@ -353,37 +330,3 @@ def from_npy(file: os.PathLike | str) -> Recording:
|
||||||
|
|
||||||
recording = Recording(data=data, metadata=metadata, annotations=annotations)
|
recording = Recording(data=data, metadata=metadata, annotations=annotations)
|
||||||
return recording
|
return recording
|
||||||
|
|
||||||
|
|
||||||
def generate_filename(recording: Recording, tag: Optional[str] = "rec"):
|
|
||||||
"""Generate a filename from metadata.
|
|
||||||
|
|
||||||
:param tag: The string at the beginning of the generated filename. Default is "rec".
|
|
||||||
:type tag: str, optional
|
|
||||||
|
|
||||||
:return: A filename without an extension.
|
|
||||||
:rtype: str
|
|
||||||
"""
|
|
||||||
|
|
||||||
tag = tag + "_"
|
|
||||||
source = recording.metadata.get("source", "")
|
|
||||||
if source != "":
|
|
||||||
source = source + "_"
|
|
||||||
|
|
||||||
# converts 1000 to 1k for example
|
|
||||||
center_frequency = str(Quantity(recording.metadata.get("center_frequency", 0)))
|
|
||||||
if center_frequency != "0":
|
|
||||||
num = center_frequency[:-1]
|
|
||||||
suffix = center_frequency[-1]
|
|
||||||
num = int(np.round(float(num)))
|
|
||||||
else:
|
|
||||||
num = 0
|
|
||||||
suffix = ""
|
|
||||||
center_frequency = str(num) + suffix + "Hz_"
|
|
||||||
|
|
||||||
timestamp = int(recording.timestamp)
|
|
||||||
timestamp = datetime.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d_%H-%M-%S") + "_"
|
|
||||||
|
|
||||||
# Add first seven characters of rec_id for uniqueness
|
|
||||||
rec_id = recording.rec_id[0:7]
|
|
||||||
return tag + source + center_frequency + timestamp + rec_id
|
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ def test_npy_save_1(tmp_path):
|
||||||
|
|
||||||
# Save to tmp_path
|
# Save to tmp_path
|
||||||
filename = tmp_path / "test"
|
filename = tmp_path / "test"
|
||||||
to_npy(filename=filename.name, path=tmp_path, recording=recording1, overwrite=True)
|
to_npy(filename=filename.name, path=tmp_path, recording=recording1)
|
||||||
|
|
||||||
# Reload
|
# Reload
|
||||||
recording2 = from_npy(filename)
|
recording2 = from_npy(filename)
|
||||||
|
|
@ -44,7 +44,7 @@ def test_npy_save_2(tmp_path):
|
||||||
|
|
||||||
# Save to tmp_path
|
# Save to tmp_path
|
||||||
filename = tmp_path / "test"
|
filename = tmp_path / "test"
|
||||||
to_npy(filename=filename.name, path=tmp_path, recording=recording1, overwrite=True)
|
to_npy(filename=filename.name, path=tmp_path, recording=recording1)
|
||||||
|
|
||||||
# Reload
|
# Reload
|
||||||
recording2 = from_npy(filename)
|
recording2 = from_npy(filename)
|
||||||
|
|
@ -63,7 +63,7 @@ def test_npy_save_3(tmp_path):
|
||||||
|
|
||||||
# Save to tmp_path
|
# Save to tmp_path
|
||||||
filename = tmp_path / "test"
|
filename = tmp_path / "test"
|
||||||
to_npy(filename=filename.name, path=tmp_path, recording=recording1, overwrite=True)
|
to_npy(filename=filename.name, path=tmp_path, recording=recording1)
|
||||||
|
|
||||||
# Reload
|
# Reload
|
||||||
recording2 = from_npy(filename)
|
recording2 = from_npy(filename)
|
||||||
|
|
@ -73,15 +73,6 @@ def test_npy_save_3(tmp_path):
|
||||||
assert recording1.metadata == recording2.metadata
|
assert recording1.metadata == recording2.metadata
|
||||||
|
|
||||||
|
|
||||||
def test_npy_save_4(tmp_path):
|
|
||||||
recording1 = Recording(data=nd_complex_data_1)
|
|
||||||
try:
|
|
||||||
filename = tmp_path / "test"
|
|
||||||
to_npy(filename=filename.name, path=tmp_path, recording=recording1)
|
|
||||||
except IOError as e:
|
|
||||||
assert str(e) == "File already exists"
|
|
||||||
|
|
||||||
|
|
||||||
def test_npy_annotations(tmp_path):
|
def test_npy_annotations(tmp_path):
|
||||||
# Create annotations
|
# Create annotations
|
||||||
annotation1 = Annotation(sample_start=0, sample_count=100, freq_lower_edge=0, freq_upper_edge=100)
|
annotation1 = Annotation(sample_start=0, sample_count=100, freq_lower_edge=0, freq_upper_edge=100)
|
||||||
|
|
@ -93,7 +84,7 @@ def test_npy_annotations(tmp_path):
|
||||||
|
|
||||||
# Save to tmp_path
|
# Save to tmp_path
|
||||||
filename = tmp_path / "test"
|
filename = tmp_path / "test"
|
||||||
to_npy(filename=filename.name, path=tmp_path, recording=recording1, overwrite=True)
|
to_npy(filename=filename.name, path=tmp_path, recording=recording1)
|
||||||
|
|
||||||
# Reload
|
# Reload
|
||||||
recording2 = from_npy(filename)
|
recording2 = from_npy(filename)
|
||||||
|
|
@ -113,7 +104,7 @@ def test_load_recording_npy(tmp_path):
|
||||||
|
|
||||||
# Save to tmp_path
|
# Save to tmp_path
|
||||||
filename = tmp_path / "test.npy"
|
filename = tmp_path / "test.npy"
|
||||||
recording1.to_npy(path=tmp_path, filename=filename.name, overwrite=True)
|
recording1.to_npy(path=tmp_path, filename=filename.name)
|
||||||
|
|
||||||
# Load from tmp_path
|
# Load from tmp_path
|
||||||
recording2 = load_rec(filename)
|
recording2 = load_rec(filename)
|
||||||
|
|
@ -139,7 +130,7 @@ def test_sigmf_1(tmp_path):
|
||||||
|
|
||||||
# Save to tmp_path in SigMF format
|
# Save to tmp_path in SigMF format
|
||||||
filename = tmp_path / "test"
|
filename = tmp_path / "test"
|
||||||
to_sigmf(recording=recording1, path=tmp_path, filename=filename.name, overwrite=True)
|
to_sigmf(recording=recording1, path=tmp_path, filename=filename.name)
|
||||||
|
|
||||||
# Reload
|
# Reload
|
||||||
recording2 = from_sigmf(filename)
|
recording2 = from_sigmf(filename)
|
||||||
|
|
@ -167,7 +158,7 @@ def test_sigmf_2(tmp_path):
|
||||||
|
|
||||||
# Save to tmp_path using the base name
|
# Save to tmp_path using the base name
|
||||||
filename = tmp_path / "test"
|
filename = tmp_path / "test"
|
||||||
to_sigmf(recording=recording1, path=tmp_path, filename=filename.name, overwrite=True)
|
to_sigmf(recording=recording1, path=tmp_path, filename=filename.name)
|
||||||
|
|
||||||
# Load from tmp_path; from_sigmf expects the base name
|
# Load from tmp_path; from_sigmf expects the base name
|
||||||
recording2 = from_sigmf(filename)
|
recording2 = from_sigmf(filename)
|
||||||
|
|
@ -180,12 +171,3 @@ def test_sigmf_2(tmp_path):
|
||||||
)
|
)
|
||||||
|
|
||||||
assert np.array_equal(recording1.data, recording2.data)
|
assert np.array_equal(recording1.data, recording2.data)
|
||||||
|
|
||||||
|
|
||||||
def test_sigmf_3(tmp_path):
|
|
||||||
recording1 = Recording(data=complex_data_1, metadata=sample_metadata)
|
|
||||||
try:
|
|
||||||
filename = tmp_path / "test"
|
|
||||||
to_sigmf(recording=recording1, path=tmp_path, filename=filename.name)
|
|
||||||
except IOError as e:
|
|
||||||
assert str(e) == "File already exists"
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user