Compare commits

..

2 Commits

Author SHA1 Message Date
c2b47ead95 Updated, edited, and cleaned up SDR files 2025-10-16 15:22:07 -04:00
34faa57ea4 Applied updates, linting, etc. 2025-10-16 15:02:09 -04:00
10 changed files with 765 additions and 547 deletions

View File

@ -168,6 +168,10 @@ Additional usage information is provided in the project documentation: [RIA Tool
Kindly report any issues on RIA Hub: [RIA Toolkit OSS Issues Board](https://riahub.ai/qoherent/ria-toolkit-oss/issues).
### Upcoming Changes
The ThinkRF package is currently pending further testing and potential updates.
## 🤝 Contribution
Contributions are always welcome! Whether it's an enhancement, bug fix, or new example, your input is valuable. If you'd like to contribute to the project, please reach out to the project maintainers.

View File

@ -11,6 +11,7 @@ authors = [
maintainers = [
{ name = "Benjamin Chinnery", email = "ben@qoherent.ai" },
{ name = "Ashkan Beigi", email = "ash@qoherent.ai" },
{ name = "Madrigal Weersink", email = "madrigal@qoherent.ai" },
]
keywords = [
"radio",

View File

@ -13,8 +13,8 @@ Run this after installing pyrf:
python scripts/fix_pyrf_python3.py
"""
from pathlib import Path
from lib2to3.refactor import RefactoringTool, get_fixers_from_package
from pathlib import Path
try:
import pyrf
@ -36,7 +36,7 @@ print(f"Found pyrf ThinkRF module at: {thinkrf_path}")
# Apply lib2to3 fixes
print("Applying Python 3 compatibility fixes...")
fixers = get_fixers_from_package('lib2to3.fixes')
fixers = get_fixers_from_package("lib2to3.fixes")
tool = RefactoringTool(fixers)
tool.refactor_file(str(thinkrf_path), write=True)

View File

@ -1,3 +1,5 @@
import time
import warnings
from typing import Optional
import numpy as np
@ -35,22 +37,6 @@ class Blade(SDR):
super().__init__()
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool, channel: Optional[int] = None):
if channel is None:
channel = getattr(self, "rx_channel", getattr(self, "tx_channel", 0))
try:
bladerf_channel = _bladerf.CHANNEL_RX(channel)
self.device.set_bias_tee(bladerf_channel, bool(enable))
except AttributeError as exc: # pragma: no cover - depends on libbladeRF version
raise NotImplementedError("bladeRF binding lacks bias-tee control") from exc
state = "enabled" if enable else "disabled"
print(f"BladeRF bias tee {state} on channel {channel}.")
def _shutdown(self, error=0, board=None):
print("Shutting down with error code: " + str(error))
if board is not None:
@ -83,9 +69,6 @@ class Blade(SDR):
print("FPGA version:\t\t" + str(device.get_fpga_version()))
return 0
def close(self):
self.device.close()
def init_rx(
self,
sample_rate: int | float,
@ -108,6 +91,9 @@ class Blade(SDR):
:type channel: int
:param buffer_size: The buffer size during receive. Defaults to 8192.
:type buffer_size: int
:param gain_mode: 'absolute' passes gain directly to the sdr,
'relative' means that gain should be a negative value, and it will be subtracted from the max gain (60).
:type gain_mode: str
"""
print("Initializing RX")
@ -128,6 +114,93 @@ class Blade(SDR):
self._rx_initialized = True
self._tx_initialized = False
def _stream_rx(self, callback):
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
# Setup synchronous stream
self.device.sync_config(
layout=_bladerf.ChannelLayout.RX_X1,
fmt=_bladerf.Format.SC16_Q11,
num_buffers=16,
buffer_size=self.rx_buffer_size,
num_transfers=8,
stream_timeout=3500000000,
)
self.rx_ch.enable = True
self.bytes_per_sample = 4
print("Blade Starting RX...")
self._enable_rx = True
while self._enable_rx:
# Create receive buffer and read in samples to buffer
# Add them to a list to convert and save after stream is finished
buffer = bytearray(self.rx_buffer_size * self.bytes_per_sample)
self.device.sync_rx(buffer, self.rx_buffer_size)
signal = self._convert_rx_samples(buffer)
self.buffer = buffer
# send callback complex signal
callback(buffer=signal, metadata=None)
# Disable module
print("Blade RX Completed.")
self.rx_ch.enable = False
def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None):
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
if num_samples is not None and rx_time is not None:
raise ValueError("Only input one of num_samples or rx_time")
elif num_samples is not None:
self._num_samples_to_record = num_samples
elif rx_time is not None:
self._num_samples_to_record = int(rx_time * self.rx_sample_rate)
else:
raise ValueError("Must provide input of one of num_samples or rx_time")
# Setup synchronous stream
self.device.sync_config(
layout=_bladerf.ChannelLayout.RX_X1,
fmt=_bladerf.Format.SC16_Q11,
num_buffers=16,
buffer_size=self.rx_buffer_size,
num_transfers=8,
stream_timeout=3500000000,
)
self.rx_ch.enable = True
self.bytes_per_sample = 4
print("Blade Starting RX...")
self._enable_rx = True
store_array = np.zeros(
(1, (self._num_samples_to_record // self.rx_buffer_size + 1) * self.rx_buffer_size), dtype=np.complex64
)
for i in range(self._num_samples_to_record // self.rx_buffer_size + 1):
# Create receive buffer and read in samples to buffer
# Add them to a list to convert and save after stream is finished
buffer = bytearray(self.rx_buffer_size * self.bytes_per_sample)
self.device.sync_rx(buffer, self.rx_buffer_size)
signal = self._convert_rx_samples(buffer)
store_array[:, i * self.rx_buffer_size : (i + 1) * self.rx_buffer_size] = signal
# Disable module
print("Blade RX Completed.")
self.rx_ch.enable = False
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate,
"center_frequency": self.rx_center_frequency,
"gain": self.rx_gain,
}
return Recording(data=store_array[:, : self._num_samples_to_record], metadata=metadata)
def init_tx(
self,
sample_rate: int | float,
@ -150,6 +223,9 @@ class Blade(SDR):
:type channel: int
:param buffer_size: The buffer size during transmission. Defaults to 8192.
:type buffer_size: int
:param gain_mode: 'absolute' passes gain directly to the sdr,
'relative' means that gain should be a negative value, and it will be subtracted from the max gain (60).
:type gain_mode: str
"""
# Configure BladeRF
@ -178,87 +254,36 @@ class Blade(SDR):
self._rx_initialized = False
return 0
def _stream_rx(self, callback):
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
def _stream_tx(self, callback):
# Setup synchronous stream
# Setup stream
self.device.sync_config(
layout=_bladerf.ChannelLayout.RX_X1,
layout=_bladerf.ChannelLayout.TX_X1,
fmt=_bladerf.Format.SC16_Q11,
num_buffers=16,
buffer_size=self.rx_buffer_size,
buffer_size=8192,
num_transfers=8,
stream_timeout=3500000000,
stream_timeout=3500,
)
self.rx_ch.enable = True
self.bytes_per_sample = 4
# Enable module
self.tx_ch.enable = True
self._enable_tx = True
print("Blade Starting RX...")
self._enable_rx = True
print("Blade Starting TX...")
while self._enable_rx:
# Create receive buffer and read in samples to buffer
# Add them to a list to convert and save after stream is finished
buffer = bytearray(self.rx_buffer_size * self.bytes_per_sample)
self.device.sync_rx(buffer, self.rx_buffer_size)
signal = self._convert_rx_samples(buffer)
# samples = convert_to_2xn(signal)
self.buffer = buffer
# send callback complex signal
callback(buffer=signal, metadata=None)
while self._enable_tx:
buffer = callback(self.tx_buffer_size) # [0]
byte_array = self._convert_tx_samples(buffer)
self.device.sync_tx(byte_array, len(buffer))
# Disable module
print("Blade RX Completed.")
self.rx_ch.enable = False
def record(self, num_samples):
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
# Setup synchronous stream
self.device.sync_config(
layout=_bladerf.ChannelLayout.RX_X1,
fmt=_bladerf.Format.SC16_Q11,
num_buffers=16,
buffer_size=self.rx_buffer_size,
num_transfers=8,
stream_timeout=3500000000,
)
self.rx_ch.enable = True
self.bytes_per_sample = 4
print("Blade Starting RX...")
self._enable_rx = True
store_array = np.zeros((1, (num_samples // self.rx_buffer_size + 1) * self.rx_buffer_size), dtype=np.complex64)
for i in range(num_samples // self.rx_buffer_size + 1):
# Create receive buffer and read in samples to buffer
# Add them to a list to convert and save after stream is finished
buffer = bytearray(self.rx_buffer_size * self.bytes_per_sample)
self.device.sync_rx(buffer, self.rx_buffer_size)
signal = self._convert_rx_samples(buffer)
# samples = convert_to_2xn(signal)
store_array[:, i * self.rx_buffer_size : (i + 1) * self.rx_buffer_size] = signal
# Disable module
print("Blade RX Completed.")
self.rx_ch.enable = False
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate,
"center_frequency": self.rx_center_frequency,
"gain": self.rx_gain,
}
return Recording(data=store_array[:, :num_samples], metadata=metadata)
print("Blade TX Completed.")
self.tx_ch.enable = False
def tx_recording(
self,
recording: "Recording | np.ndarray",
recording: Recording | np.ndarray,
num_samples: Optional[int] = None,
tx_time: Optional[int | float] = None,
):
@ -275,9 +300,6 @@ class Blade(SDR):
recording to this length. Defaults to None.
:type tx_time: int or float, optional
"""
import warnings
import time
from ria_toolkit_oss.datatypes.recording import Recording
if num_samples is not None and tx_time is not None:
raise ValueError("Only input one of num_samples or tx_time")
@ -327,7 +349,7 @@ class Blade(SDR):
sample_index = 0
chunk_size = min(self.tx_buffer_size, len(samples))
chunk = samples[sample_index:sample_index + chunk_size]
chunk = samples[sample_index : sample_index + chunk_size]
sample_index += chunk_size
# Convert and transmit
@ -341,33 +363,6 @@ class Blade(SDR):
print("Blade TX Completed.")
self.tx_ch.enable = False
def _stream_tx(self, callback):
# Setup stream
self.device.sync_config(
layout=_bladerf.ChannelLayout.TX_X1,
fmt=_bladerf.Format.SC16_Q11,
num_buffers=16,
buffer_size=8192,
num_transfers=8,
stream_timeout=3500,
)
# Enable module
self.tx_ch.enable = True
self._enable_tx = True
print("Blade Starting TX...")
while self._enable_tx:
buffer = callback(self.tx_buffer_size) # [0]
byte_array = self._convert_tx_samples(buffer)
self.device.sync_tx(byte_array, len(buffer))
# Disable module
print("Blade TX Completed.")
self.tx_ch.enable = False
def _convert_rx_samples(self, samples):
samples = np.frombuffer(samples, dtype=np.int16).astype(np.float32)
samples /= 2048
@ -486,3 +481,22 @@ class Blade(SDR):
print(f"Clock source set to {self.device.get_clock_select()}")
print(f"PLL Reference set to {self.device.get_pll_refclk()}")
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool, channel: Optional[int] = None):
if channel is None:
channel = getattr(self, "rx_channel", getattr(self, "tx_channel", 0))
try:
bladerf_channel = _bladerf.CHANNEL_RX(channel)
self.device.set_bias_tee(bladerf_channel, bool(enable))
except AttributeError as exc: # pragma: no cover - depends on libbladeRF version
raise NotImplementedError("bladeRF binding lacks bias-tee control") from exc
state = "enabled" if enable else "disabled"
print(f"BladeRF bias tee {state} on channel {channel}.")
def close(self):
self.device.close()

View File

@ -1,6 +1,5 @@
import time
import warnings
import math
from typing import Optional
import numpy as np
@ -36,16 +35,14 @@ class HackRF(SDR):
super().__init__()
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool):
try:
self.radio.set_antenna_enable(bool(enable))
except AttributeError as exc: # pragma: no cover - defensive
raise NotImplementedError("Underlying HackRF interface lacks bias-tee control") from exc
def init_rx(self, sample_rate, center_frequency, gain, channel, gain_mode):
def init_rx(
self,
sample_rate: int | float,
center_frequency: int | float,
gain: int,
channel: int,
gain_mode: Optional[str] = "absolute",
):
"""
Initializes the HackRF for receiving.
@ -58,11 +55,12 @@ class HackRF(SDR):
:type sample_rate: int or float
:param center_frequency: The center frequency of the recording.
:type center_frequency: int or float
:param gain: The total gain set for receiving on the HackRF (distributed across stages)
:param gain: The LNA gain set for receiving on the HackRF
:type gain: int
:param channel: The channel the HackRF is set to. (Not actually used)
:type channel: int
:param gain_mode: Gain mode setting. Currently only "absolute" is supported.
:param gain_mode: 'absolute' passes gain directly to the sdr,
'relative' means that gain should be a negative value, and it will be subtracted from the max gain (40).
:type gain_mode: str
"""
print("Initializing RX")
@ -77,7 +75,7 @@ class HackRF(SDR):
# Distribute gain across amplifier stages
rx_gain_min = 0
rx_gain_max = 116 # 14 (amp) + 40 (LNA) + 62 (VGA)
rx_gain_max = 40 # (LNA)
if gain_mode == "relative":
if gain > 0:
@ -95,42 +93,61 @@ class HackRF(SDR):
print(f"Gain {gain} out of range for HackRF.")
print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB")
# Distribute gain using the signal-testbed algorithm
enable_amp = False
remaining_gain = abs_gain
self.set_gain_amp(False)
self.set_rx_vga_gain(45)
self.set_rx_lna_gain(abs_gain)
# Enable 14 dB pre-amp if gain is high enough
if remaining_gain > 30:
remaining_gain = remaining_gain - 14
enable_amp = True
print("HackRF: 14dB front-end amplifier enabled.")
print(f"HackRF gain distribution: Amp={self.amp_enabled}, LNA={self.rx_lna_gain}dB, VGA={self.rx_vga_gain}dB")
print("To individually modify the HackRF gains, use set_gain_amp(), set_rx_lna_gain(), and set_rx_vga_gain().")
# Distribute remaining gain between LNA and VGA
# LNA gets 60% of remaining gain, rounded down to 8 dB steps
lna_gain = math.floor(remaining_gain * 0.6)
lna_gain = lna_gain - (lna_gain % 8) # Round to 8 dB steps
if lna_gain > 40:
lna_gain = 40
# VGA gets the rest
vga_gain = remaining_gain - lna_gain
if vga_gain > 62:
vga_gain = 62
# Apply gain settings
if enable_amp:
self.radio.enable_amp()
else:
self.radio.disable_amp()
self.radio.set_lna_gain(lna_gain)
self.radio.set_vga_gain(vga_gain)
self.rx_gain = abs_gain
print(f"HackRF gain distribution: Amp={enable_amp}, LNA={lna_gain}dB, VGA={vga_gain}dB")
self._rx_initialized = True
self._tx_initialized = False
self._rx_initialized = True
def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None):
"""
Create a radio recording (iq samples and metadata) of a given length from the SDR.
HackRF uses block capture mode, which is more reliable than streaming for USB2 connections.
Either num_samples or rx_time must be provided.
init_rx() must be called before record()
:param num_samples: The number of samples to record.
:type num_samples: int, optional
:param rx_time: The time to record.
:type rx_time: int or float, optional
returns: Recording object (iq samples and metadata)
"""
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
if num_samples is not None and rx_time is not None:
raise ValueError("Only input one of num_samples or rx_time")
elif num_samples is not None:
self._num_samples_to_record = num_samples
elif rx_time is not None:
self._num_samples_to_record = int(rx_time * self.rx_sample_rate)
else:
raise ValueError("Must provide input of one of num_samples or rx_time")
print("HackRF Starting RX...")
# Use libhackrf's block capture method
all_samples = self.radio.read_samples(self._num_samples_to_record)
print("HackRF RX Completed.")
# Create 1xN array for single-channel recording
store_array = np.zeros((1, self._num_samples_to_record), dtype=np.complex64)
store_array[0, :] = all_samples
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate,
"center_frequency": self.rx_center_frequency,
"gain": self.rx_gain,
}
return Recording(data=store_array, metadata=metadata)
def init_tx(
self,
@ -164,8 +181,6 @@ class HackRF(SDR):
self.radio.center_freq = int(center_frequency)
print(f"HackRF center frequency = {self.radio.center_freq}")
self.radio.enable_amp()
tx_gain_min = 0
tx_gain_max = 47
if gain_mode == "relative":
@ -184,8 +199,10 @@ class HackRF(SDR):
print(f"Gain {gain} out of range for Pluto.")
print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB")
self.radio.txvga_gain = abs_gain
print(f"HackRF gain = {self.radio.txvga_gain}")
self.set_gain_amp(True)
self.set_tx_vga_gain(abs_gain)
print(f"HackRF gain distribution: Amp={self.amp_enabled}, VGA={self.tx_vga_gain}dB")
print("To individually modify the HackRF gains, use set_gain_amp() or set_tx_vga_gain().")
self._tx_initialized = True
self._rx_initialized = False
@ -236,46 +253,41 @@ class HackRF(SDR):
self.radio.stop_tx()
print("HackRF Tx Completed.")
def set_clock_source(self, source):
def set_gain_amp(self, enable):
if enable:
self.radio.enable_amp()
self.amp_enabled = True
else:
self.radio.disable_amp()
self.amp_enabled = False
def set_rx_lna_gain(self, lna_gain):
self.radio.set_lna_gain(lna_gain)
self.rx_lna_gain = lna_gain
def set_rx_vga_gain(self, vga_gain):
self.radio.set_vga_gain(vga_gain)
self.rx_vga_gain = vga_gain
def set_tx_vga_gain(self, vga_gain):
self.radio.set_txvga_gain(vga_gain)
self.tx_vga_gain = vga_gain
def set_clock_source(self, source):
self.radio.set_clock_source(source)
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool):
try:
self.radio.set_antenna_enable(bool(enable))
except AttributeError as exc: # pragma: no cover - defensive
raise NotImplementedError("Underlying HackRF interface lacks bias-tee control") from exc
def close(self):
self.radio.close()
def record(self, num_samples):
"""
Record a specified number of samples from the HackRF using block capture mode.
This is more reliable than streaming for USB2 connections.
:param num_samples: Number of samples to capture
:type num_samples: int
:return: Recording object containing the captured data
:rtype: Recording
"""
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before record()")
print("HackRF Starting RX...")
# Use libhackrf's block capture method
all_samples = self.radio.read_samples(num_samples)
print("HackRF RX Completed.")
# Create 1xN array for single-channel recording
store_array = np.zeros((1, num_samples), dtype=np.complex64)
store_array[0, :] = all_samples
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate,
"center_frequency": self.rx_center_frequency,
"gain": self.rx_gain,
}
return Recording(data=store_array, metadata=metadata)
def _stream_rx(self, callback):
"""
Stream samples from the HackRF using a callback function.
@ -300,6 +312,7 @@ class HackRF(SDR):
# Use ctypes string_at to safely copy the buffer
from ctypes import string_at
byte_data = string_at(c.buffer, c.valid_length)
# Convert bytes to int8, then to float32, then view as complex64

View File

@ -48,6 +48,7 @@ class Pluto(SDR):
print(f"Successfully found MIMO-capable Pluto (Rev C/D) with identifier [{identifier}].")
else:
# Non-MIMO hardware (Rev B) - use standard Pluto driver
del test_radio
self.radio = adi.Pluto(uri)
self._mimo_capable = False
print(f"Successfully found Pluto (Rev B) with identifier [{identifier}].")
@ -75,8 +76,9 @@ class Pluto(SDR):
:type gain: int
:param channel: The channel the Pluto is set to. Must be 0 or 1. 0 enables channel 1, 1 enables both channels.
:type channel: int
:param buffer_size: The buffer size during receive. Defaults to 10000.
:type buffer_size: int
:param gain_mode: 'absolute' passes gain directly to the sdr,
'relative' means that gain should be a negative value, and it will be subtracted from the max gain (74).
:type gain_mode: str
"""
print("Initializing RX")
@ -100,36 +102,20 @@ class Pluto(SDR):
else:
raise ValueError("Channel must be either 0 or 1.")
rx_gain_min = 0
rx_gain_max = 74
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets \
the gain relative to the maximum possible gain."
)
else:
abs_gain = rx_gain_max + gain
else:
abs_gain = gain
if abs_gain < rx_gain_min or abs_gain > rx_gain_max:
abs_gain = min(max(gain, rx_gain_min), rx_gain_max)
print(f"Gain {gain} out of range for Pluto.")
print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB")
self.set_rx_gain(gain=abs_gain, channel=channel)
self.set_rx_gain(gain=gain, channel=channel, gain_mode=gain_mode)
if channel == 0:
print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}")
elif channel == 1:
self.set_rx_gain(gain=abs_gain, channel=0)
self.set_rx_gain(gain=gain, channel=0, gain_mode=gain_mode)
print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}, {self.radio.rx_hardwaregain_chan1}")
self.radio.rx_buffer_size = 1024 # TODO deal with this for zmq
self.set_rx_buffer_size(getattr(self, "rx_buffer_size", 1024))
self._rx_initialized = True
self._tx_initialized = False
return {"sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain}
def init_tx(
self,
sample_rate: int | float,
@ -150,8 +136,9 @@ class Pluto(SDR):
:type gain: int
:param channel: The channel the Pluto is set to. Must be 0 or 1. 0 enables channel 1, 1 enables both channels.
:type channel: int
:param buffer_size: The buffer size during transmit. Defaults to 10000.
:type buffer_size: int
:param gain_mode: 'absolute' passes gain directly to the sdr,
'relative' means that gain should be a negative value, and it will be subtracted from the max gain (0).
:type gain_mode: str
"""
print("Initializing TX")
@ -162,7 +149,10 @@ class Pluto(SDR):
self.set_tx_center_frequency(center_frequency=int(center_frequency))
print(f"Pluto center frequency = {self.radio.tx_lo}")
if channel == 1:
if channel == 0:
self.radio.tx_enabled_channels = [0]
print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}")
elif channel == 1:
if not self._mimo_capable:
raise ValueError(
"Dual TX channel requested (channel=1) but hardware is not MIMO-capable. "
@ -170,41 +160,21 @@ class Pluto(SDR):
)
self.radio.tx_enabled_channels = [0, 1]
print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}")
elif channel == 0:
self.radio.tx_enabled_channels = [0]
print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}")
else:
raise ValueError("Channel must be either 0 or 1.")
tx_gain_min = -89
tx_gain_max = 0
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets\
the gain relative to the maximum possible gain."
)
else:
abs_gain = tx_gain_max + gain
else:
abs_gain = gain
if abs_gain < tx_gain_min or abs_gain > tx_gain_max:
abs_gain = min(max(gain, tx_gain_min), tx_gain_max)
print(f"Gain {gain} out of range for Pluto.")
print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB")
self.set_tx_gain(gain=abs_gain, channel=channel)
self.set_tx_gain(gain=gain, channel=channel, gain_mode=gain_mode)
if channel == 0:
print(f"Pluto gain = {self.radio.tx_hardwaregain_chan0}")
elif channel == 1:
self.set_tx_gain(gain=abs_gain, channel=0)
self.set_tx_gain(gain=gain, channel=0, gain_mode=gain_mode)
print(f"Pluto gain = {self.radio.tx_hardwaregain_chan0}, {self.radio.tx_hardwaregain_chan1}")
self._tx_initialized = True
self._rx_initialized = False
return {"sample_rate": self.tx_sample_rate, "center_frequency": self.tx_center_frequency, "gain": self.tx_gain}
def _stream_rx(self, callback):
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
@ -323,11 +293,6 @@ class Pluto(SDR):
self.radio.tx_cyclic_buffer = False
print("Pluto TX Completed.")
def close(self):
if self.radio.tx_cyclic_buffer:
self.radio.tx_destroy_buffer()
del self.radio
def tx_recording(self, recording: Recording | np.ndarray | list, num_samples=None, tx_time=None, mode="timed"):
"""
Transmit the given iq samples from the provided recording.
@ -407,28 +372,47 @@ class Pluto(SDR):
except ValueError as e:
_handle_OSError(e)
def set_rx_gain(self, gain, channel=0):
self.rx_gain = gain
def set_rx_gain(self, gain, channel=0, gain_mode="absolute"):
rx_gain_min = 0
rx_gain_max = 74
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets \
the gain relative to the maximum possible gain."
)
else:
abs_gain = rx_gain_max + gain
else:
abs_gain = gain
if abs_gain < rx_gain_min or abs_gain > rx_gain_max:
abs_gain = min(max(gain, rx_gain_min), rx_gain_max)
print(f"Gain {gain} out of range for Pluto.")
print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB")
self.rx_gain = abs_gain
try:
if channel == 0:
if gain is None:
if abs_gain is None:
self.radio.gain_control_mode_chan0 = "automatic"
print("Using Pluto Automatic Gain Control.")
else:
self.radio.gain_control_mode_chan0 = "manual"
self.radio.rx_hardwaregain_chan0 = gain # dB
self.radio.rx_hardwaregain_chan0 = abs_gain # dB
elif channel == 1:
try:
if gain is None:
if abs_gain is None:
self.radio.gain_control_mode_chan1 = "automatic"
print("Using Pluto Automatic Gain Control.")
else:
self.radio.gain_control_mode_chan1 = "manual"
self.radio.rx_hardwaregain_chan1 = gain # dB
self.radio.rx_hardwaregain_chan1 = abs_gain # dB
except Exception as e:
print("Failed to use channel 1 on the PlutoSDR. \nThis is only available for revC versions.")
@ -443,10 +427,31 @@ class Pluto(SDR):
_handle_OSError(e)
def set_rx_channel(self, channel):
self.rx_channel = channel
if channel == 0:
self.radio.rx_enabled_channels = [0]
print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}")
elif channel == 1:
self.radio.rx_enabled_channels = [0, 1]
print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}")
else:
raise ValueError("Channel must be either 0 or 1.")
def set_rx_buffer_size(self, buffer_size):
raise NotImplementedError
if buffer_size is None:
raise ValueError("Buffer_size must be provided.")
buffer_size = int(buffer_size)
if buffer_size <= 0:
raise ValueError("Buffer_size must be a positive integer.")
self.rx_buffer_size = buffer_size
if hasattr(self, "radio"):
try:
self.radio.rx_buffer_size = buffer_size
except OSError as e:
_handle_OSError(e)
except ValueError as e:
_handle_OSError(e)
def set_tx_center_frequency(self, center_frequency):
try:
@ -468,14 +473,33 @@ class Pluto(SDR):
except ValueError as e:
_handle_OSError(e)
def set_tx_gain(self, gain, channel=0):
def set_tx_gain(self, gain, channel=0, gain_mode="absolute"):
tx_gain_min = -89
tx_gain_max = 0
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets\
the gain relative to the maximum possible gain."
)
else:
abs_gain = tx_gain_max + gain
else:
abs_gain = gain
if abs_gain < tx_gain_min or abs_gain > tx_gain_max:
abs_gain = min(max(gain, tx_gain_min), tx_gain_max)
print(f"Gain {gain} out of range for Pluto.")
print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB")
try:
self.tx_gain = gain
self.tx_gain = abs_gain
if channel == 0:
self.radio.tx_hardwaregain_chan0 = int(gain)
self.radio.tx_hardwaregain_chan0 = int(abs_gain)
elif channel == 1:
self.radio.tx_hardwaregain_chan1 = int(gain)
self.radio.tx_hardwaregain_chan1 = int(abs_gain)
else:
raise ValueError(f"Pluto channel must be 0 or 1 but was {channel}.")
@ -485,11 +509,23 @@ class Pluto(SDR):
_handle_OSError(e)
def set_tx_channel(self, channel):
raise NotImplementedError
if channel == 1:
self.radio.tx_enabled_channels = [0, 1]
print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}")
elif channel == 0:
self.radio.tx_enabled_channels = [0]
print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}")
else:
raise ValueError("Channel must be either 0 or 1.")
def set_tx_buffer_size(self, buffer_size):
raise NotImplementedError
def close(self):
if self.radio.tx_cyclic_buffer:
self.radio.tx_destroy_buffer()
del self.radio
def shutdown(self):
del self.radio

