diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..18ed618 --- /dev/null +++ b/.gitignore @@ -0,0 +1,21 @@ +# Environments +.env +.venv +env/ +venv/ +ENV/ + +# PyCharm +.idea/ + +# Visual Studio Code +.vscode/ + +# Generated files +*.dot +*.hdf5 +*.npy +*.png +*.json +images/ +data/ diff --git a/at_commands.py b/at_commands.py new file mode 100644 index 0000000..bc05da2 --- /dev/null +++ b/at_commands.py @@ -0,0 +1,368 @@ +import serial + +from helper_functions import comma_split, extract_numbers + +# For more info on the commands: +# https://files.waveshare.com/upload/8/8a/Quectel_RG520N%26RG52xF%26RG530F%26RM520N%26RM530N_Series_AT_Commands_Manual_V1.0.0_Preliminary_20220812.pdf + + +# Configure connection/5G parameters +def set_configs(port="/dev/ttyUSB2", baudrate=115200): + try: + ser = serial.Serial(port, baudrate, timeout=1) + ser.write(b'AT+QNWPREFCFG="nr5g_band",78\r') + response = ser.readlines() + print(response) + ser.write(b'AT+QNWPREFCFG="mode_pref",NR5G\r') + response = ser.readlines() + print(response) + ser.write(b'AT+QNWPREFCFG="roam_pref",1\r') + response = ser.readlines() + print(response) + ser.close() + print("Finished setting configs.") + except Exception as e: + return print(f"Configuration error: {e}") + + +# Fetch operator status (5.1) +def get_modem_cops(port="/dev/ttyUSB2", baudrate=115200, dictionary={}): + try: + ser = serial.Serial(port, baudrate, timeout=1) + ser.write(b"AT+COPS?\r") + response = ser.readlines() + ser.close() + + for item in response: + try: + decoded_item = item.decode("utf-8") + + if "COPS:" in decoded_item: + nums = extract_numbers(decoded_item) + dictionary["mode"] = nums[0] + return dictionary + elif "CME" in decoded_item: + dictionary["cops"] = decoded_item + except Exception as e: + dictionary["cops encoded"] = str(item) + print(f"Could not decode COPS item:\t{str(item)}\n{e}") + + return dictionary + + except Exception as e: + return {"COPS error": f"{e}"} + + +# Fetch network registration status (5.2) +def get_modem_creg(port="/dev/ttyUSB2", baudrate=115200, dictionary={}): + try: + ser = serial.Serial(port, baudrate, timeout=1) + ser.write(b"AT+CREG?\r") + response = ser.readlines() + ser.close() + + for item in response: + try: + decoded_item = item.decode("utf-8") + + if "CREG:" in decoded_item: + nums = extract_numbers(decoded_item) + dictionary["network registration result code"] = nums[0] + dictionary["creg status"] = nums[1] + return dictionary + elif "CME" in decoded_item: + dictionary["creg error"] = decoded_item + except Exception as e: + decoded_item = str(item) + dictionary["creg encoded"] = decoded_item + print(f"Could not decode CREG item:\t{decoded_item}\n{e}") + + return dictionary + + except Exception as e: + return {"CREG error": f"{e}"} + + +# Fetch modem csq stats (5.9) +def get_modem_csq(port="/dev/ttyUSB2", baudrate=115200, dictionary={}): + try: + ser = serial.Serial(port, baudrate, timeout=1) + ser.write(b"AT+CSQ\r") + response = ser.readlines() + ser.close() + + for item in response: + try: + decoded_item = item.decode("utf-8") + + if "CSQ:" in decoded_item: + nums = extract_numbers(decoded_item) + dictionary["RSSI"] = nums[0] + dictionary["BER"] = nums[1] + return dictionary + + elif "CME" in decoded_item: + dictionary["csq error"] = decoded_item + except Exception as e: + decoded_item = str(item) + dictionary["csq encoded"] = decoded_item + print(f"Could not decode CSQ item:\t{decoded_item}\n{e}") + + return dictionary + + except Exception as e: + return {"CSQ error": f"{e}"} + + +# Fetch RSRP (5.10) +def get_modem_rsrp(port="/dev/ttyUSB2", baudrate=115200, dictionary={}): + try: + ser = serial.Serial(port, baudrate, timeout=1) + ser.write(b"AT+QRSRP\r") + response = ser.readlines() + ser.close() + + for item in response: + try: + decoded_item = item.decode("utf-8") + + if "QRSRP:" in decoded_item: + _, data = decoded_item.split(":") + decoded_list = comma_split(data) + + if type(decoded_list) == list: + dictionary["RSRP PRX"] = decoded_list[0] + dictionary["RSRP DRX"] = decoded_list[1] + dictionary["RSRP RX2"] = decoded_list[2] + dictionary["RSRP RX3"] = decoded_list[3] + dictionary["RSRP sysmode"] = decoded_list[4] + else: + dictionary["RSRP"] = decoded_list + + return dictionary + + elif "CME" in decoded_item: + dictionary["qrsrp error"] = decoded_item + except Exception as e: + decoded_item = str(item) + dictionary["qrsrp encoded"] = decoded_item + print(f"Could not decode QRSRP item:\t{decoded_item}\n{e}") + + return dictionary + + except Exception as e: + return {"QRSRP error": f"{e}"} + + +# Fetch RSRQ (5.11) +def get_modem_rsrq(port="/dev/ttyUSB2", baudrate=115200, dictionary={}): + try: + ser = serial.Serial(port, baudrate, timeout=1) + ser.write(b"AT+QRSRQ\r") + response = ser.readlines() + ser.close() + + for item in response: + try: + decoded_item = item.decode("utf-8") + + if "QRSRQ:" in decoded_item: + _, data = decoded_item.split(":") + decoded_list = comma_split(data) + + if type(decoded_list) == list: + dictionary["RSRQ PRX"] = decoded_list[0] + dictionary["RSRQ DRX"] = decoded_list[1] + dictionary["RSRQ RX2"] = decoded_list[2] + dictionary["RSRQ RX3"] = decoded_list[3] + dictionary["RSRQ sysmode"] = decoded_list[4] + else: + dictionary["RSRQ"] = decoded_list + return dictionary + + elif "CME" in decoded_item: + dictionary["qrsrq error"] = decoded_item + + except Exception as e: + decoded_item = str(item) + dictionary["qrsrq encoded"] = decoded_item + print(f"Could not decode QRSRQ item:\t{decoded_item}\n{e}") + + return dictionary + + except Exception as e: + return {"QRSRQ error": f"{e}"} + + +# Fetch SINR of the current service network (5.12) +def get_modem_sinr(port="/dev/ttyUSB2", baudrate=115200, dictionary={}): + try: + ser = serial.Serial(port, baudrate, timeout=1) + ser.write(b"AT+QSINR?\r") + response = ser.readlines() + ser.close() + + for item in response: + try: + decoded_item = item.decode("utf-8") + + if "QSINR:" in decoded_item: + _, data = decoded_item.split(":") + decoded_list = comma_split(data) + + if type(decoded_list) == list: + dictionary["SINR PRX"] = decoded_list[0] + dictionary["SINR DRX"] = decoded_list[1] + dictionary["SINR RX2"] = decoded_list[2] + dictionary["SINR RX3"] = decoded_list[3] + dictionary["SINR sysmode"] = decoded_list[4] + else: + decoded_item = decoded_list + return dictionary + + elif "CME" in decoded_item: + dictionary["qsinr error"] = decoded_item + except Exception as e: + decoded_item = str(item) + dictionary["qsinr encoded"] = decoded_item + print(f"Could not decode QSINR item:\t{decoded_item}\n{e}") + + return dictionary + + except Exception as e: + return {"QSINR error": f"{e}"} + + +# Fetch the list of preferred operators (5.13) +def get_modem_cpol(port="/dev/ttyUSB2", baudrate=115200, dictionary={}): + try: + ser = serial.Serial(port, baudrate, timeout=1) + ser.write(b"AT+CPOL?\r") + response = ser.readlines() + ser.close() + + for item in response: + try: + decoded_item = item.decode("utf-8") + + if "CPOL:" in decoded_item: + _, data = decoded_item.split(":") + decoded_list = comma_split(data) + + if type(decoded_list) == list: + dictionary["index"] = decoded_list[0] + dictionary["format"] = decoded_list[1] + dictionary["oper"] = decoded_list[2] + dictionary["GSM"] = decoded_list[3] + dictionary["GSM compact"] = decoded_list[4] + dictionary["UTRAN"] = decoded_list[5] + dictionary["E-UTRAN"] = decoded_list[6] + dictionary["NG-RAN"] = decoded_list[7] + else: + dictionary["cpol"] = decoded_list + return dictionary + + elif "CME" in decoded_item: + dictionary["cpol error"] = decoded_item + except Exception as e: + decoded_item = str(item) + dictionary["cpol encoded"] = decoded_item + print(f"Could not decode CPOL item:\t{decoded_item}\n{e}") + + return dictionary + + except Exception as e: + return {"CPOL error": f"{e}"} + + +# Fetch network parameters (5.21.4) +def get_modem_qnwcfg(port="/dev/ttyUSB2", baudrate=115200, dictionary={}): + try: + ser = serial.Serial(port, baudrate, timeout=1) + ser.write(b'AT+QNWCFG="up/down"\r') + response = ser.readlines() + ser.close() + + for item in response: + try: + decoded_item = item.decode("utf-8") + + if "QNWCFG:" in decoded_item: + _, data = decoded_item.split(":") + decoded_list = comma_split(data) + + if type(decoded_list) == list: + dictionary["up/down"] = decoded_list[0] + dictionary["uplink (bytes/s)"] = decoded_list[1] + dictionary["downlink (bytes/s)"] = decoded_list[2] + dictionary["time interval"] = decoded_list[3] + else: + dictionary["qnwcfg"] = decoded_list + + return dictionary + + elif "CME" in decoded_item: + dictionary["qnwcfg"] = decoded_item + except Exception as e: + decoded_item = str(item) + dictionary["qnwcfg encoded"] = decoded_item + print(f"Could not decode QNWCFG item:\t{decoded_item}\n{e}") + + return dictionary + + except Exception as e: + return {"QNWCFG error": f"{e}"} + + +# Fetch network information (5.18) +def get_modem_qnwinfo(port="/dev/ttyUSB2", baudrate=115200, dictionary={}): + try: + ser = serial.Serial(port, baudrate, timeout=1) + ser.write(b"AT+QNWINFO\r") + response = ser.readlines() + ser.close() + + for item in response: + try: + decoded_item = item.decode("utf-8") + + if "QNWINFO:" in decoded_item: + _, data = decoded_item.split(":") + dictionary["network information"] = data + return dictionary + elif "CME" in decoded_item: + dictionary["network information"] = decoded_item + except Exception as e: + dictionary["qnwinfo encoded"] = str(item) + print(f"Could not decode QNWINFO item:\t{str(item)}\n{e}") + return dictionary + + except Exception as e: + return {"QNWINFO error": f"{e}"} + + +# Fetch servie provider name (5.19) +def get_modem_qspn(port="/dev/ttyUSB2", baudrate=115200, dictionary={}): + try: + ser = serial.Serial(port, baudrate, timeout=1) + ser.write(b"AT+QSPN\r") + response = ser.readlines() + ser.close() + + for item in response: + try: + decoded_item = item.decode("utf-8") + + if "QSPN:" in decoded_item: + _, data = decoded_item.split(":") + dictionary["service provider"] = data + return dictionary + elif "CME" in decoded_item: + dictionary["service provider"] = decoded_item + except Exception as e: + dictionary["qspn encoded"] = str(item) + print(f"Could not decode QSPN item:\t{str(item)}\n{e}") + return dictionary + + except Exception as e: + return {"QSPN error": f"{e}"} diff --git a/communication.py b/communication.py new file mode 100644 index 0000000..919847c --- /dev/null +++ b/communication.py @@ -0,0 +1,338 @@ +import pprint +import re +import subprocess +import threading +import time + +import serial + +from at_commands import (get_modem_cops, get_modem_cpol, get_modem_creg, + get_modem_csq, get_modem_qnwcfg, get_modem_qnwinfo, + get_modem_qspn, get_modem_rsrp, get_modem_rsrq, + get_modem_sinr, set_configs) +from helper_functions import (calculate_distance, parse_lat_lon, + save_data_to_json) +from sierra_commands import get_modem_nr_info, get_modem_status + +# Globals +running = False # To control start/stop +base_location = None # The basestation location + + +def read_gps_data( + port: str = "/dev/ttyACM0", baudrate: int = 9600, timeout: int = 5 +) -> dict: + """ + Reads GPS data from a u-blox 7 GPS/GLONASS device and returns it as a dictionary. + + Args: + port (str): The serial port the GPS device is connected to. Default is '/dev/ttyACM0'. + baudrate (int): Baud rate for serial communication. Default is 9600. + timeout (int): Timeout in seconds for the serial port. Default is 5 seconds. + + Returns: + dict: A dictionary containing parsed GPS data. + """ + gps_data = {} + attempts = 0 + + try: + with serial.Serial(port, baudrate=baudrate, timeout=timeout) as ser: + while True: + # Read a line from the GPS device + line = ser.readline().decode("ascii", errors="ignore").strip() + + # Filter for GGA or RMC sentences for relevant data + if line.startswith("$GPGGA"): # Global Positioning System Fix Data + parts = line.split(",") + gps_data["utc_time"] = parts[1] + gps_data["latitude"] = parse_lat_lon(parts[2], parts[3]) + gps_data["longitude"] = parse_lat_lon(parts[4], parts[5]) + gps_data["altitude"] = float(parts[9]) if parts[9] else None + + # Count number of times GPGGA has been queried + attempts = attempts + 1 + + # Stop after collecting sufficient data + if ( + gps_data.get("utc_time") + and gps_data.get("latitude") + and gps_data.get("longitude") + ): + break + elif attempts >= 3: + gps_data = {} + break + except serial.SerialException as e: + print(f"Serial communication error: {e}") + except Exception as e: + print(f"Error: {e}") + + return gps_data + + +def set_base_location(): + global base_location + base_location = {} + + try: + gps_data = read_gps_data() + base_location["latitude"] = gps_data["latitude"] + base_location["longitude"] = gps_data["longitude"] + if gps_data.get("altitude"): + base_location["altitude"] = gps_data["altitude"] + + print("Base location found") + except KeyError: + print("Base location could not be found") + except Exception as e: + print(f"Error finding location: {e}") + + return base_location + + +def get_current_location(dictionary={}): + global base_location + + try: + gps_data = read_gps_data() + dictionary["latitude"] = gps_data["latitude"] + dictionary["longitude"] = gps_data["longitude"] + if gps_data.get("altitude"): + dictionary["altitude"] = gps_data["altitude"] + + if "latitude" in base_location: + dictionary["baseLatitude"] = base_location["latitude"] + dictionary["baseLongitude"] = base_location["longitude"] + if base_location.get("altitude"): + dictionary["baseAltitude"] = base_location["altitude"] + + dictionary["distance"] = calculate_distance(base_location, gps_data) + except KeyError: + print("Location could not be found") + dictionary["distance"] = "Unknown" + except Exception as e: + print(f"Error finding location: {e}") + dictionary["distance"] = "Error" + + return dictionary + + +# Ping base station +def ping_basestation( + address="10.45.0.1", dictionary={} +): # raspberry pi address 192.168.0.29, host1 is 0.30 + try: + result = subprocess.run( + ["ping", "-c", "1", address], capture_output=True, text=True + ) + match = re.search(r"time=([\d.]+)", result.stdout) + + if match: + time_value = match.group(1) + dictionary["ping_time"] = time_value + dictionary["ping_stats"] = result.stdout + return dictionary + else: + dictionary["ping_time"] = "Not found" + dictionary["ping_stats"] = result.stdout + return dictionary + except Exception as e: + return {"error": str(e)} + + +# Run iperf test +def collect_iperf( + filename, + server_ip="10.45.0.1", + duration=10, + is_client=True, +): + if is_client: + commands = [["iperf3", "-s"]] + else: + commands = [ + ["iperf3", "-c", server_ip, "-t", str(duration)], + ["iperf3", "-c", server_ip, "-t", str(duration), "-R"], + ] + + for command in commands: + try: + # try: + # gps_data = read_gps_data() + # start_distance = calculate_distance(base_location, gps_data) + # except: + # print("Could not collect location") + + # Run the command + result = subprocess.run( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True + ) + output = result.stdout + + # Check for errors + if result.returncode != 0: + print(f"Error running iperf3: {result.stderr}") + return result.stderr + + # try: + # gps_data = read_gps_data() + # end_distance = calculate_distance(base_location, gps_data) + # except: + # pass + + # Look for final sender and receiver bitrates (usually in summary lines) + matches = re.findall( + r"\[\s*\d+\]\s+\d+\.\d+\-\d+\.\d+\s+sec\s+[\d.]+\s+\w+Bytes\s+([\d.]+)\s+Mbits/sec\s+(\S+)?", + output, + ) + + if len(matches) >= 1: + # Take the last throughput entry + last_entry = matches[-1] + sender_bitrate = float(last_entry[0]) + + # Sometimes iperf omits receiver or sender lines depending on direction + # Try to find both separately: + receiver_match = re.search(r"receiver", output, re.IGNORECASE) + sender_match = re.search(r"sender", output, re.IGNORECASE) + + if receiver_match and sender_match and len(matches) >= 2: + receiver_bitrate = float(matches[-1][0]) + sender_bitrate = float(matches[-2][0]) + else: + receiver_bitrate = sender_bitrate # fallback: assume same + + else: + bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", output) + sender_bitrate = float(bitrates[-2]) + receiver_bitrate = float(bitrates[-1]) + + data = { + "iperf_full": output, + "sender_bitrate": sender_bitrate, + "receiver_bitrate": receiver_bitrate, + "note": ( + "avgs are calculated with the assumption " + "that all value are in Mbits/sec" + ), + # "start_distance": start_distance, + # "end_distance": end_distance, + } + + print("\nIPERF complete") + print(f"IPERF sender bitrate: {sender_bitrate}") + print(f"IPERF receiver bitrate: {receiver_bitrate}") + + save_data_to_json(data=data, filename=filename) + + except Exception as e: + print(f"iPerf Error: {e}") + + +# Collect and send data continuously +def collect_data(filename): + global base_location + global running + + while running: + data = {} + # data = get_current_location(dictionary=data) + data = get_modem_cops(dictionary=data) + data = get_modem_creg(dictionary=data) + data = get_modem_csq(dictionary=data) + data = get_modem_rsrp(dictionary=data) + data = get_modem_rsrq(dictionary=data) + data = get_modem_sinr(dictionary=data) + data = get_modem_cpol(dictionary=data) + data = get_modem_qnwcfg(dictionary=data) + data = get_modem_qnwinfo(dictionary=data) + data = get_modem_qspn(dictionary=data) + data = ping_basestation(dictionary=data) + data["timestamp"] = time.time() + + # Send to server + # print(f"\nDistance: {data.get('distance')}") + print(f"\nService: {data.get('network information')}") + print(f"Ping Time: {data.get('ping_time')}") + print( + f"RSRP: {data.get('RSRP PRX')} {data.get('RSRP DRX')} {data.get('RSRP RX2')} {data.get('RSRP RX3')}" + ) + + save_data_to_json(data=data, filename=filename) + + time.sleep(5) + + +# Collect and send data continuously +def collect_sierra_data(filename): + global base_location + global running + + while running: + data = {} + data = get_current_location(dictionary=data) + data = get_modem_nr_info(dictionary=data) + data = get_modem_status(dictionary=data) + data = ping_basestation(dictionary=data) + data["timestamp"] = time.time() + + # Send to server + print(f"\nDistance: {data.get('distance')}") + print(f"Ping Time: {data.get('ping_time')}") + print(f"RSRP: {data.get('RSRP')}") + + save_data_to_json(data=data, filename=filename) + + time.sleep(5) + + +# Main function +def main(): + global running + filename = "data/boat_relay_sept_11/collection_" + str(int(time.time())) + ".json" + + print("Setting configs...") + set_configs() + + print( + "Type 'l' to set basestation location, 'b' to begin, " + "'s' to stop, 'i' to run an iperf test, or 'x' to exit:" + ) + while True: + command = input("> ").strip().lower() + if command == "b" and not running: + print("Starting data collection...") + running = True + threading.Thread(target=collect_data, args=(filename,)).start() + elif command == "l": + base_location_data = set_base_location() + save_data_to_json(data=base_location_data, filename=filename) + elif command == "i": + threading.Thread(target=collect_iperf, args=(filename, "10.45.0.1")).start() + elif command == "s" and running: + print("Stopping data collection...") + running = False + elif command == "x": + running = False + break + elif command == "m": + base_location_data = set_base_location() + + start_time = time.time() + data = get_current_location() + data["timestamp"] = time.time() + end_time = time.time() + + print("Collected Data:") + pprint.pprint(data) + print(" ") + save_data_to_json(data=data, filename=filename) + print(f"Run time: {end_time - start_time}") + + elif command not in ["l", "b", "s", "i", "x"]: + print("Invalid command. Type 'l', 'b', 's', 'i', or 'x'.") + + +if __name__ == "__main__": + main() diff --git a/helper_functions.py b/helper_functions.py new file mode 100644 index 0000000..4228392 --- /dev/null +++ b/helper_functions.py @@ -0,0 +1,121 @@ +import json +import math +import re + + +def calculate_distance(point1: dict, point2: dict) -> float: + """ + Calculate the distance in meters between two geographic points with altitude. + + :param point1: A dictionary with 'latitude', 'longitude', and optional 'altitude'. + :param point2: A dictionary with 'latitude', 'longitude', and optional 'altitude'. + :return: Distance in meters as a float. + """ + try: + # Extract latitude, longitude, and altitude + lat1, lon1, alt1 = ( + point1["latitude"], + point1["longitude"], + point1.get("altitude", 0), + ) + lat2, lon2, alt2 = ( + point2["latitude"], + point2["longitude"], + point2.get("altitude", 0), + ) + except KeyError: + return "Points were not properly set" + + # Convert latitude and longitude from degrees to radians + lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2]) + + # Radius of the Earth in meters + R = 6369500 + + # Compute deltas + delta_lat = lat2 - lat1 + delta_lon = lon2 - lon1 + + # Haversine formula for horizontal distance + a = ( + math.sin(delta_lat / 2) ** 2 + + math.cos(lat1) * math.cos(lat2) * math.sin(delta_lon / 2) ** 2 + ) + c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + horizontal_distance = R * c + + # Altitude difference + delta_alt = alt2 - alt1 + + # Total 3D distance + distance = math.sqrt(horizontal_distance**2 + delta_alt**2) + + return distance + + +# Split string by commas, if there are commas +def comma_split(string: str): + if "," in string: + return string.split(",") + else: + return string + + +# Extract stats from strings +def extract_numbers(string): + pattern = r"-?\d+\.\d+|-?\d+" + numbers = re.findall(pattern, string) + numbers = [float(num) if "." in num else int(num) for num in numbers] + return numbers + + +# Parse the latitude or longitude from the NMEA format +def parse_lat_lon(value: str, direction: str) -> float: + """ + Parses latitude or longitude from NMEA format. + + Args: + value (str): The raw NMEA latitude or longitude value. + direction (str): Direction indicator ('N', 'S', 'E', 'W'). + + Returns: + float: The parsed latitude or longitude as a decimal degree. + """ + if not value or not direction: + return None + degrees = int(value[:2]) + minutes = float(value[2:]) + decimal = degrees + minutes / 60 + if direction in ["S", "W"]: + decimal = -decimal + return decimal + + +# Convert RSSI to DBM +def rssi_to_dbm(rssi): + if rssi == 99: + return "Unknown" + else: + # Convert RSSI to dBm + return -113 + 2 * rssi + + +# Save data to JSON file +def save_data_to_json(data, filename): + try: + # Load existing data + try: + with open(filename, "r") as file: + existing_data = json.load(file) + except FileNotFoundError: + # If file doesn't exist, start with an empty list + existing_data = [] + + # Append new data + existing_data.append(data) + + # Save updated data back to the file + with open(filename, "w") as file: + json.dump(existing_data, file, indent=4) + except Exception as e: + print(f"Error saving data to JSON: {e}") diff --git a/plots.py b/plots.py new file mode 100644 index 0000000..653539b --- /dev/null +++ b/plots.py @@ -0,0 +1,319 @@ +import json +import re + +import matplotlib.pyplot as plt + + +def plot_rsrp(filename): + # Load the JSON file + with open(filename, "r") as file: + data = json.load(file) + + # Extract distance and RSRP values (convert RSRP values to integers) + distances = [] + rsrp_prx = [] + rsrp_drx = [] + rsrp_rx2 = [] + rsrp_rx3 = [] + + for entry in data: + try: + rsrp_prx.append( + -169 + if int(entry["RSRP PRX"].strip()) == -32768 + else int(entry.get("RSRP PRX", -169)) + ) + rsrp_drx.append( + -169 + if int(entry["RSRP DRX"].strip()) == -32768 + else int(entry.get("RSRP DRX", -169)) + ) + rsrp_rx2.append( + -169 + if int(entry["RSRP RX2"].strip()) == -32768 + else int(entry.get("RSRP RX2", -169)) + ) + rsrp_rx3.append( + -169 + if int(entry["RSRP RX3"].strip()) == -32768 + else int(entry.get("RSRP RX3", -169)) + ) + distances.append(entry["distance"]) + except (ValueError, KeyError): + continue + + # Plot the data + plt.figure(figsize=(10, 6)) + plt.plot(distances, rsrp_prx, label="RSRP PRX", marker="o") + plt.plot(distances, rsrp_drx, label="RSRP DRX", marker="s") + plt.plot(distances, rsrp_rx2, label="RSRP RX2", marker="^") + plt.plot(distances, rsrp_rx3, label="RSRP RX3", marker="d") + + plt.title("RSRP vs Distance") + plt.xlabel("Distance (m)") + plt.ylabel("RSRP (dBm)") + plt.legend() + plt.grid(True) + plt.tight_layout() + + # Show the plot + plt.show() + + +def plot_rsrq(filename): + # Load the JSON file + with open(filename, "r") as file: + data = json.load(file) + + # Extract distance and RSRQ values (convert RSRQ values to integers) + distances = [] + rsrq_prx = [] + rsrq_drx = [] + rsrq_rx2 = [] + rsrq_rx3 = [] + + for entry in data: + try: + rsrq_prx.append( + -20 + if int(entry["RSRQ PRX"].strip()) == -32768 + else int(entry.get("RSRQ PRX", -20)) + ) + rsrq_drx.append( + -20 + if int(entry["RSRQ DRX"].strip()) == -32768 + else int(entry.get("RSRQ DRX", -20)) + ) + rsrq_rx2.append( + -20 + if int(entry["RSRQ RX2"].strip()) == -32768 + else int(entry.get("RSRQ RX2", -20)) + ) + rsrq_rx3.append( + -20 + if int(entry["RSRQ RX3"].strip()) == -32768 + else int(entry.get("RSRQ RX3", -20)) + ) + distances.append(entry["distance"]) + except (ValueError, KeyError): + continue + + # Plot the data + plt.figure(figsize=(10, 6)) + plt.plot(distances, rsrq_prx, label="RSRQ PRX", marker="o") + plt.plot(distances, rsrq_drx, label="RSRQ DRX", marker="s") + plt.plot(distances, rsrq_rx2, label="RSRQ RX2", marker="^") + plt.plot(distances, rsrq_rx3, label="RSRQ RX3", marker="d") + + plt.title("RSRQ vs Distance") + plt.xlabel("Distance (m)") + plt.ylabel("RSRQ (dBm)") + plt.legend() + plt.grid(True) + plt.tight_layout() + + # Show the plot + plt.show() + + +def plot_iperf(filename): + # Load the JSON file + with open(filename, "r") as file: + data = json.load(file) + + distances = [] + sender = [] + receiver = [] + + for entry in data: + try: + message = entry["iperf_full"] + bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", message) + + sender.append(float(bitrates[-2])) + receiver.append(float(bitrates[-1])) + distances.append(entry["start_distance"]) + except (ValueError, KeyError): + continue + + # Plot the data + plt.figure(figsize=(10, 6)) + plt.plot(distances, sender, label="Avg Sender Bitrate", marker="o") + plt.plot(distances, receiver, label="Avg Receiver Bitrate", marker="s") + + plt.title("IPERF vs Distance") + plt.xlabel("Distance (m)") + plt.ylabel("Bitrate (Mbits/s)") + plt.legend() + plt.grid(True) + plt.tight_layout() + + # Show the plot + plt.show() + + +def plot_bytes(filename): + # Load the JSON file + with open(filename, "r") as file: + data = json.load(file) + + distances = [] + uplink = [] + downlink = [] + + for entry in data: + try: + if ( + int(entry["uplink (bytes/s)"].strip()) < 1000000 + and int(entry["downlink (bytes/s)"].strip()) < 1000000 + ): + distances.append(entry["distance"]) + uplink.append(int(entry["uplink (bytes/s)"].strip())) + downlink.append(int(entry["downlink (bytes/s)"].strip())) + except (ValueError, KeyError): + continue + + # Plot the data + plt.figure(figsize=(10, 6)) + plt.plot(distances, downlink, label="Downlink Bitrate", marker="o") + plt.plot(distances, uplink, label="Uplink Bitrate", marker="s") + + plt.title("Bitrate vs Distance") + plt.xlabel("Distance (m)") + plt.ylabel("Bitrate (bytes/s)") + plt.legend() + plt.grid(True) + plt.tight_layout() + + # Show the plot + plt.show() + + +def plot_manual(): + distances = [0, 100, 200, 300, 400, 500, 600, 700, 800] + rsrps = [-56, -82, -90, -93, -100, -105, -105, -116, -150] + sender = [0, 6.06, 6.95, 6.37, 6.96, 8.04, 7.30, 0, 0] + receiver = [0, 5.20, 6.10, 5.24, 6.08, 6.84, 6.37, 0, 0] + + # Plot the RSRP data + plt.figure(figsize=(10, 6)) + plt.plot(distances, rsrps, label="RSRP", marker="o") + + plt.title("RSRP vs Distance") + plt.xlabel("Distance (m)") + plt.ylabel("RSRP (dBm)") + plt.legend() + plt.grid(True) + plt.tight_layout() + + # Show the plot + plt.show() + + # Plot the iperf data + plt.figure(figsize=(10, 6)) + plt.plot(distances, sender, label="Avg Sender Bitrate", marker="o") + plt.plot(distances, receiver, label="Avg Receiver Bitrate", marker="s") + + plt.title("IPERF vs Distance") + plt.xlabel("Distance (m)") + plt.ylabel("Bitrate (Mbits/s)") + plt.legend() + plt.grid(True) + plt.tight_layout() + + # Show the plot + plt.show() + + +def plot_double_iperf(filename): + # Load the JSON file + with open(filename, "r") as file: + data = json.load(file) + + distances = [] + sender = [] + receiver = [] + reverse_distances = [] + reverse_sender = [] + reverse_receiver = [] + + for entry in data: + if "iperf_full" in entry: + if "Reverse mode" in entry["iperf_full"]: + try: + reverse_sender.append(float(entry["sender_bitrate"])) + reverse_receiver.append(float(entry["receiver_bitrate"])) + reverse_distances.append(entry["start_distance"]) + except: + message = entry["iperf_full"] + bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", message) + + reverse_sender.append(float(bitrates[-2])) + reverse_receiver.append(float(bitrates[-1])) + reverse_distances.append(entry["start_distance"]) + else: + try: + sender.append(float(entry["sender_bitrate"])) + receiver.append(float(entry["receiver_bitrate"])) + distances.append(entry["start_distance"]) + except: + message = entry["iperf_full"] + bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", message) + + sender.append(float(bitrates[-2])) + receiver.append(float(bitrates[-1])) + distances.append(entry["start_distance"]) + + # Plot the data + plt.figure(figsize=(10, 6)) + plt.plot(distances, sender, label="Avg Sender Bitrate", marker="o", color="red") + plt.plot( + distances, + receiver, + label="Avg Receiver Bitrate", + marker="s", + color="darkorange", + ) + plt.plot( + reverse_distances, + reverse_sender, + label="Avg Reverse Sender Bitrate", + marker="^", + color="blue", + ) + plt.plot( + reverse_distances, + reverse_receiver, + label="Avg Reverse Receiver Bitrate", + marker="d", + color="blueviolet", + ) + + plt.title("IPERF vs Distance") + plt.xlabel("Distance (m)") + plt.ylabel("Bitrate (Mbits/s)") + plt.legend() + plt.grid(True) + plt.tight_layout() + + # Show the plot + plt.show() + + +if __name__ == "__main__": + # print("Connecting to host 10.46.0.1, port 5201\n[ 5] local 192.168.225.83 port 60164 connected to 10.46.0.1 port 5201\n[ ID] Interval Transfer Bitrate Retr Cwnd\n[ 5] 0.00-1.00 sec 361 KBytes 2.95 Mbits/sec 0 43.4 KBytes \n[ 5] 1.00-2.00 sec 329 KBytes 2.70 Mbits/sec 0 56.6 KBytes \n[ 5] 2.00-3.00 sec 782 KBytes 6.41 Mbits/sec 0 89.5 KBytes \n[ 5] 3.00-4.00 sec 379 KBytes 3.11 Mbits/sec 0 107 KBytes \n[ 5] 4.00-5.00 sec 569 KBytes 4.66 Mbits/sec 0 133 KBytes \n[ 5] 5.00-6.00 sec 379 KBytes 3.11 Mbits/sec 0 151 KBytes \n[ 5] 6.00-7.00 sec 632 KBytes 5.18 Mbits/sec 0 182 KBytes \n[ 5] 7.00-8.00 sec 569 KBytes 4.66 Mbits/sec 0 247 KBytes \n[ 5] 8.00-9.00 sec 379 KBytes 3.11 Mbits/sec 0 309 KBytes \n[ 5] 9.00-10.00 sec 442 KBytes 3.62 Mbits/sec 0 432 KBytes \n- - - - - - - - - - - - - - - - - - - - - - - - -\n[ ID] Interval Transfer Bitrate Retr\n[ 5] 0.00-10.00 sec 4.71 MBytes 3.95 Mbits/sec 0 sender\n[ 5] 0.00-10.82 sec 4.31 MBytes 3.34 Mbits/sec receiver\n\niperf Done.\n") + # print("Connecting to host 10.46.0.1, port 5201\n[ 5] local 192.168.225.83 port 44064 connected to 10.46.0.1 port 5201\n[ ID] Interval Transfer Bitrate Retr Cwnd\n[ 5] 0.00-1.00 sec 405 KBytes 3.32 Mbits/sec 0 44.8 KBytes \n[ 5] 1.00-2.00 sec 320 KBytes 2.62 Mbits/sec 0 57.9 KBytes \n[ 5] 2.00-3.00 sec 207 KBytes 1.69 Mbits/sec 0 65.8 KBytes \n[ 5] 3.00-4.00 sec 253 KBytes 2.07 Mbits/sec 0 79.0 KBytes \n[ 5] 4.00-5.00 sec 379 KBytes 3.11 Mbits/sec 0 93.5 KBytes \n[ 5] 5.00-6.00 sec 442 KBytes 3.62 Mbits/sec 0 124 KBytes \n[ 5] 6.00-7.00 sec 442 KBytes 3.62 Mbits/sec 0 176 KBytes \n[ 5] 7.00-8.00 sec 569 KBytes 4.66 Mbits/sec 0 249 KBytes \n[ 5] 8.00-9.00 sec 695 KBytes 5.69 Mbits/sec 0 333 KBytes \n[ 5] 9.00-10.00 sec 442 KBytes 3.62 Mbits/sec 0 433 KBytes \n- - - - - - - - - - - - - - - - - - - - - - - - -\n[ ID] Interval Transfer Bitrate Retr\n[ 5] 0.00-10.00 sec 4.06 MBytes 3.40 Mbits/sec 0 sender\n[ 5] 0.00-11.14 sec 3.48 MBytes 2.62 Mbits/sec receiver\n\niperf Done.\n") + + plot_double_iperf( + filename="/home/madrigal/Documents/code/beach_apr4/collection_1743777162.json" + ) + plot_rsrp( + filename="/home/madrigal/Documents/code/beach_apr4/collection_1743777162.json" + ) + plot_rsrq( + filename="/home/madrigal/Documents/code/beach_apr4/collection_1743777162.json" + ) + # plot_double_iperf(filename="/home/madrigal/Documents/code/beach_mar_7/collection_whip_antennas.json") + # plot_iperf(filename='/home/madrigal/Documents/code/collections_beach_jan_29/collection_1738178064.json') + # plot_bytes(filename="/home/madrigal/Documents/code/collections_beach_jan_29/collection_1738178064.json") + # plot_manual() diff --git a/sierra_commands.py b/sierra_commands.py new file mode 100644 index 0000000..e4a88a3 --- /dev/null +++ b/sierra_commands.py @@ -0,0 +1,63 @@ +import serial + +from helper_functions import extract_numbers + + +# Fetch operational status +def get_modem_status(port="/dev/ttyUSB0", baudrate=115200, dictionary={}): + try: + ser = serial.Serial(port, baudrate, timeout=1) + ser.write(b"AT!GSTATUS?\r") + response = ser.readlines() + ser.close() + full_decoded = "" + + for item in response: + try: + decoded_item = item.decode("utf-8") + full_decoded = full_decoded + "\n" + decoded_item + + except Exception as e: + dictionary["status encoded"] = str(item) + print(f"Could not decode GSTATUS item:\t{str(item)}\n{e}") + + dictionary["Status"] = full_decoded + return dictionary + + except Exception as e: + return {"GSTATUS error": f"{e}"} + + +# Fetch NR (5G) information of the device +def get_modem_nr_info(port="/dev/ttyUSB0", baudrate=115200, dictionary={}): + try: + ser = serial.Serial(port, baudrate, timeout=1) + ser.write(b"AT!NRINFO?\r") + response = ser.readlines() + ser.close() + full_decoded = "" + + for item in response: + try: + decoded_item = item.decode("utf-8") + full_decoded = full_decoded + "\n" + decoded_item + + if "NR5G RSRP (dBm):" in decoded_item: + numbers = extract_numbers(decoded_item) + if len(numbers) >= 4: + dictionary["RSRP"] = numbers[1] + dictionary["RSRQ"] = numbers[3] + elif "NR5G SINR (dB):" in decoded_item: + numbers = extract_numbers(decoded_item) + if len(numbers) >= 2: + dictionary["SINR"] = numbers[1] + + except Exception as e: + dictionary["NR info encoded"] = str(item) + print(f"Could not decode NRINFO item:\t{str(item)}\n{e}") + + dictionary["NR Info"] = full_decoded + return dictionary + + except Exception as e: + return {"NRINFO error": f"{e}"}