Compare commits

..

2 Commits

Author SHA1 Message Date
c2b47ead95 Updated, edited, and cleaned up SDR files 2025-10-16 15:22:07 -04:00
34faa57ea4 Applied updates, linting, etc. 2025-10-16 15:02:09 -04:00
10 changed files with 765 additions and 547 deletions

View File

@ -168,6 +168,10 @@ Additional usage information is provided in the project documentation: [RIA Tool
Kindly report any issues on RIA Hub: [RIA Toolkit OSS Issues Board](https://riahub.ai/qoherent/ria-toolkit-oss/issues). Kindly report any issues on RIA Hub: [RIA Toolkit OSS Issues Board](https://riahub.ai/qoherent/ria-toolkit-oss/issues).
### Upcoming Changes
The ThinkRF package is currently pending further testing and potential updates.
## 🤝 Contribution ## 🤝 Contribution
Contributions are always welcome! Whether it's an enhancement, bug fix, or new example, your input is valuable. If you'd like to contribute to the project, please reach out to the project maintainers. Contributions are always welcome! Whether it's an enhancement, bug fix, or new example, your input is valuable. If you'd like to contribute to the project, please reach out to the project maintainers.

View File

@ -11,6 +11,7 @@ authors = [
maintainers = [ maintainers = [
{ name = "Benjamin Chinnery", email = "ben@qoherent.ai" }, { name = "Benjamin Chinnery", email = "ben@qoherent.ai" },
{ name = "Ashkan Beigi", email = "ash@qoherent.ai" }, { name = "Ashkan Beigi", email = "ash@qoherent.ai" },
{ name = "Madrigal Weersink", email = "madrigal@qoherent.ai" },
] ]
keywords = [ keywords = [
"radio", "radio",

View File

@ -13,8 +13,8 @@ Run this after installing pyrf:
python scripts/fix_pyrf_python3.py python scripts/fix_pyrf_python3.py
""" """
from pathlib import Path
from lib2to3.refactor import RefactoringTool, get_fixers_from_package from lib2to3.refactor import RefactoringTool, get_fixers_from_package
from pathlib import Path
try: try:
import pyrf import pyrf
@ -36,7 +36,7 @@ print(f"Found pyrf ThinkRF module at: {thinkrf_path}")
# Apply lib2to3 fixes # Apply lib2to3 fixes
print("Applying Python 3 compatibility fixes...") print("Applying Python 3 compatibility fixes...")
fixers = get_fixers_from_package('lib2to3.fixes') fixers = get_fixers_from_package("lib2to3.fixes")
tool = RefactoringTool(fixers) tool = RefactoringTool(fixers)
tool.refactor_file(str(thinkrf_path), write=True) tool.refactor_file(str(thinkrf_path), write=True)

View File

@ -1,3 +1,5 @@
import time
import warnings
from typing import Optional from typing import Optional
import numpy as np import numpy as np
@ -35,22 +37,6 @@ class Blade(SDR):
super().__init__() super().__init__()
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool, channel: Optional[int] = None):
if channel is None:
channel = getattr(self, "rx_channel", getattr(self, "tx_channel", 0))
try:
bladerf_channel = _bladerf.CHANNEL_RX(channel)
self.device.set_bias_tee(bladerf_channel, bool(enable))
except AttributeError as exc: # pragma: no cover - depends on libbladeRF version
raise NotImplementedError("bladeRF binding lacks bias-tee control") from exc
state = "enabled" if enable else "disabled"
print(f"BladeRF bias tee {state} on channel {channel}.")
def _shutdown(self, error=0, board=None): def _shutdown(self, error=0, board=None):
print("Shutting down with error code: " + str(error)) print("Shutting down with error code: " + str(error))
if board is not None: if board is not None:
@ -83,9 +69,6 @@ class Blade(SDR):
print("FPGA version:\t\t" + str(device.get_fpga_version())) print("FPGA version:\t\t" + str(device.get_fpga_version()))
return 0 return 0
def close(self):
self.device.close()
def init_rx( def init_rx(
self, self,
sample_rate: int | float, sample_rate: int | float,
@ -108,6 +91,9 @@ class Blade(SDR):
:type channel: int :type channel: int
:param buffer_size: The buffer size during receive. Defaults to 8192. :param buffer_size: The buffer size during receive. Defaults to 8192.
:type buffer_size: int :type buffer_size: int
:param gain_mode: 'absolute' passes gain directly to the sdr,
'relative' means that gain should be a negative value, and it will be subtracted from the max gain (60).
:type gain_mode: str
""" """
print("Initializing RX") print("Initializing RX")
@ -128,6 +114,93 @@ class Blade(SDR):
self._rx_initialized = True self._rx_initialized = True
self._tx_initialized = False self._tx_initialized = False
def _stream_rx(self, callback):
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
# Setup synchronous stream
self.device.sync_config(
layout=_bladerf.ChannelLayout.RX_X1,
fmt=_bladerf.Format.SC16_Q11,
num_buffers=16,
buffer_size=self.rx_buffer_size,
num_transfers=8,
stream_timeout=3500000000,
)
self.rx_ch.enable = True
self.bytes_per_sample = 4
print("Blade Starting RX...")
self._enable_rx = True
while self._enable_rx:
# Create receive buffer and read in samples to buffer
# Add them to a list to convert and save after stream is finished
buffer = bytearray(self.rx_buffer_size * self.bytes_per_sample)
self.device.sync_rx(buffer, self.rx_buffer_size)
signal = self._convert_rx_samples(buffer)
self.buffer = buffer
# send callback complex signal
callback(buffer=signal, metadata=None)
# Disable module
print("Blade RX Completed.")
self.rx_ch.enable = False
def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None):
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
if num_samples is not None and rx_time is not None:
raise ValueError("Only input one of num_samples or rx_time")
elif num_samples is not None:
self._num_samples_to_record = num_samples
elif rx_time is not None:
self._num_samples_to_record = int(rx_time * self.rx_sample_rate)
else:
raise ValueError("Must provide input of one of num_samples or rx_time")
# Setup synchronous stream
self.device.sync_config(
layout=_bladerf.ChannelLayout.RX_X1,
fmt=_bladerf.Format.SC16_Q11,
num_buffers=16,
buffer_size=self.rx_buffer_size,
num_transfers=8,
stream_timeout=3500000000,
)
self.rx_ch.enable = True
self.bytes_per_sample = 4
print("Blade Starting RX...")
self._enable_rx = True
store_array = np.zeros(
(1, (self._num_samples_to_record // self.rx_buffer_size + 1) * self.rx_buffer_size), dtype=np.complex64
)
for i in range(self._num_samples_to_record // self.rx_buffer_size + 1):
# Create receive buffer and read in samples to buffer
# Add them to a list to convert and save after stream is finished
buffer = bytearray(self.rx_buffer_size * self.bytes_per_sample)
self.device.sync_rx(buffer, self.rx_buffer_size)
signal = self._convert_rx_samples(buffer)
store_array[:, i * self.rx_buffer_size : (i + 1) * self.rx_buffer_size] = signal
# Disable module
print("Blade RX Completed.")
self.rx_ch.enable = False
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate,
"center_frequency": self.rx_center_frequency,
"gain": self.rx_gain,
}
return Recording(data=store_array[:, : self._num_samples_to_record], metadata=metadata)
def init_tx( def init_tx(
self, self,
sample_rate: int | float, sample_rate: int | float,
@ -150,6 +223,9 @@ class Blade(SDR):
:type channel: int :type channel: int
:param buffer_size: The buffer size during transmission. Defaults to 8192. :param buffer_size: The buffer size during transmission. Defaults to 8192.
:type buffer_size: int :type buffer_size: int
:param gain_mode: 'absolute' passes gain directly to the sdr,
'relative' means that gain should be a negative value, and it will be subtracted from the max gain (60).
:type gain_mode: str
""" """
# Configure BladeRF # Configure BladeRF
@ -178,87 +254,36 @@ class Blade(SDR):
self._rx_initialized = False self._rx_initialized = False
return 0 return 0
def _stream_rx(self, callback): def _stream_tx(self, callback):
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
# Setup synchronous stream # Setup stream
self.device.sync_config( self.device.sync_config(
layout=_bladerf.ChannelLayout.RX_X1, layout=_bladerf.ChannelLayout.TX_X1,
fmt=_bladerf.Format.SC16_Q11, fmt=_bladerf.Format.SC16_Q11,
num_buffers=16, num_buffers=16,
buffer_size=self.rx_buffer_size, buffer_size=8192,
num_transfers=8, num_transfers=8,
stream_timeout=3500000000, stream_timeout=3500,
) )
self.rx_ch.enable = True # Enable module
self.bytes_per_sample = 4 self.tx_ch.enable = True
self._enable_tx = True
print("Blade Starting RX...") print("Blade Starting TX...")
self._enable_rx = True
while self._enable_rx: while self._enable_tx:
# Create receive buffer and read in samples to buffer buffer = callback(self.tx_buffer_size) # [0]
# Add them to a list to convert and save after stream is finished byte_array = self._convert_tx_samples(buffer)
buffer = bytearray(self.rx_buffer_size * self.bytes_per_sample) self.device.sync_tx(byte_array, len(buffer))
self.device.sync_rx(buffer, self.rx_buffer_size)
signal = self._convert_rx_samples(buffer)
# samples = convert_to_2xn(signal)
self.buffer = buffer
# send callback complex signal
callback(buffer=signal, metadata=None)
# Disable module # Disable module
print("Blade RX Completed.") print("Blade TX Completed.")
self.rx_ch.enable = False self.tx_ch.enable = False
def record(self, num_samples):
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
# Setup synchronous stream
self.device.sync_config(
layout=_bladerf.ChannelLayout.RX_X1,
fmt=_bladerf.Format.SC16_Q11,
num_buffers=16,
buffer_size=self.rx_buffer_size,
num_transfers=8,
stream_timeout=3500000000,
)
self.rx_ch.enable = True
self.bytes_per_sample = 4
print("Blade Starting RX...")
self._enable_rx = True
store_array = np.zeros((1, (num_samples // self.rx_buffer_size + 1) * self.rx_buffer_size), dtype=np.complex64)
for i in range(num_samples // self.rx_buffer_size + 1):
# Create receive buffer and read in samples to buffer
# Add them to a list to convert and save after stream is finished
buffer = bytearray(self.rx_buffer_size * self.bytes_per_sample)
self.device.sync_rx(buffer, self.rx_buffer_size)
signal = self._convert_rx_samples(buffer)
# samples = convert_to_2xn(signal)
store_array[:, i * self.rx_buffer_size : (i + 1) * self.rx_buffer_size] = signal
# Disable module
print("Blade RX Completed.")
self.rx_ch.enable = False
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate,
"center_frequency": self.rx_center_frequency,
"gain": self.rx_gain,
}
return Recording(data=store_array[:, :num_samples], metadata=metadata)
def tx_recording( def tx_recording(
self, self,
recording: "Recording | np.ndarray", recording: Recording | np.ndarray,
num_samples: Optional[int] = None, num_samples: Optional[int] = None,
tx_time: Optional[int | float] = None, tx_time: Optional[int | float] = None,
): ):
@ -275,9 +300,6 @@ class Blade(SDR):
recording to this length. Defaults to None. recording to this length. Defaults to None.
:type tx_time: int or float, optional :type tx_time: int or float, optional
""" """
import warnings
import time
from ria_toolkit_oss.datatypes.recording import Recording
if num_samples is not None and tx_time is not None: if num_samples is not None and tx_time is not None:
raise ValueError("Only input one of num_samples or tx_time") raise ValueError("Only input one of num_samples or tx_time")
@ -327,7 +349,7 @@ class Blade(SDR):
sample_index = 0 sample_index = 0
chunk_size = min(self.tx_buffer_size, len(samples)) chunk_size = min(self.tx_buffer_size, len(samples))
chunk = samples[sample_index:sample_index + chunk_size] chunk = samples[sample_index : sample_index + chunk_size]
sample_index += chunk_size sample_index += chunk_size
# Convert and transmit # Convert and transmit
@ -341,33 +363,6 @@ class Blade(SDR):
print("Blade TX Completed.") print("Blade TX Completed.")
self.tx_ch.enable = False self.tx_ch.enable = False
def _stream_tx(self, callback):
# Setup stream
self.device.sync_config(
layout=_bladerf.ChannelLayout.TX_X1,
fmt=_bladerf.Format.SC16_Q11,
num_buffers=16,
buffer_size=8192,
num_transfers=8,
stream_timeout=3500,
)
# Enable module
self.tx_ch.enable = True
self._enable_tx = True
print("Blade Starting TX...")
while self._enable_tx:
buffer = callback(self.tx_buffer_size) # [0]
byte_array = self._convert_tx_samples(buffer)
self.device.sync_tx(byte_array, len(buffer))
# Disable module
print("Blade TX Completed.")
self.tx_ch.enable = False
def _convert_rx_samples(self, samples): def _convert_rx_samples(self, samples):
samples = np.frombuffer(samples, dtype=np.int16).astype(np.float32) samples = np.frombuffer(samples, dtype=np.int16).astype(np.float32)
samples /= 2048 samples /= 2048
@ -486,3 +481,22 @@ class Blade(SDR):
print(f"Clock source set to {self.device.get_clock_select()}") print(f"Clock source set to {self.device.get_clock_select()}")
print(f"PLL Reference set to {self.device.get_pll_refclk()}") print(f"PLL Reference set to {self.device.get_pll_refclk()}")
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool, channel: Optional[int] = None):
if channel is None:
channel = getattr(self, "rx_channel", getattr(self, "tx_channel", 0))
try:
bladerf_channel = _bladerf.CHANNEL_RX(channel)
self.device.set_bias_tee(bladerf_channel, bool(enable))
except AttributeError as exc: # pragma: no cover - depends on libbladeRF version
raise NotImplementedError("bladeRF binding lacks bias-tee control") from exc
state = "enabled" if enable else "disabled"
print(f"BladeRF bias tee {state} on channel {channel}.")
def close(self):
self.device.close()

View File

@ -1,6 +1,5 @@
import time import time
import warnings import warnings
import math
from typing import Optional from typing import Optional
import numpy as np import numpy as np
@ -36,16 +35,14 @@ class HackRF(SDR):
super().__init__() super().__init__()
def supports_bias_tee(self) -> bool: def init_rx(
return True self,
sample_rate: int | float,
def set_bias_tee(self, enable: bool): center_frequency: int | float,
try: gain: int,
self.radio.set_antenna_enable(bool(enable)) channel: int,
except AttributeError as exc: # pragma: no cover - defensive gain_mode: Optional[str] = "absolute",
raise NotImplementedError("Underlying HackRF interface lacks bias-tee control") from exc ):
def init_rx(self, sample_rate, center_frequency, gain, channel, gain_mode):
""" """
Initializes the HackRF for receiving. Initializes the HackRF for receiving.
@ -58,11 +55,12 @@ class HackRF(SDR):
:type sample_rate: int or float :type sample_rate: int or float
:param center_frequency: The center frequency of the recording. :param center_frequency: The center frequency of the recording.
:type center_frequency: int or float :type center_frequency: int or float
:param gain: The total gain set for receiving on the HackRF (distributed across stages) :param gain: The LNA gain set for receiving on the HackRF
:type gain: int :type gain: int
:param channel: The channel the HackRF is set to. (Not actually used) :param channel: The channel the HackRF is set to. (Not actually used)
:type channel: int :type channel: int
:param gain_mode: Gain mode setting. Currently only "absolute" is supported. :param gain_mode: 'absolute' passes gain directly to the sdr,
'relative' means that gain should be a negative value, and it will be subtracted from the max gain (40).
:type gain_mode: str :type gain_mode: str
""" """
print("Initializing RX") print("Initializing RX")
@ -77,7 +75,7 @@ class HackRF(SDR):
# Distribute gain across amplifier stages # Distribute gain across amplifier stages
rx_gain_min = 0 rx_gain_min = 0
rx_gain_max = 116 # 14 (amp) + 40 (LNA) + 62 (VGA) rx_gain_max = 40 # (LNA)
if gain_mode == "relative": if gain_mode == "relative":
if gain > 0: if gain > 0:
@ -95,42 +93,61 @@ class HackRF(SDR):
print(f"Gain {gain} out of range for HackRF.") print(f"Gain {gain} out of range for HackRF.")
print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB") print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB")
# Distribute gain using the signal-testbed algorithm self.set_gain_amp(False)
enable_amp = False self.set_rx_vga_gain(45)
remaining_gain = abs_gain self.set_rx_lna_gain(abs_gain)
# Enable 14 dB pre-amp if gain is high enough print(f"HackRF gain distribution: Amp={self.amp_enabled}, LNA={self.rx_lna_gain}dB, VGA={self.rx_vga_gain}dB")
if remaining_gain > 30: print("To individually modify the HackRF gains, use set_gain_amp(), set_rx_lna_gain(), and set_rx_vga_gain().")
remaining_gain = remaining_gain - 14
enable_amp = True
print("HackRF: 14dB front-end amplifier enabled.")
# Distribute remaining gain between LNA and VGA
# LNA gets 60% of remaining gain, rounded down to 8 dB steps
lna_gain = math.floor(remaining_gain * 0.6)
lna_gain = lna_gain - (lna_gain % 8) # Round to 8 dB steps
if lna_gain > 40:
lna_gain = 40
# VGA gets the rest
vga_gain = remaining_gain - lna_gain
if vga_gain > 62:
vga_gain = 62
# Apply gain settings
if enable_amp:
self.radio.enable_amp()
else:
self.radio.disable_amp()
self.radio.set_lna_gain(lna_gain)
self.radio.set_vga_gain(vga_gain)
self.rx_gain = abs_gain
print(f"HackRF gain distribution: Amp={enable_amp}, LNA={lna_gain}dB, VGA={vga_gain}dB")
self._rx_initialized = True
self._tx_initialized = False self._tx_initialized = False
self._rx_initialized = True
def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None):
"""
Create a radio recording (iq samples and metadata) of a given length from the SDR.
HackRF uses block capture mode, which is more reliable than streaming for USB2 connections.
Either num_samples or rx_time must be provided.
init_rx() must be called before record()
:param num_samples: The number of samples to record.
:type num_samples: int, optional
:param rx_time: The time to record.
:type rx_time: int or float, optional
returns: Recording object (iq samples and metadata)
"""
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
if num_samples is not None and rx_time is not None:
raise ValueError("Only input one of num_samples or rx_time")
elif num_samples is not None:
self._num_samples_to_record = num_samples
elif rx_time is not None:
self._num_samples_to_record = int(rx_time * self.rx_sample_rate)
else:
raise ValueError("Must provide input of one of num_samples or rx_time")
print("HackRF Starting RX...")
# Use libhackrf's block capture method
all_samples = self.radio.read_samples(self._num_samples_to_record)
print("HackRF RX Completed.")
# Create 1xN array for single-channel recording
store_array = np.zeros((1, self._num_samples_to_record), dtype=np.complex64)
store_array[0, :] = all_samples
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate,
"center_frequency": self.rx_center_frequency,
"gain": self.rx_gain,
}
return Recording(data=store_array, metadata=metadata)
def init_tx( def init_tx(
self, self,
@ -164,8 +181,6 @@ class HackRF(SDR):
self.radio.center_freq = int(center_frequency) self.radio.center_freq = int(center_frequency)
print(f"HackRF center frequency = {self.radio.center_freq}") print(f"HackRF center frequency = {self.radio.center_freq}")
self.radio.enable_amp()
tx_gain_min = 0 tx_gain_min = 0
tx_gain_max = 47 tx_gain_max = 47
if gain_mode == "relative": if gain_mode == "relative":
@ -184,8 +199,10 @@ class HackRF(SDR):
print(f"Gain {gain} out of range for Pluto.") print(f"Gain {gain} out of range for Pluto.")
print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB") print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB")
self.radio.txvga_gain = abs_gain self.set_gain_amp(True)
print(f"HackRF gain = {self.radio.txvga_gain}") self.set_tx_vga_gain(abs_gain)
print(f"HackRF gain distribution: Amp={self.amp_enabled}, VGA={self.tx_vga_gain}dB")
print("To individually modify the HackRF gains, use set_gain_amp() or set_tx_vga_gain().")
self._tx_initialized = True self._tx_initialized = True
self._rx_initialized = False self._rx_initialized = False
@ -236,46 +253,41 @@ class HackRF(SDR):
self.radio.stop_tx() self.radio.stop_tx()
print("HackRF Tx Completed.") print("HackRF Tx Completed.")
def set_clock_source(self, source): def set_gain_amp(self, enable):
if enable:
self.radio.enable_amp()
self.amp_enabled = True
else:
self.radio.disable_amp()
self.amp_enabled = False
def set_rx_lna_gain(self, lna_gain):
self.radio.set_lna_gain(lna_gain)
self.rx_lna_gain = lna_gain
def set_rx_vga_gain(self, vga_gain):
self.radio.set_vga_gain(vga_gain)
self.rx_vga_gain = vga_gain
def set_tx_vga_gain(self, vga_gain):
self.radio.set_txvga_gain(vga_gain)
self.tx_vga_gain = vga_gain
def set_clock_source(self, source):
self.radio.set_clock_source(source) self.radio.set_clock_source(source)
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool):
try:
self.radio.set_antenna_enable(bool(enable))
except AttributeError as exc: # pragma: no cover - defensive
raise NotImplementedError("Underlying HackRF interface lacks bias-tee control") from exc
def close(self): def close(self):
self.radio.close() self.radio.close()
def record(self, num_samples):
"""
Record a specified number of samples from the HackRF using block capture mode.
This is more reliable than streaming for USB2 connections.
:param num_samples: Number of samples to capture
:type num_samples: int
:return: Recording object containing the captured data
:rtype: Recording
"""
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before record()")
print("HackRF Starting RX...")
# Use libhackrf's block capture method
all_samples = self.radio.read_samples(num_samples)
print("HackRF RX Completed.")
# Create 1xN array for single-channel recording
store_array = np.zeros((1, num_samples), dtype=np.complex64)
store_array[0, :] = all_samples
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate,
"center_frequency": self.rx_center_frequency,
"gain": self.rx_gain,
}
return Recording(data=store_array, metadata=metadata)
def _stream_rx(self, callback): def _stream_rx(self, callback):
""" """
Stream samples from the HackRF using a callback function. Stream samples from the HackRF using a callback function.
@ -300,6 +312,7 @@ class HackRF(SDR):
# Use ctypes string_at to safely copy the buffer # Use ctypes string_at to safely copy the buffer
from ctypes import string_at from ctypes import string_at
byte_data = string_at(c.buffer, c.valid_length) byte_data = string_at(c.buffer, c.valid_length)
# Convert bytes to int8, then to float32, then view as complex64 # Convert bytes to int8, then to float32, then view as complex64

View File

@ -48,6 +48,7 @@ class Pluto(SDR):
print(f"Successfully found MIMO-capable Pluto (Rev C/D) with identifier [{identifier}].") print(f"Successfully found MIMO-capable Pluto (Rev C/D) with identifier [{identifier}].")
else: else:
# Non-MIMO hardware (Rev B) - use standard Pluto driver # Non-MIMO hardware (Rev B) - use standard Pluto driver
del test_radio
self.radio = adi.Pluto(uri) self.radio = adi.Pluto(uri)
self._mimo_capable = False self._mimo_capable = False
print(f"Successfully found Pluto (Rev B) with identifier [{identifier}].") print(f"Successfully found Pluto (Rev B) with identifier [{identifier}].")
@ -75,8 +76,9 @@ class Pluto(SDR):
:type gain: int :type gain: int
:param channel: The channel the Pluto is set to. Must be 0 or 1. 0 enables channel 1, 1 enables both channels. :param channel: The channel the Pluto is set to. Must be 0 or 1. 0 enables channel 1, 1 enables both channels.
:type channel: int :type channel: int
:param buffer_size: The buffer size during receive. Defaults to 10000. :param gain_mode: 'absolute' passes gain directly to the sdr,
:type buffer_size: int 'relative' means that gain should be a negative value, and it will be subtracted from the max gain (74).
:type gain_mode: str
""" """
print("Initializing RX") print("Initializing RX")
@ -100,36 +102,20 @@ class Pluto(SDR):
else: else:
raise ValueError("Channel must be either 0 or 1.") raise ValueError("Channel must be either 0 or 1.")
rx_gain_min = 0 self.set_rx_gain(gain=gain, channel=channel, gain_mode=gain_mode)
rx_gain_max = 74
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets \
the gain relative to the maximum possible gain."
)
else:
abs_gain = rx_gain_max + gain
else:
abs_gain = gain
if abs_gain < rx_gain_min or abs_gain > rx_gain_max:
abs_gain = min(max(gain, rx_gain_min), rx_gain_max)
print(f"Gain {gain} out of range for Pluto.")
print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB")
self.set_rx_gain(gain=abs_gain, channel=channel)
if channel == 0: if channel == 0:
print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}") print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}")
elif channel == 1: elif channel == 1:
self.set_rx_gain(gain=abs_gain, channel=0) self.set_rx_gain(gain=gain, channel=0, gain_mode=gain_mode)
print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}, {self.radio.rx_hardwaregain_chan1}") print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}, {self.radio.rx_hardwaregain_chan1}")
self.radio.rx_buffer_size = 1024 # TODO deal with this for zmq self.set_rx_buffer_size(getattr(self, "rx_buffer_size", 1024))
self._rx_initialized = True self._rx_initialized = True
self._tx_initialized = False self._tx_initialized = False
return {"sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain}
def init_tx( def init_tx(
self, self,
sample_rate: int | float, sample_rate: int | float,
@ -150,8 +136,9 @@ class Pluto(SDR):
:type gain: int :type gain: int
:param channel: The channel the Pluto is set to. Must be 0 or 1. 0 enables channel 1, 1 enables both channels. :param channel: The channel the Pluto is set to. Must be 0 or 1. 0 enables channel 1, 1 enables both channels.
:type channel: int :type channel: int
:param buffer_size: The buffer size during transmit. Defaults to 10000. :param gain_mode: 'absolute' passes gain directly to the sdr,
:type buffer_size: int 'relative' means that gain should be a negative value, and it will be subtracted from the max gain (0).
:type gain_mode: str
""" """
print("Initializing TX") print("Initializing TX")
@ -162,7 +149,10 @@ class Pluto(SDR):
self.set_tx_center_frequency(center_frequency=int(center_frequency)) self.set_tx_center_frequency(center_frequency=int(center_frequency))
print(f"Pluto center frequency = {self.radio.tx_lo}") print(f"Pluto center frequency = {self.radio.tx_lo}")
if channel == 1: if channel == 0:
self.radio.tx_enabled_channels = [0]
print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}")
elif channel == 1:
if not self._mimo_capable: if not self._mimo_capable:
raise ValueError( raise ValueError(
"Dual TX channel requested (channel=1) but hardware is not MIMO-capable. " "Dual TX channel requested (channel=1) but hardware is not MIMO-capable. "
@ -170,41 +160,21 @@ class Pluto(SDR):
) )
self.radio.tx_enabled_channels = [0, 1] self.radio.tx_enabled_channels = [0, 1]
print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}")
elif channel == 0:
self.radio.tx_enabled_channels = [0]
print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}")
else: else:
raise ValueError("Channel must be either 0 or 1.") raise ValueError("Channel must be either 0 or 1.")
tx_gain_min = -89 self.set_tx_gain(gain=gain, channel=channel, gain_mode=gain_mode)
tx_gain_max = 0
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets\
the gain relative to the maximum possible gain."
)
else:
abs_gain = tx_gain_max + gain
else:
abs_gain = gain
if abs_gain < tx_gain_min or abs_gain > tx_gain_max:
abs_gain = min(max(gain, tx_gain_min), tx_gain_max)
print(f"Gain {gain} out of range for Pluto.")
print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB")
self.set_tx_gain(gain=abs_gain, channel=channel)
if channel == 0: if channel == 0:
print(f"Pluto gain = {self.radio.tx_hardwaregain_chan0}") print(f"Pluto gain = {self.radio.tx_hardwaregain_chan0}")
elif channel == 1: elif channel == 1:
self.set_tx_gain(gain=abs_gain, channel=0) self.set_tx_gain(gain=gain, channel=0, gain_mode=gain_mode)
print(f"Pluto gain = {self.radio.tx_hardwaregain_chan0}, {self.radio.tx_hardwaregain_chan1}") print(f"Pluto gain = {self.radio.tx_hardwaregain_chan0}, {self.radio.tx_hardwaregain_chan1}")
self._tx_initialized = True self._tx_initialized = True
self._rx_initialized = False self._rx_initialized = False
return {"sample_rate": self.tx_sample_rate, "center_frequency": self.tx_center_frequency, "gain": self.tx_gain}
def _stream_rx(self, callback): def _stream_rx(self, callback):
if not self._rx_initialized: if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
@ -323,11 +293,6 @@ class Pluto(SDR):
self.radio.tx_cyclic_buffer = False self.radio.tx_cyclic_buffer = False
print("Pluto TX Completed.") print("Pluto TX Completed.")
def close(self):
if self.radio.tx_cyclic_buffer:
self.radio.tx_destroy_buffer()
del self.radio
def tx_recording(self, recording: Recording | np.ndarray | list, num_samples=None, tx_time=None, mode="timed"): def tx_recording(self, recording: Recording | np.ndarray | list, num_samples=None, tx_time=None, mode="timed"):
""" """
Transmit the given iq samples from the provided recording. Transmit the given iq samples from the provided recording.
@ -407,28 +372,47 @@ class Pluto(SDR):
except ValueError as e: except ValueError as e:
_handle_OSError(e) _handle_OSError(e)
def set_rx_gain(self, gain, channel=0): def set_rx_gain(self, gain, channel=0, gain_mode="absolute"):
self.rx_gain = gain rx_gain_min = 0
rx_gain_max = 74
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets \
the gain relative to the maximum possible gain."
)
else:
abs_gain = rx_gain_max + gain
else:
abs_gain = gain
if abs_gain < rx_gain_min or abs_gain > rx_gain_max:
abs_gain = min(max(gain, rx_gain_min), rx_gain_max)
print(f"Gain {gain} out of range for Pluto.")
print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB")
self.rx_gain = abs_gain
try: try:
if channel == 0: if channel == 0:
if gain is None: if abs_gain is None:
self.radio.gain_control_mode_chan0 = "automatic" self.radio.gain_control_mode_chan0 = "automatic"
print("Using Pluto Automatic Gain Control.") print("Using Pluto Automatic Gain Control.")
else: else:
self.radio.gain_control_mode_chan0 = "manual" self.radio.gain_control_mode_chan0 = "manual"
self.radio.rx_hardwaregain_chan0 = gain # dB self.radio.rx_hardwaregain_chan0 = abs_gain # dB
elif channel == 1: elif channel == 1:
try: try:
if gain is None: if abs_gain is None:
self.radio.gain_control_mode_chan1 = "automatic" self.radio.gain_control_mode_chan1 = "automatic"
print("Using Pluto Automatic Gain Control.") print("Using Pluto Automatic Gain Control.")
else: else:
self.radio.gain_control_mode_chan1 = "manual" self.radio.gain_control_mode_chan1 = "manual"
self.radio.rx_hardwaregain_chan1 = gain # dB self.radio.rx_hardwaregain_chan1 = abs_gain # dB
except Exception as e: except Exception as e:
print("Failed to use channel 1 on the PlutoSDR. \nThis is only available for revC versions.") print("Failed to use channel 1 on the PlutoSDR. \nThis is only available for revC versions.")
@ -443,10 +427,31 @@ class Pluto(SDR):
_handle_OSError(e) _handle_OSError(e)
def set_rx_channel(self, channel): def set_rx_channel(self, channel):
self.rx_channel = channel if channel == 0:
self.radio.rx_enabled_channels = [0]
print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}")
elif channel == 1:
self.radio.rx_enabled_channels = [0, 1]
print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}")
else:
raise ValueError("Channel must be either 0 or 1.")
def set_rx_buffer_size(self, buffer_size): def set_rx_buffer_size(self, buffer_size):
raise NotImplementedError if buffer_size is None:
raise ValueError("Buffer_size must be provided.")
buffer_size = int(buffer_size)
if buffer_size <= 0:
raise ValueError("Buffer_size must be a positive integer.")
self.rx_buffer_size = buffer_size
if hasattr(self, "radio"):
try:
self.radio.rx_buffer_size = buffer_size
except OSError as e:
_handle_OSError(e)
except ValueError as e:
_handle_OSError(e)
def set_tx_center_frequency(self, center_frequency): def set_tx_center_frequency(self, center_frequency):
try: try:
@ -468,14 +473,33 @@ class Pluto(SDR):
except ValueError as e: except ValueError as e:
_handle_OSError(e) _handle_OSError(e)
def set_tx_gain(self, gain, channel=0): def set_tx_gain(self, gain, channel=0, gain_mode="absolute"):
tx_gain_min = -89
tx_gain_max = 0
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets\
the gain relative to the maximum possible gain."
)
else:
abs_gain = tx_gain_max + gain
else:
abs_gain = gain
if abs_gain < tx_gain_min or abs_gain > tx_gain_max:
abs_gain = min(max(gain, tx_gain_min), tx_gain_max)
print(f"Gain {gain} out of range for Pluto.")
print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB")
try: try:
self.tx_gain = gain self.tx_gain = abs_gain
if channel == 0: if channel == 0:
self.radio.tx_hardwaregain_chan0 = int(gain) self.radio.tx_hardwaregain_chan0 = int(abs_gain)
elif channel == 1: elif channel == 1:
self.radio.tx_hardwaregain_chan1 = int(gain) self.radio.tx_hardwaregain_chan1 = int(abs_gain)
else: else:
raise ValueError(f"Pluto channel must be 0 or 1 but was {channel}.") raise ValueError(f"Pluto channel must be 0 or 1 but was {channel}.")
@ -485,11 +509,23 @@ class Pluto(SDR):
_handle_OSError(e) _handle_OSError(e)
def set_tx_channel(self, channel): def set_tx_channel(self, channel):
raise NotImplementedError if channel == 1:
self.radio.tx_enabled_channels = [0, 1]
print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}")
elif channel == 0:
self.radio.tx_enabled_channels = [0]
print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}")
else:
raise ValueError("Channel must be either 0 or 1.")
def set_tx_buffer_size(self, buffer_size): def set_tx_buffer_size(self, buffer_size):
raise NotImplementedError raise NotImplementedError
def close(self):
if self.radio.tx_cyclic_buffer:
self.radio.tx_destroy_buffer()
del self.radio
def shutdown(self): def shutdown(self):
del self.radio del self.radio

