Compare commits

..

No commits in common. "5c9e50fa48da6f0496df72753e7750fd7327770a" and "d919e4666ca6ba1cac97184e994bcf691dd43858" have entirely different histories.

10 changed files with 411 additions and 1350 deletions

View File

@ -47,23 +47,6 @@ dependencies = [
"pyzmq (>=27.1.0,<28.0.0)", "pyzmq (>=27.1.0,<28.0.0)",
] ]
[project.optional-dependencies]
# SDR hardware-specific dependencies (optional installs)
rtlsdr = ["pyrtlsdr>=0.2.9"]
pluto = ["pyadi-iio>=0.0.14"]
usrp = [] # Requires system UHD installation
hackrf = ["pyhackrf>=0.2.0"]
bladerf = [] # Requires system libbladerf installation
thinkrf = ["pyrf>=2.8.0"] # NOTE: Requires lib2to3 post-install fix (see docs/)
# All SDR hardware support
all-sdr = [
"pyrtlsdr>=0.2.9",
"pyadi-iio>=0.0.14",
"pyhackrf>=0.2.0",
"pyrf>=2.8.0",
]
[tool.poetry] [tool.poetry]
packages = [ packages = [
{ include = "ria_toolkit_oss", from = "src" } { include = "ria_toolkit_oss", from = "src" }

View File

@ -1,44 +0,0 @@
#!/usr/bin/env python3
"""
Fix pyrf Python 3 compatibility.
The pyrf library ships with Python 2 syntax in pyrf/devices/thinkrf.py.
This script uses lib2to3 to automatically convert it to Python 3.
Usage:
python scripts/fix_pyrf_python3.py
Run this after installing pyrf:
pip install ria-toolkit-oss[thinkrf]
python scripts/fix_pyrf_python3.py
"""
from pathlib import Path
from lib2to3.refactor import RefactoringTool, get_fixers_from_package
try:
import pyrf
except ImportError:
print("ERROR: pyrf is not installed.")
print("Install with: pip install pyrf")
print("Or install ria with ThinkRF support: pip install ria-toolkit-oss[thinkrf]")
exit(1)
# Find the thinkrf.py file in the pyrf package
thinkrf_path = Path(pyrf.__file__).resolve().parent / "devices" / "thinkrf.py"
if not thinkrf_path.exists():
print(f"ERROR: Could not find {thinkrf_path}")
print("Is pyrf installed correctly?")
exit(1)
print(f"Found pyrf ThinkRF module at: {thinkrf_path}")
# Apply lib2to3 fixes
print("Applying Python 3 compatibility fixes...")
fixers = get_fixers_from_package('lib2to3.fixes')
tool = RefactoringTool(fixers)
tool.refactor_file(str(thinkrf_path), write=True)
print(f"✅ Successfully patched {thinkrf_path} for Python 3 compatibility.")
print("\nYou can now use ria_toolkit_oss.sdr.thinkrf.ThinkRF")

View File

@ -325,8 +325,8 @@ f.argtypes = [p_hackrf_device, POINTER(read_partid_serialno_t)]
# libhackrf.hackrf_set_txvga_gain.argtypes = [POINTER(hackrf_device), c_uint32] # libhackrf.hackrf_set_txvga_gain.argtypes = [POINTER(hackrf_device), c_uint32]
## extern ADDAPI int ADDCALL hackrf_set_antenna_enable(hackrf_device* ## extern ADDAPI int ADDCALL hackrf_set_antenna_enable(hackrf_device*
## device, const uint8_t value); ## device, const uint8_t value);
libhackrf.hackrf_set_antenna_enable.restype = c_int # libhackrf.hackrf_set_antenna_enable.restype = c_int
libhackrf.hackrf_set_antenna_enable.argtypes = [p_hackrf_device, c_uint8] # libhackrf.hackrf_set_antenna_enable.argtypes = [POINTER(hackrf_device), c_uint8]
# #
## extern ADDAPI const char* ADDCALL hackrf_error_name(enum hackrf_error errcode); ## extern ADDAPI const char* ADDCALL hackrf_error_name(enum hackrf_error errcode);
## libhackrf.hackrf_error_name.restype = POINTER(c_char) ## libhackrf.hackrf_error_name.restype = POINTER(c_char)
@ -537,16 +537,6 @@ class HackRF(object):
raise IOError("error disabling amp") raise IOError("error disabling amp")
return 0 return 0
def set_antenna_enable(self, enable):
value = 1 if enable else 0
result = libhackrf.hackrf_set_antenna_enable(self.dev_p, value)
if result != 0:
error_name = get_error_name(result)
raise IOError(f"Error setting antenna bias tee: {error_name} (Code {result})")
state = "enabled" if enable else "disabled"
print(f"HackRF antenna bias tee {state}.")
return 0
# rounds down to multiple of 8 (15 -> 8, 39 -> 32), etc. # rounds down to multiple of 8 (15 -> 8, 39 -> 32), etc.
# internally, hackrf_set_lna_gain does the same thing # internally, hackrf_set_lna_gain does the same thing
# But we take care of it so we can keep track of the correct gain # But we take care of it so we can keep track of the correct gain
@ -592,75 +582,6 @@ class HackRF(object):
if result != 0: if result != 0:
raise IOError("stop_rx failure") raise IOError("stop_rx failure")
def _rx_capture_callback(self, hackrf_transfer):
"""Instance method callback for RX capture - prevents garbage collection"""
try:
c = hackrf_transfer.contents
# Append bytes to buffer using string_at
from ctypes import string_at
byte_chunk = string_at(c.buffer, c.valid_length)
self._capture_buffer.extend(byte_chunk)
# Check if we have enough
if len(self._capture_buffer) >= self._capture_target:
self._capture_done = True
return 1 # Stop streaming
return 0
except Exception as e:
print(f"Error in RX capture callback: {e}")
import traceback
traceback.print_exc()
self._capture_done = True
return 1
def read_samples(self, num_samples):
"""
Block capture mode for HackRF - captures exactly num_samples.
This is safer than streaming for USB2 and avoids buffer overflow issues.
:param num_samples: Number of complex samples to capture
:return: numpy array of complex64 samples
"""
# Initialize capture state as instance variables
self._capture_buffer = bytearray()
self._capture_target = num_samples * 2 # 2 bytes per complex sample (I+Q as int8)
self._capture_done = False
# Store callback as instance variable to prevent garbage collection (like TX does)
self._rx_cb = _callback(self._rx_capture_callback)
# Start RX with the callback
result = libhackrf.hackrf_start_rx(self.dev_p, self._rx_cb, None)
if result != 0:
raise IOError("start_rx failure during read_samples")
# Wait for capture to complete
import time
timeout = num_samples / self.sample_rate + 5.0 # Add 5 second buffer
start_time = time.time()
while not self._capture_done:
if time.time() - start_time > timeout:
print("HackRF capture timeout!")
break
time.sleep(0.01)
# Stop RX
self.stop_rx()
# Convert bytes to complex samples
byte_data = bytes(self._capture_buffer[:self._capture_target])
all_samples = np.frombuffer(byte_data, dtype=np.int8).astype(np.float32).view(np.complex64)
# Clean up instance variables
del self._capture_buffer
del self._capture_target
del self._capture_done
del self._rx_cb
return all_samples[:num_samples]
# Add transmit gain property # Add transmit gain property
def set_txvga_gain(self, gain): def set_txvga_gain(self, gain):
if gain < 0 or gain > 47: if gain < 0 or gain > 47:

View File

@ -35,22 +35,6 @@ class Blade(SDR):
super().__init__() super().__init__()
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool, channel: Optional[int] = None):
if channel is None:
channel = getattr(self, "rx_channel", getattr(self, "tx_channel", 0))
try:
bladerf_channel = _bladerf.CHANNEL_RX(channel)
self.device.set_bias_tee(bladerf_channel, bool(enable))
except AttributeError as exc: # pragma: no cover - depends on libbladeRF version
raise NotImplementedError("bladeRF binding lacks bias-tee control") from exc
state = "enabled" if enable else "disabled"
print(f"BladeRF bias tee {state} on channel {channel}.")
def _shutdown(self, error=0, board=None): def _shutdown(self, error=0, board=None):
print("Shutting down with error code: " + str(error)) print("Shutting down with error code: " + str(error))
if board is not None: if board is not None:
@ -256,91 +240,6 @@ class Blade(SDR):
return Recording(data=store_array[:, :num_samples], metadata=metadata) return Recording(data=store_array[:, :num_samples], metadata=metadata)
def tx_recording(
self,
recording: "Recording | np.ndarray",
num_samples: Optional[int] = None,
tx_time: Optional[int | float] = None,
):
"""
Transmit the given IQ samples from the provided recording.
init_tx() must be called before this function.
:param recording: The recording to transmit.
:type recording: Recording or np.ndarray
:param num_samples: The number of samples to transmit, will repeat or
truncate the recording to this length. Defaults to None.
:type num_samples: int, optional
:param tx_time: The time to transmit, will repeat or truncate the
recording to this length. Defaults to None.
:type tx_time: int or float, optional
"""
import warnings
import time
from ria_toolkit_oss.datatypes.recording import Recording
if num_samples is not None and tx_time is not None:
raise ValueError("Only input one of num_samples or tx_time")
elif num_samples is not None:
tx_time = num_samples / self.tx_sample_rate
elif tx_time is not None:
pass
else:
tx_time = len(recording) / self.tx_sample_rate
if isinstance(recording, np.ndarray):
samples = recording
elif isinstance(recording, Recording):
if len(recording.data) > 1:
warnings.warn("Recording object is multichannel, only channel 0 data was used for transmission")
samples = recording.data[0]
else:
raise TypeError("recording must be np.ndarray or Recording")
samples = samples.astype(np.complex64, copy=False)
# Setup stream
self.device.sync_config(
layout=_bladerf.ChannelLayout.TX_X1,
fmt=_bladerf.Format.SC16_Q11,
num_buffers=16,
buffer_size=self.tx_buffer_size,
num_transfers=8,
stream_timeout=3500,
)
# Enable module
self.tx_ch.enable = True
print("Blade Starting TX...")
# Transmit samples - repeat as needed for the duration
start_time = time.time()
sample_index = 0
try:
while time.time() - start_time < tx_time:
# Get next chunk
chunk_size = min(self.tx_buffer_size, len(samples) - sample_index)
if chunk_size == 0:
# Reached end, loop back
sample_index = 0
chunk_size = min(self.tx_buffer_size, len(samples))
chunk = samples[sample_index:sample_index + chunk_size]
sample_index += chunk_size
# Convert and transmit
byte_array = self._convert_tx_samples(chunk)
self.device.sync_tx(byte_array, len(chunk))
except KeyboardInterrupt:
print("\nTransmission interrupted by user")
# Disable module
print("Blade TX Completed.")
self.tx_ch.enable = False
def _stream_tx(self, callback): def _stream_tx(self, callback):
# Setup stream # Setup stream
@ -375,18 +274,14 @@ class Blade(SDR):
return samples return samples
def _convert_tx_samples(self, samples): def _convert_tx_samples(self, samples):
# Normalize to maximum amplitude to prevent overflow tx_samples = np.empty(samples.size * 2, dtype=np.float32)
max_val = np.max(np.abs(samples)) tx_samples[::2] = np.real(samples) # Real part
if max_val > 0: tx_samples[1::2] = np.imag(samples) # Imaginary part
samples = samples / max_val # Normalize to [-1, 1]
# Scale to Q11 format (use 2047 instead of 2048 to avoid overflow)
# and interleave I/Q samples
tx_samples = np.zeros(len(samples) * 2, dtype=np.int16)
tx_samples[0::2] = (np.real(samples) * 2047).astype(np.int16) # I samples
tx_samples[1::2] = (np.imag(samples) * 2047).astype(np.int16) # Q samples
tx_samples *= 2048
tx_samples = tx_samples.astype(np.int16)
byte_array = tx_samples.tobytes() byte_array = tx_samples.tobytes()
return byte_array return byte_array
def _set_rx_channel(self, channel): def _set_rx_channel(self, channel):

View File

@ -1,6 +1,5 @@
import time import time
import warnings import warnings
import math
from typing import Optional from typing import Optional
import numpy as np import numpy as np
@ -36,101 +35,10 @@ class HackRF(SDR):
super().__init__() super().__init__()
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool):
try:
self.radio.set_antenna_enable(bool(enable))
except AttributeError as exc: # pragma: no cover - defensive
raise NotImplementedError("Underlying HackRF interface lacks bias-tee control") from exc
def init_rx(self, sample_rate, center_frequency, gain, channel, gain_mode): def init_rx(self, sample_rate, center_frequency, gain, channel, gain_mode):
"""
Initializes the HackRF for receiving.
HackRF has 3 gain stages:
- 14 dB front-end amplifier (on/off)
- LNA gain: 0-40 dB in 8 dB steps
- VGA gain: 0-62 dB in 2 dB steps
:param sample_rate: The sample rate for receiving.
:type sample_rate: int or float
:param center_frequency: The center frequency of the recording.
:type center_frequency: int or float
:param gain: The total gain set for receiving on the HackRF (distributed across stages)
:type gain: int
:param channel: The channel the HackRF is set to. (Not actually used)
:type channel: int
:param gain_mode: Gain mode setting. Currently only "absolute" is supported.
:type gain_mode: str
"""
print("Initializing RX")
self.rx_sample_rate = sample_rate
self.radio.sample_rate = int(sample_rate)
print(f"HackRF sample rate = {self.radio.sample_rate}")
self.rx_center_frequency = center_frequency
self.radio.center_freq = int(center_frequency)
print(f"HackRF center frequency = {self.radio.center_freq}")
# Distribute gain across amplifier stages
rx_gain_min = 0
rx_gain_max = 116 # 14 (amp) + 40 (LNA) + 62 (VGA)
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This "
"sets the gain relative to the maximum possible gain."
)
else:
abs_gain = rx_gain_max + gain
else:
abs_gain = gain
if abs_gain < rx_gain_min or abs_gain > rx_gain_max:
abs_gain = min(max(abs_gain, rx_gain_min), rx_gain_max)
print(f"Gain {gain} out of range for HackRF.")
print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB")
# Distribute gain using the signal-testbed algorithm
enable_amp = False
remaining_gain = abs_gain
# Enable 14 dB pre-amp if gain is high enough
if remaining_gain > 30:
remaining_gain = remaining_gain - 14
enable_amp = True
print("HackRF: 14dB front-end amplifier enabled.")
# Distribute remaining gain between LNA and VGA
# LNA gets 60% of remaining gain, rounded down to 8 dB steps
lna_gain = math.floor(remaining_gain * 0.6)
lna_gain = lna_gain - (lna_gain % 8) # Round to 8 dB steps
if lna_gain > 40:
lna_gain = 40
# VGA gets the rest
vga_gain = remaining_gain - lna_gain
if vga_gain > 62:
vga_gain = 62
# Apply gain settings
if enable_amp:
self.radio.enable_amp()
else:
self.radio.disable_amp()
self.radio.set_lna_gain(lna_gain)
self.radio.set_vga_gain(vga_gain)
self.rx_gain = abs_gain
print(f"HackRF gain distribution: Amp={enable_amp}, LNA={lna_gain}dB, VGA={vga_gain}dB")
self._rx_initialized = True
self._tx_initialized = False self._tx_initialized = False
self._rx_initialized = True
return NotImplementedError("RX not yet implemented for HackRF")
def init_tx( def init_tx(
self, self,
@ -243,87 +151,10 @@ class HackRF(SDR):
def close(self): def close(self):
self.radio.close() self.radio.close()
def record(self, num_samples):
"""
Record a specified number of samples from the HackRF using block capture mode.
This is more reliable than streaming for USB2 connections.
:param num_samples: Number of samples to capture
:type num_samples: int
:return: Recording object containing the captured data
:rtype: Recording
"""
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before record()")
print("HackRF Starting RX...")
# Use libhackrf's block capture method
all_samples = self.radio.read_samples(num_samples)
print("HackRF RX Completed.")
# Create 1xN array for single-channel recording
store_array = np.zeros((1, num_samples), dtype=np.complex64)
store_array[0, :] = all_samples
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate,
"center_frequency": self.rx_center_frequency,
"gain": self.rx_gain,
}
return Recording(data=store_array, metadata=metadata)
def _stream_rx(self, callback): def _stream_rx(self, callback):
"""
Stream samples from the HackRF using a callback function.
:param callback: Function to call for each buffer of samples
:type callback: callable
"""
if not self._rx_initialized: if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx()") raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
return NotImplementedError("RX not yet implemented for HackRF")
print("HackRF Starting RX stream...")
self._enable_rx = True
def rx_callback(hackrf_transfer):
"""Internal callback that wraps the user's callback"""
try:
if not self._enable_rx:
return 1 # Stop
c = hackrf_transfer.contents
# Use ctypes string_at to safely copy the buffer
from ctypes import string_at
byte_data = string_at(c.buffer, c.valid_length)
# Convert bytes to int8, then to float32, then view as complex64
samples = np.frombuffer(byte_data, dtype=np.int8).astype(np.float32).view(np.complex64)
# Call user's callback
callback(buffer=samples, metadata=None)
return 0 if self._enable_rx else 1
except Exception as e:
print(f"Error in rx_callback: {e}")
return 1 # Stop on error
# Start RX
self.radio.start_rx(rx_callback)
# Wait while streaming
while self._enable_rx:
time.sleep(0.1)
# Stop RX
self.radio.stop_rx()
print("HackRF RX stream completed.")
def _stream_tx(self, callback): def _stream_tx(self, callback):
return super()._stream_tx(callback) return super()._stream_tx(callback)

View File

@ -17,7 +17,7 @@ class Pluto(SDR):
""" """
Initialize a Pluto SDR device object and connect to the SDR hardware. Initialize a Pluto SDR device object and connect to the SDR hardware.
This software supports the ADALM Pluto SDR created by Analog Devices. This software supports the ADALAM Pluto SDR created by Analog Devices.
:param identifier: The value of the parameter that identifies the device. :param identifier: The value of the parameter that identifies the device.
:type identifier: str = "192.168.3.1", "pluto.local", etc :type identifier: str = "192.168.3.1", "pluto.local", etc
@ -34,24 +34,8 @@ class Pluto(SDR):
else: else:
uri = f"ip:{identifier}" uri = f"ip:{identifier}"
# Detect MIMO capability by checking IIO channels (one-time, during init) self.radio = adi.ad9361(uri)
# Rev B: 2 channels (voltage0, voltage1) - single RX/TX only print(f"Successfully found Pluto radio with identifier [{identifier}].")
# Rev C/D: 4 channels (voltage0-3) - dual RX/TX capable
test_radio = adi.ad9361(uri)
ctx = test_radio.ctx
dev = ctx.find_device("cf-ad9361-lpc")
if dev and len(dev.channels) >= 4:
# MIMO-capable hardware (Rev C/D)
self.radio = test_radio
self._mimo_capable = True
print(f"Successfully found MIMO-capable Pluto (Rev C/D) with identifier [{identifier}].")
else:
# Non-MIMO hardware (Rev B) - use standard Pluto driver
self.radio = adi.Pluto(uri)
self._mimo_capable = False
print(f"Successfully found Pluto (Rev B) with identifier [{identifier}].")
except Exception as e: except Exception as e:
print(f"Failed to find Pluto radio with identifier [{identifier}].") print(f"Failed to find Pluto radio with identifier [{identifier}].")
raise e raise e
@ -90,11 +74,6 @@ class Pluto(SDR):
self.radio.rx_enabled_channels = [0] self.radio.rx_enabled_channels = [0]
print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}") print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}")
elif channel == 1: elif channel == 1:
if not self._mimo_capable:
raise ValueError(
"Dual RX channel requested (channel=1) but hardware is not MIMO-capable. "
"Dual RX/TX requires Pluto Rev C/D. Detected hardware: Rev B (single channel only)."
)
self.radio.rx_enabled_channels = [0, 1] self.radio.rx_enabled_channels = [0, 1]
print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}") print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}")
else: else:
@ -163,11 +142,6 @@ class Pluto(SDR):
print(f"Pluto center frequency = {self.radio.tx_lo}") print(f"Pluto center frequency = {self.radio.tx_lo}")
if channel == 1: if channel == 1:
if not self._mimo_capable:
raise ValueError(
"Dual TX channel requested (channel=1) but hardware is not MIMO-capable. "
"Dual RX/TX requires Pluto Rev C/D. Detected hardware: Rev B (single channel only)."
)
self.radio.tx_enabled_channels = [0, 1] self.radio.tx_enabled_channels = [0, 1]
print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}")
elif channel == 0: elif channel == 0:

