Added view folder and view methods in Recording

This commit is contained in:
M madrigal 2025-10-24 17:43:28 -04:00
parent c035b990ef
commit 172f8e0ca3
4 changed files with 672 additions and 0 deletions

View File

@ -448,6 +448,60 @@ class Recording:
else: else:
raise ValueError(f"Key {key} is protected and cannot be modified or removed.") raise ValueError(f"Key {key} is protected and cannot be modified or removed.")
def view(self, output_path: Optional[str] = "images/signal.png", **kwargs) -> None:
"""Create a plot of various signal visualizations as a PNG image.
:param output_path: The output image path. Defaults to "images/signal.png".
:type output_path: str, optional
:param kwargs: Keyword arguments passed on to utils.view.view_sig.
:type: dict of keyword arguments
**Examples:**
Create a recording and view it as a plot in a .png image:
>>> import numpy
>>> from utils.data import Recording
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
>>> metadata = {
>>> "sample_rate": 1e6,
>>> "center_frequency": 2.44e9,
>>> }
>>> recording = Recording(data=samples, metadata=metadata)
>>> recording.view()
"""
from ria_toolkit_oss.view.view_signal import view_sig
view_sig(recording=self, output_path=output_path, **kwargs)
def simple_view(self, **kwargs) -> None:
"""Create a plot of various signal visualizations as a PNG or SVG image.
:param kwargs: Keyword arguments passed on to utils.view.view_signal_simple.create_plots.
:type: dict of keyword arguments
**Examples:**
Create a recording and view it as a plot in a .png image:
>>> import numpy
>>> from utils.data import Recording
>>> samples = numpy.ones(10000, dtype=numpy.complex64)
>>> metadata = {
>>> "sample_rate": 1e6,
>>> "center_frequency": 2.44e9,
>>> }
>>> recording = Recording(data=samples, metadata=metadata)
>>> recording.simple_view()
"""
from ria_toolkit_oss.view.view_signal_simple import view_simple_sig
view_simple_sig(recording=self, **kwargs)
def to_sigmf( def to_sigmf(
self, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None, overwrite: bool = False self, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None, overwrite: bool = False
) -> None: ) -> None:

View File

@ -0,0 +1,57 @@
import pathlib
MAX_PLOT_POINTS = 100_000
COLORS = {
"primary": "#6366f1",
"secondary": "#8b5cf6",
"accent": "#06b6d4",
"dark": "#1e293b",
"light": "#f8fafc",
"text": "#334155",
"muted": "#64748b",
"success": "#10b981",
"warning": "#f59e0b",
"error": "#ef4444",
"purple": "#8b5cf6",
"magenta": "#d946ef",
}
def decimate(x, max_points=MAX_PLOT_POINTS):
if len(x) <= max_points:
return x
step = len(x) // max_points
return x[::step]
def extract_metadata_fields(metadata):
sample_rate = next((v for k, v in metadata.items() if "sample_rate" in k), 1)
center_freq = next((v for k, v in metadata.items() if "center_freq" in k), 0)
sdr = next((v for k, v in metadata.items() if "sdr" in k), "Unknown")
return sample_rate, center_freq, sdr
def set_path(output_path):
split_path = output_path.split("/")
if len(split_path) == 1:
folder = "images"
file = split_path[0]
elif len(split_path) > 2:
file = split_path[-1]
folder = "/".join(split_path[:-1])
else:
folder, file = split_path
split_file = file.split(".")
if len(split_file) == 2:
extension = split_file[1]
else:
extension = "no extension"
if extension != "png" and extension != "svg":
print(f"{extension} not supported, saving as .png.")
extension = "png"
file = file + ".png"
pathlib.Path(folder).mkdir(parents=True, exist_ok=True)
return "/".join([folder, file]), extension

View File

