Compare commits
No commits in common. "be32efeff280335ac7dccf60f5699d0088c9a44f" and "0f5f36b1035dedfa37e6ec0b1157fd77beff428f" have entirely different histories.
be32efeff2
...
0f5f36b103
|
|
@ -135,8 +135,6 @@ class HackRF(SDR):
|
||||||
samples = recording.data[0]
|
samples = recording.data[0]
|
||||||
|
|
||||||
samples = samples.astype(np.complex64, copy=False)
|
samples = samples.astype(np.complex64, copy=False)
|
||||||
if np.max(np.abs(samples)) >= 1:
|
|
||||||
samples = samples / (np.max(np.abs(samples)) + 1e-12)
|
|
||||||
|
|
||||||
print("HackRF Starting TX...")
|
print("HackRF Starting TX...")
|
||||||
self.radio.start_tx(samples=samples, repeat=True)
|
self.radio.start_tx(samples=samples, repeat=True)
|
||||||
|
|
|
||||||
|
|
@ -1,178 +0,0 @@
|
||||||
import subprocess
|
|
||||||
import numpy as np # type: ignore
|
|
||||||
import pytest # type: ignore
|
|
||||||
|
|
||||||
from ria_toolkit_oss.datatypes.recording import Recording
|
|
||||||
from ria_toolkit_oss.sdr.blade import Blade
|
|
||||||
|
|
||||||
|
|
||||||
SAMPLE_RATE = int(1e6)
|
|
||||||
CENTER_FREQUENCY = int(3440e6)
|
|
||||||
CHANNEL = 0
|
|
||||||
ABS_GAIN = 10
|
|
||||||
REL_GAIN = -50
|
|
||||||
t = np.linspace(0, 1, int(1e6 * 1), endpoint=False)
|
|
||||||
angular_frequency = 2 * np.pi * 1
|
|
||||||
SINE_WAVE = 10 * np.exp(1j * angular_frequency * t)
|
|
||||||
|
|
||||||
|
|
||||||
def radio_connected() -> bool:
|
|
||||||
try:
|
|
||||||
# Example: check if a specific USB device is present
|
|
||||||
result = subprocess.run(
|
|
||||||
["lsusb"],
|
|
||||||
capture_output=True,
|
|
||||||
text=True,
|
|
||||||
check=True
|
|
||||||
)
|
|
||||||
return "bladeRF" in result.stdout
|
|
||||||
except Exception:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_blade_rx_init():
|
|
||||||
try:
|
|
||||||
rx_radio = Blade()
|
|
||||||
rx_radio.init_rx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=ABS_GAIN,
|
|
||||||
)
|
|
||||||
assert str(rx_radio.rx_ch) == "Channel RX1"
|
|
||||||
assert int(rx_radio.rx_ch.sample_rate) == SAMPLE_RATE
|
|
||||||
assert int(rx_radio.rx_ch.frequency) == pytest.approx(CENTER_FREQUENCY, abs=5)
|
|
||||||
assert int(rx_radio.rx_ch.gain) == ABS_GAIN
|
|
||||||
finally:
|
|
||||||
rx_radio.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_blade_tx_init():
|
|
||||||
try:
|
|
||||||
tx_radio = Blade()
|
|
||||||
tx_radio.init_tx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=ABS_GAIN,
|
|
||||||
)
|
|
||||||
assert str(tx_radio.tx_ch) == "Channel TX1"
|
|
||||||
assert int(tx_radio.tx_ch.sample_rate) == SAMPLE_RATE
|
|
||||||
assert int(tx_radio.tx_ch.frequency) == pytest.approx(CENTER_FREQUENCY, abs=5)
|
|
||||||
assert int(tx_radio.tx_ch.gain) == ABS_GAIN
|
|
||||||
finally:
|
|
||||||
tx_radio.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_blade_rx_setters():
|
|
||||||
try:
|
|
||||||
rx_radio = Blade()
|
|
||||||
rx_radio.init_rx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=ABS_GAIN,
|
|
||||||
)
|
|
||||||
rx_radio._set_rx_channel(channel=1)
|
|
||||||
assert str(rx_radio.rx_ch) == "Channel RX2"
|
|
||||||
rx_radio._set_rx_buffer_size(buffer_size=4096)
|
|
||||||
assert int(rx_radio.rx_buffer_size) == 4096
|
|
||||||
rx_radio._set_rx_center_frequency(center_frequency=int(3500e6))
|
|
||||||
assert int(rx_radio.rx_ch.frequency) == pytest.approx(int(3500e6), abs=5)
|
|
||||||
rx_radio._set_rx_gain(channel=1, gain=20, gain_mode='absolute')
|
|
||||||
assert int(rx_radio.rx_ch.gain) == 20
|
|
||||||
rx_radio._set_rx_sample_rate(sample_rate=int(2e6))
|
|
||||||
assert int(rx_radio.rx_ch.sample_rate) == int(2e6)
|
|
||||||
finally:
|
|
||||||
rx_radio.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_blade_tx_setters():
|
|
||||||
try:
|
|
||||||
tx_radio = Blade()
|
|
||||||
tx_radio.init_tx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=ABS_GAIN,
|
|
||||||
)
|
|
||||||
tx_radio._set_tx_channel(channel=1)
|
|
||||||
assert str(tx_radio.tx_ch) == "Channel TX2"
|
|
||||||
tx_radio._set_tx_buffer_size(buffer_size=4096)
|
|
||||||
assert int(tx_radio.tx_buffer_size) == 4096
|
|
||||||
tx_radio._set_tx_center_frequency(center_frequency=int(3500e6))
|
|
||||||
assert int(tx_radio.tx_ch.frequency) == pytest.approx(int(3500e6), abs=5)
|
|
||||||
tx_radio._set_tx_gain(channel=1, gain=20, gain_mode='absolute')
|
|
||||||
assert int(tx_radio.tx_ch.gain) == 20
|
|
||||||
tx_radio._set_tx_sample_rate(sample_rate=int(2e6))
|
|
||||||
assert int(tx_radio.tx_ch.sample_rate) == int(2e6)
|
|
||||||
finally:
|
|
||||||
tx_radio.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_blade_relative_mode():
|
|
||||||
try:
|
|
||||||
radio = Blade()
|
|
||||||
radio.init_rx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=REL_GAIN,
|
|
||||||
gain_mode='relative'
|
|
||||||
)
|
|
||||||
assert int(radio.rx_ch.gain) == ABS_GAIN
|
|
||||||
radio.init_tx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=REL_GAIN,
|
|
||||||
gain_mode='relative'
|
|
||||||
)
|
|
||||||
assert int(radio.tx_ch.gain) == ABS_GAIN
|
|
||||||
finally:
|
|
||||||
radio.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_blade_rx():
|
|
||||||
try:
|
|
||||||
print('Beginning test of Blade rx...')
|
|
||||||
rx_radio = Blade()
|
|
||||||
rx_radio.init_rx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=ABS_GAIN,
|
|
||||||
)
|
|
||||||
recording = rx_radio.record(num_samples=SAMPLE_RATE)
|
|
||||||
assert type(recording) is Recording
|
|
||||||
finally:
|
|
||||||
rx_radio.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_blade_tx():
|
|
||||||
try:
|
|
||||||
tx_radio = Blade()
|
|
||||||
tx_radio.init_tx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=ABS_GAIN,
|
|
||||||
)
|
|
||||||
recording = Recording(
|
|
||||||
data=SINE_WAVE,
|
|
||||||
metadata={'data': 'sine_wave'}
|
|
||||||
)
|
|
||||||
tx_radio.tx_recording(
|
|
||||||
recording=recording,
|
|
||||||
num_samples=SAMPLE_RATE
|
|
||||||
)
|
|
||||||
assert True
|
|
||||||
finally:
|
|
||||||
tx_radio.close()
|
|
||||||
|
|
@ -1,91 +0,0 @@
|
||||||
import subprocess
|
|
||||||
import numpy as np # type: ignore
|
|
||||||
import pytest # type: ignore
|
|
||||||
|
|
||||||
from ria_toolkit_oss.datatypes.recording import Recording
|
|
||||||
from ria_toolkit_oss.sdr.hackrf import HackRF
|
|
||||||
|
|
||||||
|
|
||||||
SAMPLE_RATE = int(1e6)
|
|
||||||
CENTER_FREQUENCY = int(3440e6)
|
|
||||||
CHANNEL = 0
|
|
||||||
ABS_GAIN = 10
|
|
||||||
REL_GAIN = -37
|
|
||||||
t = np.linspace(0, 1, int(1e6 * 1), endpoint=False)
|
|
||||||
angular_frequency = 2 * np.pi * 1
|
|
||||||
SINE_WAVE = 10 * np.exp(1j * angular_frequency * t)
|
|
||||||
|
|
||||||
|
|
||||||
def radio_connected() -> bool:
|
|
||||||
try:
|
|
||||||
# Example: check if a specific USB device is present
|
|
||||||
result = subprocess.run(
|
|
||||||
["lsusb"],
|
|
||||||
capture_output=True,
|
|
||||||
text=True,
|
|
||||||
check=True
|
|
||||||
)
|
|
||||||
return "hackrf" in result.stdout.lower()
|
|
||||||
except Exception:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_hackrf_relative_mode():
|
|
||||||
try:
|
|
||||||
radio = HackRF()
|
|
||||||
radio.init_tx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=REL_GAIN,
|
|
||||||
gain_mode='relative'
|
|
||||||
)
|
|
||||||
assert int(radio.radio.txvga_gain) == ABS_GAIN
|
|
||||||
finally:
|
|
||||||
radio.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_hackrf_rx():
|
|
||||||
try:
|
|
||||||
rx_radio = HackRF()
|
|
||||||
try:
|
|
||||||
rx_radio.init_rx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=ABS_GAIN,
|
|
||||||
gain_mode='absolute'
|
|
||||||
)
|
|
||||||
except NotImplementedError:
|
|
||||||
assert True
|
|
||||||
finally:
|
|
||||||
rx_radio.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_hackrf_tx():
|
|
||||||
try:
|
|
||||||
tx_radio = HackRF()
|
|
||||||
tx_radio.init_tx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=ABS_GAIN,
|
|
||||||
)
|
|
||||||
|
|
||||||
max_val = np.max(np.abs(SINE_WAVE))
|
|
||||||
data = SINE_WAVE / (max_val * 1.01)
|
|
||||||
recording = Recording(
|
|
||||||
data=SINE_WAVE,
|
|
||||||
metadata={'data': 'sine_wave'}
|
|
||||||
)
|
|
||||||
|
|
||||||
tx_radio.tx_recording(
|
|
||||||
recording=recording,
|
|
||||||
num_samples=SAMPLE_RATE
|
|
||||||
)
|
|
||||||
assert True
|
|
||||||
finally:
|
|
||||||
tx_radio.close()
|
|
||||||
|
|
@ -1,185 +0,0 @@
|
||||||
import subprocess
|
|
||||||
import numpy as np # type: ignore
|
|
||||||
import pytest # type: ignore
|
|
||||||
|
|
||||||
from ria_toolkit_oss.datatypes.recording import Recording
|
|
||||||
from ria_toolkit_oss.sdr.pluto import Pluto
|
|
||||||
|
|
||||||
|
|
||||||
SAMPLE_RATE = int(1e6)
|
|
||||||
CENTER_FREQUENCY = int(3440e6)
|
|
||||||
CHANNEL = 0
|
|
||||||
ABS_GAIN = 10
|
|
||||||
REL_GAIN = -50
|
|
||||||
t = np.linspace(0, 1, int(1e6 * 1), endpoint=False)
|
|
||||||
angular_frequency = 2 * np.pi * 1
|
|
||||||
SINE_WAVE = 10 * np.exp(1j * angular_frequency * t)
|
|
||||||
CONSTANT_TONE = np.ones((1000), dtype=np.complex64)
|
|
||||||
|
|
||||||
|
|
||||||
def radio_connected() -> bool:
|
|
||||||
try:
|
|
||||||
# Example: check if a specific USB device is present
|
|
||||||
result = subprocess.run(
|
|
||||||
["lsusb"],
|
|
||||||
capture_output=True,
|
|
||||||
text=True,
|
|
||||||
check=True
|
|
||||||
)
|
|
||||||
return "pluto" in result.stdout.lower()
|
|
||||||
except Exception:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_pluto_rx_setters():
|
|
||||||
try:
|
|
||||||
rx_radio = Pluto()
|
|
||||||
rx_radio.init_rx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=ABS_GAIN,
|
|
||||||
)
|
|
||||||
assert int(rx_radio.radio.sample_rate) == SAMPLE_RATE
|
|
||||||
assert int(rx_radio.radio.rx_lo) == pytest.approx(CENTER_FREQUENCY, abs=5)
|
|
||||||
assert rx_radio.radio.rx_enabled_channels == [0]
|
|
||||||
assert rx_radio.radio.rx_hardwaregain_chan0 == ABS_GAIN
|
|
||||||
|
|
||||||
# rx_radio.set_rx_channel(channel=1)
|
|
||||||
# assert rx_radio.radio.rx_enabled_channels == [0, 1]
|
|
||||||
rx_radio.set_rx_center_frequency(center_frequency=int(3500e6))
|
|
||||||
assert int(rx_radio.radio.rx_lo) == pytest.approx(int(3500e6), abs=5)
|
|
||||||
rx_radio.set_rx_gain(channel=0, gain=20)
|
|
||||||
assert rx_radio.radio.rx_hardwaregain_chan0 == 20
|
|
||||||
rx_radio.set_rx_sample_rate(sample_rate=int(2e6))
|
|
||||||
assert int(rx_radio.radio.sample_rate) == int(2e6)
|
|
||||||
finally:
|
|
||||||
rx_radio.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_pluto_tx_setters():
|
|
||||||
try:
|
|
||||||
print('Beginning test of Pluto tx setters...')
|
|
||||||
tx_radio = Pluto()
|
|
||||||
tx_radio.init_tx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=-ABS_GAIN,
|
|
||||||
)
|
|
||||||
assert int(tx_radio.radio.sample_rate) == SAMPLE_RATE
|
|
||||||
assert int(tx_radio.radio.tx_lo) == pytest.approx(CENTER_FREQUENCY, abs=5)
|
|
||||||
assert tx_radio.radio.tx_enabled_channels == [0]
|
|
||||||
assert tx_radio.radio.tx_hardwaregain_chan0 == -ABS_GAIN
|
|
||||||
|
|
||||||
try:
|
|
||||||
tx_radio.set_tx_channel(channel=1)
|
|
||||||
except NotImplementedError:
|
|
||||||
assert True
|
|
||||||
try:
|
|
||||||
tx_radio.set_tx_buffer_size(buffer_size=4096)
|
|
||||||
except NotImplementedError:
|
|
||||||
assert True
|
|
||||||
tx_radio.set_tx_center_frequency(center_frequency=int(3500e6))
|
|
||||||
assert int(tx_radio.radio.tx_lo) == pytest.approx(int(3500e6), abs=5)
|
|
||||||
tx_radio.set_tx_gain(channel=0, gain=-30)
|
|
||||||
assert int(tx_radio.radio.tx_hardwaregain_chan0) == -30
|
|
||||||
tx_radio.set_tx_sample_rate(sample_rate=int(2e6))
|
|
||||||
assert int(tx_radio.radio.sample_rate) == int(2e6)
|
|
||||||
finally:
|
|
||||||
tx_radio.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_pluto_relative_mode():
|
|
||||||
try:
|
|
||||||
radio = Pluto()
|
|
||||||
radio.init_rx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=REL_GAIN,
|
|
||||||
gain_mode='relative'
|
|
||||||
)
|
|
||||||
assert radio.radio.rx_hardwaregain_chan0 == (74 + REL_GAIN)
|
|
||||||
radio.init_tx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=REL_GAIN,
|
|
||||||
gain_mode='relative'
|
|
||||||
)
|
|
||||||
assert radio.radio.tx_hardwaregain_chan0 == REL_GAIN
|
|
||||||
finally:
|
|
||||||
radio.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_pluto_rx():
|
|
||||||
try:
|
|
||||||
rx_radio = Pluto()
|
|
||||||
rx_radio.init_rx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=ABS_GAIN,
|
|
||||||
)
|
|
||||||
recording = rx_radio.record(num_samples=SAMPLE_RATE)
|
|
||||||
assert type(recording) is Recording
|
|
||||||
finally:
|
|
||||||
rx_radio.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_pluto_tx():
|
|
||||||
try:
|
|
||||||
tx_radio = Pluto()
|
|
||||||
tx_radio.init_tx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=ABS_GAIN,
|
|
||||||
)
|
|
||||||
recording = Recording(
|
|
||||||
data=SINE_WAVE,
|
|
||||||
metadata={'data': 'sine_wave'}
|
|
||||||
)
|
|
||||||
tx_radio.tx_recording(
|
|
||||||
recording=recording,
|
|
||||||
num_samples=SAMPLE_RATE
|
|
||||||
)
|
|
||||||
assert True
|
|
||||||
finally:
|
|
||||||
tx_radio.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_pluto_dual_tx():
|
|
||||||
try:
|
|
||||||
tx_radio = Pluto()
|
|
||||||
try:
|
|
||||||
tx_radio.init_tx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=1,
|
|
||||||
gain=ABS_GAIN,
|
|
||||||
)
|
|
||||||
except AttributeError:
|
|
||||||
pytest.skip("Dual tx not available on connected Pluto device")
|
|
||||||
recording1 = Recording(
|
|
||||||
data=SINE_WAVE,
|
|
||||||
metadata={'data': 'sine_wave'}
|
|
||||||
)
|
|
||||||
recording2 = Recording(
|
|
||||||
data=CONSTANT_TONE,
|
|
||||||
metadata={'data': 'constant_tone'}
|
|
||||||
)
|
|
||||||
tx_radio.tx_recording(
|
|
||||||
recording=[recording1, recording2],
|
|
||||||
num_samples=SAMPLE_RATE,
|
|
||||||
)
|
|
||||||
assert True
|
|
||||||
finally:
|
|
||||||
tx_radio.close()
|
|
||||||
|
|
@ -1,111 +0,0 @@
|
||||||
import subprocess
|
|
||||||
import numpy as np # type: ignore
|
|
||||||
import pytest # type: ignore
|
|
||||||
|
|
||||||
from ria_toolkit_oss.datatypes.recording import Recording
|
|
||||||
from ria_toolkit_oss.sdr.usrp import USRP
|
|
||||||
|
|
||||||
|
|
||||||
SAMPLE_RATE = int(1e6)
|
|
||||||
CENTER_FREQUENCY = int(3440e6)
|
|
||||||
CHANNEL = 0
|
|
||||||
ABS_GAIN = 10
|
|
||||||
REL_GAIN = -25
|
|
||||||
t = np.linspace(0, 1, int(1e6 * 1), endpoint=False)
|
|
||||||
angular_frequency = 2 * np.pi * 1
|
|
||||||
SINE_WAVE = 10 * np.exp(1j * angular_frequency * t)
|
|
||||||
|
|
||||||
|
|
||||||
def radio_connected() -> bool:
|
|
||||||
try:
|
|
||||||
# Example: check if a specific USB device is present
|
|
||||||
result = subprocess.run(
|
|
||||||
["uhd_find_devices"],
|
|
||||||
capture_output=True,
|
|
||||||
text=True,
|
|
||||||
check=True
|
|
||||||
)
|
|
||||||
return not "No UHD Devices Found" in result.stdout
|
|
||||||
except Exception:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_usrp_clock_setter():
|
|
||||||
try:
|
|
||||||
rx_radio = USRP()
|
|
||||||
rx_radio.init_rx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=ABS_GAIN,
|
|
||||||
)
|
|
||||||
rx_radio.set_clock_source(source='external')
|
|
||||||
assert rx_radio.usrp.get_clock_source(0) == "external"
|
|
||||||
finally:
|
|
||||||
rx_radio.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_usrp_relative_mode():
|
|
||||||
try:
|
|
||||||
radio = USRP()
|
|
||||||
radio.init_rx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=REL_GAIN,
|
|
||||||
gain_mode='relative'
|
|
||||||
)
|
|
||||||
max_gain = radio.usrp.get_rx_gain_range().stop()
|
|
||||||
assert radio.rx_gain == (max_gain + REL_GAIN)
|
|
||||||
radio.init_tx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=REL_GAIN,
|
|
||||||
gain_mode='relative'
|
|
||||||
)
|
|
||||||
max_gain = radio.usrp.get_tx_gain_range().stop()
|
|
||||||
assert radio.tx_gain == (max_gain + REL_GAIN)
|
|
||||||
finally:
|
|
||||||
radio.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_usrp_rx():
|
|
||||||
try:
|
|
||||||
rx_radio = USRP()
|
|
||||||
rx_radio.init_rx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=ABS_GAIN,
|
|
||||||
)
|
|
||||||
recording = rx_radio.record(num_samples=SAMPLE_RATE)
|
|
||||||
assert type(recording) is Recording
|
|
||||||
finally:
|
|
||||||
rx_radio.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
|
||||||
def test_usrp_tx():
|
|
||||||
try:
|
|
||||||
tx_radio = USRP()
|
|
||||||
tx_radio.init_tx(
|
|
||||||
sample_rate=SAMPLE_RATE,
|
|
||||||
center_frequency=CENTER_FREQUENCY,
|
|
||||||
channel=CHANNEL,
|
|
||||||
gain=ABS_GAIN,
|
|
||||||
)
|
|
||||||
recording = Recording(
|
|
||||||
data=SINE_WAVE,
|
|
||||||
metadata={'data': 'sine_wave'}
|
|
||||||
)
|
|
||||||
tx_radio.tx_recording(
|
|
||||||
recording=recording,
|
|
||||||
num_samples=SAMPLE_RATE
|
|
||||||
)
|
|
||||||
assert True
|
|
||||||
finally:
|
|
||||||
tx_radio.close()
|
|
||||||
Loading…
Reference in New Issue
Block a user