import re import subprocess import time from functions.helper_functions import calculate_distance, normalize, save_data_to_json from functions.location import read_gps_data def get_location(base_location): try: location = read_gps_data() if base_location: start_distance = calculate_distance(base_location, location) else: start_distance = None except Exception as e: print(f"Could not collect location: {e}") start_distance = None return location, start_distance def parse_iperf_output(output): if output == "": return None, None matches = re.findall( r"\[\s*\d+\]\s+\d+\.\d+\-\d+\.\d+\s+sec\s+[\d.]+\s+\w+Bytes\s+([\d.]+)\s+(Kbits/sec|Mbits/sec)", output, ) if len(matches) >= 1: # Normalize the last entries last_value, last_unit = matches[-1] sender_bitrate = normalize(float(last_value), last_unit) # Try to differentiate sender vs receiver receiver_match = re.search(r"receiver", output, re.IGNORECASE) sender_match = re.search(r"sender", output, re.IGNORECASE) if receiver_match and sender_match and len(matches) >= 2: recv_value, recv_unit = matches[-1] send_value, send_unit = matches[-2] receiver_bitrate = normalize(float(recv_value), recv_unit) sender_bitrate = normalize(float(send_value), send_unit) else: receiver_bitrate = sender_bitrate else: bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", output) sender_bitrate = float(bitrates[-2]) receiver_bitrate = float(bitrates[-1]) return sender_bitrate, receiver_bitrate # Run iperf test def collect_iperf( filename, is_client=True, server_ip="10.45.0.1", stations="", duration=10, timeout=20, base_location=None, ): if is_client: commands = [ ["iperf3", "-c", server_ip, "-t", str(duration)], ["iperf3", "-c", server_ip, "-t", str(duration), "-R"], ] else: commands = [["iperf3", "-s"]] for command in commands: try: location, start_distance = get_location(base_location=base_location) # Run iperf with timeout protection try: result = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=timeout, ) output = result.stdout # Check for errors if result.returncode != 0: print(f"Error running iperf3: {result.stderr}") output = "" error = result.stderr except subprocess.TimeoutExpired: print(f"iPerf test timed out after {timeout} seconds.") output = "" error = f"iPerf test timed out after {timeout} seconds." sender_bitrate, receiver_bitrate = parse_iperf_output(output) if "-R" in command: test_type = "downlink" else: test_type = "uplink" data = { "iperf_full": output, "sender_bitrate": sender_bitrate, "receiver_bitrate": receiver_bitrate, "start_distance": start_distance, "location": location, "type": test_type, "stations": stations, } if output == "": data["error"] = error else: print(f"\n{server_ip} {test_type} IPERF complete") print(f"IPERF sender bitrate: {sender_bitrate}") print(f"IPERF receiver bitrate: {receiver_bitrate}") save_data_to_json(data=data, filename=filename) time.sleep(0.25) except Exception as e: print(f"iPerf Error: {e}")