330 lines
12 KiB
Python
330 lines
12 KiB
Python
import time
|
|
import warnings
|
|
import math
|
|
from typing import Optional
|
|
|
|
import numpy as np
|
|
|
|
from ria_toolkit_oss.datatypes.recording import Recording
|
|
from ria_toolkit_oss.sdr._external.libhackrf import HackRF as hrf
|
|
from ria_toolkit_oss.sdr.sdr import SDR
|
|
|
|
|
|
class HackRF(SDR):
|
|
def __init__(self, identifier=""):
|
|
"""
|
|
Initialize a HackRF device object and connect to the SDR hardware.
|
|
|
|
:param identifier: Not used for HackRF.
|
|
|
|
HackRF devices cannot currently be selected with and identifier value.
|
|
If there are multiple connected devices, the device in use may be selected randomly.
|
|
"""
|
|
|
|
if identifier != "":
|
|
print(f"Warning, radio identifier {identifier} provided for HackRF but will not be used.")
|
|
|
|
print("Initializing HackRF radio.")
|
|
try:
|
|
super().__init__()
|
|
|
|
self.radio = hrf()
|
|
print("Successfully found HackRF radio.")
|
|
except Exception as e:
|
|
print("Failed to find HackRF radio.")
|
|
raise e
|
|
|
|
super().__init__()
|
|
|
|
def supports_bias_tee(self) -> bool:
|
|
return True
|
|
|
|
def set_bias_tee(self, enable: bool):
|
|
try:
|
|
self.radio.set_antenna_enable(bool(enable))
|
|
except AttributeError as exc: # pragma: no cover - defensive
|
|
raise NotImplementedError("Underlying HackRF interface lacks bias-tee control") from exc
|
|
|
|
def init_rx(self, sample_rate, center_frequency, gain, channel, gain_mode):
|
|
"""
|
|
Initializes the HackRF for receiving.
|
|
|
|
HackRF has 3 gain stages:
|
|
- 14 dB front-end amplifier (on/off)
|
|
- LNA gain: 0-40 dB in 8 dB steps
|
|
- VGA gain: 0-62 dB in 2 dB steps
|
|
|
|
:param sample_rate: The sample rate for receiving.
|
|
:type sample_rate: int or float
|
|
:param center_frequency: The center frequency of the recording.
|
|
:type center_frequency: int or float
|
|
:param gain: The total gain set for receiving on the HackRF (distributed across stages)
|
|
:type gain: int
|
|
:param channel: The channel the HackRF is set to. (Not actually used)
|
|
:type channel: int
|
|
:param gain_mode: Gain mode setting. Currently only "absolute" is supported.
|
|
:type gain_mode: str
|
|
"""
|
|
print("Initializing RX")
|
|
|
|
self.rx_sample_rate = sample_rate
|
|
self.radio.sample_rate = int(sample_rate)
|
|
print(f"HackRF sample rate = {self.radio.sample_rate}")
|
|
|
|
self.rx_center_frequency = center_frequency
|
|
self.radio.center_freq = int(center_frequency)
|
|
print(f"HackRF center frequency = {self.radio.center_freq}")
|
|
|
|
# Distribute gain across amplifier stages
|
|
rx_gain_min = 0
|
|
rx_gain_max = 116 # 14 (amp) + 40 (LNA) + 62 (VGA)
|
|
|
|
if gain_mode == "relative":
|
|
if gain > 0:
|
|
raise ValueError(
|
|
"When gain_mode = 'relative', gain must be < 0. This "
|
|
"sets the gain relative to the maximum possible gain."
|
|
)
|
|
else:
|
|
abs_gain = rx_gain_max + gain
|
|
else:
|
|
abs_gain = gain
|
|
|
|
if abs_gain < rx_gain_min or abs_gain > rx_gain_max:
|
|
abs_gain = min(max(abs_gain, rx_gain_min), rx_gain_max)
|
|
print(f"Gain {gain} out of range for HackRF.")
|
|
print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB")
|
|
|
|
# Distribute gain using the signal-testbed algorithm
|
|
enable_amp = False
|
|
remaining_gain = abs_gain
|
|
|
|
# Enable 14 dB pre-amp if gain is high enough
|
|
if remaining_gain > 30:
|
|
remaining_gain = remaining_gain - 14
|
|
enable_amp = True
|
|
print("HackRF: 14dB front-end amplifier enabled.")
|
|
|
|
# Distribute remaining gain between LNA and VGA
|
|
# LNA gets 60% of remaining gain, rounded down to 8 dB steps
|
|
lna_gain = math.floor(remaining_gain * 0.6)
|
|
lna_gain = lna_gain - (lna_gain % 8) # Round to 8 dB steps
|
|
if lna_gain > 40:
|
|
lna_gain = 40
|
|
|
|
# VGA gets the rest
|
|
vga_gain = remaining_gain - lna_gain
|
|
if vga_gain > 62:
|
|
vga_gain = 62
|
|
|
|
# Apply gain settings
|
|
if enable_amp:
|
|
self.radio.enable_amp()
|
|
else:
|
|
self.radio.disable_amp()
|
|
|
|
self.radio.set_lna_gain(lna_gain)
|
|
self.radio.set_vga_gain(vga_gain)
|
|
|
|
self.rx_gain = abs_gain
|
|
print(f"HackRF gain distribution: Amp={enable_amp}, LNA={lna_gain}dB, VGA={vga_gain}dB")
|
|
|
|
self._rx_initialized = True
|
|
self._tx_initialized = False
|
|
|
|
def init_tx(
|
|
self,
|
|
sample_rate: int | float,
|
|
center_frequency: int | float,
|
|
gain: int,
|
|
channel: int,
|
|
gain_mode: Optional[str] = "absolute",
|
|
):
|
|
"""
|
|
Initializes the HackRF for transmitting.
|
|
|
|
:param sample_rate: The sample rate for transmitting.
|
|
:type sample_rate: int or float
|
|
:param center_frequency: The center frequency of the recording.
|
|
:type center_frequency: int or float
|
|
:param gain: The gain set for transmitting on the HackRF
|
|
:type gain: int
|
|
:param channel: The channel the HackRF is set to. (Not actually used)
|
|
:type channel: int
|
|
:param buffer_size: The buffer size during transmit. Defaults to 10000.
|
|
:type buffer_size: int
|
|
"""
|
|
|
|
print("Initializing TX")
|
|
self.tx_sample_rate = sample_rate
|
|
self.radio.sample_rate = int(sample_rate)
|
|
print(f"HackRF sample rate = {self.radio.sample_rate}")
|
|
|
|
self.tx_center_frequency = center_frequency
|
|
self.radio.center_freq = int(center_frequency)
|
|
print(f"HackRF center frequency = {self.radio.center_freq}")
|
|
|
|
self.radio.enable_amp()
|
|
|
|
tx_gain_min = 0
|
|
tx_gain_max = 47
|
|
if gain_mode == "relative":
|
|
if gain > 0:
|
|
raise ValueError(
|
|
"When gain_mode = 'relative', gain must be < 0. This \
|
|
sets the gain relative to the maximum possible gain."
|
|
)
|
|
else:
|
|
abs_gain = tx_gain_max + gain
|
|
else:
|
|
abs_gain = gain
|
|
|
|
if abs_gain < tx_gain_min or abs_gain > tx_gain_max:
|
|
abs_gain = min(max(gain, tx_gain_min), tx_gain_max)
|
|
print(f"Gain {gain} out of range for Pluto.")
|
|
print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB")
|
|
|
|
self.radio.txvga_gain = abs_gain
|
|
print(f"HackRF gain = {self.radio.txvga_gain}")
|
|
|
|
self._tx_initialized = True
|
|
self._rx_initialized = False
|
|
|
|
def tx_recording(
|
|
self,
|
|
recording: Recording | np.ndarray,
|
|
num_samples: Optional[int] = None,
|
|
tx_time: Optional[int | float] = None,
|
|
):
|
|
"""
|
|
Transmit the given iq samples from the provided recording.
|
|
init_tx() must be called before this function.
|
|
|
|
:param recording: The recording to transmit.
|
|
:type recording: Recording or np.ndarray
|
|
:param num_samples: The number of samples to transmit, will repeat or
|
|
truncate the recording to this length. Defaults to None.
|
|
:type num_samples: int, optional
|
|
:param tx_time: The time to transmit, will repeat or truncate the
|
|
recording to this length. Defaults to None.
|
|
:type tx_time: int or float, optional
|
|
"""
|
|
if num_samples is not None and tx_time is not None:
|
|
raise ValueError("Only input one of num_samples or tx_time")
|
|
elif num_samples is not None:
|
|
tx_time = num_samples / self.tx_sample_rate
|
|
elif tx_time is not None:
|
|
pass
|
|
else:
|
|
tx_time = len(recording) / self.tx_sample_rate
|
|
|
|
if isinstance(recording, np.ndarray):
|
|
samples = recording
|
|
elif isinstance(recording, Recording):
|
|
if len(recording.data) > 1:
|
|
warnings.warn("Recording object is multichannel, only channel 0 data was used for transmission")
|
|
|
|
samples = recording.data[0]
|
|
|
|
samples = samples.astype(np.complex64, copy=False)
|
|
if np.max(np.abs(samples)) >= 1:
|
|
samples = samples / (np.max(np.abs(samples)) + 1e-12)
|
|
|
|
print("HackRF Starting TX...")
|
|
self.radio.start_tx(samples=samples, repeat=True)
|
|
time.sleep(tx_time)
|
|
self.radio.stop_tx()
|
|
print("HackRF Tx Completed.")
|
|
|
|
def set_clock_source(self, source):
|
|
|
|
self.radio.set_clock_source(source)
|
|
|
|
def close(self):
|
|
self.radio.close()
|
|
|
|
def record(self, num_samples):
|
|
"""
|
|
Record a specified number of samples from the HackRF using block capture mode.
|
|
This is more reliable than streaming for USB2 connections.
|
|
|
|
:param num_samples: Number of samples to capture
|
|
:type num_samples: int
|
|
:return: Recording object containing the captured data
|
|
:rtype: Recording
|
|
"""
|
|
if not self._rx_initialized:
|
|
raise RuntimeError("RX was not initialized. init_rx() must be called before record()")
|
|
|
|
print("HackRF Starting RX...")
|
|
|
|
# Use libhackrf's block capture method
|
|
all_samples = self.radio.read_samples(num_samples)
|
|
|
|
print("HackRF RX Completed.")
|
|
|
|
# Create 1xN array for single-channel recording
|
|
store_array = np.zeros((1, num_samples), dtype=np.complex64)
|
|
store_array[0, :] = all_samples
|
|
|
|
metadata = {
|
|
"source": self.__class__.__name__,
|
|
"sample_rate": self.rx_sample_rate,
|
|
"center_frequency": self.rx_center_frequency,
|
|
"gain": self.rx_gain,
|
|
}
|
|
|
|
return Recording(data=store_array, metadata=metadata)
|
|
|
|
def _stream_rx(self, callback):
|
|
"""
|
|
Stream samples from the HackRF using a callback function.
|
|
|
|
:param callback: Function to call for each buffer of samples
|
|
:type callback: callable
|
|
"""
|
|
if not self._rx_initialized:
|
|
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx()")
|
|
|
|
print("HackRF Starting RX stream...")
|
|
|
|
self._enable_rx = True
|
|
|
|
def rx_callback(hackrf_transfer):
|
|
"""Internal callback that wraps the user's callback"""
|
|
try:
|
|
if not self._enable_rx:
|
|
return 1 # Stop
|
|
|
|
c = hackrf_transfer.contents
|
|
|
|
# Use ctypes string_at to safely copy the buffer
|
|
from ctypes import string_at
|
|
byte_data = string_at(c.buffer, c.valid_length)
|
|
|
|
# Convert bytes to int8, then to float32, then view as complex64
|
|
samples = np.frombuffer(byte_data, dtype=np.int8).astype(np.float32).view(np.complex64)
|
|
|
|
# Call user's callback
|
|
callback(buffer=samples, metadata=None)
|
|
|
|
return 0 if self._enable_rx else 1
|
|
except Exception as e:
|
|
print(f"Error in rx_callback: {e}")
|
|
return 1 # Stop on error
|
|
|
|
# Start RX
|
|
self.radio.start_rx(rx_callback)
|
|
|
|
# Wait while streaming
|
|
while self._enable_rx:
|
|
time.sleep(0.1)
|
|
|
|
# Stop RX
|
|
self.radio.stop_rx()
|
|
|
|
print("HackRF RX stream completed.")
|
|
|
|
def _stream_tx(self, callback):
|
|
return super()._stream_tx(callback)
|