View File

@ -1,190 +0,0 @@
"""RTL-SDR device integration for the RIA Toolkit."""
import time
import warnings
from typing import Optional
import numpy as np
try:
from rtlsdr import RtlSdr
except ImportError as exc: # pragma: no cover - dependency provided by end user
raise ImportError("pyrtlsdr is required to use the RTLSDR class") from exc
from ria_toolkit_oss.sdr.sdr import SDR
class RTLSDR(SDR):
"""SDR interface for RTL-SDR dongles using pyrtlsdr."""
def __init__(self, identifier: Optional[int | str] = None):
super().__init__()
try:
if identifier is None:
self.radio = RtlSdr()
else:
self.radio = RtlSdr(identifier)
print(f"Initialized RTL-SDR with identifier [{identifier}].")
except Exception as exc:
print(f"Failed to initialize RTL-SDR with identifier [{identifier}].")
raise exc
self.rx_buffer_size = 256_000
self.rx_channel = 0
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool):
self.radio.set_bias_tee(bool(enable))
state = "enabled" if enable else "disabled"
print(f"RTL-SDR bias tee {state}.")
def init_rx(
self,
sample_rate: int | float,
center_frequency: int | float,
gain: Optional[int],
channel: int,
gain_mode: Optional[str] = "absolute",
buffer_size: Optional[int] = 256_000,
bias_t: bool = False,
):
if channel not in (0, None):
raise ValueError("RTL-SDR supports only channel 0 for RX.")
self.radio.sample_rate = float(sample_rate)
self.rx_sample_rate = self.radio.sample_rate
self.radio.center_freq = float(center_frequency)
self.rx_center_frequency = self.radio.center_freq
available_gains = getattr(self.radio, "gains", [])
if gain is None:
self.radio.gain = "auto"
self.rx_gain = "auto"
else:
if not available_gains:
warnings.warn(
"No gain table reported by RTL-SDR; applying requested gain directly.",
RuntimeWarning,
)
target_gain = gain
else:
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets the gain relative to the maximum."
)
target_gain = max(available_gains) + gain
else:
target_gain = gain
min_gain = min(available_gains)
max_gain = max(available_gains)
if target_gain < min_gain or target_gain > max_gain:
print(f"Requested gain {target_gain} dB out of range; clamping to valid span {min_gain}-{max_gain} dB.")
target_gain = min(max(target_gain, min_gain), max_gain)
target_gain = min(available_gains, key=lambda g: abs(g - target_gain))
self.radio.gain = target_gain
self.rx_gain = self.radio.gain
self.rx_buffer_size = int(buffer_size or self.rx_buffer_size)
self.rx_channel = 0
if bias_t:
self.set_bias_tee(True)
time.sleep(1)
self._rx_initialized = True
self._tx_initialized = False
def init_tx(self, *args, **kwargs): # pragma: no cover - RTL-SDR is RX only
raise NotImplementedError("RTL-SDR does not support transmit operations")
def record(self, num_samples):
"""
Record a fixed number of samples from RTL-SDR.
Args:
num_samples: Number of samples to capture
Returns:
Recording object with captured samples
"""
from ria_toolkit_oss.datatypes.recording import Recording
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before record().")
print("RTL-SDR Starting RX...")
# RTL-SDR has USB buffer limitations - use consistent 256k chunks
# Always read full chunks to avoid USB overflow issues with partial reads
max_samples_per_read = 262144 # 256k samples = stable chunk size
num_full_reads = num_samples // max_samples_per_read
remainder = num_samples % max_samples_per_read
signal = np.array([], dtype=np.complex64)
# Read full chunks
for i in range(num_full_reads):
try:
chunk = self.radio.read_samples(max_samples_per_read)
signal = np.append(signal, chunk)
except Exception as e:
print(f"Error while reading samples: {e}")
break
# Read remainder if needed (round up to power of 2 for USB compatibility)
if remainder > 0 and len(signal) == num_full_reads * max_samples_per_read:
# Round up to next 16k boundary for USB stability
padded_remainder = ((remainder + 16383) // 16384) * 16384
try:
chunk = self.radio.read_samples(padded_remainder)
signal = np.append(signal, chunk[:remainder]) # Only keep what we need
except Exception as e:
print(f"Error while reading final chunk: {e}")
print("RTL-SDR RX Completed.")
# Create 1xN array for single-channel recording
store_array = np.zeros((1, len(signal)), dtype=np.complex64)
store_array[0, :] = signal
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate,
"center_frequency": self.rx_center_frequency,
"gain": self.rx_gain,
}
return Recording(data=store_array, metadata=metadata)
def _stream_rx(self, callback):
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record().")
print("RTL-SDR Starting RX...")
self._enable_rx = True
try:
while self._enable_rx:
samples = self.radio.read_samples(self.rx_buffer_size)
callback(buffer=np.asarray(samples, dtype=np.complex64), metadata=None)
finally:
print("RTL-SDR RX Completed.")
def _stream_tx(self, callback): # pragma: no cover - RTL-SDR is RX only
raise NotImplementedError("RTL-SDR does not support transmit operations")
def close(self):
try:
self.radio.close()
finally:
self._enable_rx = False
self._enable_tx = False
def set_clock_source(self, source): # pragma: no cover - not applicable to RTL-SDR
raise NotImplementedError("RTL-SDR does not support external clock configuration")