@ -0,0 +1,257 @@
import os
import textwrap
from typing import Optional
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import gridspec
from PIL import Image
from scipy.fft import fft, fftshift
from scipy.signal import spectrogram
from ria_toolkit_oss.datatypes.recording import Recording
from ria_toolkit_oss.view.tools import (
COLORS,
decimate,
extract_metadata_fields,
set_path,
)
def get_fft_size(plot_length):
if plot_length < 2000:
return int(64)
elif plot_length < 10000:
return int(256)
elif plot_length < 1000000:
return int(1024)
else:
return int(2048)
def set_spines(ax, spines):
if not spines:
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
ax.spines["left"].set_visible(False)
def view_sig(
recording: Recording,
output_path: Optional[str] = "images/signal.png",
title: Optional[str] = "Signal Plot",
dpi: Optional[int] = 250,
plot_length: Optional[int] = None,
plot_spectrogram: Optional[bool] = True,
iq: Optional[bool] = True,
frequency: Optional[bool] = True,
constellation: Optional[bool] = True,
metadata: Optional[bool] = True,
logo: Optional[bool] = True,
dark: Optional[bool] = True,
spines: Optional[bool] = False,
title_fontsize: Optional[int] = 40,
subtitle_fontsize: Optional[int] = 20,
) -> None:
"""
Create a plot of various signal visualizations as a png image.
:param recording: The recording object to plot.
:type recording: Recording
:param output_path: The output image path. Defaults to "images/signal.png"
:type output_path: str, optional
:param title: The display title. Defaults to "Signal Plot"
:type title: str, optional
:param dpi: The dots per inch resolution. Defaults to 250
:type dpi: int, optional
:param plot_length: The number of samples to plot, default is the whole recording. Defaults to None
:type plot_length: int, optional
:param plot_spectrogram: Display the spectrogram. Defaults to True
:type plot_spectrogram: bool, optional
:param iq: Display the iq sample plot. Defaults to True
:type iq: bool, optional
:param frequency: Display the fft of the recording. Defaults to True
:type frequency: bool, optional
:param constellation: Display the constellation plot. Defaults to True
:type constellation: bool, optional
:param metadata: Display the metadata text. Defaults to True
:type metadata: bool, optional
:param logo: Display the Qoherent logo. Defaults to True
:type logo: bool, optional
:param dark: Use dark mode. Defaults to True
:type dark: bool, optional
:param spines: Display spines (bounding lines) around plots. Defaults to False
:type spines: bool, optional
:param title_fontsize: The font size of the main title text. Defaults to 40
:type title_fontsize: int, optional
:param subtitle_fontsize: The fontsize of the subplot titles. Defaults to 20
:type subtitle_fontsize: int, optional
**Examples:**
.. todo:: Usage examples coming soon.
"""
complex_signal = recording.data[0]
sample_rate, center_frequency, _ = extract_metadata_fields(recording.metadata)
subplot_height = 2 * (plot_spectrogram + iq + frequency) + 3 * (constellation or metadata or logo)
subplot_width = max((constellation + metadata or 1), logo * 3)
if dark:
plt.style.use("dark_background")
logo_path = os.path.dirname(__file__) + "/graphics/Qoherent-logo-white-transparent.png"
else:
plt.style.use("default")
logo_path = os.path.dirname(__file__) + "/graphics/Qoherent-logo-black-transparent.png"
if plot_length is None:
plot_length = len(recording.data[0])
# Plot preparation
fig = plt.figure(figsize=(14, 12))
fig.suptitle(title, fontsize=title_fontsize)
gs = gridspec.GridSpec(subplot_height, subplot_width)
plot_y_indx = 0
plot_x_indx = 0
if plot_spectrogram:
spec_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :])
plot_y_indx = plot_y_indx + 2
fft_size = get_fft_size(plot_length=plot_length)
f, t_spec, Sxx = spectrogram(
complex_signal[:plot_length],
fs=sample_rate,
nperseg=fft_size,
noverlap=fft_size // 8,
mode="magnitude",
return_onesided=False,
)
# shift frequencies so zero is centered
Sxx = np.fft.fftshift(Sxx, axes=0)
f = np.fft.fftshift(f) - sample_rate / 2 + center_frequency
spec_ax.imshow(
10 * np.log10(Sxx + 1e-12),
aspect="auto",
origin="lower",
extent=[t_spec[0], t_spec[-1], f[0], f[-1]],
cmap="twilight",
)
set_spines(spec_ax, spines)
spec_ax.set_title("Spectrogram", loc="center", fontsize=subtitle_fontsize)
spec_ax.set_ylabel("Frequency (Hz)")
spec_ax.set_xlabel("Time (s)")
if iq:
iq_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :])
plot_y_indx = plot_y_indx + 2
plot_iq = decimate(complex_signal[:plot_length])
t = np.arange(len(plot_iq)) / sample_rate * (len(complex_signal[:plot_length]) / len(plot_iq))
iq_ax.plot(t, plot_iq.real, color=COLORS["purple"], linewidth=0.6, alpha=0.8, label="I")
iq_ax.plot(t, plot_iq.imag, color=COLORS["magenta"], linewidth=0.6, alpha=0.8, label="Q")
iq_ax.grid(False)
iq_ax.set_ylabel("Amplitude")
iq_ax.set_xlim([min(t), max(t)])
iq_ax.set_xlabel("Time (s)")
iq_ax.set_title("IQ Sample Plot", fontsize=subtitle_fontsize)
set_spines(iq_ax, spines)
if frequency:
freq_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :])
plot_y_indx = plot_y_indx + 2
epsilon = 1e-10
spectrum = np.abs(fftshift(fft(complex_signal[0:plot_length])))
freqs = (
np.linspace(-1 * (sample_rate / 2), (sample_rate / 2), len(complex_signal[0:plot_length]))
+ center_frequency
)
# Use semi-log for the y-axis
freq_ax.semilogy(freqs, spectrum + epsilon, color=COLORS["accent"], linewidth=0.8)
freq_ax.set_xlabel("Frequency")
freq_ax.set_ylabel("Magnitude")
freq_ax.set_title("Frequency Spectrum", fontsize=subtitle_fontsize)
set_spines(freq_ax, spines)
if constellation:
const_ax = plt.subplot(gs[plot_y_indx:, plot_x_indx])
plot_x_indx = plot_x_indx + 1
plot_const = decimate(complex_signal[:plot_length], 50_000)
const_ax.scatter(plot_const.real, plot_const.imag, c=COLORS["purple"], s=1, linewidths=0.1)
dimension = max(abs(complex_signal)) * 1.1
const_ax.set_xlim([-1 * dimension, dimension])
const_ax.set_ylim([-1 * dimension, dimension])
const_ax.set_xlabel("In-phase (I)")
const_ax.set_ylabel("Quadrature (Q)")
const_ax.set_title("Constellation", fontsize=subtitle_fontsize)
const_ax.set_aspect("equal")
if not spines:
const_ax.spines["top"].set_visible(False)
const_ax.spines["right"].set_visible(False)
const_ax.spines["bottom"].set_visible(False)
const_ax.spines["left"].set_visible(False)
# metadata text box
if metadata:
meta_ax = plt.subplot(gs[plot_y_indx:, plot_x_indx])
plot_x_indx = plot_x_indx + 1
metadata_text = "\n".join(
[
f"{key}: {textwrap.shorten(str(value), width=80, placeholder='...')}"
for key, value in recording.metadata.items()
]
)
meta_ax.text(
0.05,
0.95,
metadata_text,
fontsize=10,
va="top",
ha="left",
bbox=dict(facecolor="none", alpha=0.5, edgecolor="none"),
)
meta_ax.set_title("Metadata", fontsize=subtitle_fontsize)
# Remove the tick labels
meta_ax.xaxis.set_ticklabels([]) # Remove x-axis tick labels
meta_ax.yaxis.set_ticklabels([]) # Remove y-axis tick labels
meta_ax.set_xticks([])
meta_ax.set_yticks([])
set_spines(meta_ax, spines)
if logo and os.path.isfile(logo_path):
logo_ax = plt.subplot(gs[plot_y_indx:, 2])
plot_x_indx = plot_x_indx + 1
logo_ax.axis("off")
try:
image = Image.open(logo_path) # Open the PNG image using PIL
logo_ax.imshow(image)
except FileNotFoundError:
print(f"Warning, {logo_path} not found.")
fig.subplots_adjust(
left=0.1, # Left margin
right=0.9, # Right margin
top=0.9, # Top margin
bottom=0.1, # Bottom margin
wspace=0.4, # Horizontal space between subplots
hspace=2.5, # Vertical space between subplots
)
# save path handling
output_path, _ = set_path(output_path=output_path)
plt.savefig(output_path, dpi=dpi)
print(f"Saved signal plot to {output_path}")

