Compare commits

..

3 Commits

Author SHA1 Message Date
806fcf8293 added tests for cli 2025-12-11 15:59:08 -05:00
155b13928b changed file to a string 2025-12-11 15:12:01 -05:00
54b41c246e changed load_rec to load_recording 2025-12-11 13:37:16 -05:00
19 changed files with 4569 additions and 38 deletions

87
poetry.lock generated
View File

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. # This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand.
[[package]] [[package]]
name = "alabaster" name = "alabaster"
@ -1116,6 +1116,89 @@ files = [
{file = "pytz-2025.2.tar.gz", hash = "sha256:360b9e3dbb49a209c21ad61809c7fb453643e048b38924c765813546746e81c3"}, {file = "pytz-2025.2.tar.gz", hash = "sha256:360b9e3dbb49a209c21ad61809c7fb453643e048b38924c765813546746e81c3"},
] ]
[[package]]
name = "pyyaml"
version = "6.0.3"
description = "YAML parser and emitter for Python"
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "PyYAML-6.0.3-cp38-cp38-macosx_10_13_x86_64.whl", hash = "sha256:c2514fceb77bc5e7a2f7adfaa1feb2fb311607c9cb518dbc378688ec73d8292f"},
{file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9c57bb8c96f6d1808c030b1687b9b5fb476abaa47f0db9c0101f5e9f394e97f4"},
{file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:efd7b85f94a6f21e4932043973a7ba2613b059c4a000551892ac9f1d11f5baf3"},
{file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:22ba7cfcad58ef3ecddc7ed1db3409af68d023b7f940da23c6c2a1890976eda6"},
{file = "PyYAML-6.0.3-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:6344df0d5755a2c9a276d4473ae6b90647e216ab4757f8426893b5dd2ac3f369"},
{file = "PyYAML-6.0.3-cp38-cp38-win32.whl", hash = "sha256:3ff07ec89bae51176c0549bc4c63aa6202991da2d9a6129d7aef7f1407d3f295"},
{file = "PyYAML-6.0.3-cp38-cp38-win_amd64.whl", hash = "sha256:5cf4e27da7e3fbed4d6c3d8e797387aaad68102272f8f9752883bc32d61cb87b"},
{file = "pyyaml-6.0.3-cp310-cp310-macosx_10_13_x86_64.whl", hash = "sha256:214ed4befebe12df36bcc8bc2b64b396ca31be9304b8f59e25c11cf94a4c033b"},
{file = "pyyaml-6.0.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:02ea2dfa234451bbb8772601d7b8e426c2bfa197136796224e50e35a78777956"},
{file = "pyyaml-6.0.3-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b30236e45cf30d2b8e7b3e85881719e98507abed1011bf463a8fa23e9c3e98a8"},
{file = "pyyaml-6.0.3-cp310-cp310-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:66291b10affd76d76f54fad28e22e51719ef9ba22b29e1d7d03d6777a9174198"},
{file = "pyyaml-6.0.3-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9c7708761fccb9397fe64bbc0395abcae8c4bf7b0eac081e12b809bf47700d0b"},
{file = "pyyaml-6.0.3-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:418cf3f2111bc80e0933b2cd8cd04f286338bb88bdc7bc8e6dd775ebde60b5e0"},
{file = "pyyaml-6.0.3-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:5e0b74767e5f8c593e8c9b5912019159ed0533c70051e9cce3e8b6aa699fcd69"},
{file = "pyyaml-6.0.3-cp310-cp310-win32.whl", hash = "sha256:28c8d926f98f432f88adc23edf2e6d4921ac26fb084b028c733d01868d19007e"},
{file = "pyyaml-6.0.3-cp310-cp310-win_amd64.whl", hash = "sha256:bdb2c67c6c1390b63c6ff89f210c8fd09d9a1217a465701eac7316313c915e4c"},
{file = "pyyaml-6.0.3-cp311-cp311-macosx_10_13_x86_64.whl", hash = "sha256:44edc647873928551a01e7a563d7452ccdebee747728c1080d881d68af7b997e"},
{file = "pyyaml-6.0.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:652cb6edd41e718550aad172851962662ff2681490a8a711af6a4d288dd96824"},
{file = "pyyaml-6.0.3-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:10892704fc220243f5305762e276552a0395f7beb4dbf9b14ec8fd43b57f126c"},
{file = "pyyaml-6.0.3-cp311-cp311-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:850774a7879607d3a6f50d36d04f00ee69e7fc816450e5f7e58d7f17f1ae5c00"},
{file = "pyyaml-6.0.3-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:b8bb0864c5a28024fac8a632c443c87c5aa6f215c0b126c449ae1a150412f31d"},
{file = "pyyaml-6.0.3-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:1d37d57ad971609cf3c53ba6a7e365e40660e3be0e5175fa9f2365a379d6095a"},
{file = "pyyaml-6.0.3-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:37503bfbfc9d2c40b344d06b2199cf0e96e97957ab1c1b546fd4f87e53e5d3e4"},
{file = "pyyaml-6.0.3-cp311-cp311-win32.whl", hash = "sha256:8098f252adfa6c80ab48096053f512f2321f0b998f98150cea9bd23d83e1467b"},
{file = "pyyaml-6.0.3-cp311-cp311-win_amd64.whl", hash = "sha256:9f3bfb4965eb874431221a3ff3fdcddc7e74e3b07799e0e84ca4a0f867d449bf"},
{file = "pyyaml-6.0.3-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:7f047e29dcae44602496db43be01ad42fc6f1cc0d8cd6c83d342306c32270196"},
{file = "pyyaml-6.0.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:fc09d0aa354569bc501d4e787133afc08552722d3ab34836a80547331bb5d4a0"},
{file = "pyyaml-6.0.3-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9149cad251584d5fb4981be1ecde53a1ca46c891a79788c0df828d2f166bda28"},
{file = "pyyaml-6.0.3-cp312-cp312-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:5fdec68f91a0c6739b380c83b951e2c72ac0197ace422360e6d5a959d8d97b2c"},
{file = "pyyaml-6.0.3-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:ba1cc08a7ccde2d2ec775841541641e4548226580ab850948cbfda66a1befcdc"},
{file = "pyyaml-6.0.3-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:8dc52c23056b9ddd46818a57b78404882310fb473d63f17b07d5c40421e47f8e"},
{file = "pyyaml-6.0.3-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:41715c910c881bc081f1e8872880d3c650acf13dfa8214bad49ed4cede7c34ea"},
{file = "pyyaml-6.0.3-cp312-cp312-win32.whl", hash = "sha256:96b533f0e99f6579b3d4d4995707cf36df9100d67e0c8303a0c55b27b5f99bc5"},
{file = "pyyaml-6.0.3-cp312-cp312-win_amd64.whl", hash = "sha256:5fcd34e47f6e0b794d17de1b4ff496c00986e1c83f7ab2fb8fcfe9616ff7477b"},
{file = "pyyaml-6.0.3-cp312-cp312-win_arm64.whl", hash = "sha256:64386e5e707d03a7e172c0701abfb7e10f0fb753ee1d773128192742712a98fd"},
{file = "pyyaml-6.0.3-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:8da9669d359f02c0b91ccc01cac4a67f16afec0dac22c2ad09f46bee0697eba8"},
{file = "pyyaml-6.0.3-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:2283a07e2c21a2aa78d9c4442724ec1eb15f5e42a723b99cb3d822d48f5f7ad1"},
{file = "pyyaml-6.0.3-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ee2922902c45ae8ccada2c5b501ab86c36525b883eff4255313a253a3160861c"},
{file = "pyyaml-6.0.3-cp313-cp313-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:a33284e20b78bd4a18c8c2282d549d10bc8408a2a7ff57653c0cf0b9be0afce5"},
{file = "pyyaml-6.0.3-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:0f29edc409a6392443abf94b9cf89ce99889a1dd5376d94316ae5145dfedd5d6"},
{file = "pyyaml-6.0.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:f7057c9a337546edc7973c0d3ba84ddcdf0daa14533c2065749c9075001090e6"},
{file = "pyyaml-6.0.3-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:eda16858a3cab07b80edaf74336ece1f986ba330fdb8ee0d6c0d68fe82bc96be"},
{file = "pyyaml-6.0.3-cp313-cp313-win32.whl", hash = "sha256:d0eae10f8159e8fdad514efdc92d74fd8d682c933a6dd088030f3834bc8e6b26"},
{file = "pyyaml-6.0.3-cp313-cp313-win_amd64.whl", hash = "sha256:79005a0d97d5ddabfeeea4cf676af11e647e41d81c9a7722a193022accdb6b7c"},
{file = "pyyaml-6.0.3-cp313-cp313-win_arm64.whl", hash = "sha256:5498cd1645aa724a7c71c8f378eb29ebe23da2fc0d7a08071d89469bf1d2defb"},
{file = "pyyaml-6.0.3-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:8d1fab6bb153a416f9aeb4b8763bc0f22a5586065f86f7664fc23339fc1c1fac"},
{file = "pyyaml-6.0.3-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:34d5fcd24b8445fadc33f9cf348c1047101756fd760b4dacb5c3e99755703310"},
{file = "pyyaml-6.0.3-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:501a031947e3a9025ed4405a168e6ef5ae3126c59f90ce0cd6f2bfc477be31b7"},
{file = "pyyaml-6.0.3-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:b3bc83488de33889877a0f2543ade9f70c67d66d9ebb4ac959502e12de895788"},
{file = "pyyaml-6.0.3-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c458b6d084f9b935061bc36216e8a69a7e293a2f1e68bf956dcd9e6cbcd143f5"},
{file = "pyyaml-6.0.3-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:7c6610def4f163542a622a73fb39f534f8c101d690126992300bf3207eab9764"},
{file = "pyyaml-6.0.3-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:5190d403f121660ce8d1d2c1bb2ef1bd05b5f68533fc5c2ea899bd15f4399b35"},
{file = "pyyaml-6.0.3-cp314-cp314-win_amd64.whl", hash = "sha256:4a2e8cebe2ff6ab7d1050ecd59c25d4c8bd7e6f400f5f82b96557ac0abafd0ac"},
{file = "pyyaml-6.0.3-cp314-cp314-win_arm64.whl", hash = "sha256:93dda82c9c22deb0a405ea4dc5f2d0cda384168e466364dec6255b293923b2f3"},
{file = "pyyaml-6.0.3-cp314-cp314t-macosx_10_13_x86_64.whl", hash = "sha256:02893d100e99e03eda1c8fd5c441d8c60103fd175728e23e431db1b589cf5ab3"},
{file = "pyyaml-6.0.3-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:c1ff362665ae507275af2853520967820d9124984e0f7466736aea23d8611fba"},
{file = "pyyaml-6.0.3-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6adc77889b628398debc7b65c073bcb99c4a0237b248cacaf3fe8a557563ef6c"},
{file = "pyyaml-6.0.3-cp314-cp314t-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:a80cb027f6b349846a3bf6d73b5e95e782175e52f22108cfa17876aaeff93702"},
{file = "pyyaml-6.0.3-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:00c4bdeba853cc34e7dd471f16b4114f4162dc03e6b7afcc2128711f0eca823c"},
{file = "pyyaml-6.0.3-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:66e1674c3ef6f541c35191caae2d429b967b99e02040f5ba928632d9a7f0f065"},
{file = "pyyaml-6.0.3-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:16249ee61e95f858e83976573de0f5b2893b3677ba71c9dd36b9cf8be9ac6d65"},
{file = "pyyaml-6.0.3-cp314-cp314t-win_amd64.whl", hash = "sha256:4ad1906908f2f5ae4e5a8ddfce73c320c2a1429ec52eafd27138b7f1cbe341c9"},
{file = "pyyaml-6.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:ebc55a14a21cb14062aa4162f906cd962b28e2e9ea38f9b4391244cd8de4ae0b"},
{file = "pyyaml-6.0.3-cp39-cp39-macosx_10_13_x86_64.whl", hash = "sha256:b865addae83924361678b652338317d1bd7e79b1f4596f96b96c77a5a34b34da"},
{file = "pyyaml-6.0.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:c3355370a2c156cffb25e876646f149d5d68f5e0a3ce86a5084dd0b64a994917"},
{file = "pyyaml-6.0.3-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3c5677e12444c15717b902a5798264fa7909e41153cdf9ef7ad571b704a63dd9"},
{file = "pyyaml-6.0.3-cp39-cp39-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:5ed875a24292240029e4483f9d4a4b8a1ae08843b9c54f43fcc11e404532a8a5"},
{file = "pyyaml-6.0.3-cp39-cp39-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:0150219816b6a1fa26fb4699fb7daa9caf09eb1999f3b70fb6e786805e80375a"},
{file = "pyyaml-6.0.3-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:fa160448684b4e94d80416c0fa4aac48967a969efe22931448d853ada8baf926"},
{file = "pyyaml-6.0.3-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:27c0abcb4a5dac13684a37f76e701e054692a9b2d3064b70f5e4eb54810553d7"},
{file = "pyyaml-6.0.3-cp39-cp39-win32.whl", hash = "sha256:1ebe39cb5fc479422b83de611d14e2c0d3bb2a18bbcb01f229ab3cfbd8fee7a0"},
{file = "pyyaml-6.0.3-cp39-cp39-win_amd64.whl", hash = "sha256:2e71d11abed7344e42a8849600193d15b6def118602c4c176f748e4583246007"},
{file = "pyyaml-6.0.3.tar.gz", hash = "sha256:d76623373421df22fb4cf8817020cbb7ef15c725b9d5e45f17e189bfc384190f"},
]
[[package]] [[package]]
name = "pyzmq" name = "pyzmq"
version = "27.1.0" version = "27.1.0"
@ -2136,4 +2219,4 @@ files = [
[metadata] [metadata]
lock-version = "2.1" lock-version = "2.1"
python-versions = ">=3.10" python-versions = ">=3.10"
content-hash = "546dd85a2ad750359310ff22acfe7bfd3ca764f025d19e3fd48a50cd431e64e5" content-hash = "65a8b8214ef247a1f9b8e936e1e2a8253d9a2c21517138f3bf41b5289d1208a0"

View File

@ -47,6 +47,7 @@ dependencies = [
"h5py (>=3.14.0,<4.0.0)", "h5py (>=3.14.0,<4.0.0)",
"pandas (>=2.3.2,<3.0.0)", "pandas (>=2.3.2,<3.0.0)",
"pyzmq (>=27.1.0,<28.0.0)", "pyzmq (>=27.1.0,<28.0.0)",
"pyyaml (>=6.0.3,<7.0.0)",
] ]
# [project.optional-dependencies] Commented out to prevent Tox tests from failing # [project.optional-dependencies] Commented out to prevent Tox tests from failing

View File

@ -326,6 +326,7 @@ def from_sigmf(file: os.PathLike | str) -> Recording:
:rtype: ria_toolkit_oss.datatypes.Recording :rtype: ria_toolkit_oss.datatypes.Recording
""" """
file = str(file)
if len(file) > 11: if len(file) > 11:
if file[-11:-5] != ".sigmf": if file[-11:-5] != ".sigmf":
file = file + ".sigmf-data" file = file + ".sigmf-data"

View File

@ -6,6 +6,7 @@ This module contains all the CLI bindings for the ria package.
from .capture import capture from .capture import capture
from .combine import combine from .combine import combine
from .convert import convert from .convert import convert
from .generate import generate
# Import all command functions # Import all command functions
from .discover import discover from .discover import discover
@ -18,7 +19,7 @@ from .transmit import transmit
from .view import view from .view import view
# Aliases # Aliases
# synth = generate synth = generate
# All commands will be automatically registered by cli.py # All commands will be automatically registered by cli.py
# Commands must be click.Command instances # Commands must be click.Command instances

View File

@ -5,58 +5,42 @@ from typing import Optional
import click import click
import numpy as np import numpy as np
import utils.signal.basic_signal_generator as basic_gen import ria_toolkit_oss.signal.basic_signal_generator as basic_gen
import yaml import yaml
from utils.data import Recording from ria_toolkit_oss.datatypes import Recording
from utils.signal.block_gen.continuous_modulation.fsk_modulator import FSKModulator from ria_toolkit_oss.signal.block_generator.continuous_modulation.fsk_modulator import FSKModulator
from utils.signal.block_generator.basic import FrequencyShift from ria_toolkit_oss.signal.block_generator.basic import FrequencyShift
from utils.signal.block_generator.data_types import DataType from ria_toolkit_oss.signal.block_generator.data_types import DataType
from utils.signal.block_generator.mapping.apsk_mapper import _APSKMapper from ria_toolkit_oss.signal.block_generator.mapping.apsk_mapper import _APSKMapper
from utils.signal.block_generator.mapping.cross_qam_mapper import _CrossQAMMapper from ria_toolkit_oss.signal.block_generator.mapping.cross_qam_mapper import _CrossQAMMapper
from utils.signal.block_generator.mapping.mapper import Mapper from ria_toolkit_oss.signal.block_generator.mapping.mapper import Mapper
from utils.signal.block_generator.modulation import ( from ria_toolkit_oss.signal.block_generator.symbol_modulation import (
GMSKModulator, GMSKModulator,
OOKModulator, OOKModulator,
OQPSKModulator, OQPSKModulator,
) )
from utils.signal.block_generator.pulse_shaping import ( from ria_toolkit_oss.signal.block_generator.pulse_shaping import (
RaisedCosineFilter, RaisedCosineFilter,
RootRaisedCosineFilter, RootRaisedCosineFilter,
Upsampling, Upsampling,
) )
from utils.signal.block_generator.source import ( from ria_toolkit_oss.signal.block_generator.source import (
LFMChirpSource, LFMChirpSource,
RandomBinarySource, BinarySource,
RecordingSource, RecordingSource,
SawtoothSource, SawtoothSource,
SquareSource, SquareSource,
) )
# Block Generator Imports # Block Generator Imports
from utils.signal.block_generator.source_block import SourceBlock from ria_toolkit_oss.signal.block_generator.source_block import SourceBlock
# Transforms for impairments from ria_toolkit_oss.transforms.iq_impairments import (
from utils.transforms.iq_channel_models import (
complex_multipath_rayleigh_channel,
rician_fading_channel,
)
from utils.transforms.iq_impairments import (
add_compression,
add_doppler,
add_gain_fluctuation,
add_phase_noise,
iq_imbalance, iq_imbalance,
) )
# NR 5G Import
try:
from utils.signal.block_gen.nr_5g.nr_5g_generator import NR5GGenerator
HAS_NR5G = True from ria_toolkit_oss_cli.ria_toolkit_oss.common import (
except ImportError:
HAS_NR5G = False
from utils_cli.utils.common import (
echo_progress, echo_progress,
echo_verbose, echo_verbose,
format_frequency, format_frequency,
@ -64,7 +48,7 @@ from utils_cli.utils.common import (
parse_metadata_args, parse_metadata_args,
save_recording, save_recording,
) )
from utils_cli.utils.config import load_user_config from ria_toolkit_oss_cli.ria_toolkit_oss.config import load_user_config
# Extend Mapper to support new types # Extend Mapper to support new types

View File

@ -210,7 +210,7 @@ def generate_recording(generate, input_file, sample_rate, verbose, legacy):
# Generate signal or load from file # Generate signal or load from file
if generate or input_file is None: if generate or input_file is None:
# Generate signal instead of loading from file # Generate signal instead of loading from file
from utils.signal.basic_signal_generator import ( from ria_toolkit_oss.signal.basic_signal_generator import (
chirp, chirp,
lfm_chirp_complex, lfm_chirp_complex,
sine, sine,

0
tests/__init__.py Normal file
View File

View File

@ -4,7 +4,7 @@ from ria_toolkit_oss.datatypes import Annotation, Recording
from ria_toolkit_oss.io.recording import ( from ria_toolkit_oss.io.recording import (
from_npy, from_npy,
from_sigmf, from_sigmf,
load_rec, load_recording,
to_npy, to_npy,
to_sigmf, to_sigmf,
) )
@ -116,7 +116,7 @@ def test_load_recording_npy(tmp_path):
recording1.to_npy(path=tmp_path, filename=filename.name, overwrite=True) recording1.to_npy(path=tmp_path, filename=filename.name, overwrite=True)
# Load from tmp_path # Load from tmp_path
recording2 = load_rec(filename) recording2 = load_recording(filename)
assert recording1.annotations == recording2.annotations assert recording1.annotations == recording2.annotations

View File

@ -0,0 +1,126 @@
# CLI Tests
Comprehensive test suite for the utils CLI commands.
## Test Structure
- `test_common.py` - Tests for common CLI utilities (YAML loading, metadata parsing, frequency formatting)
- `test_discover.py` - Tests for device discovery functionality
- `test_capture.py` - Tests for the capture command
- `test_transmit.py` - Tests for the transmit command
## Running Tests
### Run all CLI tests:
```bash
poetry run pytest tests/utils_cli/ -v
```
### Run specific test file:
```bash
poetry run pytest tests/utils_cli/test_common.py -v
poetry run pytest tests/utils_cli/test_discover.py -v
poetry run pytest tests/utils_cli/test_capture.py -v
```
### Run specific test class or function:
```bash
poetry run pytest tests/utils_cli/test_capture.py::TestCaptureCommand::test_capture_basic -v
poetry run pytest tests/utils_cli/test_common.py::test_parse_frequency -v
```
### Run with coverage:
```bash
poetry run pytest tests/utils_cli/ --cov=utils_cli --cov-report=html
```
## Test Coverage
### test_common.py
- YAML configuration file loading
- Metadata KEY=VALUE parsing
- Frequency parsing (scientific notation, suffixes)
- Frequency and sample rate formatting
### test_discover.py
- Device discovery for all supported SDR types (PlutoSDR, HackRF, BladeRF, USRP, RTL-SDR, ThinkRF)
- Device auto-selection logic
- Device connection testing
- CLI command options (--verbose, --json-output, --test, --type)
- Error handling for missing devices and multiple devices
### test_capture.py
- Basic capture functionality
- Parameter validation (sample rate, center frequency, duration/num-samples)
- Device auto-selection
- Multiple output formats (SigMF, NPY, WAV, Blue)
- Format auto-detection from file extension
- YAML configuration file loading
- Custom metadata handling
- Gain and bandwidth configuration
- Visualization saving (--save-image)
- Chunked capture for large recordings
- Verbose and quiet output modes
- Proper device cleanup on errors
### test_transmit.py
- TX device initialization (PlutoSDR, HackRF, BladeRF, USRP only)
- RX-only device rejection (RTL-SDR, ThinkRF)
- TX device auto-selection
- Input file loading (SigMF, NPY, WAV, Blue)
- Legacy NPY format support
- TX gain validation and range checking
- Sample rate mismatch warnings
- Transmission modes:
- Single transmission
- Repeat mode with delays
- Continuous transmission with safety warnings
- YAML configuration file loading
- Safety confirmations for continuous mode
- Proper device cleanup on errors
- Verbose and quiet output modes
## Mock Strategy
Tests use `unittest.mock` to:
- Mock SDR device instances to avoid requiring actual hardware
- Mock file I/O operations
- Mock discovery functions to simulate different device scenarios
- Verify proper function calls and parameters
## Adding New Tests
When adding new CLI commands, follow this pattern:
1. Create `test_<command>.py` in this directory
2. Use Click's `CliRunner` for testing CLI commands
3. Mock external dependencies (SDR devices, file I/O)
4. Test both success and error cases
5. Verify proper resource cleanup (device.close(), file handles, etc.)
Example:
```python
from click.testing import CliRunner
from unittest.mock import patch, MagicMock
def test_new_command():
runner = CliRunner()
with patch('module.dependency') as mock_dep:
mock_dep.return_value = expected_value
result = runner.invoke(command, ['--option', 'value'])
assert result.exit_code == 0
assert 'expected output' in result.output
```
## CI/CD Integration
These tests are designed to run in CI/CD pipelines without requiring actual SDR hardware. All hardware interactions are mocked.
## Notes
- Tests use temporary directories for file operations (cleaned up automatically)
- Device mocks simulate real SDR behavior without hardware dependencies
- Tests verify both command-line interface and underlying function behavior

View File

@ -0,0 +1 @@
"""Tests for utils CLI commands."""

View File

@ -0,0 +1,963 @@
"""Tests for the combine command."""
import tempfile
from pathlib import Path
import numpy as np
import pytest
from click.testing import CliRunner
from ria_toolkit_oss.datatypes import Annotation, Recording
from ria_toolkit_oss.io import load_recording, to_npy, to_sigmf
from ria_toolkit_oss_cli.cli import cli
class TestCombineHelp:
"""Test help and basic command structure."""
def test_help(self):
"""Test combine help."""
runner = CliRunner()
result = runner.invoke(cli, ["combine", "--help"])
assert result.exit_code == 0
assert "Combine multiple recordings" in result.output
assert "--mode" in result.output
assert "--align-mode" in result.output
def test_no_inputs(self):
"""Test error with no inputs."""
runner = CliRunner()
result = runner.invoke(cli, ["combine"])
assert result.exit_code != 0
def test_single_input(self):
"""Test error with only one input."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as tmpdir:
# Create test file
signal = np.arange(1000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 2e6})
to_npy(recording, filename=str(Path(tmpdir) / "test.npy"), overwrite=True)
result = runner.invoke(cli, ["combine", str(Path(tmpdir) / "test.npy"), str(Path(tmpdir) / "output.npy")])
assert result.exit_code != 0
class TestCombineConcat:
"""Test concatenate mode."""
@pytest.fixture
def test_recordings(self):
"""Create multiple test recording files."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create 3 recordings with different data
for i in range(3):
signal = np.arange(i * 1000, (i + 1) * 1000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 2e6})
to_npy(recording, filename=str(Path(tmpdir) / f"chunk{i}.npy"), overwrite=True)
yield tmpdir
def test_concat_basic(self, test_recordings):
"""Test basic concatenation."""
runner = CliRunner()
tmpdir = test_recordings
output_path = str(Path(tmpdir) / "combined.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "chunk0.npy"),
str(Path(tmpdir) / "chunk1.npy"),
str(Path(tmpdir) / "chunk2.npy"),
output_path,
],
)
assert result.exit_code == 0
assert Path(output_path).exists()
# Verify result
combined = load_recording(output_path)
assert combined.data.shape[1] == 3000
assert combined._metadata["combine_mode"] == "concat"
assert combined._metadata["num_inputs"] == 3
# Check data is correctly concatenated
assert np.allclose(combined.data[0, :1000], np.arange(0, 1000))
assert np.allclose(combined.data[0, 1000:2000], np.arange(1000, 2000))
assert np.allclose(combined.data[0, 2000:3000], np.arange(2000, 3000))
def test_concat_verbose(self, test_recordings):
"""Test concatenation with verbose output."""
runner = CliRunner()
tmpdir = test_recordings
output_path = str(Path(tmpdir) / "combined.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "chunk0.npy"),
str(Path(tmpdir) / "chunk1.npy"),
str(Path(tmpdir) / "chunk2.npy"),
output_path,
"--verbose",
],
)
assert result.exit_code == 0
assert "Combining 3 recordings" in result.output
assert "concat mode" in result.output
assert "Concatenating..." in result.output
def test_concat_with_annotations(self):
"""Test that annotations are preserved and shifted in concat mode."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create recordings with annotations
rec1 = Recording(data=np.ones(1000, dtype=np.complex64), metadata={"sample_rate": 2e6})
rec1._annotations.append(
Annotation(
sample_start=100, sample_count=200, freq_lower_edge=900e6, freq_upper_edge=920e6, label="test1"
)
)
rec2 = Recording(data=np.ones(1000, dtype=np.complex64) * 2, metadata={"sample_rate": 2e6})
rec2._annotations.append(
Annotation(
sample_start=100, sample_count=200, freq_lower_edge=900e6, freq_upper_edge=920e6, label="test2"
)
)
to_sigmf(rec1, filename="rec1", path=tmpdir, overwrite=True)
to_sigmf(rec2, filename="rec2", path=tmpdir, overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "combined.sigmf-data")
result = runner.invoke(
cli,
["combine", str(Path(tmpdir) / "rec1.sigmf-data"), str(Path(tmpdir) / "rec2.sigmf-data"), output_path],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert len(combined._annotations) == 2
# First annotation unchanged
assert combined._annotations[0].sample_start == 100
assert combined._annotations[0].label == "test1"
# Second annotation shifted by 1000 samples
assert combined._annotations[1].sample_start == 1100
assert combined._annotations[1].label == "test2"
class TestCombineAddSameLength:
"""Test add mode with same-length recordings."""
def test_add_basic(self):
"""Test basic add with same-length recordings."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create two recordings with same length
sig1 = np.ones(1000, dtype=np.complex64)
sig2 = np.ones(1000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "added.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
],
)
assert result.exit_code == 0
# Verify result
combined = load_recording(output_path)
assert combined.data.shape[1] == 1000
assert np.allclose(combined.data, 3 + 0j)
assert combined._metadata["combine_mode"] == "add"
assert combined._metadata["align_mode"] == "error"
def test_add_three_recordings(self):
"""Test adding three same-length recordings."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create three recordings
for i in range(1, 4):
sig = np.ones(1000, dtype=np.complex64) * i
rec = Recording(data=sig, metadata={"sample_rate": 2e6})
to_npy(rec, filename=str(Path(tmpdir) / f"sig{i}.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "added.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
str(Path(tmpdir) / "sig3.npy"),
output_path,
"--mode",
"add",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
# 1 + 2 + 3 = 6
assert np.allclose(combined.data, 6 + 0j)
class TestCombineAddAlignError:
"""Test add mode with error alignment (default)."""
def test_different_length_error(self):
"""Test that different lengths cause error by default."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create recordings with different lengths
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "output.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
],
)
assert result.exit_code != 0
assert "different lengths" in result.output
assert "--align-mode" in result.output
class TestCombineAddAlignTruncate:
"""Test add mode with truncate alignment."""
def test_truncate(self):
"""Test truncate to shortest recording."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "truncated.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"truncate",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert combined.data.shape[1] == 5000
assert np.allclose(combined.data, 3 + 0j)
class TestCombineAddAlignPad:
"""Test add mode with pad alignment."""
def test_pad(self):
"""Test zero-padding to longest recording."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "padded.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"pad",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert combined.data.shape[1] == 10000
# First 5000: 1 + 2 = 3
assert np.allclose(combined.data[0, :5000], 3 + 0j)
# Last 5000: 1 + 0 = 1
assert np.allclose(combined.data[0, 5000:], 1 + 0j)
class TestCombineAddAlignPadStart:
"""Test add mode with pad-start alignment."""
def test_pad_start(self):
"""Test pad-start at specific sample."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "pad_start.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"pad-start",
"--pad-start-sample",
"3000",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert combined.data.shape[1] == 10000
# Before 3000: 1 + 0 = 1
assert np.allclose(combined.data[0, :3000], 1 + 0j)
# 3000-8000: 1 + 2 = 3
assert np.allclose(combined.data[0, 3000:8000], 3 + 0j)
# After 8000: 1 + 0 = 1
assert np.allclose(combined.data[0, 8000:], 1 + 0j)
def test_pad_start_invalid(self):
"""Test invalid pad-start-sample."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "output.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"pad-start",
"--pad-start-sample",
"7000", # Too large
],
)
assert result.exit_code != 0
assert "exceeds max length" in result.output
class TestCombineAddAlignPadCenter:
"""Test add mode with pad-center alignment."""
def test_pad_center(self):
"""Test centering shorter recording."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "pad_center.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"pad-center",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert combined.data.shape[1] == 10000
# Before 2500: 1 + 0 = 1
assert np.allclose(combined.data[0, :2500], 1 + 0j)
# 2500-7500: 1 + 2 = 3
assert np.allclose(combined.data[0, 2500:7500], 3 + 0j)
# After 7500: 1 + 0 = 1
assert np.allclose(combined.data[0, 7500:], 1 + 0j)
class TestCombineAddAlignPadEnd:
"""Test add mode with pad-end alignment."""
def test_pad_end(self):
"""Test aligning end of recordings."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "pad_end.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"pad-end",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert combined.data.shape[1] == 10000
# First 5000: 1 + 0 = 1
assert np.allclose(combined.data[0, :5000], 1 + 0j)
# Last 5000: 1 + 2 = 3
assert np.allclose(combined.data[0, 5000:], 3 + 0j)
class TestCombineAddAlignRepeat:
"""Test add mode with repeat alignment."""
def test_repeat(self):
"""Test repeating shorter recording."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "repeated.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"repeat",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert combined.data.shape[1] == 10000
# Entire recording: 1 + 2 = 3 (pattern repeated)
assert np.allclose(combined.data, 3 + 0j)
def test_repeat_partial(self):
"""Test repeat with non-exact multiple."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.arange(3000, dtype=np.complex64)
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "repeated.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"repeat",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
# Check pattern repeats correctly
# First 3000: 1 + [0,1,2,...,2999]
assert np.allclose(combined.data[0, :3000], 1 + np.arange(3000))
# Next 3000: 1 + [0,1,2,...,2999]
assert np.allclose(combined.data[0, 3000:6000], 1 + np.arange(3000))
# Next 3000: 1 + [0,1,2,...,2999]
assert np.allclose(combined.data[0, 6000:9000], 1 + np.arange(3000))
# Last 1000: 1 + [0,1,2,...,999]
assert np.allclose(combined.data[0, 9000:10000], 1 + np.arange(1000))
class TestCombineAddAlignRepeatSpaced:
"""Test add mode with repeat-spaced alignment."""
def test_repeat_spaced(self):
"""Test repeating with spacing."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(2000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "repeat_spaced.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"repeat-spaced",
"--repeat-spacing",
"1000",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert combined.data.shape[1] == 10000
# First 2000: 1 + 2 = 3
assert np.allclose(combined.data[0, :2000], 3 + 0j)
# Next 1000 (gap): 1 + 0 = 1
assert np.allclose(combined.data[0, 2000:3000], 1 + 0j)
# Next 2000: 1 + 2 = 3
assert np.allclose(combined.data[0, 3000:5000], 3 + 0j)
# Next 1000 (gap): 1 + 0 = 1
assert np.allclose(combined.data[0, 5000:6000], 1 + 0j)
def test_repeat_spaced_missing_spacing(self):
"""Test error when spacing not provided."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "output.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"repeat-spaced",
# Missing --repeat-spacing
],
)
assert result.exit_code != 0
assert "requires --repeat-spacing" in result.output
class TestCombineValidation:
"""Test validation and error handling."""
def test_sample_rate_mismatch(self):
"""Test error on sample rate mismatch in add mode."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(1000, dtype=np.complex64)
sig2 = np.ones(1000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 1e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "output.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
],
)
assert result.exit_code != 0
assert "different sample rates" in result.output
def test_channel_count_mismatch(self):
"""Test error on channel count mismatch."""
with tempfile.TemporaryDirectory() as tmpdir:
# Single channel
sig1 = np.ones((1, 1000), dtype=np.complex64)
# Two channels
sig2 = np.ones((2, 1000), dtype=np.complex64)
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "output.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
],
)
assert result.exit_code != 0
assert "different channel counts" in result.output
def test_overwrite_protection(self):
"""Test overwrite protection."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create test recordings
sig1 = np.ones(1000, dtype=np.complex64)
sig2 = np.ones(1000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
# Create existing output file
existing = Recording(data=np.zeros(100, dtype=np.complex64), metadata={})
output_path = str(Path(tmpdir) / "output.npy")
to_npy(existing, filename=output_path, overwrite=True)
runner = CliRunner()
# Should fail without --overwrite
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
],
)
assert result.exit_code != 0
assert "already exists" in result.output
# Should succeed with --overwrite
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
"--overwrite",
],
)
assert result.exit_code == 0
class TestCombineOutputOptions:
"""Test output format and options."""
def test_output_formats(self):
"""Test different output formats."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create test recordings
sig1 = np.ones(1000, dtype=np.complex64)
sig2 = np.ones(1000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
runner = CliRunner()
# Test SigMF output
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
str(Path(tmpdir) / "output.sigmf-data"),
"--mode",
"add",
],
)
assert result.exit_code == 0
assert Path(tmpdir, "output.sigmf-data").exists()
assert Path(tmpdir, "output.sigmf-meta").exists()
def test_normalize(self):
"""Test normalize option."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(1000, dtype=np.complex64) * 10
sig2 = np.ones(1000, dtype=np.complex64) * 20
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "normalized.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
"--normalize",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
# Should be normalized to max magnitude 1
assert np.allclose(np.max(np.abs(combined.data)), 1.0)
assert combined._metadata.get("normalized") is True
def test_custom_metadata(self):
"""Test adding custom metadata."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(1000, dtype=np.complex64)
sig2 = np.ones(1000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "output.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
"--metadata",
"test_id=test123",
"--metadata",
"author=tester",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert combined._metadata["test_id"] == "test123"
assert combined._metadata["author"] == "tester"
class TestCombineVerboseQuiet:
"""Test verbose and quiet modes."""
def test_verbose(self):
"""Test verbose output."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(1000, dtype=np.complex64)
sig2 = np.ones(1000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "output.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
"--verbose",
],
)
assert result.exit_code == 0
assert "Loading" in result.output
assert "Done" in result.output
def test_quiet(self):
"""Test quiet output."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(1000, dtype=np.complex64)
sig2 = np.ones(1000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "output.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
"--quiet",
],
)
assert result.exit_code == 0
assert result.output == ""

View File

@ -0,0 +1,165 @@
# flake8: noqa
"""Tests for capture command."""
import os
import tempfile
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
import yaml
from click.testing import CliRunner
from ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture import (
auto_select_device,
capture,
get_sdr_device,
save_visualization,
)
class TestGetSdrDevice:
"""Tests for get_sdr_device function."""
def test_get_pluto_device(self):
"""Test getting PlutoSDR device."""
mock_sdr_class = MagicMock()
mock_sdr_instance = MagicMock()
mock_sdr_class.return_value = mock_sdr_instance
with patch.dict("sys.modules", {"src.ria_toolkit_oss.sdr.pluto": MagicMock(Pluto=mock_sdr_class)}):
device = get_sdr_device("pluto")
assert device is mock_sdr_instance
def test_get_hackrf_device(self):
"""Test getting HackRF device."""
mock_sdr_class = MagicMock()
mock_sdr_instance = MagicMock()
mock_sdr_class.return_value = mock_sdr_instance
with patch.dict("sys.modules", {"src.ria_toolkit_oss.sdr.hackrf": MagicMock(HackRF=mock_sdr_class)}):
device = get_sdr_device("hackrf")
assert device is mock_sdr_instance
def test_get_unknown_device(self):
"""Test getting unknown device type."""
from click.exceptions import ClickException
with pytest.raises(ClickException) as exc_info:
get_sdr_device("unknown_device")
assert "Unknown device type" in str(exc_info.value)
class TestAutoSelectDevice:
"""Tests for auto_select_device function."""
def test_auto_select_no_devices(self):
"""Test auto-select with no devices found."""
from click.exceptions import ClickException
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.list_all_devices") as mock_discover:
mock_discover.return_value = []
with pytest.raises(ClickException) as exc_info:
auto_select_device()
assert "No SDR devices found" in str(exc_info.value)
def test_auto_select_single_device(self):
"""Test auto-select with single device."""
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.list_all_devices") as mock_discover:
mock_discover.return_value = [{"type": "HackRF", "serial": "123456"}]
device_type = auto_select_device(quiet=True)
assert device_type == "hackrf"
def test_auto_select_single_device_with_warning(self):
"""Test auto-select shows warning when not quiet."""
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.list_all_devices") as mock_discover,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.click.echo") as mock_echo,
):
mock_discover.return_value = [{"type": "PlutoSDR", "uri": "ip:pluto.local"}]
device_type = auto_select_device(quiet=False)
assert device_type == "pluto"
# Should have called echo twice (warning + hint)
assert mock_echo.call_count == 2
def test_auto_select_multiple_devices(self):
"""Test auto-select with multiple devices raises error."""
from click.exceptions import ClickException
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.list_all_devices") as mock_discover:
mock_discover.return_value = [
{"type": "HackRF", "serial": "123456"},
{"type": "PlutoSDR", "uri": "ip:pluto.local"},
]
with pytest.raises(ClickException) as exc_info:
auto_select_device()
assert "Multiple devices found" in str(exc_info.value)
def test_auto_select_device_name_mapping(self):
"""Test device name mapping."""
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.list_all_devices") as mock_discover:
# Test various device name formats
test_cases = [
("PlutoSDR", "pluto"),
("HackRF", "hackrf"),
("BladeRF", "bladerf"),
("RTL-SDR", "rtlsdr"),
]
for device_name, expected_type in test_cases:
mock_discover.return_value = [{"type": device_name}]
device_type = auto_select_device(quiet=True)
assert device_type == expected_type
class TestSaveVisualization:
"""Tests for save_visualization function."""
def test_save_visualization_success(self):
"""Test successful visualization save."""
mock_recording = MagicMock()
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.view_simple_sig") as mock_view:
save_visualization(mock_recording, "test.png", quiet=True)
mock_view.assert_called_once_with(
mock_recording, output_path="test.png", saveplot=True, fast_mode=False, labels_mode=True
)
def test_save_visualization_import_error(self):
"""Test visualization save with import error."""
mock_recording = MagicMock()
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.view_simple_sig", side_effect=ImportError("Module not found")),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.click.echo") as mock_echo,
):
save_visualization(mock_recording, "test.png", quiet=True)
# Should catch error and echo warning
mock_echo.assert_called_once()
assert "Warning" in str(mock_echo.call_args)
def test_save_visualization_general_error(self):
"""Test visualization save with general error."""
mock_recording = MagicMock()
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.view_simple_sig", side_effect=Exception("Failed to plot")),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.click.echo") as mock_echo,
):
save_visualization(mock_recording, "test.png", quiet=True)
mock_echo.assert_called_once()
assert "Failed to save visualization" in str(mock_echo.call_args)

View File

@ -0,0 +1,118 @@
"""Tests for common CLI utilities."""
import os
import tempfile
import pytest
import yaml
from ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.common import (
format_frequency,
format_sample_rate,
load_yaml_config,
parse_frequency,
parse_metadata_args,
)
def test_load_yaml_config():
"""Test loading YAML configuration files."""
config_data = {
"device": "pluto",
"sample_rate": 2e6,
"center_frequency": "915e6",
"gain": 30,
"metadata": {"location": "test_lab", "experiment": "test_001"},
}
with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
yaml.dump(config_data, f)
config_file = f.name
try:
loaded_config = load_yaml_config(config_file)
assert loaded_config == config_data
assert loaded_config["device"] == "pluto"
assert loaded_config["sample_rate"] == 2e6
assert loaded_config["metadata"]["location"] == "test_lab"
finally:
os.unlink(config_file)
def test_load_yaml_config_empty():
"""Test loading empty YAML file."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
f.write("")
config_file = f.name
try:
loaded_config = load_yaml_config(config_file)
assert loaded_config == {}
finally:
os.unlink(config_file)
def test_parse_metadata_args():
"""Test parsing metadata KEY=VALUE arguments."""
metadata_args = ["location=test_lab", "experiment=001", "power=30", "frequency=2.4e9", "description=Test Signal"]
result = parse_metadata_args(metadata_args)
assert result["location"] == "test_lab"
assert result["experiment"] == "001" # String because doesn't parse as number
assert result["power"] == 30 # Integer
assert result["frequency"] == 2.4e9 # Float
assert result["description"] == "Test Signal"
def test_parse_metadata_args_invalid():
"""Test invalid metadata format raises error."""
from click.exceptions import ClickException
with pytest.raises(ClickException):
parse_metadata_args(["invalid_format"])
with pytest.raises(ClickException):
parse_metadata_args(["key1=value1", "invalid", "key2=value2"])
def test_parse_frequency():
"""Test frequency parsing with different formats."""
# Scientific notation
assert parse_frequency("915e6") == 915e6
assert parse_frequency("2.4e9") == 2.4e9
assert parse_frequency("433e6") == 433e6
# With suffixes
assert parse_frequency("915M") == 915e6
assert parse_frequency("2.4G") == 2.4e9
assert parse_frequency("433M") == 433e6
assert parse_frequency("100k") == 100e3
assert parse_frequency("100K") == 100e3
# Plain numbers
assert parse_frequency("915000000") == 915e6
assert parse_frequency("2400000000") == 2.4e9
# Edge cases
assert parse_frequency("0.915G") == 915e6
assert parse_frequency("915.0M") == 915e6
def test_format_frequency():
"""Test frequency formatting."""
assert format_frequency(915e6) == "915.00 MHz"
assert format_frequency(2.4e9) == "2.40 GHz"
assert format_frequency(433e6) == "433.00 MHz"
assert format_frequency(100e3) == "100.00 kHz"
assert format_frequency(1e3) == "1.00 kHz"
assert format_frequency(500) == "500.00 Hz"
def test_format_sample_rate():
"""Test sample rate formatting."""
assert format_sample_rate(20e6) == "20.00 MS/s"
assert format_sample_rate(2e6) == "2.00 MS/s"
assert format_sample_rate(100e3) == "100.00 kS/s"
assert format_sample_rate(1e3) == "1.00 kS/s"
assert format_sample_rate(500) == "500.00 S/s"