View File

@ -11,35 +11,42 @@ try:
except ImportError as exc: # pragma: no cover - dependency provided by end user
raise ImportError("pyrtlsdr is required to use the RTLSDR class") from exc
from ria_toolkit_oss.datatypes.recording import Recording
from ria_toolkit_oss.sdr.sdr import SDR
class RTLSDR(SDR):
"""SDR interface for RTL-SDR dongles using pyrtlsdr."""
def __init__(self, identifier: Optional[int | str] = None):
super().__init__()
def __init__(self, identifier: Optional[str] = None):
"""
Initialize a Pluto SDR device object and connect to the SDR hardware.
This software supports the ADALM Pluto SDR created by Analog Devices.
:param identifier: The value of the parameter that identifies the device.
:type identifier: str = "192.168.3.1", "pluto.local", etc
If no identifier is provided, it will select the first device found, with a warning.
If more than one device is found with the identifier, it will select the first of those devices.
"""
print(f"Initializing Pluto radio with identifier [{identifier}].")
try:
super().__init__()
if identifier is None:
self.radio = RtlSdr()
else:
self.radio = RtlSdr(identifier)
self.rx_buffer_size = 256_000
self.rx_channel = 0
print(f"Initialized RTL-SDR with identifier [{identifier}].")
except Exception as exc:
print(f"Failed to initialize RTL-SDR with identifier [{identifier}].")
raise exc
self.rx_buffer_size = 256_000
self.rx_channel = 0
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool):
self.radio.set_bias_tee(bool(enable))
state = "enabled" if enable else "disabled"
print(f"RTL-SDR bias tee {state}.")
except Exception as e:
print(f"Failed to find RTL-SDR with identifier [{identifier}].")
raise e
def init_rx(
self,
@ -54,43 +61,9 @@ class RTLSDR(SDR):
if channel not in (0, None):
raise ValueError("RTL-SDR supports only channel 0 for RX.")
self.radio.sample_rate = float(sample_rate)
self.rx_sample_rate = self.radio.sample_rate
self.radio.center_freq = float(center_frequency)
self.rx_center_frequency = self.radio.center_freq
available_gains = getattr(self.radio, "gains", [])
if gain is None:
self.radio.gain = "auto"
self.rx_gain = "auto"
else:
if not available_gains:
warnings.warn(
"No gain table reported by RTL-SDR; applying requested gain directly.",
RuntimeWarning,
)
target_gain = gain
else:
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets the gain relative to the maximum."
)
target_gain = max(available_gains) + gain
else:
target_gain = gain
min_gain = min(available_gains)
max_gain = max(available_gains)
if target_gain < min_gain or target_gain > max_gain:
print(f"Requested gain {target_gain} dB out of range; clamping to valid span {min_gain}-{max_gain} dB.")
target_gain = min(max(target_gain, min_gain), max_gain)
target_gain = min(available_gains, key=lambda g: abs(g - target_gain))
self.radio.gain = target_gain
self.rx_gain = self.radio.gain
self.set_rx_sample_rate(sample_rate=sample_rate)
self.set_rx_center_frequency(center_frequency=center_frequency)
self.set_rx_gain(gain=gain, gain_mode=gain_mode)
self.rx_buffer_size = int(buffer_size or self.rx_buffer_size)
self.rx_channel = 0
@ -102,25 +75,112 @@ class RTLSDR(SDR):
self._rx_initialized = True
self._tx_initialized = False
def init_tx(self, *args, **kwargs): # pragma: no cover - RTL-SDR is RX only
raise NotImplementedError("RTL-SDR does not support transmit operations")
return {"sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain}
def record(self, num_samples):
def get_rx_sample_rate(self):
"""
Record a fixed number of samples from RTL-SDR.
Args:
num_samples: Number of samples to capture
Retrieve the current sample rate of the receiver.
Returns:
Recording object with captured samples
float: The receiver's sample rate in samples per second (Hz).
"""
return self.rx_sample_rate
def get_rx_center_frequency(self):
"""
Retrieve the current center frequency of the receiver.
Returns:
float: The receiver's center frequency in Hertz (Hz).
"""
return self.rx_center_frequency
def get_rx_gain(self):
"""
Retrieve the current gain setting of the receiver.
Returns:
float: The receiver's gain in decibels (dB).
"""
return self.rx_gain
def set_rx_sample_rate(self, sample_rate):
self.radio.sample_rate = float(sample_rate)
self.rx_sample_rate = self.radio.sample_rate
print(f"RTL RX Sample Rate = {self.radio.get_sample_rate()}")
def set_rx_center_frequency(self, center_frequency):
self.radio.center_freq = float(center_frequency)
self.rx_center_frequency = self.radio.center_freq
print(f"RTL RX Center Frequency = {self.radio.get_center_freq()}")
def set_rx_gain(self, gain, gain_mode="absolute"):
available_gains = self.radio.get_gains()
if gain is None:
self.radio.gain = "auto"
self.rx_gain = "auto"
else:
if not available_gains:
warnings.warn(
"No gain table reported by RTL-SDR; applying requested gain directly.",
RuntimeWarning,
)
target_gain = gain
else:
min_gain = min(available_gains)
max_gain = max(available_gains)
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets\
the gain relative to the maximum possible gain."
)
target_gain = max_gain + gain
else:
target_gain = gain
if target_gain < min_gain or target_gain > max_gain:
print(
f"Requested gain {target_gain} dB out of range;\
clamping to valid span {min_gain}-{max_gain} dB."
)
target_gain = min(max(target_gain, min_gain), max_gain)
target_gain = min(available_gains, key=lambda g: abs(g - target_gain))
self.radio.set_gain(target_gain)
self.rx_gain = self.radio.get_gain()
print(f"RTL RX Gain = {self.radio.get_gain()}")
print(f"Available RTL RX Gains: {available_gains}")
def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None):
"""
Create a radio recording (iq samples and metadata) of a given length from the RTL-SDR.
Either num_samples or rx_time must be provided.
init_rx() must be called before record()
:param num_samples: The number of samples to record.
:type num_samples: int, optional
:param rx_time: The time to record.
:type rx_time: int or float, optional
returns: Recording object (iq samples and metadata)
"""
from ria_toolkit_oss.datatypes.recording import Recording
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before record().")
print("RTL-SDR Starting RX...")
if num_samples is not None and rx_time is not None:
raise ValueError("Only input one of num_samples or rx_time")
elif num_samples is not None:
pass
elif rx_time is not None:
num_samples = int(rx_time * self.rx_sample_rate)
else:
raise ValueError("Must provide input of one of num_samples or rx_time")
# RTL-SDR has USB buffer limitations - use consistent 256k chunks
# Always read full chunks to avoid USB overflow issues with partial reads
@ -129,8 +189,10 @@ class RTLSDR(SDR):
remainder = num_samples % max_samples_per_read
signal = np.array([], dtype=np.complex64)
print("RTL-SDR Starting RX...")
# Read full chunks
for i in range(num_full_reads):
for _ in range(num_full_reads):
try:
chunk = self.radio.read_samples(max_samples_per_read)
signal = np.append(signal, chunk)
@ -150,10 +212,6 @@ class RTLSDR(SDR):
print("RTL-SDR RX Completed.")
# Create 1xN array for single-channel recording
store_array = np.zeros((1, len(signal)), dtype=np.complex64)
store_array[0, :] = signal
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate,
@ -161,7 +219,7 @@ class RTLSDR(SDR):
"gain": self.rx_gain,
}
return Recording(data=store_array, metadata=metadata)
return Recording(data=signal, metadata=metadata)
def _stream_rx(self, callback):
if not self._rx_initialized:
@ -179,12 +237,28 @@ class RTLSDR(SDR):
def _stream_tx(self, callback): # pragma: no cover - RTL-SDR is RX only
raise NotImplementedError("RTL-SDR does not support transmit operations")
def init_tx(self, *args, **kwargs): # pragma: no cover - RTL-SDR is RX only
raise NotImplementedError("RTL-SDR does not support transmit operations")
def tx_recording(
self, recording: Recording | np.ndarray | list, num_samples=None, tx_time=None
): # pragma: no cover - RTL-SDR is RX only
raise NotImplementedError("RTL-SDR does not support transmit operations")
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool):
self.radio.set_bias_tee(bool(enable))
state = "enabled" if enable else "disabled"
print(f"RTL-SDR bias tee {state}.")
def set_clock_source(self, source): # pragma: no cover - not applicable to RTL-SDR
raise NotImplementedError("RTL-SDR does not support external clock configuration")
def close(self):
try:
self.radio.close()
finally:
self._enable_rx = False
self._enable_tx = False
def set_clock_source(self, source): # pragma: no cover - not applicable to RTL-SDR
raise NotImplementedError("RTL-SDR does not support external clock configuration")