View File

@ -0,0 +1,304 @@
"""Shared plotting primitives for signal visualization."""
from __future__ import annotations
from typing import Optional
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from scipy.fft import fft, fftshift
from scipy.signal.windows import hann
from ria_toolkit_oss.datatypes.recording import Recording
from ria_toolkit_oss.view.tools import (
COLORS,
decimate,
extract_metadata_fields,
set_path,
)
def _get_nfft_size(signal, fast_mode):
if len(signal) < 1000:
nfft = 128
elif len(signal) < 10_000:
nfft = 256
elif len(signal) < 100_000:
nfft = 512
elif len(signal) < 1_000_000:
nfft = 1024
else:
nfft = 2048
if fast_mode:
nfft = min(nfft, 512)
overlap = nfft // 8 if fast_mode else nfft // 4
return nfft, overlap
def _get_plot_samples(signal, fast_mode, slow_max, fast_max):
max_samples = fast_max if fast_mode else slow_max
if len(signal) > max_samples:
start_idx = len(signal) // 2 - max_samples // 2
return signal[start_idx : start_idx + max_samples]
else:
return signal
def _set_dpi(fast_mode, labels_mode, extension):
if fast_mode:
dpi = 75
elif labels_mode:
dpi = 200
else:
dpi = 150
return dpi if extension == "png" else None
def setup_style(*, labels_mode: bool = False, compact_mode: bool = False) -> None:
"""Configure matplotlib with the signal-testbed styling."""
plt.style.use("dark_background")
if compact_mode:
base_font = 8
title_font = 10
label_font = 8
elif labels_mode:
base_font = 12
title_font = 16
label_font = 14
else:
base_font = 10
title_font = 12
label_font = 10
matplotlib.rcParams.update(
{
"figure.facecolor": "#0f172a",
"axes.facecolor": "#1e293b",
"axes.edgecolor": COLORS["muted"],
"axes.labelcolor": COLORS["light"],
"text.color": COLORS["light"],
"xtick.color": COLORS["muted"],
"ytick.color": COLORS["muted"],
"grid.color": COLORS["muted"],
"grid.alpha": 0.3,
"font.size": base_font,
"axes.titlesize": title_font,
"axes.labelsize": label_font,
"figure.titlesize": title_font + 2,
"legend.frameon": False,
"legend.facecolor": "none",
"xtick.labelsize": base_font,
"ytick.labelsize": base_font,
}
)
def detect_constellation_symbols(signal: np.ndarray, method: str = "differential") -> np.ndarray:
"""Heuristic symbol detector used for constellation highlighting."""
if len(signal) < 100:
return np.ones(len(signal), dtype=bool)
if method == "differential":
di = np.diff(signal.imag)
dq = np.diff(signal.real)
derivative_magnitude = np.sqrt(di**2 + dq**2)
derivative_magnitude = np.append(derivative_magnitude, 0)
threshold = np.percentile(derivative_magnitude, 15)
return derivative_magnitude < threshold
if method == "amplitude":
amplitude = np.abs(signal)
amplitude_change = np.abs(np.diff(amplitude))
amplitude_change = np.append(amplitude_change, 0)
threshold = np.percentile(amplitude_change, 20)
return amplitude_change < threshold
if method == "phase":
phase = np.angle(signal)
phase_diff = np.diff(np.unwrap(phase))
phase_diff = np.append(phase_diff, 0)
threshold = np.percentile(np.abs(phase_diff), 20)
return np.abs(phase_diff) < threshold
if method == "combined":
diff_stable = detect_constellation_symbols(signal, "differential")
amp_stable = detect_constellation_symbols(signal, "amplitude")
phase_stable = detect_constellation_symbols(signal, "phase")
stability_count = diff_stable.astype(int) + amp_stable.astype(int) + phase_stable.astype(int)
return stability_count >= 2
raise ValueError(f"Unknown method: {method}")
def view_simple_sig(
recording: Recording,
output_path: Optional[str] = "images/signal.png",
saveplot: Optional[bool] = True,
fast_mode: Optional[bool] = False,
compact_mode: Optional[bool] = False,
horizontal_mode: Optional[bool] = False,
constellation_mode: Optional[bool] = False,
labels_mode: Optional[bool] = False,
slice: Optional[tuple] = None,
title: Optional[str] = "Signal",
):
"""Render the multi-panel viewer used by the CLI."""
signal = recording.data[0]
sample_rate_hz, center_freq_hz, sdr = extract_metadata_fields(recording.metadata)
setup_style(labels_mode=labels_mode, compact_mode=compact_mode)
if slice:
start_idx, end_idx = slice
signal = signal[start_idx:end_idx]
print(f"Using slice: samples {start_idx} to {end_idx} ({len(signal):,} samples)")
max_display_pixels = 100_000 if fast_mode else 250_000
display_signal = decimate(signal, max_display_pixels) if len(signal) > max_display_pixels else signal
spec_signal = signal
if compact_mode:
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 6), gridspec_kw={"height_ratios": [1, 5]})
show_title = False
show_labels = False
ax_constellation = ax_psd = None
elif horizontal_mode:
if constellation_mode:
fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(18, 6))
ax_constellation = ax3
else:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))
ax_constellation = None
show_title = True
show_labels = labels_mode
ax_psd = None
else:
if constellation_mode:
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))
ax_constellation, ax_psd = ax3, ax4
else:
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 10))
ax_constellation = ax_psd = None
show_title = True
show_labels = labels_mode
if show_title:
fig.suptitle(title, fontsize=16, color=COLORS["light"], y=0.96)
fig.patch.set_facecolor("#0f172a")
total_duration_s = len(signal) / sample_rate_hz if sample_rate_hz else 0.0
t_s = np.linspace(0, total_duration_s, len(display_signal)) if len(display_signal) else np.array([])
ax1.plot(t_s, display_signal.real, color=COLORS["purple"], linewidth=0.8, alpha=0.8, label="I")
ax1.plot(t_s, display_signal.imag, color=COLORS["magenta"], linewidth=0.8, alpha=0.8, label="Q")
ax1.set_xlim(0, total_duration_s)
ax1.grid(True, alpha=0.3)
nfft, overlap = _get_nfft_size(signal=signal, fast_mode=fast_mode)
_, freqs, _, _ = ax2.specgram(
spec_signal,
NFFT=nfft,
Fc=center_freq_hz,
Fs=sample_rate_hz,
noverlap=overlap,
cmap="twilight",
)
ax2.set_ylim(center_freq_hz - sample_rate_hz / 2, center_freq_hz + sample_rate_hz / 2)
ax2.set_xlim(0, total_duration_s)
if show_labels:
if horizontal_mode:
ax1.set_xlabel("Time (s)")
else:
ax2.set_xlabel("Time (s)")
ax1.set_ylabel("Amplitude")
ax1.set_title(f"Time Series - {sdr} SDR")
ax1.legend(loc="upper right")
ax2.set_ylabel("Frequency (Hz)")
ax2.set_title(f"Spectrogram - {center_freq_hz / 1e6:.1f} MHz ± {sample_rate_hz / 2e6:.1f} MHz")
yticks = ax2.get_yticks()
ax2.set_yticklabels([f"{y / 1e6:.1f}" for y in yticks])
elif not compact_mode:
ax1.set_title("Time Series")
ax1.legend(loc="upper right", fontsize=8)
ax2.set_title("Spectrogram")
if ax_constellation is not None:
constellation_samples = _get_plot_samples(signal=signal, fast_mode=fast_mode, slow_max=50_000, fast_max=20_000)
method = "differential" if fast_mode else "combined"
stable_points = detect_constellation_symbols(constellation_samples, method=method)
ax_constellation.scatter(
constellation_samples.real[~stable_points],
constellation_samples.imag[~stable_points],
c=COLORS["muted"],
s=0.5,
alpha=0.2,
)
ax_constellation.scatter(
constellation_samples.real[stable_points],
constellation_samples.imag[stable_points],
c=COLORS["purple"],
s=3,
alpha=0.8,
)
ax_constellation.set_xlabel("In-phase (I)")
ax_constellation.set_ylabel("Quadrature (Q)")
ax_constellation.set_title("Constellation")
ax_constellation.grid(True, alpha=0.3)
ax_constellation.set_aspect("equal")
if ax_psd is not None:
psd_samples = _get_plot_samples(signal=signal, fast_mode=fast_mode, slow_max=65_536, fast_max=16_384)
window = hann(len(psd_samples))
spectrum = np.abs(fftshift(fft(psd_samples * window))) ** 2
freqs = np.linspace(-sample_rate_hz / 2, sample_rate_hz / 2, len(psd_samples))
freqs = freqs + center_freq_hz
spectrum_db = 10 * np.log10(spectrum + 1e-12)
ax_psd.plot(freqs / 1e6, spectrum_db, color=COLORS["accent"], linewidth=1.0)
ax_psd.set_xlabel("Frequency (MHz)")
ax_psd.set_ylabel("Power (dB)")
ax_psd.set_title("Power Spectral Density")
ax_psd.grid(True, alpha=0.3)
if compact_mode:
ax1.set_xticks([])
ax1.set_yticks([])
ax2.set_xticks([])
ax2.set_yticks([])
plt.subplots_adjust(left=0, right=1, top=1, bottom=0, hspace=0)
else:
plt.tight_layout()
if show_title:
plt.subplots_adjust(top=0.92)
if saveplot:
output_path, extension = set_path(output_path=output_path)
dpi_value = _set_dpi(fast_mode=fast_mode, labels_mode=labels_mode, extension=extension)
plt.savefig(output_path, dpi=dpi_value, bbox_inches="tight", facecolor="#0f172a", edgecolor="none")
print(f"Saved signal plot to {output_path}")
return output_path
plt.show()
return None
__all__ = [
"setup_style",
"detect_constellation_symbols",
"create_plots",
]