Compare commits
5 Commits
98407604ef
...
0f246e9c69
| Author | SHA1 | Date | |
|---|---|---|---|
| 0f246e9c69 | |||
| 4ba0dc170e | |||
| 762eda9426 | |||
| 172f8e0ca3 | |||
| c035b990ef |
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -48,6 +48,7 @@ coverage.xml
|
|||
.hypothesis/
|
||||
.pytest_cache/
|
||||
cover/
|
||||
tests/sdr/
|
||||
|
||||
# Sphinx documentation
|
||||
docs/build/
|
||||
|
|
|
|||
|
|
@ -448,6 +448,60 @@ class Recording:
|
|||
else:
|
||||
raise ValueError(f"Key {key} is protected and cannot be modified or removed.")
|
||||
|
||||
def view(self, output_path: Optional[str] = "images/signal.png", **kwargs) -> None:
|
||||
"""Create a plot of various signal visualizations as a PNG image.
|
||||
|
||||
:param output_path: The output image path. Defaults to "images/signal.png".
|
||||
:type output_path: str, optional
|
||||
:param kwargs: Keyword arguments passed on to utils.view.view_sig.
|
||||
:type: dict of keyword arguments
|
||||
|
||||
**Examples:**
|
||||
|
||||
Create a recording and view it as a plot in a .png image:
|
||||
|
||||
>>> import numpy
|
||||
>>> from utils.data import Recording
|
||||
|
||||
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
|
||||
>>> metadata = {
|
||||
>>> "sample_rate": 1e6,
|
||||
>>> "center_frequency": 2.44e9,
|
||||
>>> }
|
||||
|
||||
>>> recording = Recording(data=samples, metadata=metadata)
|
||||
>>> recording.view()
|
||||
"""
|
||||
from ria_toolkit_oss.view.view_signal import view_sig
|
||||
|
||||
view_sig(recording=self, output_path=output_path, **kwargs)
|
||||
|
||||
def simple_view(self, **kwargs) -> None:
|
||||
"""Create a plot of various signal visualizations as a PNG or SVG image.
|
||||
|
||||
:param kwargs: Keyword arguments passed on to utils.view.view_signal_simple.create_plots.
|
||||
:type: dict of keyword arguments
|
||||
|
||||
**Examples:**
|
||||
|
||||
Create a recording and view it as a plot in a .png image:
|
||||
|
||||
>>> import numpy
|
||||
>>> from utils.data import Recording
|
||||
|
||||
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
|
||||
>>> metadata = {
|
||||
>>> "sample_rate": 1e6,
|
||||
>>> "center_frequency": 2.44e9,
|
||||
>>> }
|
||||
|
||||
>>> recording = Recording(data=samples, metadata=metadata)
|
||||
>>> recording.simple_view()
|
||||
"""
|
||||
from ria_toolkit_oss.view.view_signal_simple import view_simple_sig
|
||||
|
||||
view_simple_sig(recording=self, **kwargs)
|
||||
|
||||
def to_sigmf(
|
||||
self, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None, overwrite: bool = False
|
||||
) -> None:
|
||||
|
|
|
|||
57
src/ria_toolkit_oss/view/tools.py
Normal file
57
src/ria_toolkit_oss/view/tools.py
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
import pathlib
|
||||
|
||||
MAX_PLOT_POINTS = 100_000
|
||||
COLORS = {
|
||||
"primary": "#6366f1",
|
||||
"secondary": "#8b5cf6",
|
||||
"accent": "#06b6d4",
|
||||
"dark": "#1e293b",
|
||||
"light": "#f8fafc",
|
||||
"text": "#334155",
|
||||
"muted": "#64748b",
|
||||
"success": "#10b981",
|
||||
"warning": "#f59e0b",
|
||||
"error": "#ef4444",
|
||||
"purple": "#8b5cf6",
|
||||
"magenta": "#d946ef",
|
||||
}
|
||||
|
||||
|
||||
def decimate(x, max_points=MAX_PLOT_POINTS):
|
||||
if len(x) <= max_points:
|
||||
return x
|
||||
step = len(x) // max_points
|
||||
return x[::step]
|
||||
|
||||
|
||||
def extract_metadata_fields(metadata):
|
||||
sample_rate = next((v for k, v in metadata.items() if "sample_rate" in k), 1)
|
||||
center_freq = next((v for k, v in metadata.items() if "center_freq" in k), 0)
|
||||
sdr = next((v for k, v in metadata.items() if "sdr" in k), "Unknown")
|
||||
return sample_rate, center_freq, sdr
|
||||
|
||||
|
||||
def set_path(output_path):
|
||||
split_path = output_path.split("/")
|
||||
|
||||
if len(split_path) == 1:
|
||||
folder = "images"
|
||||
file = split_path[0]
|
||||
elif len(split_path) > 2:
|
||||
file = split_path[-1]
|
||||
folder = "/".join(split_path[:-1])
|
||||
else:
|
||||
folder, file = split_path
|
||||
|
||||
split_file = file.split(".")
|
||||
if len(split_file) == 2:
|
||||
extension = split_file[1]
|
||||
else:
|
||||
extension = "no extension"
|
||||
if extension != "png" and extension != "svg":
|
||||
print(f"{extension} not supported, saving as .png.")
|
||||
extension = "png"
|
||||
file = file + ".png"
|
||||
|
||||
pathlib.Path(folder).mkdir(parents=True, exist_ok=True)
|
||||
return "/".join([folder, file]), extension
|
||||
257
src/ria_toolkit_oss/view/view_signal.py
Normal file
257
src/ria_toolkit_oss/view/view_signal.py
Normal file
|
|
@ -0,0 +1,257 @@
|
|||
import os
|
||||
import textwrap
|
||||
from typing import Optional
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
from matplotlib import gridspec
|
||||
from PIL import Image
|
||||
from scipy.fft import fft, fftshift
|
||||
from scipy.signal import spectrogram
|
||||
|
||||
from ria_toolkit_oss.datatypes.recording import Recording
|
||||
from ria_toolkit_oss.view.tools import (
|
||||
COLORS,
|
||||
decimate,
|
||||
extract_metadata_fields,
|
||||
set_path,
|
||||
)
|
||||
|
||||
|
||||
def get_fft_size(plot_length):
|
||||
if plot_length < 2000:
|
||||
return int(64)
|
||||
elif plot_length < 10000:
|
||||
return int(256)
|
||||
elif plot_length < 1000000:
|
||||
return int(1024)
|
||||
else:
|
||||
return int(2048)
|
||||
|
||||
|
||||
def set_spines(ax, spines):
|
||||
if not spines:
|
||||
ax.spines["top"].set_visible(False)
|
||||
ax.spines["right"].set_visible(False)
|
||||
ax.spines["bottom"].set_visible(False)
|
||||
ax.spines["left"].set_visible(False)
|
||||
|
||||
|
||||
def view_sig(
|
||||
recording: Recording,
|
||||
output_path: Optional[str] = "images/signal.png",
|
||||
title: Optional[str] = "Signal Plot",
|
||||
dpi: Optional[int] = 250,
|
||||
plot_length: Optional[int] = None,
|
||||
plot_spectrogram: Optional[bool] = True,
|
||||
iq: Optional[bool] = True,
|
||||
frequency: Optional[bool] = True,
|
||||
constellation: Optional[bool] = True,
|
||||
metadata: Optional[bool] = True,
|
||||
logo: Optional[bool] = True,
|
||||
dark: Optional[bool] = True,
|
||||
spines: Optional[bool] = False,
|
||||
title_fontsize: Optional[int] = 40,
|
||||
subtitle_fontsize: Optional[int] = 20,
|
||||
) -> None:
|
||||
"""
|
||||
Create a plot of various signal visualizations as a png or svg image.
|
||||
|
||||
:param recording: The recording object to plot.
|
||||
:type recording: Recording
|
||||
:param output_path: The output image path. Defaults to "images/signal.png"
|
||||
:type output_path: str, optional
|
||||
:param title: The display title. Defaults to "Signal Plot"
|
||||
:type title: str, optional
|
||||
:param dpi: The dots per inch resolution. Defaults to 250
|
||||
:type dpi: int, optional
|
||||
:param plot_length: The number of samples to plot, default is the whole recording. Defaults to None
|
||||
:type plot_length: int, optional
|
||||
:param plot_spectrogram: Display the spectrogram. Defaults to True
|
||||
:type plot_spectrogram: bool, optional
|
||||
:param iq: Display the iq sample plot. Defaults to True
|
||||
:type iq: bool, optional
|
||||
:param frequency: Display the fft of the recording. Defaults to True
|
||||
:type frequency: bool, optional
|
||||
:param constellation: Display the constellation plot. Defaults to True
|
||||
:type constellation: bool, optional
|
||||
:param metadata: Display the metadata text. Defaults to True
|
||||
:type metadata: bool, optional
|
||||
:param logo: Display the Qoherent logo. Defaults to True
|
||||
:type logo: bool, optional
|
||||
:param dark: Use dark mode. Defaults to True
|
||||
:type dark: bool, optional
|
||||
:param spines: Display spines (bounding lines) around plots. Defaults to False
|
||||
:type spines: bool, optional
|
||||
:param title_fontsize: The font size of the main title text. Defaults to 40
|
||||
:type title_fontsize: int, optional
|
||||
:param subtitle_fontsize: The fontsize of the subplot titles. Defaults to 20
|
||||
:type subtitle_fontsize: int, optional
|
||||
|
||||
**Examples:**
|
||||
|
||||
.. todo:: Usage examples coming soon.
|
||||
"""
|
||||
|
||||
complex_signal = recording.data[0]
|
||||
sample_rate, center_frequency, _ = extract_metadata_fields(recording.metadata)
|
||||
|
||||
subplot_height = 2 * (plot_spectrogram + iq + frequency) + 3 * (constellation or metadata or logo)
|
||||
subplot_width = max((constellation + metadata or 1), logo * 3)
|
||||
|
||||
if dark:
|
||||
plt.style.use("dark_background")
|
||||
logo_path = os.path.dirname(__file__) + "/graphics/Qoherent-logo-white-transparent.png"
|
||||
else:
|
||||
plt.style.use("default")
|
||||
logo_path = os.path.dirname(__file__) + "/graphics/Qoherent-logo-black-transparent.png"
|
||||
|
||||
if plot_length is None:
|
||||
plot_length = len(recording.data[0])
|
||||
|
||||
# Plot preparation
|
||||
fig = plt.figure(figsize=(14, 12))
|
||||
fig.suptitle(title, fontsize=title_fontsize)
|
||||
gs = gridspec.GridSpec(subplot_height, subplot_width)
|
||||
|
||||
plot_y_indx = 0
|
||||
plot_x_indx = 0
|
||||
|
||||
if plot_spectrogram:
|
||||
spec_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :])
|
||||
plot_y_indx = plot_y_indx + 2
|
||||
fft_size = get_fft_size(plot_length=plot_length)
|
||||
|
||||
f, t_spec, Sxx = spectrogram(
|
||||
complex_signal[:plot_length],
|
||||
fs=sample_rate,
|
||||
nperseg=fft_size,
|
||||
noverlap=fft_size // 8,
|
||||
mode="magnitude",
|
||||
return_onesided=False,
|
||||
)
|
||||
|
||||
# shift frequencies so zero is centered
|
||||
Sxx = np.fft.fftshift(Sxx, axes=0)
|
||||
f = np.fft.fftshift(f) - sample_rate / 2 + center_frequency
|
||||
|
||||
spec_ax.imshow(
|
||||
10 * np.log10(Sxx + 1e-12),
|
||||
aspect="auto",
|
||||
origin="lower",
|
||||
extent=[t_spec[0], t_spec[-1], f[0], f[-1]],
|
||||
cmap="twilight",
|
||||
)
|
||||
|
||||
set_spines(spec_ax, spines)
|
||||
spec_ax.set_title("Spectrogram", loc="center", fontsize=subtitle_fontsize)
|
||||
spec_ax.set_ylabel("Frequency (Hz)")
|
||||
spec_ax.set_xlabel("Time (s)")
|
||||
|
||||
if iq:
|
||||
iq_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :])
|
||||
plot_y_indx = plot_y_indx + 2
|
||||
|
||||
plot_iq = decimate(complex_signal[:plot_length])
|
||||
t = np.arange(len(plot_iq)) / sample_rate * (len(complex_signal[:plot_length]) / len(plot_iq))
|
||||
|
||||
iq_ax.plot(t, plot_iq.real, color=COLORS["purple"], linewidth=0.6, alpha=0.8, label="I")
|
||||
iq_ax.plot(t, plot_iq.imag, color=COLORS["magenta"], linewidth=0.6, alpha=0.8, label="Q")
|
||||
iq_ax.grid(False)
|
||||
|
||||
iq_ax.set_ylabel("Amplitude")
|
||||
iq_ax.set_xlim([min(t), max(t)])
|
||||
iq_ax.set_xlabel("Time (s)")
|
||||
iq_ax.set_title("IQ Sample Plot", fontsize=subtitle_fontsize)
|
||||
set_spines(iq_ax, spines)
|
||||
|
||||
if frequency:
|
||||
freq_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :])
|
||||
plot_y_indx = plot_y_indx + 2
|
||||
|
||||
epsilon = 1e-10
|
||||
spectrum = np.abs(fftshift(fft(complex_signal[0:plot_length])))
|
||||
freqs = (
|
||||
np.linspace(-1 * (sample_rate / 2), (sample_rate / 2), len(complex_signal[0:plot_length]))
|
||||
+ center_frequency
|
||||
)
|
||||
|
||||
# Use semi-log for the y-axis
|
||||
freq_ax.semilogy(freqs, spectrum + epsilon, color=COLORS["accent"], linewidth=0.8)
|
||||
freq_ax.set_xlabel("Frequency")
|
||||
freq_ax.set_ylabel("Magnitude")
|
||||
freq_ax.set_title("Frequency Spectrum", fontsize=subtitle_fontsize)
|
||||
set_spines(freq_ax, spines)
|
||||
|
||||
if constellation:
|
||||
const_ax = plt.subplot(gs[plot_y_indx:, plot_x_indx])
|
||||
plot_x_indx = plot_x_indx + 1
|
||||
plot_const = decimate(complex_signal[:plot_length], 50_000)
|
||||
const_ax.scatter(plot_const.real, plot_const.imag, c=COLORS["purple"], s=1, linewidths=0.1)
|
||||
dimension = max(abs(complex_signal)) * 1.1
|
||||
const_ax.set_xlim([-1 * dimension, dimension])
|
||||
const_ax.set_ylim([-1 * dimension, dimension])
|
||||
const_ax.set_xlabel("In-phase (I)")
|
||||
const_ax.set_ylabel("Quadrature (Q)")
|
||||
const_ax.set_title("Constellation", fontsize=subtitle_fontsize)
|
||||
const_ax.set_aspect("equal")
|
||||
|
||||
if not spines:
|
||||
const_ax.spines["top"].set_visible(False)
|
||||
const_ax.spines["right"].set_visible(False)
|
||||
const_ax.spines["bottom"].set_visible(False)
|
||||
const_ax.spines["left"].set_visible(False)
|
||||
|
||||
# metadata text box
|
||||
if metadata:
|
||||
meta_ax = plt.subplot(gs[plot_y_indx:, plot_x_indx])
|
||||
plot_x_indx = plot_x_indx + 1
|
||||
metadata_text = "\n".join(
|
||||
[
|
||||
f"{key}: {textwrap.shorten(str(value), width=80, placeholder='...')}"
|
||||
for key, value in recording.metadata.items()
|
||||
]
|
||||
)
|
||||
|
||||
meta_ax.text(
|
||||
0.05,
|
||||
0.95,
|
||||
metadata_text,
|
||||
fontsize=10,
|
||||
va="top",
|
||||
ha="left",
|
||||
bbox=dict(facecolor="none", alpha=0.5, edgecolor="none"),
|
||||
)
|
||||
meta_ax.set_title("Metadata", fontsize=subtitle_fontsize)
|
||||
# Remove the tick labels
|
||||
meta_ax.xaxis.set_ticklabels([]) # Remove x-axis tick labels
|
||||
meta_ax.yaxis.set_ticklabels([]) # Remove y-axis tick labels
|
||||
meta_ax.set_xticks([])
|
||||
meta_ax.set_yticks([])
|
||||
set_spines(meta_ax, spines)
|
||||
|
||||
if logo and os.path.isfile(logo_path):
|
||||
logo_ax = plt.subplot(gs[plot_y_indx:, 2])
|
||||
plot_x_indx = plot_x_indx + 1
|
||||
logo_ax.axis("off")
|
||||
|
||||
try:
|
||||
image = Image.open(logo_path) # Open the PNG image using PIL
|
||||
logo_ax.imshow(image)
|
||||
|
||||
except FileNotFoundError:
|
||||
print(f"Warning, {logo_path} not found.")
|
||||
|
||||
fig.subplots_adjust(
|
||||
left=0.1, # Left margin
|
||||
right=0.9, # Right margin
|
||||
top=0.9, # Top margin
|
||||
bottom=0.1, # Bottom margin
|
||||
wspace=0.4, # Horizontal space between subplots
|
||||
hspace=2.5, # Vertical space between subplots
|
||||
)
|
||||
|
||||
# save path handling
|
||||
output_path, _ = set_path(output_path=output_path)
|
||||
plt.savefig(output_path, dpi=dpi)
|
||||
print(f"Saved signal plot to {output_path}")
|
||||
328
src/ria_toolkit_oss/view/view_signal_simple.py
Normal file
328
src/ria_toolkit_oss/view/view_signal_simple.py
Normal file
|
|
@ -0,0 +1,328 @@
|
|||
"""Shared plotting primitives for signal visualization."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Optional
|
||||
|
||||
import matplotlib
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
from scipy.fft import fft, fftshift
|
||||
from scipy.signal.windows import hann
|
||||
|
||||
from ria_toolkit_oss.datatypes.recording import Recording
|
||||
from ria_toolkit_oss.view.tools import (
|
||||
COLORS,
|
||||
decimate,
|
||||
extract_metadata_fields,
|
||||
set_path,
|
||||
)
|
||||
|
||||
|
||||
def _get_nfft_size(signal, fast_mode):
|
||||
if len(signal) < 1000:
|
||||
nfft = 128
|
||||
elif len(signal) < 10_000:
|
||||
nfft = 256
|
||||
elif len(signal) < 100_000:
|
||||
nfft = 512
|
||||
elif len(signal) < 1_000_000:
|
||||
nfft = 1024
|
||||
else:
|
||||
nfft = 2048
|
||||
|
||||
if fast_mode:
|
||||
nfft = min(nfft, 512)
|
||||
overlap = nfft // 8 if fast_mode else nfft // 4
|
||||
return nfft, overlap
|
||||
|
||||
|
||||
def _get_plot_samples(signal, fast_mode, slow_max, fast_max):
|
||||
max_samples = fast_max if fast_mode else slow_max
|
||||
if len(signal) > max_samples:
|
||||
start_idx = len(signal) // 2 - max_samples // 2
|
||||
return signal[start_idx : start_idx + max_samples]
|
||||
else:
|
||||
return signal
|
||||
|
||||
|
||||
def _set_dpi(fast_mode, labels_mode, extension):
|
||||
if fast_mode:
|
||||
dpi = 75
|
||||
elif labels_mode:
|
||||
dpi = 200
|
||||
else:
|
||||
dpi = 150
|
||||
return dpi if extension == "png" else None
|
||||
|
||||
|
||||
def setup_style(*, labels_mode: bool = False, compact_mode: bool = False) -> None:
|
||||
"""Configure matplotlib with the signal-testbed styling."""
|
||||
|
||||
plt.style.use("dark_background")
|
||||
|
||||
if compact_mode:
|
||||
base_font = 8
|
||||
title_font = 10
|
||||
label_font = 8
|
||||
elif labels_mode:
|
||||
base_font = 12
|
||||
title_font = 16
|
||||
label_font = 14
|
||||
else:
|
||||
base_font = 10
|
||||
title_font = 12
|
||||
label_font = 10
|
||||
|
||||
matplotlib.rcParams.update(
|
||||
{
|
||||
"figure.facecolor": "#0f172a",
|
||||
"axes.facecolor": "#1e293b",
|
||||
"axes.edgecolor": COLORS["muted"],
|
||||
"axes.labelcolor": COLORS["light"],
|
||||
"text.color": COLORS["light"],
|
||||
"xtick.color": COLORS["muted"],
|
||||
"ytick.color": COLORS["muted"],
|
||||
"grid.color": COLORS["muted"],
|
||||
"grid.alpha": 0.3,
|
||||
"font.size": base_font,
|
||||
"axes.titlesize": title_font,
|
||||
"axes.labelsize": label_font,
|
||||
"figure.titlesize": title_font + 2,
|
||||
"legend.frameon": False,
|
||||
"legend.facecolor": "none",
|
||||
"xtick.labelsize": base_font,
|
||||
"ytick.labelsize": base_font,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def detect_constellation_symbols(signal: np.ndarray, method: str = "differential") -> np.ndarray:
|
||||
"""Heuristic symbol detector used for constellation highlighting."""
|
||||
|
||||
if len(signal) < 100:
|
||||
return np.ones(len(signal), dtype=bool)
|
||||
|
||||
if method == "differential":
|
||||
di = np.diff(signal.imag)
|
||||
dq = np.diff(signal.real)
|
||||
derivative_magnitude = np.sqrt(di**2 + dq**2)
|
||||
derivative_magnitude = np.append(derivative_magnitude, 0)
|
||||
threshold = np.percentile(derivative_magnitude, 15)
|
||||
return derivative_magnitude < threshold
|
||||
|
||||
if method == "amplitude":
|
||||
amplitude = np.abs(signal)
|
||||
amplitude_change = np.abs(np.diff(amplitude))
|
||||
amplitude_change = np.append(amplitude_change, 0)
|
||||
threshold = np.percentile(amplitude_change, 20)
|
||||
return amplitude_change < threshold
|
||||
|
||||
if method == "phase":
|
||||
phase = np.angle(signal)
|
||||
phase_diff = np.diff(np.unwrap(phase))
|
||||
phase_diff = np.append(phase_diff, 0)
|
||||
threshold = np.percentile(np.abs(phase_diff), 20)
|
||||
return np.abs(phase_diff) < threshold
|
||||
|
||||
if method == "combined":
|
||||
diff_stable = detect_constellation_symbols(signal, "differential")
|
||||
amp_stable = detect_constellation_symbols(signal, "amplitude")
|
||||
phase_stable = detect_constellation_symbols(signal, "phase")
|
||||
stability_count = diff_stable.astype(int) + amp_stable.astype(int) + phase_stable.astype(int)
|
||||
return stability_count >= 2
|
||||
|
||||
raise ValueError(f"Unknown method: {method}")
|
||||
|
||||
|
||||
def view_simple_sig(
|
||||
recording: Recording,
|
||||
output_path: Optional[str] = "images/signal.png",
|
||||
saveplot: Optional[bool] = True,
|
||||
fast_mode: Optional[bool] = False,
|
||||
compact_mode: Optional[bool] = False,
|
||||
horizontal_mode: Optional[bool] = False,
|
||||
constellation_mode: Optional[bool] = False,
|
||||
labels_mode: Optional[bool] = False,
|
||||
slice: Optional[tuple] = None,
|
||||
title: Optional[str] = "Signal",
|
||||
):
|
||||
"""
|
||||
Create a simple plot of various signal visualizations as a png or svg image.
|
||||
|
||||
:param recording: The recording object to plot.
|
||||
:type recording: Recording
|
||||
:param output_path: The output image path. Defaults to "images/signal.png"
|
||||
:type output_path: str, optional
|
||||
:param saveplot: Whether or not to save the plot. Defaults to True.
|
||||
:type saveplot: bool, optional
|
||||
:param fast_mode: Use fast mode for faster render. Defaults to False.
|
||||
:type fast_mode: bool, optional
|
||||
:param compact_mode: Use compact mode for compact plot. Defaults to False.
|
||||
:type compact_mode: bool, optional
|
||||
:param horizontal_mode: Display plots horizontally. Defaults to False.
|
||||
:type horizontal_mode: bool, optional
|
||||
:param constellation_mode: Display constellation plot and PSD if not using compact mode. Defaults to False.
|
||||
:type constellation_mode: bool, optional
|
||||
:param labels_mode: Display more thorough labels. Defaults to False.
|
||||
:type labels_mode: bool, optional
|
||||
:param slice: Slice of signal to display. Defaults to None.
|
||||
:type slice: tuple[int, int], optional
|
||||
:param title: Title of plot. Defaults to "Signal".
|
||||
:type title: str, optional
|
||||
|
||||
"""
|
||||
|
||||
signal = recording.data[0]
|
||||
sample_rate_hz, center_freq_hz, sdr = extract_metadata_fields(recording.metadata)
|
||||
|
||||
setup_style(labels_mode=labels_mode, compact_mode=compact_mode)
|
||||
|
||||
if slice:
|
||||
start_idx, end_idx = slice
|
||||
signal = signal[start_idx:end_idx]
|
||||
print(f"Using slice: samples {start_idx} to {end_idx} ({len(signal):,} samples)")
|
||||
|
||||
max_display_pixels = 100_000 if fast_mode else 250_000
|
||||
display_signal = decimate(signal, max_display_pixels) if len(signal) > max_display_pixels else signal
|
||||
spec_signal = signal
|
||||
|
||||
if compact_mode:
|
||||
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 6), gridspec_kw={"height_ratios": [1, 5]})
|
||||
show_title = False
|
||||
show_labels = False
|
||||
ax_constellation = ax_psd = None
|
||||
elif horizontal_mode:
|
||||
if constellation_mode:
|
||||
fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(18, 6))
|
||||
ax_constellation = ax3
|
||||
else:
|
||||
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))
|
||||
ax_constellation = None
|
||||
show_title = True
|
||||
show_labels = labels_mode
|
||||
ax_psd = None
|
||||
else:
|
||||
if constellation_mode:
|
||||
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))
|
||||
ax_constellation, ax_psd = ax3, ax4
|
||||
else:
|
||||
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 10))
|
||||
ax_constellation = ax_psd = None
|
||||
show_title = True
|
||||
show_labels = labels_mode
|
||||
|
||||
if show_title:
|
||||
fig.suptitle(title, fontsize=16, color=COLORS["light"], y=0.96)
|
||||
fig.patch.set_facecolor("#0f172a")
|
||||
|
||||
total_duration_s = len(signal) / sample_rate_hz if sample_rate_hz else 0.0
|
||||
t_s = np.linspace(0, total_duration_s, len(display_signal)) if len(display_signal) else np.array([])
|
||||
|
||||
ax1.plot(t_s, display_signal.real, color=COLORS["purple"], linewidth=0.8, alpha=0.8, label="I")
|
||||
ax1.plot(t_s, display_signal.imag, color=COLORS["magenta"], linewidth=0.8, alpha=0.8, label="Q")
|
||||
ax1.set_xlim(0, total_duration_s)
|
||||
ax1.grid(True, alpha=0.3)
|
||||
|
||||
nfft, overlap = _get_nfft_size(signal=signal, fast_mode=fast_mode)
|
||||
|
||||
_, freqs, _, _ = ax2.specgram(
|
||||
spec_signal,
|
||||
NFFT=nfft,
|
||||
Fc=center_freq_hz,
|
||||
Fs=sample_rate_hz,
|
||||
noverlap=overlap,
|
||||
cmap="twilight",
|
||||
)
|
||||
|
||||
ax2.set_ylim(center_freq_hz - sample_rate_hz / 2, center_freq_hz + sample_rate_hz / 2)
|
||||
ax2.set_xlim(0, total_duration_s)
|
||||
|
||||
if show_labels:
|
||||
if horizontal_mode:
|
||||
ax1.set_xlabel("Time (s)")
|
||||
else:
|
||||
ax2.set_xlabel("Time (s)")
|
||||
|
||||
ax1.set_ylabel("Amplitude")
|
||||
ax1.set_title(f"Time Series - {sdr} SDR")
|
||||
ax1.legend(loc="upper right")
|
||||
|
||||
ax2.set_ylabel("Frequency (Hz)")
|
||||
ax2.set_title(f"Spectrogram - {center_freq_hz / 1e6:.1f} MHz ± {sample_rate_hz / 2e6:.1f} MHz")
|
||||
yticks = ax2.get_yticks()
|
||||
ax2.set_yticklabels([f"{y / 1e6:.1f}" for y in yticks])
|
||||
elif not compact_mode:
|
||||
ax1.set_title("Time Series")
|
||||
ax1.legend(loc="upper right", fontsize=8)
|
||||
|
||||
ax2.set_title("Spectrogram")
|
||||
|
||||
if ax_constellation is not None:
|
||||
constellation_samples = _get_plot_samples(signal=signal, fast_mode=fast_mode, slow_max=50_000, fast_max=20_000)
|
||||
method = "differential" if fast_mode else "combined"
|
||||
stable_points = detect_constellation_symbols(constellation_samples, method=method)
|
||||
|
||||
ax_constellation.scatter(
|
||||
constellation_samples.real[~stable_points],
|
||||
constellation_samples.imag[~stable_points],
|
||||
c=COLORS["muted"],
|
||||
s=0.5,
|
||||
alpha=0.2,
|
||||
)
|
||||
ax_constellation.scatter(
|
||||
constellation_samples.real[stable_points],
|
||||
constellation_samples.imag[stable_points],
|
||||
c=COLORS["purple"],
|
||||
s=3,
|
||||
alpha=0.8,
|
||||
)
|
||||
ax_constellation.set_xlabel("In-phase (I)")
|
||||
ax_constellation.set_ylabel("Quadrature (Q)")
|
||||
ax_constellation.set_title("Constellation")
|
||||
ax_constellation.grid(True, alpha=0.3)
|
||||
ax_constellation.set_aspect("equal")
|
||||
|
||||
if ax_psd is not None:
|
||||
psd_samples = _get_plot_samples(signal=signal, fast_mode=fast_mode, slow_max=65_536, fast_max=16_384)
|
||||
window = hann(len(psd_samples))
|
||||
spectrum = np.abs(fftshift(fft(psd_samples * window))) ** 2
|
||||
freqs = np.linspace(-sample_rate_hz / 2, sample_rate_hz / 2, len(psd_samples))
|
||||
freqs = freqs + center_freq_hz
|
||||
spectrum_db = 10 * np.log10(spectrum + 1e-12)
|
||||
|
||||
ax_psd.plot(freqs / 1e6, spectrum_db, color=COLORS["accent"], linewidth=1.0)
|
||||
ax_psd.set_xlabel("Frequency (MHz)")
|
||||
ax_psd.set_ylabel("Power (dB)")
|
||||
ax_psd.set_title("Power Spectral Density")
|
||||
ax_psd.grid(True, alpha=0.3)
|
||||
|
||||
if compact_mode:
|
||||
ax1.set_xticks([])
|
||||
ax1.set_yticks([])
|
||||
ax2.set_xticks([])
|
||||
ax2.set_yticks([])
|
||||
|
||||
plt.subplots_adjust(left=0, right=1, top=1, bottom=0, hspace=0)
|
||||
else:
|
||||
plt.tight_layout()
|
||||
if show_title:
|
||||
plt.subplots_adjust(top=0.92)
|
||||
|
||||
if saveplot:
|
||||
output_path, extension = set_path(output_path=output_path)
|
||||
dpi_value = _set_dpi(fast_mode=fast_mode, labels_mode=labels_mode, extension=extension)
|
||||
|
||||
plt.savefig(output_path, dpi=dpi_value, bbox_inches="tight", facecolor="#0f172a", edgecolor="none")
|
||||
print(f"Saved signal plot to {output_path}")
|
||||
return output_path
|
||||
|
||||
plt.show()
|
||||
return None
|
||||
|
||||
|
||||
__all__ = [
|
||||
"setup_style",
|
||||
"detect_constellation_symbols",
|
||||
"view_simple_sig",
|
||||
]
|
||||
Loading…
Reference in New Issue
Block a user