View File

@ -304,14 +304,6 @@ class SDR(ABC):
def stop(self): def stop(self):
self.pause_rx() self.pause_rx()
def supports_bias_tee(self) -> bool:
"""Return True when the radio supports bias-tee control."""
return False
def set_bias_tee(self, enable: bool):
"""Enable or disable bias-tee power when supported by the radio."""
raise NotImplementedError(f"{self.__class__.__name__} does not support bias-tee control")
@abstractmethod @abstractmethod
def close(self): def close(self):
pass pass

View File

@ -1,291 +0,0 @@
"""ThinkRF integration for the RIA toolkit."""
from typing import Any, Dict, Optional
import numpy as np
try:
from pyrf.devices.thinkrf import WSA
except ImportError as exc: # pragma: no cover - optional dependency
raise ImportError(
"pyrf is required to use the ThinkRF integration. "
"Install with: pip install ria-toolkit-oss[thinkrf]"
) from exc
except SyntaxError as exc: # pragma: no cover - Python 2/3 compatibility issue
import sys
from pathlib import Path
# pyrf ships with Python 2 syntax - try to auto-fix it
print("\033[93mWARNING: pyrf has Python 2 syntax. Attempting automatic fix...\033[0m")
try:
from lib2to3.refactor import RefactoringTool, get_fixers_from_package
import pyrf
thinkrf_path = Path(pyrf.__file__).resolve().parent / "devices" / "thinkrf.py"
print(f"Fixing: {thinkrf_path}")
fixers = get_fixers_from_package('lib2to3.fixes')
tool = RefactoringTool(fixers)
tool.refactor_file(str(thinkrf_path), write=True)
print("\033[92m✅ Fixed pyrf for Python 3. Please restart Python/reload the module.\033[0m")
print("Or run: python -m ria_toolkit_oss.sdr.thinkrf_fix")
sys.exit(1) # Exit so user can reload
except Exception as fix_exc:
print(f"\033[91m❌ Auto-fix failed: {fix_exc}\033[0m")
print("Manual fix: Run `python scripts/fix_pyrf_python3.py` from ria-toolkit-oss directory")
raise exc
from ria_toolkit_oss.sdr.sdr import SDR
class ThinkRF(SDR):
"""SDR adapter for ThinkRF analyzers using the PyRF API."""
BASE_SAMPLE_RATE = 125_000_000
SUPPORTED_DECIMATIONS = (1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024)
def __init__(self, identifier: Optional[str] = None):
super().__init__()
if identifier is None:
raise ValueError("ThinkRF requires an IP address or hostname identifier")
self.identifier = identifier
try:
self.radio = WSA()
self.radio.connect(identifier)
self.radio.request_read_perm()
print(f"Connected to ThinkRF at [{identifier}].")
except Exception as exc:
print(f"Failed to connect to ThinkRF at [{identifier}].")
raise exc
self.configure_frontend()
self._last_context: Optional[Any] = None
def configure_frontend(
self,
*,
rfe_mode: str = "ZIF",
attenuation: int = 0,
gain_profile: str = "HIGH",
trigger_config: Optional[Dict[str, Any]] = None,
samples_per_packet: int = 65504,
packets_per_block: int = 1,
capture_mode: str = "block",
stream_id: int = 1,
min_stream_decimation: int = 16,
) -> None:
"""Persist settings applied during the next RX initialisation.
``capture_mode`` selects between buffered ``"block"`` captures that use
the analyser's onboard RAM and ``"stream"`` captures that push data over
GigE in real time. Streaming requires a sufficiently large decimation to
keep within the link budget; ``min_stream_decimation`` forms the lower
bound.
"""
mode = capture_mode.lower()
if mode not in {"block", "stream"}:
raise ValueError("capture_mode must be either 'block' or 'stream'")
self._rfe_mode = rfe_mode
self._attenuation = int(max(0, min(attenuation, 30)))
self._gain_profile = gain_profile.upper()
self._trigger_config = trigger_config
self._samples_per_packet = int(samples_per_packet)
self._packets_per_block = max(1, int(packets_per_block))
self._capture_mode = mode
self._stream_id = int(stream_id)
self._min_stream_decimation = max(1, int(min_stream_decimation))
self._streaming_active = False
def init_rx(
self,
sample_rate: int | float,
center_frequency: int | float,
gain: int,
channel: int,
gain_mode: Optional[str] = "absolute",
):
if channel not in (0, None):
raise ValueError("ThinkRF devices expose a single receive channel")
stream_mode = getattr(self, "_capture_mode", "block") == "stream"
decimation = self._derive_decimation(sample_rate)
if stream_mode and decimation < self._min_stream_decimation:
enforced = self._min_stream_decimation
print(
"Requested ThinkRF sample rate exceeds typical GigE throughput; "
f"enforcing decimation {enforced} for streaming."
)
decimation = enforced
actual_sample_rate = self.BASE_SAMPLE_RATE / decimation
self._decimation = decimation
self.radio.reset()
self.radio.scpiset(":SYSTEM:FLUSH")
try:
self.radio.scpiset(":TRACE:STREAM:STOP")
except Exception:
pass
self.radio.rfe_mode(self._rfe_mode)
self.radio.freq(int(center_frequency))
attenuation = self._attenuation if gain is None else int(gain)
attenuation = max(0, min(attenuation, 30))
self.radio.attenuator(attenuation)
gain_profile = self._gain_profile
if gain_mode and isinstance(gain_mode, str) and gain_mode.upper() in {"LOW", "MEDIUM", "HIGH", "VLOW"}:
gain_profile = gain_mode.upper()
self.radio.psfm_gain(gain_profile)
self.radio.decimation(decimation)
if stream_mode:
self.radio.scpiset(f":SENSE:DECIMATION {decimation}")
trigger = self._trigger_config or self._default_trigger(center_frequency)
self.radio.trigger(trigger)
self.radio.scpiset(f":TRACE:SPP {self._samples_per_packet}")
if stream_mode:
self._streaming_active = False
else:
self.radio.scpiset(f":TRACE:BLOCK:PACKETS {self._packets_per_block}")
self.radio.scpiset(":TRACE:BLOCK:DATA?")
self.rx_sample_rate = actual_sample_rate
self.rx_center_frequency = center_frequency
self.rx_gain = {"attenuation_dB": attenuation, "profile": gain_profile}
self.rx_buffer_size = self._samples_per_packet
self.rx_channel = 0
self._rx_initialized = True
self._tx_initialized = False
def init_tx(
self,
sample_rate: int | float,
center_frequency: int | float,
gain: int,
channel: int,
gain_mode: Optional[str] = "absolute",
):
raise NotImplementedError("ThinkRF devices do not support transmit operations")
def _stream_rx(self, callback):
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record().")
print("ThinkRF Starting RX...")
self._enable_rx = True
packets_processed = 0
stream_mode = getattr(self, "_capture_mode", "block") == "stream"
if stream_mode and not self._streaming_active:
try:
self.radio.scpiset(f":TRACE:STREAM:START {self._stream_id}")
self._streaming_active = True
except Exception as exc:
print(f"Failed to start ThinkRF stream: {exc}")
return
while self._enable_rx:
try:
packet = self.radio.read()
except Exception as exc:
print(f"ThinkRF read error: {exc}")
break
if packet is None:
continue
if packet.is_context_packet():
self._last_context = packet
continue
if not packet.is_data_packet():
continue
iq_data = np.asarray(packet.data, dtype=np.float32)
if iq_data.ndim != 2 or iq_data.shape[1] != 2:
print("Unexpected ThinkRF packet format; skipping packet")
continue
complex_buffer = (iq_data[:, 0] + 1j * iq_data[:, 1]).astype(np.complex64, copy=False)
metadata = None
if hasattr(packet, "fields"):
metadata = packet.fields
if metadata.get("sample_loss"):
print("\033[93mWarning: ThinkRF sample overflow detected\033[0m")
callback(buffer=complex_buffer, metadata=metadata)
if stream_mode:
packets_processed += 1
else:
packets_processed += 1
if packets_processed >= self._packets_per_block:
packets_processed = 0
if self._enable_rx:
self.radio.scpiset(":TRACE:BLOCK:DATA?")
print("ThinkRF RX Completed.")
if stream_mode and self._streaming_active:
try:
self.radio.scpiset(":TRACE:STREAM:STOP")
except Exception:
pass
self._streaming_active = False
self.radio.scpiset(":SYSTEM:FLUSH")
def _stream_tx(self, callback):
raise NotImplementedError("ThinkRF devices do not support transmit operations")
def set_clock_source(self, source):
raise NotImplementedError("ThinkRF clock configuration is not implemented")
def close(self):
try:
self.radio.scpiset(":TRACE:STREAM:STOP")
except Exception: # pragma: no cover - best effort cleanup
pass
try:
self.radio.scpiset(":SYSTEM:FLUSH")
except Exception:
pass
try:
self.radio.disconnect()
finally:
self._enable_rx = False
self._enable_tx = False
print(f"Disconnected from ThinkRF at [{self.identifier}].")
def supports_bias_tee(self) -> bool:
return False
def set_bias_tee(self, enable: bool): # pragma: no cover - interface compliance
raise NotImplementedError("ThinkRF radios do not expose a controllable bias-tee")
def _derive_decimation(self, target_sample_rate: int | float) -> int:
if not target_sample_rate:
return 1
requested = float(target_sample_rate)
if requested >= self.BASE_SAMPLE_RATE:
return 1
desired = self.BASE_SAMPLE_RATE / requested
best = min(self.SUPPORTED_DECIMATIONS, key=lambda dec: abs(dec - desired))
return int(best)
def _default_trigger(self, center_frequency: int | float) -> Dict[str, Any]:
span = 40_000_000
half = span // 2
return {
"type": "NONE",
"fstart": int(center_frequency) - half,
"fstop": int(center_frequency) + half,
"amplitude": -100,
}