View File

@ -0,0 +1,190 @@
"""Tests for convert command."""
import os
import tempfile
from pathlib import Path
import pytest
from click.testing import CliRunner
from ria_toolkit_oss.ria_toolkit_oss_cli.cli import cli
class TestConvert:
"""Test convert command functionality."""
def test_convert_help(self):
"""Test convert command help."""
runner = CliRunner()
result = runner.invoke(cli, ["convert", "--help"])
assert result.exit_code == 0
assert "Convert recordings between file formats" in result.output
assert "--format" in result.output
assert "--legacy" in result.output
assert "--wav-sample-rate" in result.output
assert "--blue-format" in result.output
assert "--overwrite" in result.output
assert "--metadata" in result.output
def test_missing_arguments(self):
"""Test that missing arguments show error."""
runner = CliRunner()
result = runner.invoke(cli, ["convert"])
assert result.exit_code != 0
assert "Missing argument" in result.output or "Error" in result.output
def test_invalid_input_format(self):
"""Test handling of invalid input format."""
runner = CliRunner()
with tempfile.NamedTemporaryFile(suffix=".xyz", delete=False) as f:
try:
result = runner.invoke(cli, ["convert", f.name, "output.npy"])
assert result.exit_code != 0
assert "Unknown format" in result.output or "Supported" in result.output
finally:
os.unlink(f.name)
def test_overwrite_protection(self):
"""Test that overwrite protection works."""
runner = CliRunner()
# Create a dummy input file (will use actual test data if available)
test_input = "/home/qrf/workarea/ash/signal-testbed/recordings/iq2440MHz234233.npy"
if not os.path.exists(test_input):
pytest.skip("Test recording file not found")
with tempfile.TemporaryDirectory() as tmpdir:
output_file = os.path.join(tmpdir, "test.sigmf")
# First conversion should succeed
result = runner.invoke(cli, ["convert", test_input, output_file, "--legacy", "-q"])
assert result.exit_code == 0
# Second conversion without --overwrite should fail
result = runner.invoke(cli, ["convert", test_input, output_file, "--legacy"])
assert result.exit_code != 0
assert "exist" in result.output.lower()
assert "--overwrite" in result.output
# Third conversion with --overwrite should succeed
result = runner.invoke(cli, ["convert", test_input, output_file, "--legacy", "--overwrite", "-q"])
assert result.exit_code == 0
def test_metadata_override(self):
"""Test metadata override functionality."""
runner = CliRunner()
test_input = "/home/qrf/workarea/ash/signal-testbed/recordings/iq2440MHz234233.npy"
if not os.path.exists(test_input):
pytest.skip("Test recording file not found")
with tempfile.TemporaryDirectory() as tmpdir:
output_file = os.path.join(tmpdir, "test.sigmf")
result = runner.invoke(
cli,
[
"convert",
test_input,
output_file,
"--legacy",
"--metadata",
"test_key=test_value",
"--metadata",
"number=42",
"--metadata",
"float_val=3.14",
"-v",
],
)
assert result.exit_code == 0
assert "test_key" in result.output
assert "number" in result.output
assert "float_val" in result.output
def test_format_detection(self):
"""Test that format detection works for different extensions."""
runner = CliRunner()
test_input = "/home/qrf/workarea/ash/signal-testbed/recordings/iq2440MHz234233.npy"
if not os.path.exists(test_input):
pytest.skip("Test recording file not found")
with tempfile.TemporaryDirectory() as tmpdir:
# Test NPY to SigMF
sigmf_out = os.path.join(tmpdir, "test.sigmf")
result = runner.invoke(cli, ["convert", test_input, sigmf_out, "--legacy", "-q"])
assert result.exit_code == 0
assert Path(sigmf_out).with_suffix(".sigmf-data").exists()
assert Path(sigmf_out).with_suffix(".sigmf-meta").exists()
# Test NPY to NPY
npy_out = os.path.join(tmpdir, "test.npy")
result = runner.invoke(cli, ["convert", test_input, npy_out, "--legacy", "-q"])
assert result.exit_code == 0
assert Path(npy_out).exists()
def test_wav_conversion_with_decimation(self):
"""Test WAV conversion with sample rate decimation."""
runner = CliRunner()
test_input = "/home/qrf/workarea/ash/signal-testbed/recordings/iq2440MHz234233.npy"
if not os.path.exists(test_input):
pytest.skip("Test recording file not found")
with tempfile.TemporaryDirectory() as tmpdir:
wav_out = os.path.join(tmpdir, "test.wav")
result = runner.invoke(
cli, ["convert", test_input, wav_out, "--legacy", "--wav-sample-rate", "48000", "--wav-bits", "16"]
)
assert result.exit_code == 0
assert "Decimation factor" in result.output
assert Path(wav_out).exists()
# Check file is non-empty
assert os.path.getsize(wav_out) > 0
def test_blue_format_conversion(self):
"""Test MIDAS Blue format conversion."""
runner = CliRunner()
test_input = "/home/qrf/workarea/ash/signal-testbed/recordings/iq2440MHz234233.npy"
if not os.path.exists(test_input):
pytest.skip("Test recording file not found")
with tempfile.TemporaryDirectory() as tmpdir:
# Test each Blue format
for blue_fmt in ["CI", "CF", "CD"]:
blue_out = os.path.join(tmpdir, f"test_{blue_fmt}.blue")
result = runner.invoke(
cli, ["convert", test_input, blue_out, "--legacy", "--blue-format", blue_fmt, "-q"]
)
assert result.exit_code == 0
assert Path(blue_out).exists()
# Check file is non-empty
assert os.path.getsize(blue_out) > 0
def test_quiet_and_verbose_modes(self):
"""Test quiet and verbose output modes."""
runner = CliRunner()
test_input = "/home/qrf/workarea/ash/signal-testbed/recordings/iq2440MHz234233.npy"
if not os.path.exists(test_input):
pytest.skip("Test recording file not found")
with tempfile.TemporaryDirectory() as tmpdir:
# Test verbose mode
output_file = os.path.join(tmpdir, "test_verbose.sigmf")
result = runner.invoke(cli, ["convert", test_input, output_file, "--legacy", "-v"])
assert result.exit_code == 0
assert "Reading input" in result.output
assert "Metadata preserved" in result.output
# Test quiet mode
output_file = os.path.join(tmpdir, "test_quiet.npy")
result = runner.invoke(cli, ["convert", test_input, output_file, "--legacy", "-q"])
assert result.exit_code == 0
# Should have minimal output
assert len(result.output) < 100 or result.output.strip() == ""