View File

@ -295,26 +295,27 @@ class SDR(ABC):
return samples
def supports_bias_tee(self) -> bool:
"""Return True when the radio supports bias-tee control."""
return False
def set_bias_tee(self, enable: bool):
"""Enable or disable bias-tee power when supported by the radio."""
raise NotImplementedError(f"{self.__class__.__name__} does not support bias-tee control")
def pause_rx(self):
self._enable_rx = False
def pause_tx(self):
self._enable_tx = False
def stop(self):
self.pause_rx()
def supports_bias_tee(self) -> bool:
"""Return True when the radio supports bias-tee control."""
return False
def set_bias_tee(self, enable: bool):
"""Enable or disable bias-tee power when supported by the radio."""
raise NotImplementedError(f"{self.__class__.__name__} does not support bias-tee control")
@abstractmethod
def close(self):
pass
def stop(self):
self.pause_rx()
self.pause_tx()
@abstractmethod
def close(self):
pass
@abstractmethod
def init_rx(self, sample_rate, center_frequency, gain, channel, gain_mode):

View File

@ -116,21 +116,7 @@ class ThinkRF(SDR):
raise ValueError("ThinkRF devices expose a single receive channel")
stream_mode = getattr(self, "_capture_mode", "block") == "stream"
# Enforce sample rate / decimation
# Note: decimation parameter takes precedence if provided
actual_decimation, actual_sample_rate = self.enforce_sample_rate(sample_rate, decimation)
if stream_mode and actual_decimation < self._min_stream_decimation:
enforced = self._min_stream_decimation
print(
"Requested ThinkRF sample rate exceeds typical GigE throughput; "
f"enforcing decimation {enforced} for streaming."
)
actual_decimation = enforced
actual_sample_rate = self.BASE_SAMPLE_RATE / actual_decimation
self._decimation = actual_decimation
actual_decimation, actual_sample_rate = self.set_rx_sample_rate(sample_rate=sample_rate, decimation=decimation)
self.radio.reset()
self.radio.scpiset(":SYSTEM:FLUSH")
@ -138,9 +124,11 @@ class ThinkRF(SDR):
self.radio.scpiset(":TRACE:STREAM:STOP")
except Exception:
pass
self.radio.rfe_mode(self._rfe_mode)
self.radio.freq(int(center_frequency))
attenuation = self._attenuation if gain is None else int(gain)
self.set_rx_center_frequency(center_frequency=center_frequency)
attenuation = self._attenuation if gain is None else int(gain) # gain
attenuation = max(0, min(attenuation, 30))
self.radio.attenuator(attenuation)
@ -159,12 +147,12 @@ class ThinkRF(SDR):
if stream_mode:
self._streaming_active = False
else:
print(f"ThinkRF: Configuring block capture - SPP={self._samples_per_packet}, PPB={self._packets_per_block}")
print(
f"ThinkRF: Configuring block capture - SPP={self._samples_per_packet}, PPB={self._packets_per_block}"
)
self.radio.scpiset(f":TRACE:BLOCK:PACKETS {self._packets_per_block}")
self.radio.scpiset(":TRACE:BLOCK:DATA?")
self.rx_sample_rate = actual_sample_rate
self.rx_center_frequency = center_frequency
self.rx_gain = {
"attenuation_dB": attenuation,
"profile": gain_profile,
@ -179,21 +167,35 @@ class ThinkRF(SDR):
self._rx_initialized = True
self._tx_initialized = False
def init_tx(
self,
sample_rate: int | float,
center_frequency: int | float,
gain: int,
channel: int,
gain_mode: Optional[str] = "absolute",
):
raise NotImplementedError("ThinkRF devices do not support transmit operations")
def set_rx_sample_rate(self, sample_rate, decimation, stream_mode):
# Enforce sample rate / decimation
# Note: decimation parameter takes precedence if provided
actual_decimation, actual_sample_rate = self.enforce_sample_rate(sample_rate, decimation)
if stream_mode and actual_decimation < self._min_stream_decimation:
enforced = self._min_stream_decimation
print(
"Requested ThinkRF sample rate exceeds typical GigE throughput; "
f"enforcing decimation {enforced} for streaming."
)
actual_decimation = enforced
actual_sample_rate = self.BASE_SAMPLE_RATE / actual_decimation
self._decimation = actual_decimation
self.rx_sample_rate = actual_sample_rate
print(f"ThinkRF RX Sample Rate = {actual_sample_rate}")
return actual_decimation, actual_sample_rate
def set_rx_center_frequency(self, center_frequency):
self.radio.freq(int(center_frequency))
self.rx_center_frequency = self.radio.freq
print(f"ThinkRF RX Center Frequency = {self.radio.freq}")
def _stream_rx(self, callback):
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record().")
print("ThinkRF Starting RX...")
self._enable_rx = True
packets_processed = 0
stream_mode = getattr(self, "_capture_mode", "block") == "stream"
@ -206,18 +208,9 @@ class ThinkRF(SDR):
print(f"Failed to start ThinkRF stream: {exc}")
return
print("ThinkRF Starting RX...")
while self._enable_rx:
try:
packet = self.radio.read()
except Exception as exc:
# In block mode, reaching end of block can cause exceptions
# This is normal - just stop reading
if not stream_mode and packets_processed > 0:
# Got some packets in block mode, finish gracefully
print(f"ThinkRF: Block read complete ({packets_processed} packets received)")
break
print(f"ThinkRF read error: {exc}")
break
packet = self._safe_read(stream_mode, packets_processed)
if packet is None:
# No more packets available
@ -234,32 +227,13 @@ class ThinkRF(SDR):
# Unknown packet type - skip
continue
# packet.data is an iterable IQData object that yields (I, Q) tuples
# Convert to numpy array: collect all [I, Q] pairs
try:
# Iterate through packet.data to get all IQ pairs
iq_pairs = list(packet.data) # List of (I, Q) tuples
if not iq_pairs:
continue
# Convert to numpy array [N, 2]
iq_array = np.array(iq_pairs, dtype=np.float32)
# Extract I and Q channels and create complex buffer
complex_buffer = (iq_array[:, 0] + 1j * iq_array[:, 1]).astype(np.complex64)
except Exception as e:
print(f"Error extracting IQ from packet.data: {e}")
metadata = metadata = self._extract_metadata(packet)
complex_buffer = self._extract_iq(packet)
if complex_buffer is None:
continue
metadata = None
if hasattr(packet, "fields"):
metadata = packet.fields
if metadata.get("sample_loss"):
print("\033[93mWarning: ThinkRF sample overflow detected\033[0m")
# Send packet data to callback (accumulation handled by parent)
callback(buffer=complex_buffer, metadata=metadata)
packets_processed += 1
# In block mode, stop after receiving all packets in the block
@ -269,14 +243,61 @@ class ThinkRF(SDR):
print("ThinkRF RX Completed.")
if stream_mode and self._streaming_active:
try:
self.radio.scpiset(":TRACE:STREAM:STOP")
except Exception:
pass
self._streaming_active = False
self._stop_stream()
self.radio.scpiset(":SYSTEM:FLUSH")
def _safe_read(self, stream_mode, packets_processed):
packet = None
try:
packet = self.radio.read()
except Exception as e:
# In block mode, reaching end of block can cause exceptions
if not stream_mode and packets_processed > 0:
# We got some packets in block mode, so finish gracefully
print(f"ThinkRF: Block read complete ({packets_processed} packets received)")
else:
print(f"ThinkRF read error: {e}")
return packet
def _extract_iq(self, packet):
# packet.data is an iterable IQData object that yields (I, Q) tuples
# Convert to numpy array: collect all [I, Q] pairs
try:
iq_pairs = list(packet.data)
if not iq_pairs:
return None
iq_array = np.array(iq_pairs, dtype=np.float32)
return (iq_array[:, 0] + 1j * iq_array[:, 1]).astype(np.complex64)
except Exception as e:
print(f"Error extracting IQ from packet.data: {e}")
return None
def _extract_metadata(self, packet):
if not hasattr(packet, "fields"):
return None
metadata = packet.fields
if metadata.get("sample_loss"):
print("\033[93mWarning: ThinkRF sample overflow detected\033[0m")
return metadata
def _stop_stream(self):
try:
self.radio.scpiset(":TRACE:STREAM:STOP")
except Exception:
pass
self._streaming_active = False
def init_tx(
self,
sample_rate: int | float,
center_frequency: int | float,
gain: int,
channel: int,
gain_mode: Optional[str] = "absolute",
):
raise NotImplementedError("ThinkRF devices do not support transmit operations")
def _stream_tx(self, callback):
raise NotImplementedError("ThinkRF devices do not support transmit operations")
@ -333,7 +354,9 @@ class ThinkRF(SDR):
return int(best)
def enforce_sample_rate(self, requested_sample_rate: int | float, decimation: Optional[int] = None) -> tuple[int, float]:
def enforce_sample_rate(
self, requested_sample_rate: int | float, decimation: Optional[int] = None
) -> tuple[int, float]:
"""
Enforce valid sample rate and decimation.
@ -356,7 +379,10 @@ class ThinkRF(SDR):
actual_sample_rate = self.BASE_SAMPLE_RATE / decimation
if abs(actual_sample_rate - requested_sample_rate) > 1e3: # More than 1 kHz difference
print(f"ThinkRF: Requested {requested_sample_rate/1e6:.2f} MS/s → Using decimation={decimation} ({actual_sample_rate/1e6:.2f} MS/s)")
print(
f"ThinkRF: Requested {requested_sample_rate/1e6:.2f} MS/s → \
Using decimation={decimation} ({actual_sample_rate/1e6:.2f} MS/s)"
)
return decimation, actual_sample_rate
@ -391,7 +417,9 @@ class ThinkRF(SDR):
actual_samples = actual_spp * ppb
if actual_samples != num_samples:
print(f"ThinkRF: Requested {num_samples} samples → Capturing {actual_samples} (SPP={actual_spp}, PPB={ppb})")
print(
f"ThinkRF: Requested {num_samples} samples → Capturing {actual_samples} (SPP={actual_spp}, PPB={ppb})"
)
return actual_spp, ppb

View File

@ -17,11 +17,11 @@ class USRP(SDR):
This software supports all USRP SDRs created by Ettus Research.
:param identifier: Identifier of the device. Can be an IP address (e.g. "192.168.0.0"),
a device name (e.g. "MyB210"), or any name/address found via ``uhd_find_devices``.
If not provided, the first available device is selected with a warning.
If multiple devices match the identifier, the first one is selected.
:type identifier: str, optional
:param identifier: The value of the parameter that identifies the device.
:type identifier: str = "192.168.0.0", "MyB210", name or address found in uhd_find_devices
If no identifier is provided, it will select the first device found, with a warning.
If more than one device is found with the identifier, it will select the first of those devices.
"""
super().__init__()
@ -43,29 +43,23 @@ class USRP(SDR):
rx_buffer_size: int = 960000,
):
"""
Initialize the USRP for receiving.
Initializes the USRP for receiving.
:param sample_rate: The sample rate for receiving.
:type sample_rate: int or float
:param center_frequency: The center frequency of the recording.
:type center_frequency: int or float
:param gain: The gain set for receiving on the USRP
:type gain: int
:param channel: The channel the USRP is set to.
:type channel: int
:param gain: The gain set for receiving on the USRP.
:type gain: int
:param gain_mode: Gain mode setting. ``"absolute"`` passes gain directly to the SDR.
``"relative"`` means gain should be a negative value, which will be subtracted
from the maximum gain.
:param gain_mode: 'absolute' passes gain directly to the sdr,
'relative' means that gain should be a negative value, and it will be subtracted from the max gain.
:type gain_mode: str
:param rx_buffer_size: Internal buffer size for receiving samples. Defaults to 960000.
:type rx_buffer_size: int
:return: Dictionary with the actual RX parameters after configuration.
:return: A dictionary with the actual RX parameters after configuration.
:rtype: dict
"""
@ -80,59 +74,12 @@ class USRP(SDR):
if channel + 1 > max_num_channels:
raise IOError(f"Channel {channel} not valid for device with {max_num_channels} channels.")
# check if gain arg is valid
gain_range = self.usrp.get_rx_gain_range()
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets\
the gain relative to the maximum possible gain."
)
else:
# set gain relative to max
abs_gain = gain_range.stop() + gain
else:
abs_gain = gain
if abs_gain < gain_range.start() or abs_gain > gain_range.stop():
print(f"Gain {abs_gain} out of range for this USRP.")
print(f"Gain range: {gain_range.start()} to {gain_range.stop()} dB")
abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop())
self.usrp.set_rx_gain(abs_gain, channel)
self.set_rx_sample_rate(sample_rate=sample_rate, channel=channel)
self.set_rx_center_frequency(center_frequency=center_frequency, channel=channel)
self.set_rx_gain(gain=gain, gain_mode=gain_mode, channel=channel)
# check if sample rate arg is valid
# Note: B200/B210 devices auto-adjust master clock rate, so get_rx_rates() returns
# the range for the CURRENT master clock, not the maximum possible range.
# Skip validation for B-series devices and let UHD handle it.
device_type = self.device_dict.get("type", "").lower()
if device_type not in ["b200", "b210"]:
sample_rate_range = self.usrp.get_rx_rates()
if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop():
raise IOError(
f"Sample rate {sample_rate} not valid for this USRP.\nValid\
range is {sample_rate_range.start()}\
to {sample_rate_range.stop()}."
)
self.usrp.set_rx_rate(sample_rate, channel)
center_frequency_range = self.usrp.get_rx_freq_range()
if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop():
raise IOError(
f"Center frequency {center_frequency} out of range for USRP.\
\nValid range is {center_frequency_range.start()} \
to {center_frequency_range.stop()}."
)
self.usrp.set_rx_freq(uhd.libpyuhd.types.tune_request(center_frequency), channel)
# set internal variables for metadata
self.rx_sample_rate = self.usrp.get_rx_rate(channel)
self.rx_gain = self.usrp.get_rx_gain(channel)
self.rx_center_frequency = self.usrp.get_rx_freq(channel)
self.rx_channel = channel
print(f"USRP RX Sample Rate = {self.rx_sample_rate}")
print(f"USRP RX Center Frequency = {self.rx_center_frequency}")
print(f"USRP RX Channel = {self.rx_channel}")
print(f"USRP RX Gain = {self.rx_gain}")
# flag to prevent user from calling certain functions before this one.
self._rx_initialized = True
@ -167,6 +114,58 @@ class USRP(SDR):
"""
return self.rx_gain
def set_rx_sample_rate(self, sample_rate, channel=0):
# check if sample rate arg is valid
# Note: B200/B210 devices auto-adjust master clock rate, so get_rx_rates() returns
# the range for the CURRENT master clock, not the maximum possible range.
# Skip validation for B-series devices and let UHD handle it.
device_type = self.device_dict.get("type", "").lower()
if device_type not in ["b200", "b210"]:
sample_rate_range = self.usrp.get_rx_rates()
if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop():
raise IOError(
f"Sample rate {sample_rate} not valid for this USRP.\nValid\
range is {sample_rate_range.start()}\
to {sample_rate_range.stop()}."
)
self.usrp.set_rx_rate(sample_rate, channel)
self.rx_sample_rate = self.usrp.get_rx_rate(channel)
print(f"USRP RX Sample Rate = {self.rx_sample_rate}")
def set_rx_center_frequency(self, center_frequency, channel=0):
center_frequency_range = self.usrp.get_rx_freq_range()
if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop():
raise IOError(
f"Center frequency {center_frequency} out of range for USRP.\
\nValid range is {center_frequency_range.start()} \
to {center_frequency_range.stop()}."
)
self.usrp.set_rx_freq(uhd.libpyuhd.types.tune_request(center_frequency), channel)
self.rx_center_frequency = self.usrp.get_rx_freq(channel)
print(f"USRP RX Center Frequency = {self.rx_center_frequency}")
def set_rx_gain(self, gain, gain_mode="absolute", channel=0):
# check if gain arg is valid
gain_range = self.usrp.get_rx_gain_range()
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets\
the gain relative to the maximum possible gain."
)
else:
# set gain relative to max
abs_gain = gain_range.stop() + gain
else:
abs_gain = gain
if abs_gain < gain_range.start() or abs_gain > gain_range.stop():
print(f"Gain {abs_gain} out of range for this USRP.")
print(f"Gain range: {gain_range.start()} to {gain_range.stop()} dB")
abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop())
self.usrp.set_rx_gain(abs_gain, channel)
self.rx_gain = self.usrp.get_rx_gain(channel)
print(f"USRP RX Gain = {self.rx_gain}")
def _stream_rx(self, callback):
if not self._rx_initialized:
@ -211,10 +210,31 @@ class USRP(SDR):
del self.rx_stream
print("USRP RX Completed.")
def record(self, num_samples):
def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None):
"""
Create a radio recording (iq samples and metadata) of a given length from the USRP.
Either num_samples or rx_time must be provided.
init_rx() must be called before record()
:param num_samples: The number of samples to record.
:type num_samples: int, optional
:param rx_time: The time to record.
:type rx_time: int or float, optional
returns: Recording object (iq samples and metadata)
"""
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
if num_samples is not None and rx_time is not None:
raise ValueError("Only input one of num_samples or rx_time")
elif num_samples is not None:
pass
elif rx_time is not None:
num_samples = int(rx_time * self.rx_sample_rate)
else:
raise ValueError("Must provide input of one of num_samples or rx_time")
stream_args = uhd.usrp.StreamArgs("fc32", "sc16")
stream_args.channels = [self.rx_channel]
@ -269,23 +289,18 @@ class USRP(SDR):
gain_mode: Optional[str] = "absolute",
):
"""
Initialize the USRP for transmitting.
Initializes the USRP for transmitting.
:param sample_rate: The sample rate for transmitting.
:type sample_rate: int or float
:param center_frequency: The center frequency of the recording.
:type center_frequency: int or float
:param gain: The gain set for transmitting on the USRP.
:param gain: The gain set for transmitting on the USRP
:type gain: int
:param channel: The channel the USRP is set to.
:type channel: int
:param gain_mode: Gain mode setting. ``"absolute"`` passes gain directly to the SDR.
``"relative"`` means gain should be a negative value, which will be subtracted
from the maximum gain.
:param gain_mode: 'absolute' passes gain directly to the sdr,
'relative' means that gain should be a negative value, and it will be subtracted from the max gain.
:type gain_mode: str
"""
@ -301,6 +316,79 @@ class USRP(SDR):
if channel + 1 > max_num_channels:
raise IOError(f"Channel {channel} not valid for device with {max_num_channels} channels.")
self.set_tx_sample_rate(sample_rate=sample_rate, channel=channel)
self.set_tx_center_frequency(center_frequency=center_frequency, channel=channel)
self.set_tx_gain(gain=gain, gain_mode=gain_mode, channel=channel)
self.tx_channel = channel
print(f"USRP TX Channel = {self.tx_channel}")
self.usrp.set_clock_source("internal")
self.usrp.set_time_source("internal")
self.usrp.set_tx_antenna("TX/RX", channel)
self._tx_initialized = True
self._rx_initialized = False
return {"sample_rate": self.tx_sample_rate, "center_frequency": self.tx_center_frequency, "gain": self.tx_gain}
def get_tx_sample_rate(self):
"""
Retrieve the current sample rate of the transmitter.
Returns:
float: The transmitter's sample rate in samples per second (Hz).
"""
return self.tx_sample_rate
def get_tx_center_frequency(self):
"""
Retrieve the current center frequency of the transmitter.
Returns:
float: The transmitter's center frequency in Hertz (Hz).
"""
return self.tx_center_frequency
def get_tx_gain(self):
"""
Retrieve the current gain setting of the transmitter.
Returns:
float: The transmitter's gain in decibels (dB).
"""
return self.tx_gain
def set_tx_sample_rate(self, sample_rate, channel=0):
# check if sample rate arg is valid
# Note: B200/B210 devices auto-adjust master clock rate, so get_tx_rates() returns
# the range for the CURRENT master clock, not the maximum possible range.
# Skip validation for B-series devices and let UHD handle it.
device_type = self.device_dict.get("type", "").lower()
if device_type not in ["b200", "b210"]:
sample_rate_range = self.usrp.get_tx_rates()
if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop():
raise IOError(
f"Sample rate {sample_rate} not valid for this USRP.\nValid\
range is {sample_rate_range.start()} to {sample_rate_range.stop()}."
)
self.usrp.set_tx_rate(sample_rate, channel)
self.tx_sample_rate = self.usrp.get_tx_rate(channel)
print(f"USRP TX Sample Rate = {self.tx_sample_rate}")
def set_tx_center_frequency(self, center_frequency, channel=0):
center_frequency_range = self.usrp.get_tx_freq_range()
if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop():
raise IOError(
f"Center frequency {center_frequency} out of range for USRP.\
\nValid range is {center_frequency_range.start()}\
to {center_frequency_range.stop()}."
)
self.usrp.set_tx_freq(uhd.types.TuneRequest(center_frequency), channel)
self.tx_center_frequency = self.usrp.get_tx_freq(channel)
print(f"USRP TX Center Frequency = {self.tx_center_frequency}")
def set_tx_gain(self, gain, gain_mode="absolute", channel=0):
# Ensure gain is within valid range
gain_range = self.usrp.get_tx_gain_range()
if gain_mode == "relative":
@ -320,50 +408,9 @@ class USRP(SDR):
abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop())
self.usrp.set_tx_gain(abs_gain, channel)
# check if sample rate arg is valid
# Note: B200/B210 devices auto-adjust master clock rate, so get_tx_rates() returns
# the range for the CURRENT master clock, not the maximum possible range.
# Skip validation for B-series devices and let UHD handle it.
device_type = self.device_dict.get("type", "").lower()
if device_type not in ["b200", "b210"]:
sample_rate_range = self.usrp.get_tx_rates()
if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop():
raise IOError(
f"Sample rate {sample_rate} not valid for this USRP.\nValid\
range is {sample_rate_range.start()} to {sample_rate_range.stop()}."
)
self.usrp.set_tx_rate(sample_rate, channel)
center_frequency_range = self.usrp.get_tx_freq_range()
if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop():
raise IOError(
f"Center frequency {center_frequency} out of range for USRP.\
\nValid range is {center_frequency_range.start()}\
to {center_frequency_range.stop()}."
)
self.usrp.set_tx_freq(uhd.libpyuhd.types.tune_request(center_frequency), channel)
self.usrp.set_clock_source("internal")
self.usrp.set_time_source("internal")
self.usrp.set_tx_rate(sample_rate)
self.usrp.set_tx_freq(uhd.types.TuneRequest(center_frequency), channel)
self.usrp.set_tx_antenna("TX/RX", channel)
# set internal variables for metadata
self.tx_sample_rate = self.usrp.get_tx_rate(channel)
self.tx_gain = self.usrp.get_tx_gain(channel)
self.tx_center_frequency = self.usrp.get_tx_freq(channel)
self.tx_channel = channel
print(f"USRP TX Sample Rate = {self.tx_sample_rate}")
print(f"USRP TX Center Frequency = {self.tx_center_frequency}")
print(f"USRP TX Channel = {self.tx_channel}")
print(f"USRP TX Gain = {self.tx_gain}")
self._tx_initialized = True
self._rx_initialized = False
def close(self):
pass