View File

@ -100,18 +100,13 @@ class USRP(SDR):
self.usrp.set_rx_gain(abs_gain, channel) self.usrp.set_rx_gain(abs_gain, channel)
# check if sample rate arg is valid # check if sample rate arg is valid
# Note: B200/B210 devices auto-adjust master clock rate, so get_rx_rates() returns sample_rate_range = self.usrp.get_rx_rates()
# the range for the CURRENT master clock, not the maximum possible range. if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop():
# Skip validation for B-series devices and let UHD handle it. raise IOError(
device_type = self.device_dict.get("type", "").lower() f"Sample rate {sample_rate} not valid for this USRP.\nValid\
if device_type not in ["b200", "b210"]: range is {sample_rate_range.start()}\
sample_rate_range = self.usrp.get_rx_rates() to {sample_rate_range.stop()}."
if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop(): )
raise IOError(
f"Sample rate {sample_rate} not valid for this USRP.\nValid\
range is {sample_rate_range.start()}\
to {sample_rate_range.stop()}."
)
self.usrp.set_rx_rate(sample_rate, channel) self.usrp.set_rx_rate(sample_rate, channel)
center_frequency_range = self.usrp.get_rx_freq_range() center_frequency_range = self.usrp.get_rx_freq_range()
@ -322,17 +317,12 @@ class USRP(SDR):
self.usrp.set_tx_gain(abs_gain, channel) self.usrp.set_tx_gain(abs_gain, channel)
# check if sample rate arg is valid # check if sample rate arg is valid
# Note: B200/B210 devices auto-adjust master clock rate, so get_tx_rates() returns sample_rate_range = self.usrp.get_tx_rates()
# the range for the CURRENT master clock, not the maximum possible range. if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop():
# Skip validation for B-series devices and let UHD handle it. raise IOError(
device_type = self.device_dict.get("type", "").lower() f"Sample rate {sample_rate} not valid for this USRP.\nValid\
if device_type not in ["b200", "b210"]: range is {sample_rate_range.start()} to {sample_rate_range.stop()}."
sample_rate_range = self.usrp.get_tx_rates() )
if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop():
raise IOError(
f"Sample rate {sample_rate} not valid for this USRP.\nValid\
range is {sample_rate_range.start()} to {sample_rate_range.stop()}."
)
self.usrp.set_tx_rate(sample_rate, channel) self.usrp.set_tx_rate(sample_rate, channel)
center_frequency_range = self.usrp.get_tx_freq_range() center_frequency_range = self.usrp.get_tx_freq_range()