View File

@ -11,35 +11,42 @@ try:
except ImportError as exc: # pragma: no cover - dependency provided by end user except ImportError as exc: # pragma: no cover - dependency provided by end user
raise ImportError("pyrtlsdr is required to use the RTLSDR class") from exc raise ImportError("pyrtlsdr is required to use the RTLSDR class") from exc
from ria_toolkit_oss.datatypes.recording import Recording
from ria_toolkit_oss.sdr.sdr import SDR from ria_toolkit_oss.sdr.sdr import SDR
class RTLSDR(SDR): class RTLSDR(SDR):
"""SDR interface for RTL-SDR dongles using pyrtlsdr.""" """SDR interface for RTL-SDR dongles using pyrtlsdr."""
def __init__(self, identifier: Optional[int | str] = None): def __init__(self, identifier: Optional[str] = None):
super().__init__() """
Initialize a Pluto SDR device object and connect to the SDR hardware.
This software supports the ADALM Pluto SDR created by Analog Devices.
:param identifier: The value of the parameter that identifies the device.
:type identifier: str = "192.168.3.1", "pluto.local", etc
If no identifier is provided, it will select the first device found, with a warning.
If more than one device is found with the identifier, it will select the first of those devices.
"""
print(f"Initializing Pluto radio with identifier [{identifier}].")
try: try:
super().__init__()
if identifier is None: if identifier is None:
self.radio = RtlSdr() self.radio = RtlSdr()
else: else:
self.radio = RtlSdr(identifier) self.radio = RtlSdr(identifier)
self.rx_buffer_size = 256_000
self.rx_channel = 0
print(f"Initialized RTL-SDR with identifier [{identifier}].") print(f"Initialized RTL-SDR with identifier [{identifier}].")
except Exception as exc:
print(f"Failed to initialize RTL-SDR with identifier [{identifier}].")
raise exc
self.rx_buffer_size = 256_000 except Exception as e:
self.rx_channel = 0 print(f"Failed to find RTL-SDR with identifier [{identifier}].")
raise e
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool):
self.radio.set_bias_tee(bool(enable))
state = "enabled" if enable else "disabled"
print(f"RTL-SDR bias tee {state}.")
def init_rx( def init_rx(
self, self,
@ -54,43 +61,9 @@ class RTLSDR(SDR):
if channel not in (0, None): if channel not in (0, None):
raise ValueError("RTL-SDR supports only channel 0 for RX.") raise ValueError("RTL-SDR supports only channel 0 for RX.")
self.radio.sample_rate = float(sample_rate) self.set_rx_sample_rate(sample_rate=sample_rate)
self.rx_sample_rate = self.radio.sample_rate self.set_rx_center_frequency(center_frequency=center_frequency)
self.set_rx_gain(gain=gain, gain_mode=gain_mode)
self.radio.center_freq = float(center_frequency)
self.rx_center_frequency = self.radio.center_freq
available_gains = getattr(self.radio, "gains", [])
if gain is None:
self.radio.gain = "auto"
self.rx_gain = "auto"
else:
if not available_gains:
warnings.warn(
"No gain table reported by RTL-SDR; applying requested gain directly.",
RuntimeWarning,
)
target_gain = gain
else:
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets the gain relative to the maximum."
)
target_gain = max(available_gains) + gain
else:
target_gain = gain
min_gain = min(available_gains)
max_gain = max(available_gains)
if target_gain < min_gain or target_gain > max_gain:
print(f"Requested gain {target_gain} dB out of range; clamping to valid span {min_gain}-{max_gain} dB.")
target_gain = min(max(target_gain, min_gain), max_gain)
target_gain = min(available_gains, key=lambda g: abs(g - target_gain))
self.radio.gain = target_gain
self.rx_gain = self.radio.gain
self.rx_buffer_size = int(buffer_size or self.rx_buffer_size) self.rx_buffer_size = int(buffer_size or self.rx_buffer_size)
self.rx_channel = 0 self.rx_channel = 0
@ -102,25 +75,112 @@ class RTLSDR(SDR):
self._rx_initialized = True self._rx_initialized = True
self._tx_initialized = False self._tx_initialized = False
def init_tx(self, *args, **kwargs): # pragma: no cover - RTL-SDR is RX only return {"sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain}
raise NotImplementedError("RTL-SDR does not support transmit operations")
def record(self, num_samples): def get_rx_sample_rate(self):
""" """
Record a fixed number of samples from RTL-SDR. Retrieve the current sample rate of the receiver.
Args:
num_samples: Number of samples to capture
Returns: Returns:
Recording object with captured samples float: The receiver's sample rate in samples per second (Hz).
"""
return self.rx_sample_rate
def get_rx_center_frequency(self):
"""
Retrieve the current center frequency of the receiver.
Returns:
float: The receiver's center frequency in Hertz (Hz).
"""
return self.rx_center_frequency
def get_rx_gain(self):
"""
Retrieve the current gain setting of the receiver.
Returns:
float: The receiver's gain in decibels (dB).
"""
return self.rx_gain
def set_rx_sample_rate(self, sample_rate):
self.radio.sample_rate = float(sample_rate)
self.rx_sample_rate = self.radio.sample_rate
print(f"RTL RX Sample Rate = {self.radio.get_sample_rate()}")
def set_rx_center_frequency(self, center_frequency):
self.radio.center_freq = float(center_frequency)
self.rx_center_frequency = self.radio.center_freq
print(f"RTL RX Center Frequency = {self.radio.get_center_freq()}")
def set_rx_gain(self, gain, gain_mode="absolute"):
available_gains = self.radio.get_gains()
if gain is None:
self.radio.gain = "auto"
self.rx_gain = "auto"
else:
if not available_gains:
warnings.warn(
"No gain table reported by RTL-SDR; applying requested gain directly.",
RuntimeWarning,
)
target_gain = gain
else:
min_gain = min(available_gains)
max_gain = max(available_gains)
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets\
the gain relative to the maximum possible gain."
)
target_gain = max_gain + gain
else:
target_gain = gain
if target_gain < min_gain or target_gain > max_gain:
print(
f"Requested gain {target_gain} dB out of range;\
clamping to valid span {min_gain}-{max_gain} dB."
)
target_gain = min(max(target_gain, min_gain), max_gain)
target_gain = min(available_gains, key=lambda g: abs(g - target_gain))
self.radio.set_gain(target_gain)
self.rx_gain = self.radio.get_gain()
print(f"RTL RX Gain = {self.radio.get_gain()}")
print(f"Available RTL RX Gains: {available_gains}")
def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None):
"""
Create a radio recording (iq samples and metadata) of a given length from the RTL-SDR.
Either num_samples or rx_time must be provided.
init_rx() must be called before record()
:param num_samples: The number of samples to record.
:type num_samples: int, optional
:param rx_time: The time to record.
:type rx_time: int or float, optional
returns: Recording object (iq samples and metadata)
""" """
from ria_toolkit_oss.datatypes.recording import Recording
if not self._rx_initialized: if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before record().") raise RuntimeError("RX was not initialized. init_rx() must be called before record().")
print("RTL-SDR Starting RX...") if num_samples is not None and rx_time is not None:
raise ValueError("Only input one of num_samples or rx_time")
elif num_samples is not None:
pass
elif rx_time is not None:
num_samples = int(rx_time * self.rx_sample_rate)
else:
raise ValueError("Must provide input of one of num_samples or rx_time")
# RTL-SDR has USB buffer limitations - use consistent 256k chunks # RTL-SDR has USB buffer limitations - use consistent 256k chunks
# Always read full chunks to avoid USB overflow issues with partial reads # Always read full chunks to avoid USB overflow issues with partial reads
@ -129,8 +189,10 @@ class RTLSDR(SDR):
remainder = num_samples % max_samples_per_read remainder = num_samples % max_samples_per_read
signal = np.array([], dtype=np.complex64) signal = np.array([], dtype=np.complex64)
print("RTL-SDR Starting RX...")
# Read full chunks # Read full chunks
for i in range(num_full_reads): for _ in range(num_full_reads):
try: try:
chunk = self.radio.read_samples(max_samples_per_read) chunk = self.radio.read_samples(max_samples_per_read)
signal = np.append(signal, chunk) signal = np.append(signal, chunk)
@ -150,10 +212,6 @@ class RTLSDR(SDR):
print("RTL-SDR RX Completed.") print("RTL-SDR RX Completed.")
# Create 1xN array for single-channel recording
store_array = np.zeros((1, len(signal)), dtype=np.complex64)
store_array[0, :] = signal
metadata = { metadata = {
"source": self.__class__.__name__, "source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate, "sample_rate": self.rx_sample_rate,
@ -161,7 +219,7 @@ class RTLSDR(SDR):
"gain": self.rx_gain, "gain": self.rx_gain,
} }
return Recording(data=store_array, metadata=metadata) return Recording(data=signal, metadata=metadata)
def _stream_rx(self, callback): def _stream_rx(self, callback):
if not self._rx_initialized: if not self._rx_initialized:
@ -179,12 +237,28 @@ class RTLSDR(SDR):
def _stream_tx(self, callback): # pragma: no cover - RTL-SDR is RX only def _stream_tx(self, callback): # pragma: no cover - RTL-SDR is RX only
raise NotImplementedError("RTL-SDR does not support transmit operations") raise NotImplementedError("RTL-SDR does not support transmit operations")
def init_tx(self, *args, **kwargs): # pragma: no cover - RTL-SDR is RX only
raise NotImplementedError("RTL-SDR does not support transmit operations")
def tx_recording(
self, recording: Recording | np.ndarray | list, num_samples=None, tx_time=None
): # pragma: no cover - RTL-SDR is RX only
raise NotImplementedError("RTL-SDR does not support transmit operations")
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool):
self.radio.set_bias_tee(bool(enable))
state = "enabled" if enable else "disabled"
print(f"RTL-SDR bias tee {state}.")
def set_clock_source(self, source): # pragma: no cover - not applicable to RTL-SDR
raise NotImplementedError("RTL-SDR does not support external clock configuration")
def close(self): def close(self):
try: try:
self.radio.close() self.radio.close()
finally: finally:
self._enable_rx = False self._enable_rx = False
self._enable_tx = False self._enable_tx = False
def set_clock_source(self, source): # pragma: no cover - not applicable to RTL-SDR
raise NotImplementedError("RTL-SDR does not support external clock configuration")