View File

@ -0,0 +1,287 @@
"""Tests for discover command."""
import json
import re
from unittest.mock import MagicMock, patch
from click.testing import CliRunner
from ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover import ( # find_bladerf_devices,; find_thinkrf_devices,; find_uhd_devices,
discover,
discover_all_devices,
find_hackrf_devices,
find_pluto_devices,
find_rtlsdr_devices,
load_sdr_drivers,
)
def test_discover_pluto_no_devices():
"""Test PlutoSDR discovery with no devices."""
with (
patch.dict("sys.modules", {"iio": MagicMock()}) as mock_modules,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.get_usb_devices") as mock_usb,
):
mock_iio = mock_modules["iio"]
mock_iio.scan_contexts.return_value = {}
mock_usb.return_value = []
devices = find_pluto_devices()
assert devices == []
def test_discover_pluto_with_device():
"""Test PlutoSDR discovery with device present."""
with patch.dict("sys.modules", {"iio": MagicMock()}) as mock_modules:
mock_iio = mock_modules["iio"]
mock_ctx = MagicMock()
mock_ctx.attrs = {"hw_serial": "123456", "fw_version": "1.0"}
mock_ctx._destroy = MagicMock()
mock_iio.scan_contexts.return_value = {"ip:pluto.local": "PlutoSDR (ADALM-PLUTO)"}
mock_iio.Context.return_value = mock_ctx
devices = find_pluto_devices()
assert len(devices) == 1
assert devices[0]["type"] == "PlutoSDR"
assert devices[0]["serial"] == "123456"
assert devices[0]["firmware"] == "1.0"
assert devices[0]["uri"] == "ip:pluto.local"
def test_discover_hackrf_no_devices():
"""Test HackRF discovery with no devices."""
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.subprocess") as mock_subprocess:
mock_subprocess.check_output.return_value = ""
devices = find_hackrf_devices()
assert devices == []
def test_discover_hackrf_with_devices():
"""Test HackRF discovery with devices present."""
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.subprocess") as mock_subprocess:
mock_subprocess.check_output.return_value = """
hackrf_info version: 2023.01.1
libhackrf version: 2023.01.1 (0.8)
Found HackRF
Index: 0
Serial number: serial123
Board ID Number: 2 (HackRF One)
Firmware Version: v2.1.0 (API:1.08)
Part ID Number: 0xa000cb3c 0x005d4761
Index: 1
Serial number: serial456
Board ID Number: 2 (HackRF One)
Firmware Version: v2.1.0 (API:1.08)
Part ID Number: 0xa000cb3c 0x005d4761
"""
devices = find_hackrf_devices()
assert len(devices) == 2
assert devices[0]["type"] == "HackRF One"
assert devices[0]["serial"] == "serial123"
assert devices[0]["device_index"] == 0 or devices[0]["device_index"] == "0"
assert devices[1]["serial"] == "serial456"
assert devices[1]["device_index"] == 1 or devices[1]["device_index"] == "1"
def test_discover_rtlsdr_no_devices():
"""Test RTL-SDR discovery with no devices."""
with patch("ria_toolkit_oss.ria_toolkit_oss.ria_toolkit_oss.discover.subprocess") as mock_subprocess:
mock_subprocess.check_output.return_value = ""
devices = find_rtlsdr_devices()
assert devices == []
def test_discover_rtlsdr_with_devices():
"""Test RTL-SDR discovery with devices present."""
with patch("ria_toolkit_oss.ria_toolkit_oss.ria_toolkit_oss.discover.subprocess") as mock_subprocess:
mock_subprocess.check_output.return_value = """
Found 2 device(s):
0: RTLSDRBlog, Blog V4, SN: 00000001
1: RTLSDRBlog, Blog V4, SN: 00000002
Using device 0: Generic RTL2832U OEM
Found Rafael Micro R828D tuner
RTL-SDR Blog V4 Detected
"""
devices = find_rtlsdr_devices()
assert len(devices) == 2
assert devices[0]["type"] == "RTL-SDR"
assert devices[0]["serial"] == "00000001"
assert devices[0]["device_index"] == 0 or devices[0]["device_index"] == "0"
def test_discover_all_devices_filter():
"""Test discovering devices with type filter."""
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_pluto_devices") as mock_pluto,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_hackrf_devices") as mock_hackrf,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_bladerf_devices") as mock_bladerf,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_uhd_devices") as mock_usrp,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_rtlsdr_devices") as mock_rtlsdr,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_thinkrf_devices") as mock_thinkrf,
):
mock_pluto.return_value = [{"type": "PlutoSDR", "uri": "ip:pluto.local"}]
mock_hackrf.return_value = []
mock_bladerf.return_value = []
mock_usrp.return_value = []
mock_rtlsdr.return_value = []
mock_thinkrf.return_value = []
# Test filtering by pluto
load_sdr_drivers(verbose=False)
devices = discover_all_devices()
mock_pluto.assert_called_once()
mock_hackrf.assert_called_once()
mock_bladerf.assert_called_once()
assert len(devices["devices"]) == 1
assert len(devices["pluto_devices"]) == 1
def test_discover_all_devices_no_filter():
"""Test discovering all device types."""
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_pluto_devices") as mock_pluto,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_hackrf_devices") as mock_hackrf,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_bladerf_devices") as mock_bladerf,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_uhd_devices") as mock_usrp,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_rtlsdr_devices") as mock_rtlsdr,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_thinkrf_devices") as mock_thinkrf,
):
mock_pluto.return_value = [{"type": "PlutoSDR", "uri": "ip:pluto.local"}]
mock_hackrf.return_value = [{"type": "HackRF"}]
mock_bladerf.return_value = []
mock_usrp.return_value = []
mock_rtlsdr.return_value = []
mock_thinkrf.return_value = []
load_sdr_drivers(verbose=False)
devices = discover_all_devices()
mock_pluto.assert_called_once()
mock_hackrf.assert_called_once()
mock_bladerf.assert_called_once()
mock_usrp.assert_called_once()
mock_rtlsdr.assert_called_once()
assert len(devices["devices"]) == 2
assert len(devices["pluto_devices"]) == 1
assert len(devices["hackrf_devices"]) == 1
def test_discover_command_no_devices():
"""Test discover CLI command with no devices."""
runner = CliRunner()
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.discover_all_devices") as mock_discover:
mock_discover.return_value = {
"loaded_drivers": [],
"failed_drivers": [],
"devices": [],
"total_devices": 0,
"uhd_devices": [],
"pluto_devices": [],
"rtlsdr_devices": [],
"bladerf_devices": [],
"hackrf_devices": [],
}
result = runner.invoke(discover)
assert result.exit_code == 0
assert "No devices detected" in result.output
def test_discover_command():
"""Test discover CLI command."""
runner = CliRunner()
result = runner.invoke(discover)
radios = ["USRP/UHD", "PlutoSDR", "RTL-SDR", "BladeRF", "HackRF", "ThinkRF"]
match = re.search(r"Detected devices: (\d+)", result.output)
if match:
total_devices = int(match.group(1))
else:
total_devices = 0
if result.exit_code == 0:
assert "Attached Devices" in result.output
assert "Discovery Summary" in result.output
if total_devices > 0:
assert any(radio in result.output for radio in radios)
else:
assert not any(radio in result.output for radio in radios)
else:
assert result.exit_code == 1
assert isinstance(result.exception, AttributeError)
assert "undefined symbol: iio_get_backends_count" in str(result.exception)
def test_discover_command_json_output():
"""Test discover CLI command with JSON output."""
runner = CliRunner()
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.discover_all_devices") as mock_discover:
mock_discover.return_value = {
"loaded_drivers": [],
"failed_drivers": [],
"devices": [{"type": "HackRF", "serial": "123456", "status": "available"}],
"total_devices": 1,
"uhd_devices": [],
"pluto_devices": [],
"rtlsdr_devices": [],
"bladerf_devices": [],
"hackrf_devices": [{"type": "HackRF", "serial": "123456", "status": "available"}],
}
result = runner.invoke(discover, ["--json-output"])
output_data = json.loads(result.output)
assert result.exit_code == 0
assert output_data["total_devices"] == 1
assert len(output_data["devices"]) == 1
assert output_data["devices"][0]["type"] == "HackRF"
def test_discover_command_verbose():
"""Test discover CLI command with verbose output."""
runner = CliRunner()
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.discover_all_devices") as mock_discover:
mock_discover.return_value = {
"loaded_drivers": [],
"failed_drivers": [],
"devices": [
{
"type": "PlutoSDR",
"serial": "123456",
"firmware": "1.0",
"uri": "ip:pluto.local",
"status": "available",
}
],
"total_devices": 1,
"uhd_devices": [],
"pluto_devices": [],
"rtlsdr_devices": [],
"bladerf_devices": [],
"hackrf_devices": [
{
"type": "PlutoSDR",
"serial": "123456",
"firmware": "1.0",
"uri": "ip:pluto.local",
"status": "available",
}
],
}
result = runner.invoke(discover, ["--verbose"])
assert result.exit_code == 0
assert "RTL-SDR devices: None found" in result.output or "\n rtlsdr:" in result.output

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,670 @@
"""Tests for split CLI command."""
import tempfile
from pathlib import Path
import numpy as np
import pytest
from click.testing import CliRunner
from ria_toolkit_oss.datatypes import Annotation, Recording
from ria_toolkit_oss.io import load_recording, to_sigmf
from ria_toolkit_oss.ria_toolkit_oss_cli.cli import cli
class TestSplitHelp:
"""Test split command help and basic functionality."""
def test_split_help(self):
"""Test split command help."""
runner = CliRunner()
result = runner.invoke(cli, ["split", "--help"])
assert result.exit_code == 0
assert "Split, trim, and extract portions of recordings" in result.output
assert "--split-at" in result.output
assert "--split-every" in result.output
assert "--split-duration" in result.output
assert "--trim" in result.output
assert "--extract-annotations" in result.output
def test_missing_arguments(self):
"""Test that missing arguments show error."""
runner = CliRunner()
result = runner.invoke(cli, ["split"])
assert result.exit_code != 0
assert "Missing argument" in result.output or "Error" in result.output
def test_no_operation_specified(self):
"""Test error when no operation is specified."""
runner = CliRunner()
# Create a test file
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.ones(1000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 1e6})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
test_file = str(Path(tmpdir) / "test.sigmf-data")
result = runner.invoke(cli, ["split", test_file])
assert result.exit_code != 0
assert "No operation specified" in result.output
class TestSplitTrim:
"""Test trim operations."""
@pytest.fixture
def test_recording(self):
"""Create a test recording file."""
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(10000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
yield str(Path(tmpdir) / "test.sigmf-data")
def test_trim_with_length(self, test_recording):
"""Test trim with --start and --length."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
[
"split",
test_recording,
"--trim",
"--start",
"1000",
"--length",
"5000",
"--output-dir",
outdir,
"-q",
],
)
assert result.exit_code == 0
# Verify output file exists
output_files = list(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 1
# Verify output has correct length
output_rec = load_recording(str(output_files[0]))
assert output_rec.data.shape[1] == 5000
assert output_rec.metadata["original_start_sample"] == 1000
assert output_rec.metadata["original_end_sample"] == 6000
assert output_rec.metadata["split_operation"] == "trim"
def test_trim_with_end(self, test_recording):
"""Test trim with --start and --end."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
["split", test_recording, "--trim", "--start", "2000", "--end", "7000", "--output-dir", outdir, "-q"],
)
assert result.exit_code == 0
output_files = list(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 1
output_rec = load_recording(str(output_files[0]))
assert output_rec.data.shape[1] == 5000
def test_trim_without_length_or_end(self, test_recording):
"""Test that trim requires --length or --end."""
runner = CliRunner()
result = runner.invoke(cli, ["split", test_recording, "--trim", "--start", "1000"])
assert result.exit_code != 0
assert "requires either --length or --end" in result.output
def test_trim_with_both_length_and_end(self, test_recording):
"""Test that trim rejects both --length and --end."""
runner = CliRunner()
result = runner.invoke(
cli, ["split", test_recording, "--trim", "--start", "1000", "--length", "5000", "--end", "6000"]
)
assert result.exit_code != 0
assert "Cannot specify both --length and --end" in result.output
def test_trim_invalid_range(self, test_recording):
"""Test trim with invalid range."""
runner = CliRunner()
result = runner.invoke(
cli,
["split", test_recording, "--trim", "--start", "1000", "--length", "50000"], # Exceeds recording length
)
assert result.exit_code != 0
assert "Invalid trim range" in result.output
def test_trim_end_before_start(self, test_recording):
"""Test trim with end < start."""
runner = CliRunner()
result = runner.invoke(cli, ["split", test_recording, "--trim", "--start", "5000", "--end", "1000"])
assert result.exit_code != 0
assert "Invalid range" in result.output
class TestSplitAt:
"""Test split-at operations."""
@pytest.fixture
def test_recording(self):
"""Create a test recording file."""
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(10000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
yield str(Path(tmpdir) / "test.sigmf-data")
def test_split_at_middle(self, test_recording):
"""Test splitting at middle of recording."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(cli, ["split", test_recording, "--split-at", "5000", "--output-dir", outdir, "-q"])
assert result.exit_code == 0
# Verify two output files exist
output_files = sorted(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 2
# Verify part1
part1 = load_recording(str(output_files[0]))
assert part1.data.shape[1] == 5000
assert part1.metadata["original_start_sample"] == 0
assert part1.metadata["original_end_sample"] == 5000
# Verify part2
part2 = load_recording(str(output_files[1]))
assert part2.data.shape[1] == 5000
assert part2.metadata["original_start_sample"] == 5000
assert part2.metadata["original_end_sample"] == 10000
def test_split_at_invalid_point(self, test_recording):
"""Test split-at with invalid sample point."""
runner = CliRunner()
result = runner.invoke(cli, ["split", test_recording, "--split-at", "50000"]) # Exceeds recording length
assert result.exit_code != 0
assert "Invalid split point" in result.output
class TestSplitEvery:
"""Test split-every operations."""
@pytest.fixture
def test_recording(self):
"""Create a test recording file."""
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(10000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
yield str(Path(tmpdir) / "test.sigmf-data")
def test_split_every_equal_chunks(self, test_recording):
"""Test splitting into equal chunks."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli, ["split", test_recording, "--split-every", "2500", "--output-dir", outdir, "-q"]
)
assert result.exit_code == 0
# Verify 4 chunks created
output_files = sorted(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 4
# Verify all chunks have correct size
for i, file in enumerate(output_files):
chunk = load_recording(str(file))
assert chunk.data.shape[1] == 2500
assert chunk.metadata["chunk_index"] == i + 1
assert chunk.metadata["total_chunks"] == 4
def test_split_every_unequal_chunks(self, test_recording):
"""Test splitting with remainder chunk."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli, ["split", test_recording, "--split-every", "3000", "--output-dir", outdir, "-q"]
)
assert result.exit_code == 0
# Verify 4 chunks created (3x3000 + 1x1000)
output_files = sorted(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 4
# Last chunk should be smaller
last_chunk = load_recording(str(output_files[-1]))
assert last_chunk.data.shape[1] == 1000
class TestSplitDuration:
"""Test split-duration operations."""
@pytest.fixture
def test_recording(self):
"""Create a test recording file with known sample rate."""
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(10000, dtype=np.complex64)
recording = Recording(
data=signal, metadata={"sample_rate": 10000, "center_frequency": 915e6} # 10kHz for easy math
)
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
yield str(Path(tmpdir) / "test.sigmf-data")
def test_split_duration_basic(self, test_recording):
"""Test splitting by duration."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
[
"split",
test_recording,
"--split-duration",
"0.25", # 0.25s = 2500 samples at 10kHz
"--output-dir",
outdir,
"-q",
],
)
assert result.exit_code == 0
# Verify chunks created
output_files = sorted(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 4
# Verify chunk sizes
for file in output_files[:-1]:
chunk = load_recording(str(file))
assert chunk.data.shape[1] == 2500
def test_split_duration_no_sample_rate(self):
"""Test that split-duration requires sample_rate in metadata."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as tmpdir:
# Create recording without sample_rate
signal = np.arange(1000, dtype=np.complex64)
recording = Recording(data=signal, metadata={})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
test_file = str(Path(tmpdir) / "test.sigmf-data")
result = runner.invoke(cli, ["split", test_file, "--split-duration", "1.0"])
assert result.exit_code != 0
assert "Cannot split by duration" in result.output
assert "no sample_rate" in result.output
class TestExtractAnnotations:
"""Test extract-annotations operations."""
@pytest.fixture
def annotated_recording(self):
"""Create a test recording with annotations."""
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(100000, dtype=np.complex64)
annotations = [
Annotation(
sample_start=0, sample_count=10000, freq_lower_edge=914e6, freq_upper_edge=916e6, label="preamble"
),
Annotation(
sample_start=10000,
sample_count=50000,
freq_lower_edge=914e6,
freq_upper_edge=916e6,
label="payload",
),
Annotation(
sample_start=60000, sample_count=5000, freq_lower_edge=914e6, freq_upper_edge=916e6, label="crc"
),
]
recording = Recording(
data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6}, annotations=annotations
)
to_sigmf(recording, filename="annotated", path=tmpdir, overwrite=True)
yield str(Path(tmpdir) / "annotated.sigmf-data")
def test_extract_all_annotations(self, annotated_recording):
"""Test extracting all annotations."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli, ["split", annotated_recording, "--extract-annotations", "--output-dir", outdir, "-q"]
)
assert result.exit_code == 0
# Verify 3 files created
output_files = sorted(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 3
# Verify each annotation was extracted
preamble = [f for f in output_files if "preamble" in str(f)][0]
payload = [f for f in output_files if "payload" in str(f)][0]
crc = [f for f in output_files if "crc" in str(f)][0]
preamble_rec = load_recording(str(preamble))
assert preamble_rec.data.shape[1] == 10000
assert preamble_rec.metadata["annotation_label"] == "preamble"
assert len(preamble_rec.annotations) == 0 # Annotations cleared
payload_rec = load_recording(str(payload))
assert payload_rec.data.shape[1] == 50000
assert payload_rec.metadata["annotation_label"] == "payload"
crc_rec = load_recording(str(crc))
assert crc_rec.data.shape[1] == 5000
assert crc_rec.metadata["annotation_label"] == "crc"
def test_extract_annotation_by_label(self, annotated_recording):
"""Test extracting annotations by label."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
[
"split",
annotated_recording,
"--extract-annotations",
"--annotation-label",
"payload",
"--output-dir",
outdir,
"-q",
],
)
assert result.exit_code == 0
# Verify only 1 file created
output_files = list(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 1
assert "payload" in str(output_files[0])
def test_extract_annotation_by_index(self, annotated_recording):
"""Test extracting annotation by index."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
[
"split",
annotated_recording,
"--extract-annotations",
"--annotation-index",
"1",
"--output-dir",
outdir,
"-q",
],
)
assert result.exit_code == 0
# Verify only 1 file created (payload at index 1)
output_files = list(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 1
assert "payload" in str(output_files[0])
def test_extract_annotations_invalid_label(self, annotated_recording):
"""Test error with non-existent label."""
runner = CliRunner()
result = runner.invoke(
cli, ["split", annotated_recording, "--extract-annotations", "--annotation-label", "nonexistent"]
)
assert result.exit_code != 0
assert "No annotations with label" in result.output
def test_extract_annotations_invalid_index(self, annotated_recording):
"""Test error with invalid index."""
runner = CliRunner()
result = runner.invoke(
cli, ["split", annotated_recording, "--extract-annotations", "--annotation-index", "99"]
)
assert result.exit_code != 0
assert "Invalid annotation index" in result.output
def test_extract_annotations_no_annotations(self):
"""Test error when recording has no annotations."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(1000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 1e6})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
test_file = str(Path(tmpdir) / "test.sigmf-data")
result = runner.invoke(cli, ["split", test_file, "--extract-annotations"])
assert result.exit_code != 0
assert "No annotations found" in result.output
class TestOutputOptions:
"""Test output-related options."""
@pytest.fixture
def test_recording(self):
"""Create a test recording file."""
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(10000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
yield str(Path(tmpdir) / "test.sigmf-data")
def test_output_prefix(self, test_recording):
"""Test custom output prefix."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
[
"split",
test_recording,
"--split-every",
"3000",
"--output-prefix",
"custom",
"--output-dir",
outdir,
"-q",
],
)
assert result.exit_code == 0
output_files = list(Path(outdir).glob("*.sigmf-data"))
assert all("custom" in str(f) for f in output_files)
def test_output_format_conversion(self, test_recording):
"""Test format conversion during split."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
[
"split",
test_recording,
"--split-every",
"5000",
"--output-format",
"npy",
"--output-dir",
outdir,
"-q",
],
)
assert result.exit_code == 0
# Verify NPY files created
output_files = list(Path(outdir).glob("*.npy"))
assert len(output_files) == 2
def test_overwrite_protection(self, test_recording):
"""Test overwrite protection."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
# First split should succeed
result = runner.invoke(
cli,
["split", test_recording, "--trim", "--start", "0", "--length", "1000", "--output-dir", outdir, "-q"],
)
assert result.exit_code == 0
# Second split without --overwrite should fail
result = runner.invoke(
cli, ["split", test_recording, "--trim", "--start", "0", "--length", "1000", "--output-dir", outdir]
)
assert result.exit_code != 0
assert "exist" in result.output.lower()
# Third split with --overwrite should succeed
result = runner.invoke(
cli,
[
"split",
test_recording,
"--trim",
"--start",
"0",
"--length",
"1000",
"--output-dir",
outdir,
"--overwrite",
"-q",
],
)
assert result.exit_code == 0
class TestMultipleOperations:
"""Test that multiple operations are rejected."""
@pytest.fixture
def test_recording(self):
"""Create a test recording file."""
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(10000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
yield str(Path(tmpdir) / "test.sigmf-data")
def test_trim_and_split_at(self, test_recording):
"""Test that trim and split-at cannot be used together."""
runner = CliRunner()
result = runner.invoke(cli, ["split", test_recording, "--trim", "--split-at", "5000"])
assert result.exit_code != 0
assert "Multiple operations specified" in result.output
def test_split_every_and_extract(self, test_recording):
"""Test that split-every and extract-annotations cannot be used together."""
runner = CliRunner()
result = runner.invoke(cli, ["split", test_recording, "--split-every", "1000", "--extract-annotations"])
assert result.exit_code != 0
assert "Multiple operations specified" in result.output
class TestVerboseQuiet:
"""Test verbose and quiet modes."""
@pytest.fixture
def test_recording(self):
"""Create a test recording file."""
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(10000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
yield str(Path(tmpdir) / "test.sigmf-data")
def test_verbose_mode(self, test_recording):
"""Test verbose output."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
[
"split",
test_recording,
"--trim",
"--start",
"0",
"--length",
"1000",
"--output-dir",
outdir,
"--verbose",
],
)
assert result.exit_code == 0
assert "Input format: SIGMF" in result.output
assert "Output format: SIGMF" in result.output
def test_quiet_mode(self, test_recording):
"""Test quiet output (minimal output)."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
[
"split",
test_recording,
"--trim",
"--start",
"0",
"--length",
"1000",
"--output-dir",
outdir,
"--quiet",
],
)
assert result.exit_code == 0
# Output should be minimal in quiet mode
assert len(result.output.strip()) < 100 or result.output.strip() == ""

View File

@ -0,0 +1,345 @@
"""Tests for transmit command."""
import os
import tempfile
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
import yaml
from click.testing import CliRunner
from ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.common import get_sdr_device
from ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit import (
auto_select_tx_device,
check_sample_rate_mismatch,
load_input_file,
transmit,
validate_tx_gain,
)
class TestGetTxDevice:
"""Tests for get_sdr_device function."""
def test_get_pluto_device(self):
"""Test getting PlutoSDR device."""
mock_sdr_class = MagicMock()
mock_sdr_instance = MagicMock()
mock_sdr_class.return_value = mock_sdr_instance
with patch.dict("sys.modules", {"src.ria_toolkit_oss.sdr.pluto": MagicMock(Pluto=mock_sdr_class)}):
device = get_sdr_device("pluto")
assert device is mock_sdr_instance
def test_get_hackrf_device(self):
"""Test getting HackRF device."""
mock_sdr_class = MagicMock()
mock_sdr_instance = MagicMock()
mock_sdr_class.return_value = mock_sdr_instance
with patch.dict("sys.modules", {"src.ria_toolkit_oss.sdr.hackrf": MagicMock(HackRF=mock_sdr_class)}):
device = get_sdr_device("hackrf")
assert device is mock_sdr_instance
def test_get_unknown_device(self):
"""Test getting unknown device type."""
from click.exceptions import ClickException
with pytest.raises(ClickException) as exc_info:
get_sdr_device("unknown_device")
assert "Unknown device type" in str(exc_info.value)
class TestAutoSelectTxDevice:
"""Tests for auto_select_tx_device function."""
def test_auto_select_no_devices(self):
"""Test auto-select with no TX devices found."""
from click.exceptions import ClickException
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.load_sdr_drivers"),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_uhd_devices", return_value=[]),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_pluto_devices", return_value=[]),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_hackrf_devices", return_value=[]),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_bladerf_devices", return_value=[]),
):
with pytest.raises(ClickException) as exc_info:
auto_select_tx_device()
assert "No TX-capable SDR devices found" in str(exc_info.value)
def test_auto_select_single_device(self):
"""Test auto-select with single TX device."""
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.load_sdr_drivers"),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_uhd_devices", return_value=[]),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_pluto_devices", return_value=[]),
patch(
"ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_hackrf_devices",
return_value=[{"type": "HackRF One", "serial": "123456"}],
),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_bladerf_devices", return_value=[]),
):
device_type = auto_select_tx_device(quiet=True)
assert device_type == "hackrf"
def test_auto_select_multiple_devices(self):
"""Test auto-select with multiple TX devices raises error."""
from click.exceptions import ClickException
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.load_sdr_drivers"),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_uhd_devices", return_value=[]),
patch(
"ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_pluto_devices",
return_value=[{"type": "PlutoSDR", "uri": "ip:pluto.local"}],
),
patch(
"ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_hackrf_devices",
return_value=[{"type": "HackRF One", "serial": "123456"}],
),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_bladerf_devices", return_value=[]),
):
with pytest.raises(ClickException) as exc_info:
auto_select_tx_device()
assert "Multiple TX-capable devices found" in str(exc_info.value)
def test_auto_select_device_mapping(self):
"""Test device type name mapping."""
test_cases = [
("PlutoSDR", "pluto"),
("HackRF One", "hackrf"),
("BladeRF", "bladerf"),
("b200", "usrp"),
("B210", "usrp"),
]
for device_name, expected_type in test_cases:
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.load_sdr_drivers"),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_uhd_devices", return_value=[]),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_pluto_devices", return_value=[]),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_hackrf_devices", return_value=[]),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_bladerf_devices", return_value=[{"type": device_name}]),
):
device_type = auto_select_tx_device(quiet=True)
assert device_type == expected_type
class TestLoadInputFile:
"""Tests for load_input_file function."""
def test_load_file_not_found(self):
"""Test loading non-existent file."""
from click.exceptions import ClickException
with pytest.raises(ClickException) as exc_info:
load_input_file("nonexistent.sigmf")
assert "Input file not found" in str(exc_info.value)
def test_load_sigmf_file(self):
"""Test loading SigMF file."""
with tempfile.NamedTemporaryFile(suffix=".sigmf-data", delete=False) as f:
test_file = f.name
try:
mock_recording = MagicMock()
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.load_recording", return_value=mock_recording):
recording = load_input_file(test_file, legacy=False)
assert recording == mock_recording
finally:
os.unlink(test_file)
def test_load_legacy_npy_file(self):
"""Test loading legacy NPY file."""
with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f:
test_file = f.name
try:
mock_recording = MagicMock()
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.from_npy_legacy", return_value=mock_recording):
recording = load_input_file(test_file, legacy=True)
assert recording == mock_recording
finally:
os.unlink(test_file)
def test_load_unsupported_format(self):
"""Test loading unsupported file format."""
from click.exceptions import ClickException
with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as f:
test_file = f.name
try:
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.load_recording", side_effect=Exception("Unsupported format")):
with pytest.raises(ClickException) as exc_info:
load_input_file(test_file)
assert "Could not load" in str(exc_info.value)
assert "Supported formats" in str(exc_info.value)
finally:
os.unlink(test_file)
class TestValidateTxGain:
"""Tests for validate_tx_gain function."""
def test_valid_pluto_gain(self):
"""Test valid PlutoSDR gain."""
validate_tx_gain("pluto", -30)
validate_tx_gain("pluto", 0)
validate_tx_gain("pluto", -89)
def test_invalid_pluto_gain_too_high(self):
"""Test PlutoSDR gain too high."""
from click.exceptions import ClickException
with pytest.raises(ClickException) as exc_info:
validate_tx_gain("pluto", 10)
assert "out of range" in str(exc_info.value)
def test_invalid_pluto_gain_too_low(self):
"""Test PlutoSDR gain too low."""
from click.exceptions import ClickException
with pytest.raises(ClickException) as exc_info:
validate_tx_gain("pluto", -100)
assert "out of range" in str(exc_info.value)
def test_valid_hackrf_gain(self):
"""Test valid HackRF gain."""
validate_tx_gain("hackrf", 0)
validate_tx_gain("hackrf", 20)
validate_tx_gain("hackrf", 47)
def test_invalid_hackrf_gain(self):
"""Test invalid HackRF gain."""
from click.exceptions import ClickException
with pytest.raises(ClickException):
validate_tx_gain("hackrf", -10)
with pytest.raises(ClickException):
validate_tx_gain("hackrf", 50)
def test_high_gain_warning(self):
"""Test warning for high gain levels."""
import click
with patch.object(click, "echo") as mock_echo:
validate_tx_gain("hackrf", 45)
mock_echo.assert_called()
args = str(mock_echo.call_args)
assert "WARNING" in args
assert "high gain" in args.lower()
class TestCheckSampleRateMismatch:
"""Tests for check_sample_rate_mismatch function."""
def test_no_mismatch(self):
"""Test when sample rates match."""
mock_recording = MagicMock()
mock_recording.metadata = {"sample_rate": 2e6}
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.click.echo") as mock_echo:
check_sample_rate_mismatch(mock_recording, 2e6, quiet=False)
mock_echo.assert_not_called()
def test_mismatch_warning(self):
"""Test warning when sample rates differ."""
mock_recording = MagicMock()
mock_recording.metadata = {"sample_rate": 1e6}
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.click.echo") as mock_echo:
check_sample_rate_mismatch(mock_recording, 2e6, quiet=False)
mock_echo.assert_called_once()
args = str(mock_echo.call_args)
assert "Warning" in args
assert "differs" in args
def test_mismatch_quiet_mode(self):
"""Test no warning in quiet mode."""
mock_recording = MagicMock()
mock_recording.metadata = {"sample_rate": 1e6}
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.click.echo") as mock_echo:
check_sample_rate_mismatch(mock_recording, 2e6, quiet=True)
mock_echo.assert_not_called()
def test_no_metadata(self):
"""Test when recording has no metadata."""
mock_recording = MagicMock()
mock_recording.metadata = None
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.click.echo") as mock_echo:
check_sample_rate_mismatch(mock_recording, 2e6, quiet=False)
mock_echo.assert_not_called()
class TestTransmitCommand:
"""Tests for transmit CLI command."""
def setup_method(self):
"""Set up test fixtures."""
self.runner = CliRunner()
self.temp_dir = tempfile.mkdtemp()
def teardown_method(self):
"""Clean up test fixtures."""
import shutil
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
def test_transmit_basic(self):
"""Test basic transmit command."""
test_file = os.path.join(self.temp_dir, "test.npy")
open(test_file, "w").close()
mock_sdr = MagicMock()
mock_recording = MagicMock()
mock_recording.data = np.array([[0.1 + 0.1j] * 1000])
mock_recording.metadata = {}
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.get_sdr_device", return_value=mock_sdr),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.load_input_file", return_value=mock_recording),
):
result = self.runner.invoke(
transmit,
[
"--device",
"hackrf",
"--sample-rate",
"2e6",
"--center-frequency",
"915M",
"--gain",
"10",
"--input",
test_file,
"--quiet",
],
)
assert result.exit_code == 0
mock_sdr.tx_recording.assert_called_once()
mock_sdr.close.assert_called_once()

