from flask import Flask, render_template, send_file, request, jsonify import zmq import numpy as np import matplotlib.pyplot as plt import matplotlib.ticker as ticker import os import threading import time import serial app = Flask(__name__) # Path to save the plot image #PLOT_PATH = os.path.join(os.getcwd(), "plot.png") PLOT_PATH = "/opt/gain_viz/plot.png" # Global variables for gain values usrp_tx_gain = 60 usrp_rx_gain = 30 scm_tx_gain = 30 scm_rx_gain = 30 # Global variables for plot settings sample_rate = 23.04e6 # Hz window_ms = 20 center_freq = 3.415e9 NFFT = 1024 tcp_port = 5556 # ----------------- Serial / SCM ----------------- def connect_serial(port, baudrate=115200, timeout=1): """Connect to a serial port with even parity.""" try: ser = serial.Serial( port=port, baudrate=baudrate, timeout=timeout, bytesize=serial.EIGHTBITS, parity=serial.PARITY_EVEN, stopbits=serial.STOPBITS_ONE ) return ser except serial.SerialException as e: print(f"Error connecting to {port}: {e}") return None def send_command(ser, command): if ser.is_open: ser.write(command.encode('utf-8')) def receive_feedback(ser): if ser.is_open: try: ser.flush() raw_response = ser.readlines() if raw_response: rep = "" for x in raw_response: rep += str(x) + " ," rep = rep[2:].split("\\r") return rep[-2] except serial.SerialTimeoutException: return "" return "" def scm_conf(port, baudrate, rx_cmd, tx_cmd): ser = connect_serial(port, baudrate) commands = [rx_cmd, tx_cmd] if ser: for cmd in commands: feedback = None attempt = 0 while feedback != "OK" and attempt < 5: send_command(ser, cmd + "\r") feedback = receive_feedback(ser) attempt += 1 return True return False # ----------------- Gain Updates ----------------- def gain_update(usrp_tx, usrp_rx, scm_tx, scm_rx): global usrp_tx_gain, usrp_rx_gain, scm_tx_gain, scm_rx_gain scm_change = False if usrp_tx != usrp_tx_gain: usrp_tx_gain = usrp_tx os.system(f"tmux send-keys -t ran 'tx_gain 0 {usrp_tx_gain} ' C-m") if usrp_rx != usrp_rx_gain: usrp_rx_gain = usrp_rx os.system(f"tmux send-keys -t ran 'rx_gain 0 {usrp_rx_gain} ' C-m") if scm_tx != scm_tx_gain: scm_tx_gain = scm_tx scm_change = True if scm_rx != scm_rx_gain: scm_rx_gain = scm_rx scm_change = True t_cmd = f"HW:GAIN 0 TX 0 {scm_tx_gain}" r_cmd = f"HW:GAIN 1 RX 0 {scm_rx_gain}" if scm_change: scm_conf("/dev/ttyUSB0", 115200, r_cmd, t_cmd) scm_conf("/dev/ttyUSB1", 115200, r_cmd, t_cmd) return True # ----------------- ZMQ Subscriber ----------------- def zmq_subscriber(host, port): context = zmq.Context() socket = context.socket(zmq.SUB) socket.setsockopt(zmq.CONFLATE, 1) socket.setsockopt_string(zmq.SUBSCRIBE, "") socket.setsockopt(zmq.RCVTIMEO, 1000) socket.connect(f"tcp://{host}:{port}") return socket # ----------------- Plot Generation ----------------- def generate_spectrum_plot(): socket = zmq_subscriber("localhost", tcp_port) global sample_rate, window_ms, center_freq, NFFT window_samples = int(sample_rate * window_ms / 1000) noverlap = 512 cmap = plt.get_cmap('twilight') # Initial placeholder for first plot (zeros) iq_sample = np.zeros(window_samples, dtype=np.complex64) while True: try: # Try to read ZMQ message msg = socket.recv(zmq.NOBLOCK) float_data = np.frombuffer(msg, dtype=np.float32) if float_data.size >= 2: complex_data = float_data.reshape(-1, 2) iq_all = complex_data[:, 0] + 1j * complex_data[:, 1] iq_sample = ( iq_all[-window_samples:] if len(iq_all) >= window_samples else np.pad(iq_all, (window_samples - len(iq_all), 0)) ) # --- Create plot --- fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 6)) fig.subplots_adjust(hspace=0.4) # Time-domain plot (ms) times_ms = np.arange(len(iq_sample)) * 1000 / sample_rate ax1.plot(times_ms, np.real(iq_sample), label="Real", color='b') ax1.plot(times_ms, np.imag(iq_sample), label="Imag", color='r') ax1.set_xlim(0, window_ms) ax1.set_xlabel("Time (ms)") ax1.set_ylabel("IQ Amplitude") ax1.grid(True, which='both', linestyle='--', linewidth=0.5) ax1.legend() # Spectrogram without grid ax2.specgram( iq_sample, Fs=sample_rate, Fc=center_freq, NFFT=NFFT, noverlap=noverlap, cmap=cmap ) ax2.set_xlabel("Time (ms)") ax2.set_ylabel("Frequency (Hz)") ax2.grid(False) ax2.set_ylim(center_freq - sample_rate / 2, center_freq + sample_rate / 2) ax2.xaxis.set_major_formatter(ticker.FuncFormatter(lambda t, pos: '{0:g}'.format(t*1e3))) # kHz format ax2.set_xlabel("Time (ms)") ax2.set_ylabel("Frequency (Hz)") ax2.xaxis.set_minor_locator(ticker.AutoMinorLocator()) # Save the plot os.makedirs(os.path.dirname(PLOT_PATH), exist_ok=True) plt.savefig(PLOT_PATH, bbox_inches='tight') plt.close(fig) except zmq.Again: pass # No new data, keep last iq_sample # Fast refresh (20ms = 50 fps) time.sleep(0.5) # ----------------- Flask Routes ----------------- @app.route('/') def index(): return render_template('index.html') @app.route('/update_gains', methods=['POST']) def update_gains(): global usrp_tx_gain, usrp_rx_gain, scm_tx_gain, scm_rx_gain usrp_tx = request.form.get('usrp_tx_gain', usrp_tx_gain, type=float) usrp_rx = request.form.get('usrp_rx_gain', usrp_rx_gain, type=float) scm_tx = request.form.get('scm_tx_gain', scm_tx_gain, type=float) scm_rx = request.form.get('scm_rx_gain', scm_rx_gain, type=float) gain_update(usrp_tx, usrp_rx, scm_tx, scm_rx) return jsonify({"status": "success", "message": "Gains updated successfully"}) @app.route('/plot') def plot(): return send_file(PLOT_PATH, mimetype='image/png') @app.route('/get_gains') def get_gains(): return jsonify({ "usrp_tx_gain": usrp_tx_gain, "usrp_rx_gain": usrp_rx_gain, "scm_tx_gain": scm_tx_gain, "scm_rx_gain": scm_rx_gain }) @app.route('/update_params', methods=['POST']) def update_params(): global sample_rate, window_ms, center_freq, NFFT try: # Get parameters from form data center_freq = request.form.get('center_freq', type=float) sample_rate = request.form.get('sample_rate', type=float) NFFT = request.form.get('fft_size', type=int) window_ms = request.form.get('window_ms', type=float) # Save to config file if needed save_config() return jsonify({ 'status': 'success', 'message': 'Parameters updated successfully' }) except Exception as e: return jsonify({ 'status': 'error', 'message': str(e) }), 500 # Add to your config handling def save_config(): config = { 'center_freq': center_freq, 'sample_rate': sample_rate, 'fft_size': NFFT, 'window_ms': window_ms } with open('/opt/gain-viz/config.json', 'w') as f: json.dump(config, f) # ----------------- Main ----------------- def main(): # Ensure placeholder image exists if not os.path.exists(PLOT_PATH): plt.figure() plt.text(0.5, 0.5, "Waiting for data...", ha='center', va='center') plt.savefig(PLOT_PATH) plt.close() # Start plotting thread threading.Thread(target=generate_spectrum_plot, daemon=True).start() app.run(host="0.0.0.0", debug=True)