View File

@ -295,26 +295,27 @@ class SDR(ABC):
return samples return samples
def supports_bias_tee(self) -> bool:
"""Return True when the radio supports bias-tee control."""
return False
def set_bias_tee(self, enable: bool):
"""Enable or disable bias-tee power when supported by the radio."""
raise NotImplementedError(f"{self.__class__.__name__} does not support bias-tee control")
def pause_rx(self): def pause_rx(self):
self._enable_rx = False self._enable_rx = False
def pause_tx(self): def pause_tx(self):
self._enable_tx = False self._enable_tx = False
def stop(self): def stop(self):
self.pause_rx() self.pause_rx()
self.pause_tx()
def supports_bias_tee(self) -> bool:
"""Return True when the radio supports bias-tee control.""" @abstractmethod
return False def close(self):
pass
def set_bias_tee(self, enable: bool):
"""Enable or disable bias-tee power when supported by the radio."""
raise NotImplementedError(f"{self.__class__.__name__} does not support bias-tee control")
@abstractmethod
def close(self):
pass
@abstractmethod @abstractmethod
def init_rx(self, sample_rate, center_frequency, gain, channel, gain_mode): def init_rx(self, sample_rate, center_frequency, gain, channel, gain_mode):

View File

@ -116,21 +116,7 @@ class ThinkRF(SDR):
raise ValueError("ThinkRF devices expose a single receive channel") raise ValueError("ThinkRF devices expose a single receive channel")
stream_mode = getattr(self, "_capture_mode", "block") == "stream" stream_mode = getattr(self, "_capture_mode", "block") == "stream"
actual_decimation, actual_sample_rate = self.set_rx_sample_rate(sample_rate=sample_rate, decimation=decimation)
# Enforce sample rate / decimation
# Note: decimation parameter takes precedence if provided
actual_decimation, actual_sample_rate = self.enforce_sample_rate(sample_rate, decimation)
if stream_mode and actual_decimation < self._min_stream_decimation:
enforced = self._min_stream_decimation
print(
"Requested ThinkRF sample rate exceeds typical GigE throughput; "
f"enforcing decimation {enforced} for streaming."
)
actual_decimation = enforced
actual_sample_rate = self.BASE_SAMPLE_RATE / actual_decimation
self._decimation = actual_decimation
self.radio.reset() self.radio.reset()
self.radio.scpiset(":SYSTEM:FLUSH") self.radio.scpiset(":SYSTEM:FLUSH")
@ -138,9 +124,11 @@ class ThinkRF(SDR):
self.radio.scpiset(":TRACE:STREAM:STOP") self.radio.scpiset(":TRACE:STREAM:STOP")
except Exception: except Exception:
pass pass
self.radio.rfe_mode(self._rfe_mode) self.radio.rfe_mode(self._rfe_mode)
self.radio.freq(int(center_frequency)) self.set_rx_center_frequency(center_frequency=center_frequency)
attenuation = self._attenuation if gain is None else int(gain)
attenuation = self._attenuation if gain is None else int(gain) # gain
attenuation = max(0, min(attenuation, 30)) attenuation = max(0, min(attenuation, 30))
self.radio.attenuator(attenuation) self.radio.attenuator(attenuation)
@ -159,12 +147,12 @@ class ThinkRF(SDR):
if stream_mode: if stream_mode:
self._streaming_active = False self._streaming_active = False
else: else:
print(f"ThinkRF: Configuring block capture - SPP={self._samples_per_packet}, PPB={self._packets_per_block}") print(
f"ThinkRF: Configuring block capture - SPP={self._samples_per_packet}, PPB={self._packets_per_block}"
)
self.radio.scpiset(f":TRACE:BLOCK:PACKETS {self._packets_per_block}") self.radio.scpiset(f":TRACE:BLOCK:PACKETS {self._packets_per_block}")
self.radio.scpiset(":TRACE:BLOCK:DATA?") self.radio.scpiset(":TRACE:BLOCK:DATA?")
self.rx_sample_rate = actual_sample_rate
self.rx_center_frequency = center_frequency
self.rx_gain = { self.rx_gain = {
"attenuation_dB": attenuation, "attenuation_dB": attenuation,
"profile": gain_profile, "profile": gain_profile,
@ -179,21 +167,35 @@ class ThinkRF(SDR):
self._rx_initialized = True self._rx_initialized = True
self._tx_initialized = False self._tx_initialized = False
def init_tx( def set_rx_sample_rate(self, sample_rate, decimation, stream_mode):
self, # Enforce sample rate / decimation
sample_rate: int | float, # Note: decimation parameter takes precedence if provided
center_frequency: int | float, actual_decimation, actual_sample_rate = self.enforce_sample_rate(sample_rate, decimation)
gain: int,
channel: int, if stream_mode and actual_decimation < self._min_stream_decimation:
gain_mode: Optional[str] = "absolute", enforced = self._min_stream_decimation
): print(
raise NotImplementedError("ThinkRF devices do not support transmit operations") "Requested ThinkRF sample rate exceeds typical GigE throughput; "
f"enforcing decimation {enforced} for streaming."
)
actual_decimation = enforced
actual_sample_rate = self.BASE_SAMPLE_RATE / actual_decimation
self._decimation = actual_decimation
self.rx_sample_rate = actual_sample_rate
print(f"ThinkRF RX Sample Rate = {actual_sample_rate}")
return actual_decimation, actual_sample_rate
def set_rx_center_frequency(self, center_frequency):
self.radio.freq(int(center_frequency))
self.rx_center_frequency = self.radio.freq
print(f"ThinkRF RX Center Frequency = {self.radio.freq}")
def _stream_rx(self, callback): def _stream_rx(self, callback):
if not self._rx_initialized: if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record().") raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record().")
print("ThinkRF Starting RX...")
self._enable_rx = True self._enable_rx = True
packets_processed = 0 packets_processed = 0
stream_mode = getattr(self, "_capture_mode", "block") == "stream" stream_mode = getattr(self, "_capture_mode", "block") == "stream"
@ -206,18 +208,9 @@ class ThinkRF(SDR):
print(f"Failed to start ThinkRF stream: {exc}") print(f"Failed to start ThinkRF stream: {exc}")
return return
print("ThinkRF Starting RX...")
while self._enable_rx: while self._enable_rx:
try: packet = self._safe_read(stream_mode, packets_processed)
packet = self.radio.read()
except Exception as exc:
# In block mode, reaching end of block can cause exceptions
# This is normal - just stop reading
if not stream_mode and packets_processed > 0:
# Got some packets in block mode, finish gracefully
print(f"ThinkRF: Block read complete ({packets_processed} packets received)")
break
print(f"ThinkRF read error: {exc}")
break
if packet is None: if packet is None:
# No more packets available # No more packets available
@ -234,32 +227,13 @@ class ThinkRF(SDR):
# Unknown packet type - skip # Unknown packet type - skip
continue continue
# packet.data is an iterable IQData object that yields (I, Q) tuples metadata = metadata = self._extract_metadata(packet)
# Convert to numpy array: collect all [I, Q] pairs complex_buffer = self._extract_iq(packet)
try: if complex_buffer is None:
# Iterate through packet.data to get all IQ pairs
iq_pairs = list(packet.data) # List of (I, Q) tuples
if not iq_pairs:
continue
# Convert to numpy array [N, 2]
iq_array = np.array(iq_pairs, dtype=np.float32)
# Extract I and Q channels and create complex buffer
complex_buffer = (iq_array[:, 0] + 1j * iq_array[:, 1]).astype(np.complex64)
except Exception as e:
print(f"Error extracting IQ from packet.data: {e}")
continue continue
metadata = None
if hasattr(packet, "fields"):
metadata = packet.fields
if metadata.get("sample_loss"):
print("\033[93mWarning: ThinkRF sample overflow detected\033[0m")
# Send packet data to callback (accumulation handled by parent) # Send packet data to callback (accumulation handled by parent)
callback(buffer=complex_buffer, metadata=metadata) callback(buffer=complex_buffer, metadata=metadata)
packets_processed += 1 packets_processed += 1
# In block mode, stop after receiving all packets in the block # In block mode, stop after receiving all packets in the block
@ -269,14 +243,61 @@ class ThinkRF(SDR):
print("ThinkRF RX Completed.") print("ThinkRF RX Completed.")
if stream_mode and self._streaming_active: if stream_mode and self._streaming_active:
try: self._stop_stream()
self.radio.scpiset(":TRACE:STREAM:STOP")
except Exception:
pass
self._streaming_active = False
self.radio.scpiset(":SYSTEM:FLUSH") self.radio.scpiset(":SYSTEM:FLUSH")
def _safe_read(self, stream_mode, packets_processed):
packet = None
try:
packet = self.radio.read()
except Exception as e:
# In block mode, reaching end of block can cause exceptions
if not stream_mode and packets_processed > 0:
# We got some packets in block mode, so finish gracefully
print(f"ThinkRF: Block read complete ({packets_processed} packets received)")
else:
print(f"ThinkRF read error: {e}")
return packet
def _extract_iq(self, packet):
# packet.data is an iterable IQData object that yields (I, Q) tuples
# Convert to numpy array: collect all [I, Q] pairs
try:
iq_pairs = list(packet.data)
if not iq_pairs:
return None
iq_array = np.array(iq_pairs, dtype=np.float32)
return (iq_array[:, 0] + 1j * iq_array[:, 1]).astype(np.complex64)
except Exception as e:
print(f"Error extracting IQ from packet.data: {e}")
return None
def _extract_metadata(self, packet):
if not hasattr(packet, "fields"):
return None
metadata = packet.fields
if metadata.get("sample_loss"):
print("\033[93mWarning: ThinkRF sample overflow detected\033[0m")
return metadata
def _stop_stream(self):
try:
self.radio.scpiset(":TRACE:STREAM:STOP")
except Exception:
pass
self._streaming_active = False
def init_tx(
self,
sample_rate: int | float,
center_frequency: int | float,
gain: int,
channel: int,
gain_mode: Optional[str] = "absolute",
):
raise NotImplementedError("ThinkRF devices do not support transmit operations")
def _stream_tx(self, callback): def _stream_tx(self, callback):
raise NotImplementedError("ThinkRF devices do not support transmit operations") raise NotImplementedError("ThinkRF devices do not support transmit operations")
@ -333,7 +354,9 @@ class ThinkRF(SDR):
return int(best) return int(best)
def enforce_sample_rate(self, requested_sample_rate: int | float, decimation: Optional[int] = None) -> tuple[int, float]: def enforce_sample_rate(
self, requested_sample_rate: int | float, decimation: Optional[int] = None
) -> tuple[int, float]:
""" """
Enforce valid sample rate and decimation. Enforce valid sample rate and decimation.
@ -356,7 +379,10 @@ class ThinkRF(SDR):
actual_sample_rate = self.BASE_SAMPLE_RATE / decimation actual_sample_rate = self.BASE_SAMPLE_RATE / decimation
if abs(actual_sample_rate - requested_sample_rate) > 1e3: # More than 1 kHz difference if abs(actual_sample_rate - requested_sample_rate) > 1e3: # More than 1 kHz difference
print(f"ThinkRF: Requested {requested_sample_rate/1e6:.2f} MS/s → Using decimation={decimation} ({actual_sample_rate/1e6:.2f} MS/s)") print(
f"ThinkRF: Requested {requested_sample_rate/1e6:.2f} MS/s → \
Using decimation={decimation} ({actual_sample_rate/1e6:.2f} MS/s)"
)
return decimation, actual_sample_rate return decimation, actual_sample_rate
@ -391,7 +417,9 @@ class ThinkRF(SDR):
actual_samples = actual_spp * ppb actual_samples = actual_spp * ppb
if actual_samples != num_samples: if actual_samples != num_samples:
print(f"ThinkRF: Requested {num_samples} samples → Capturing {actual_samples} (SPP={actual_spp}, PPB={ppb})") print(
f"ThinkRF: Requested {num_samples} samples → Capturing {actual_samples} (SPP={actual_spp}, PPB={ppb})"
)
return actual_spp, ppb return actual_spp, ppb

View File

@ -17,11 +17,11 @@ class USRP(SDR):
This software supports all USRP SDRs created by Ettus Research. This software supports all USRP SDRs created by Ettus Research.
:param identifier: Identifier of the device. Can be an IP address (e.g. "192.168.0.0"), :param identifier: The value of the parameter that identifies the device.
a device name (e.g. "MyB210"), or any name/address found via ``uhd_find_devices``. :type identifier: str = "192.168.0.0", "MyB210", name or address found in uhd_find_devices
If not provided, the first available device is selected with a warning.
If multiple devices match the identifier, the first one is selected. If no identifier is provided, it will select the first device found, with a warning.
:type identifier: str, optional If more than one device is found with the identifier, it will select the first of those devices.
""" """
super().__init__() super().__init__()
@ -43,29 +43,23 @@ class USRP(SDR):
rx_buffer_size: int = 960000, rx_buffer_size: int = 960000,
): ):
""" """
Initialize the USRP for receiving. Initializes the USRP for receiving.
:param sample_rate: The sample rate for receiving. :param sample_rate: The sample rate for receiving.
:type sample_rate: int or float :type sample_rate: int or float
:param center_frequency: The center frequency of the recording. :param center_frequency: The center frequency of the recording.
:type center_frequency: int or float :type center_frequency: int or float
:param gain: The gain set for receiving on the USRP
:type gain: int
:param channel: The channel the USRP is set to. :param channel: The channel the USRP is set to.
:type channel: int :type channel: int
:param gain_mode: 'absolute' passes gain directly to the sdr,
:param gain: The gain set for receiving on the USRP. 'relative' means that gain should be a negative value, and it will be subtracted from the max gain.
:type gain: int
:param gain_mode: Gain mode setting. ``"absolute"`` passes gain directly to the SDR.
``"relative"`` means gain should be a negative value, which will be subtracted
from the maximum gain.
:type gain_mode: str :type gain_mode: str
:param rx_buffer_size: Internal buffer size for receiving samples. Defaults to 960000. :param rx_buffer_size: Internal buffer size for receiving samples. Defaults to 960000.
:type rx_buffer_size: int :type rx_buffer_size: int
:return: Dictionary with the actual RX parameters after configuration. :return: A dictionary with the actual RX parameters after configuration.
:rtype: dict :rtype: dict
""" """
@ -80,59 +74,12 @@ class USRP(SDR):
if channel + 1 > max_num_channels: if channel + 1 > max_num_channels:
raise IOError(f"Channel {channel} not valid for device with {max_num_channels} channels.") raise IOError(f"Channel {channel} not valid for device with {max_num_channels} channels.")
# check if gain arg is valid self.set_rx_sample_rate(sample_rate=sample_rate, channel=channel)
gain_range = self.usrp.get_rx_gain_range() self.set_rx_center_frequency(center_frequency=center_frequency, channel=channel)
if gain_mode == "relative": self.set_rx_gain(gain=gain, gain_mode=gain_mode, channel=channel)
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets\
the gain relative to the maximum possible gain."
)
else:
# set gain relative to max
abs_gain = gain_range.stop() + gain
else:
abs_gain = gain
if abs_gain < gain_range.start() or abs_gain > gain_range.stop():
print(f"Gain {abs_gain} out of range for this USRP.")
print(f"Gain range: {gain_range.start()} to {gain_range.stop()} dB")
abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop())
self.usrp.set_rx_gain(abs_gain, channel)
# check if sample rate arg is valid
# Note: B200/B210 devices auto-adjust master clock rate, so get_rx_rates() returns
# the range for the CURRENT master clock, not the maximum possible range.
# Skip validation for B-series devices and let UHD handle it.
device_type = self.device_dict.get("type", "").lower()
if device_type not in ["b200", "b210"]:
sample_rate_range = self.usrp.get_rx_rates()
if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop():
raise IOError(
f"Sample rate {sample_rate} not valid for this USRP.\nValid\
range is {sample_rate_range.start()}\
to {sample_rate_range.stop()}."
)
self.usrp.set_rx_rate(sample_rate, channel)
center_frequency_range = self.usrp.get_rx_freq_range()
if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop():
raise IOError(
f"Center frequency {center_frequency} out of range for USRP.\
\nValid range is {center_frequency_range.start()} \
to {center_frequency_range.stop()}."
)
self.usrp.set_rx_freq(uhd.libpyuhd.types.tune_request(center_frequency), channel)
# set internal variables for metadata
self.rx_sample_rate = self.usrp.get_rx_rate(channel)
self.rx_gain = self.usrp.get_rx_gain(channel)
self.rx_center_frequency = self.usrp.get_rx_freq(channel)
self.rx_channel = channel self.rx_channel = channel
print(f"USRP RX Sample Rate = {self.rx_sample_rate}")
print(f"USRP RX Center Frequency = {self.rx_center_frequency}")
print(f"USRP RX Channel = {self.rx_channel}") print(f"USRP RX Channel = {self.rx_channel}")
print(f"USRP RX Gain = {self.rx_gain}")
# flag to prevent user from calling certain functions before this one. # flag to prevent user from calling certain functions before this one.
self._rx_initialized = True self._rx_initialized = True
@ -167,6 +114,58 @@ class USRP(SDR):
""" """
return self.rx_gain return self.rx_gain
def set_rx_sample_rate(self, sample_rate, channel=0):
# check if sample rate arg is valid
# Note: B200/B210 devices auto-adjust master clock rate, so get_rx_rates() returns
# the range for the CURRENT master clock, not the maximum possible range.
# Skip validation for B-series devices and let UHD handle it.
device_type = self.device_dict.get("type", "").lower()
if device_type not in ["b200", "b210"]:
sample_rate_range = self.usrp.get_rx_rates()
if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop():
raise IOError(
f"Sample rate {sample_rate} not valid for this USRP.\nValid\
range is {sample_rate_range.start()}\
to {sample_rate_range.stop()}."
)
self.usrp.set_rx_rate(sample_rate, channel)
self.rx_sample_rate = self.usrp.get_rx_rate(channel)
print(f"USRP RX Sample Rate = {self.rx_sample_rate}")
def set_rx_center_frequency(self, center_frequency, channel=0):
center_frequency_range = self.usrp.get_rx_freq_range()
if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop():
raise IOError(
f"Center frequency {center_frequency} out of range for USRP.\
\nValid range is {center_frequency_range.start()} \
to {center_frequency_range.stop()}."
)
self.usrp.set_rx_freq(uhd.libpyuhd.types.tune_request(center_frequency), channel)
self.rx_center_frequency = self.usrp.get_rx_freq(channel)
print(f"USRP RX Center Frequency = {self.rx_center_frequency}")
def set_rx_gain(self, gain, gain_mode="absolute", channel=0):
# check if gain arg is valid
gain_range = self.usrp.get_rx_gain_range()
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets\
the gain relative to the maximum possible gain."
)
else:
# set gain relative to max
abs_gain = gain_range.stop() + gain
else:
abs_gain = gain
if abs_gain < gain_range.start() or abs_gain > gain_range.stop():
print(f"Gain {abs_gain} out of range for this USRP.")
print(f"Gain range: {gain_range.start()} to {gain_range.stop()} dB")
abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop())
self.usrp.set_rx_gain(abs_gain, channel)
self.rx_gain = self.usrp.get_rx_gain(channel)
print(f"USRP RX Gain = {self.rx_gain}")
def _stream_rx(self, callback): def _stream_rx(self, callback):
if not self._rx_initialized: if not self._rx_initialized:
@ -211,10 +210,31 @@ class USRP(SDR):
del self.rx_stream del self.rx_stream
print("USRP RX Completed.") print("USRP RX Completed.")
def record(self, num_samples): def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None):
"""
Create a radio recording (iq samples and metadata) of a given length from the USRP.
Either num_samples or rx_time must be provided.
init_rx() must be called before record()
:param num_samples: The number of samples to record.
:type num_samples: int, optional
:param rx_time: The time to record.
:type rx_time: int or float, optional
returns: Recording object (iq samples and metadata)
"""
if not self._rx_initialized: if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
if num_samples is not None and rx_time is not None:
raise ValueError("Only input one of num_samples or rx_time")
elif num_samples is not None:
pass
elif rx_time is not None:
num_samples = int(rx_time * self.rx_sample_rate)
else:
raise ValueError("Must provide input of one of num_samples or rx_time")
stream_args = uhd.usrp.StreamArgs("fc32", "sc16") stream_args = uhd.usrp.StreamArgs("fc32", "sc16")
stream_args.channels = [self.rx_channel] stream_args.channels = [self.rx_channel]
@ -269,23 +289,18 @@ class USRP(SDR):
gain_mode: Optional[str] = "absolute", gain_mode: Optional[str] = "absolute",
): ):
""" """
Initialize the USRP for transmitting. Initializes the USRP for transmitting.
:param sample_rate: The sample rate for transmitting. :param sample_rate: The sample rate for transmitting.
:type sample_rate: int or float :type sample_rate: int or float
:param center_frequency: The center frequency of the recording. :param center_frequency: The center frequency of the recording.
:type center_frequency: int or float :type center_frequency: int or float
:param gain: The gain set for transmitting on the USRP
:param gain: The gain set for transmitting on the USRP.
:type gain: int :type gain: int
:param channel: The channel the USRP is set to. :param channel: The channel the USRP is set to.
:type channel: int :type channel: int
:param gain_mode: 'absolute' passes gain directly to the sdr,
:param gain_mode: Gain mode setting. ``"absolute"`` passes gain directly to the SDR. 'relative' means that gain should be a negative value, and it will be subtracted from the max gain.
``"relative"`` means gain should be a negative value, which will be subtracted
from the maximum gain.
:type gain_mode: str :type gain_mode: str
""" """
@ -301,6 +316,79 @@ class USRP(SDR):
if channel + 1 > max_num_channels: if channel + 1 > max_num_channels:
raise IOError(f"Channel {channel} not valid for device with {max_num_channels} channels.") raise IOError(f"Channel {channel} not valid for device with {max_num_channels} channels.")
self.set_tx_sample_rate(sample_rate=sample_rate, channel=channel)
self.set_tx_center_frequency(center_frequency=center_frequency, channel=channel)
self.set_tx_gain(gain=gain, gain_mode=gain_mode, channel=channel)
self.tx_channel = channel
print(f"USRP TX Channel = {self.tx_channel}")
self.usrp.set_clock_source("internal")
self.usrp.set_time_source("internal")
self.usrp.set_tx_antenna("TX/RX", channel)
self._tx_initialized = True
self._rx_initialized = False
return {"sample_rate": self.tx_sample_rate, "center_frequency": self.tx_center_frequency, "gain": self.tx_gain}
def get_tx_sample_rate(self):
"""
Retrieve the current sample rate of the transmitter.
Returns:
float: The transmitter's sample rate in samples per second (Hz).
"""
return self.tx_sample_rate
def get_tx_center_frequency(self):
"""
Retrieve the current center frequency of the transmitter.
Returns:
float: The transmitter's center frequency in Hertz (Hz).
"""
return self.tx_center_frequency
def get_tx_gain(self):
"""
Retrieve the current gain setting of the transmitter.
Returns:
float: The transmitter's gain in decibels (dB).
"""
return self.tx_gain
def set_tx_sample_rate(self, sample_rate, channel=0):
# check if sample rate arg is valid
# Note: B200/B210 devices auto-adjust master clock rate, so get_tx_rates() returns
# the range for the CURRENT master clock, not the maximum possible range.
# Skip validation for B-series devices and let UHD handle it.
device_type = self.device_dict.get("type", "").lower()
if device_type not in ["b200", "b210"]:
sample_rate_range = self.usrp.get_tx_rates()
if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop():
raise IOError(
f"Sample rate {sample_rate} not valid for this USRP.\nValid\
range is {sample_rate_range.start()} to {sample_rate_range.stop()}."
)
self.usrp.set_tx_rate(sample_rate, channel)
self.tx_sample_rate = self.usrp.get_tx_rate(channel)
print(f"USRP TX Sample Rate = {self.tx_sample_rate}")
def set_tx_center_frequency(self, center_frequency, channel=0):
center_frequency_range = self.usrp.get_tx_freq_range()
if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop():
raise IOError(
f"Center frequency {center_frequency} out of range for USRP.\
\nValid range is {center_frequency_range.start()}\
to {center_frequency_range.stop()}."
)
self.usrp.set_tx_freq(uhd.types.TuneRequest(center_frequency), channel)
self.tx_center_frequency = self.usrp.get_tx_freq(channel)
print(f"USRP TX Center Frequency = {self.tx_center_frequency}")
def set_tx_gain(self, gain, gain_mode="absolute", channel=0):
# Ensure gain is within valid range # Ensure gain is within valid range
gain_range = self.usrp.get_tx_gain_range() gain_range = self.usrp.get_tx_gain_range()
if gain_mode == "relative": if gain_mode == "relative":
@ -320,50 +408,9 @@ class USRP(SDR):
abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop()) abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop())
self.usrp.set_tx_gain(abs_gain, channel) self.usrp.set_tx_gain(abs_gain, channel)
# check if sample rate arg is valid
# Note: B200/B210 devices auto-adjust master clock rate, so get_tx_rates() returns
# the range for the CURRENT master clock, not the maximum possible range.
# Skip validation for B-series devices and let UHD handle it.
device_type = self.device_dict.get("type", "").lower()
if device_type not in ["b200", "b210"]:
sample_rate_range = self.usrp.get_tx_rates()
if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop():
raise IOError(
f"Sample rate {sample_rate} not valid for this USRP.\nValid\
range is {sample_rate_range.start()} to {sample_rate_range.stop()}."
)
self.usrp.set_tx_rate(sample_rate, channel)
center_frequency_range = self.usrp.get_tx_freq_range()
if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop():
raise IOError(
f"Center frequency {center_frequency} out of range for USRP.\
\nValid range is {center_frequency_range.start()}\
to {center_frequency_range.stop()}."
)
self.usrp.set_tx_freq(uhd.libpyuhd.types.tune_request(center_frequency), channel)
self.usrp.set_clock_source("internal")
self.usrp.set_time_source("internal")
self.usrp.set_tx_rate(sample_rate)
self.usrp.set_tx_freq(uhd.types.TuneRequest(center_frequency), channel)
self.usrp.set_tx_antenna("TX/RX", channel)
# set internal variables for metadata
self.tx_sample_rate = self.usrp.get_tx_rate(channel)
self.tx_gain = self.usrp.get_tx_gain(channel) self.tx_gain = self.usrp.get_tx_gain(channel)
self.tx_center_frequency = self.usrp.get_tx_freq(channel)
self.tx_channel = channel
print(f"USRP TX Sample Rate = {self.tx_sample_rate}")
print(f"USRP TX Center Frequency = {self.tx_center_frequency}")
print(f"USRP TX Channel = {self.tx_channel}")
print(f"USRP TX Gain = {self.tx_gain}") print(f"USRP TX Gain = {self.tx_gain}")
self._tx_initialized = True
self._rx_initialized = False
def close(self): def close(self):
pass pass