View File

@ -0,0 +1,94 @@
"""Tests for transmit command signal generation."""
from click.testing import CliRunner
from ria_toolkit_oss.ria_toolkit_oss_cli.cli import cli
class TestTransmitGenerate:
"""Test signal generation in transmit command."""
def test_transmit_help(self):
"""Test transmit command help."""
runner = CliRunner()
result = runner.invoke(cli, ["transmit", "--help"])
assert result.exit_code == 0
assert "Generate signal instead of loading from file" in result.output
assert "lfm" in result.output
assert "chirp" in result.output
assert "sine" in result.output
assert "pulse" in result.output
def test_generate_lfm_chirp(self):
"""Test LFM chirp generation (should fail without device)."""
runner = CliRunner()
result = runner.invoke(cli, ["transmit", "--generate", "lfm", "--device", "pluto", "-v"])
# Should fail because no device is connected, but should show it's generating LFM
# Error will be about device initialization, not about missing input file
assert "Generating LFM chirp signal" in result.output or "Failed to initialize" in result.output
def test_generate_sine_wave(self):
"""Test sine wave generation (should fail without device)."""
runner = CliRunner()
result = runner.invoke(cli, ["transmit", "--generate", "sine", "--device", "pluto", "-v"])
# Should fail because no device is connected, but should show it's generating sine
assert "Generating sine wave signal" in result.output or "Failed to initialize" in result.output
def test_generate_chirp(self):
"""Test simple chirp generation (should fail without device)."""
runner = CliRunner()
result = runner.invoke(cli, ["transmit", "--generate", "chirp", "--device", "pluto", "-v"])
# Should fail because no device is connected, but should show it's generating chirp
assert "Generating chirp signal" in result.output or "Failed to initialize" in result.output
def test_generate_pulse(self):
"""Test pulse generation (should fail without device)."""
runner = CliRunner()
result = runner.invoke(cli, ["transmit", "--generate", "pulse", "--device", "pluto", "-v"])
# Should fail because no device is connected, but should show it's generating pulse
assert "Generating pulse signal" in result.output or "Failed to initialize" in result.output
def test_default_generates_lfm_when_no_input(self):
"""Test that default generates LFM chirp when no input file specified."""
runner = CliRunner()
result = runner.invoke(cli, ["transmit", "--device", "pluto", "-v"])
# Should default to LFM chirp when no input file or --generate specified
assert "Generating LFM chirp signal" in result.output or "Failed to initialize" in result.output
def test_generate_overrides_input_file(self):
"""Test that --generate overrides --input file."""
runner = CliRunner()
result = runner.invoke(
cli, ["transmit", "--device", "pluto", "--input", "nonexistent.sigmf", "--generate", "lfm", "-v"]
)
# Should generate LFM, not try to load nonexistent.sigmf
assert "Generating LFM chirp signal" in result.output or "Failed to initialize" in result.output
# Should NOT say "Input file not found"
assert "Input file not found" not in result.output
def test_signal_generation_parameters(self):
"""Test that signal generation uses correct parameters from CLI."""
runner = CliRunner()
result = runner.invoke(
cli,
[
"transmit",
"--device",
"pluto",
"--generate",
"lfm",
"--sample-rate",
"10e6",
"--center-frequency",
"915M",
"--gain",
"-20",
"-v",
],
)
# Check that parameters are shown in output
if "Failed to initialize" in result.output:
# Device initialization failed (expected without real device)
assert "10.00 MHz" in result.output or "10.000 MHz" in result.output or "10.00 MS/s" in result.output
assert "915" in result.output
assert "-20 dB" in result.output or "-20.0 dB" in result.output