ria-toolkit-oss/src/ria_toolkit_oss/view/view_signal.py
M madrigal 8a66860d33
All checks were successful
Build Sphinx Docs Set / Build Docs (pull_request) Successful in 15m51s
Build Project / Build Project (3.10) (pull_request) Successful in 16m14s
Build Project / Build Project (3.11) (pull_request) Successful in 17m9s
Build Project / Build Project (3.12) (pull_request) Successful in 2m29s
Test with tox / Test with tox (3.12) (pull_request) Successful in 21m28s
Test with tox / Test with tox (3.10) (pull_request) Successful in 22m50s
Test with tox / Test with tox (3.11) (pull_request) Successful in 23m18s
Moved all contents of to , refactored accordingly
2026-04-21 14:38:06 -04:00

397 lines
14 KiB
Python

import gc
import os
import textwrap
from typing import Optional
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import gridspec
from matplotlib.patches import Patch
from PIL import Image
from scipy.fft import fft, fftshift
from scipy.signal import spectrogram
from scipy.signal.windows import hann
from ria_toolkit_oss.data.recording import Recording
from ria_toolkit_oss.view.tools import (
COLORS,
decimate,
extract_metadata_fields,
set_path,
)
def get_fft_size(plot_length):
if plot_length < 2000:
return int(64)
elif plot_length < 10000:
return int(256)
elif plot_length < 1000000:
return int(1024)
else:
return int(2048)
def set_spines(ax, spines):
if not spines:
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
ax.spines["left"].set_visible(False)
def view_annotations(
recording: Recording,
channel: Optional[int] = 0,
output_path: Optional[str] = "images/annotations.png",
title: Optional[str] = "Annotated Spectrogram",
dpi: Optional[int] = 300,
title_fontsize: Optional[int] = 15,
dark: Optional[bool] = True,
) -> None:
# 1. Setup Plotting Environment
plt.close("all")
if dark:
plt.style.use("dark_background")
else:
plt.style.use("default")
fig, ax = plt.subplots(figsize=(12, 8))
complex_signal = recording.data[channel]
sample_rate, center_frequency, _ = extract_metadata_fields(recording.metadata)
annotations = recording.annotations
# 2. Setup Color Mapping
palette = ["#2196F3", "#9C27B0", "#64B5F6", "#7B1FA2", "#5C6BC0", "#CE93D8", "#1565C0", "#7C4DFF"]
unique_labels = sorted(list(set(ann.label for ann in annotations if ann.label)))
label_to_color = {label: palette[i % len(palette)] for i, label in enumerate(unique_labels)}
# 3. Generate Spectrogram
Pxx, freqs, times, im = ax.specgram(
complex_signal, NFFT=256, Fs=sample_rate, Fc=center_frequency, noverlap=128, cmap="twilight"
)
# 4. Draw Annotations (highest threshold % first so lower % renders on top)
def _threshold_sort_key(ann):
try:
return int(ann.label.rstrip("%"))
except (ValueError, AttributeError):
return 0
for annotation in sorted(annotations, key=_threshold_sort_key, reverse=True):
t_start = annotation.sample_start / sample_rate
t_width = annotation.sample_count / sample_rate
f_start = annotation.freq_lower_edge
f_height = annotation.freq_upper_edge - annotation.freq_lower_edge
ann_color = label_to_color.get(annotation.label, "gray")
rect = plt.Rectangle(
(t_start, f_start), t_width, f_height, linewidth=1.5, edgecolor=ann_color, facecolor="none", alpha=0.8
)
ax.add_patch(rect)
if unique_labels:
legend_elements = [
Patch(facecolor=label_to_color[label], alpha=0.3, edgecolor=label_to_color[label], label=label)
for label in unique_labels
]
ax.legend(handles=legend_elements, loc="upper right", framealpha=0.2)
ax.set_title(title, fontsize=title_fontsize, pad=20)
ax.set_xlabel("Time (s)", fontsize=12)
ax.set_ylabel("Frequency (MHz)", fontsize=12)
ax.grid(alpha=0.1)
output_path, _ = set_path(output_path=output_path)
plt.savefig(output_path, dpi=dpi, bbox_inches="tight")
plt.close(fig)
print(f"Professional annotation plot saved to {output_path}")
def view_channels(
recording: Recording,
output_path: Optional[str] = "images/signal.png",
title: Optional[str] = "Multichannel Signal Plot",
) -> None:
"""Create a PNG of the recording samples, spectrogram, and constellation plot.
Plot is automatically saved to file at output_path.
:param recording: The recording object to plot
:type recording: Recording
:param output_path: The path to save the image. Defaults to "images/signal.png".
:type output_path: str, optional
:param title: The plot title. Defaults to "Multichannel Signal Plot".
:type title: str, optional
:return: None
**Examples:**
.. todo:: Usage examples coming soon.
"""
num_channels = recording.data.shape[0]
fig, axes = plt.subplots(nrows=num_channels, ncols=2)
fig.subplots_adjust(wspace=0.5, hspace=0.5)
plt.style.use("dark_background")
fig.suptitle(title, fontsize=16)
axes[0, 0].set_title("IQ Signal", color=COLORS["light"])
axes[0, 1].set_title("Spectrogram", color=COLORS["light"])
linewidth = 0.5
tick_fontsize = 4
center_frequency = recording.metadata.get("center_frequency", 0)
sample_rate = recording.metadata.get("sample_rate", 1)
sample_indexes = np.arange(0, len(recording.data[0]), 1)
t = sample_indexes / sample_rate
for i in range(num_channels):
axes[i, 0].plot(t, np.real(recording.data[i]), linewidth=linewidth)
axes[i, 0].plot(t, np.imag(recording.data[i]), linewidth=linewidth)
axes[i, 1].specgram(recording.data[i], Fs=sample_rate, Fc=center_frequency)
axes[i, 0].tick_params(labelsize=tick_fontsize, colors=COLORS["light"])
axes[i, 1].tick_params(labelsize=tick_fontsize, colors=COLORS["light"])
axes[i, 0].set_ylabel("Amplitude", fontsize=6, color=COLORS["light"])
axes[i, 1].set_ylabel("Freq (Hz)", fontsize=6, color=COLORS["light"])
if i != num_channels - 1:
axes[i, 0].set_xticks([])
axes[i, 1].set_xticks([])
else:
axes[i, 0].set_xlabel("Time (s)", fontsize=6, color=COLORS["light"])
axes[i, 1].set_xlabel("Time (s)", fontsize=6, color=COLORS["light"])
output_path, _ = set_path(output_path=output_path)
plt.savefig(output_path, dpi=1000)
print(f"Saved signal plot to {output_path}")
def view_sig(
recording: Recording,
output_path: Optional[str] = "images/signal.png",
title: Optional[str] = "Signal Plot",
dpi: Optional[int] = 250,
plot_length: Optional[int] = None,
plot_spectrogram: Optional[bool] = True,
iq: Optional[bool] = True,
frequency: Optional[bool] = True,
constellation: Optional[bool] = True,
metadata: Optional[bool] = True,
logo: Optional[bool] = True,
dark: Optional[bool] = True,
spines: Optional[bool] = False,
title_fontsize: Optional[int] = 35,
subtitle_fontsize: Optional[int] = 15,
) -> None:
"""
Create a plot of various signal visualizations as a png or svg image.
:param recording: The recording object to plot.
:type recording: Recording
:param output_path: The output image path. Defaults to "images/signal.png"
:type output_path: str, optional
:param title: The display title. Defaults to "Signal Plot"
:type title: str, optional
:param dpi: The dots per inch resolution. Defaults to 250
:type dpi: int, optional
:param plot_length: The number of samples to plot, default is the whole recording. Defaults to None
:type plot_length: int, optional
:param plot_spectrogram: Display the spectrogram. Defaults to True
:type plot_spectrogram: bool, optional
:param iq: Display the iq sample plot. Defaults to True
:type iq: bool, optional
:param frequency: Display the fft of the recording. Defaults to True
:type frequency: bool, optional
:param constellation: Display the constellation plot. Defaults to True
:type constellation: bool, optional
:param metadata: Display the metadata text. Defaults to True
:type metadata: bool, optional
:param logo: Display the Qoherent logo. Defaults to True
:type logo: bool, optional
:param dark: Use dark mode. Defaults to True
:type dark: bool, optional
:param spines: Display spines (bounding lines) around plots. Defaults to False
:type spines: bool, optional
:param title_fontsize: The font size of the main title text. Defaults to 40
:type title_fontsize: int, optional
:param subtitle_fontsize: The fontsize of the subplot titles. Defaults to 20
:type subtitle_fontsize: int, optional
**Examples:**
.. todo:: Usage examples coming soon.
"""
complex_signal = recording.data[0]
sample_rate, center_frequency, _ = extract_metadata_fields(recording.metadata)
subplot_height = 2 * (plot_spectrogram + iq + frequency) + 3 * (constellation or metadata or logo)
subplot_width = max((constellation + metadata or 1), logo * 3)
if dark:
plt.style.use("dark_background")
logo_path = os.path.dirname(__file__) + "/graphics/Qoherent-logo-white-transparent.png"
else:
plt.style.use("default")
logo_path = os.path.dirname(__file__) + "/graphics/Qoherent-logo-black-transparent.png"
if plot_length is None:
plot_length = len(recording.data[0])
# Plot preparation
fig = plt.figure(figsize=(16, 14))
fig.suptitle(title, fontsize=title_fontsize)
gs = gridspec.GridSpec(subplot_height, subplot_width)
plot_y_indx = 0
plot_x_indx = 0
if plot_spectrogram:
spec_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :])
plot_y_indx = plot_y_indx + 2
fft_size = get_fft_size(plot_length=plot_length)
_, t_spec, Sxx = spectrogram(
complex_signal[:plot_length],
fs=sample_rate,
nperseg=fft_size,
noverlap=fft_size // 8,
mode="magnitude",
return_onesided=False,
)
# shift frequencies so zero is centered
f_bins = np.fft.fftfreq(fft_size, d=1.0 / sample_rate)
f_bins = np.fft.fftshift(f_bins)
f_bins = f_bins + center_frequency
Sxx = np.fft.fftshift(Sxx, axes=0)
spec_ax.imshow(
10 * np.log10(Sxx + 1e-12),
aspect="auto",
origin="lower",
extent=[t_spec[0], t_spec[-1], f_bins[0], f_bins[-1]],
cmap="twilight",
)
set_spines(spec_ax, spines)
spec_ax.set_title("Spectrogram", loc="center", fontsize=subtitle_fontsize)
if iq:
iq_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :])
plot_y_indx = plot_y_indx + 2
plot_iq = decimate(complex_signal[:plot_length])
t = np.arange(len(plot_iq)) / sample_rate * (len(complex_signal[:plot_length]) / len(plot_iq))
iq_ax.plot(t, plot_iq.real, color=COLORS["purple"], linewidth=0.6, alpha=0.8, label="I")
iq_ax.plot(t, plot_iq.imag, color=COLORS["magenta"], linewidth=0.6, alpha=0.8, label="Q")
iq_ax.grid(False)
iq_ax.set_ylabel("Amplitude")
iq_ax.set_xlim([min(t), max(t)])
iq_ax.set_xlabel("Time (s)")
iq_ax.set_title("IQ Sample Plot", fontsize=subtitle_fontsize)
set_spines(iq_ax, spines)
if frequency:
freq_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :])
plot_y_indx = plot_y_indx + 2
# Apply window to reduce spectral leakage
window = hann(len(complex_signal[:plot_length]))
spectrum = np.abs(fftshift(fft(complex_signal[:plot_length] * window)))
# Convert to dB
spectrum_db = 20 * np.log10(spectrum + 1e-12) # 20*log for magnitude
freqs = np.linspace(-sample_rate / 2, sample_rate / 2, len(complex_signal[:plot_length])) + center_frequency
freq_ax.plot(freqs, spectrum_db, color=COLORS["accent"], linewidth=0.8)
freq_ax.set_ylabel("Magnitude (dB)")
freq_ax.set_title("Frequency Spectrum (Windowed FFT)", fontsize=subtitle_fontsize)
set_spines(freq_ax, spines)
if constellation:
const_ax = plt.subplot(gs[plot_y_indx:, plot_x_indx])
plot_x_indx = plot_x_indx + 1
plot_const = decimate(complex_signal[:plot_length], 50_000)
const_ax.scatter(plot_const.real, plot_const.imag, c=COLORS["purple"], s=1, linewidths=0.1)
dimension = max(abs(complex_signal)) * 1.1
const_ax.set_xlim([-1 * dimension, dimension])
const_ax.set_ylim([-1 * dimension, dimension])
const_ax.set_xlabel("In-phase (I)")
const_ax.set_ylabel("Quadrature (Q)")
const_ax.set_title("Constellation", fontsize=subtitle_fontsize)
const_ax.set_aspect("equal")
if not spines:
const_ax.spines["top"].set_visible(False)
const_ax.spines["right"].set_visible(False)
const_ax.spines["bottom"].set_visible(False)
const_ax.spines["left"].set_visible(False)
# metadata text box
if metadata:
meta_ax = plt.subplot(gs[plot_y_indx:, plot_x_indx])
plot_x_indx = plot_x_indx + 1
metadata_text = "\n".join(
[
f"{key}: {textwrap.shorten(str(value), width=80, placeholder='...')}"
for key, value in recording.metadata.items()
]
)
meta_ax.text(
0.05,
0.95,
metadata_text,
fontsize=10,
va="top",
ha="left",
bbox=dict(facecolor="none", alpha=0.5, edgecolor="none"),
)
meta_ax.set_title("Metadata", fontsize=subtitle_fontsize)
# Remove the tick labels
meta_ax.xaxis.set_ticklabels([]) # Remove x-axis tick labels
meta_ax.yaxis.set_ticklabels([]) # Remove y-axis tick labels
meta_ax.set_xticks([])
meta_ax.set_yticks([])
set_spines(meta_ax, spines)
if logo and os.path.isfile(logo_path):
# logo_ax = plt.subplot(gs[plot_y_indx:, 2])
logo_pos = [0.75, 0.05, 0.2, 0.08]
logo_ax = fig.add_axes(logo_pos, anchor="SE", zorder=10)
plot_x_indx = plot_x_indx + 1
logo_ax.axis("off")
try:
image = Image.open(logo_path) # Open the PNG image using PIL
logo_ax.imshow(image)
except FileNotFoundError:
print(f"Warning, {logo_path} not found.")
fig.subplots_adjust(
left=0.1, # Left margin
right=0.9, # Right margin
top=0.9, # Top margin
bottom=0.1, # Bottom margin
wspace=0.4, # Horizontal space between subplots
hspace=2.5, # Vertical space between subplots
)
output_path, _ = set_path(output_path=output_path)
plt.savefig(output_path, dpi=dpi)
print(f"Saved signal plot to {output_path}")
# Garbage collection and clean up to prevent memory overloading
plt.close("all")
gc.collect()