"""Campaign executor: runs a capture campaign end-to-end.""" from __future__ import annotations import json import logging import subprocess import time from dataclasses import dataclass, field from pathlib import Path from typing import Callable, Optional from ria_toolkit_oss.datatypes.recording import Recording from ria_toolkit_oss.io.recording import to_sigmf from .campaign import CampaignConfig, CaptureStep, TransmitterConfig from .labeler import build_output_filename, label_recording from .qa import QAResult, check_recording logger = logging.getLogger(__name__) # Device name aliases: campaign YAML names → get_sdr_device() names _DEVICE_ALIASES = { "usrp_b210": "usrp", "usrp_b200": "usrp", "usrp": "usrp", "plutosdr": "pluto", "pluto": "pluto", "hackrf": "hackrf", "hackrf_one": "hackrf", "bladerf": "bladerf", "rtlsdr": "rtlsdr", "rtl_sdr": "rtlsdr", "thinkrf": "thinkrf", # Simulated device — no hardware required "mock": "mock", "sim": "mock", } @dataclass class StepResult: """Outcome of a single capture step.""" transmitter_id: str step_label: str output_path: Optional[str] qa: QAResult capture_timestamp: float error: Optional[str] = None @property def ok(self) -> bool: return self.error is None and self.qa.passed def to_dict(self) -> dict: return { "transmitter_id": self.transmitter_id, "step_label": self.step_label, "output_path": self.output_path, "capture_timestamp": self.capture_timestamp, "qa": self.qa.to_dict(), "error": self.error, } @dataclass class CampaignResult: """Aggregate outcome of a full campaign.""" campaign_name: str steps: list[StepResult] = field(default_factory=list) start_time: float = field(default_factory=time.time) end_time: Optional[float] = None @property def total_steps(self) -> int: return len(self.steps) @property def passed(self) -> int: return sum(1 for s in self.steps if s.ok) @property def flagged(self) -> int: return sum(1 for s in self.steps if not s.error and s.qa.flagged) @property def failed(self) -> int: return sum(1 for s in self.steps if s.error or not s.qa.passed) @property def duration_s(self) -> float: if self.end_time: return self.end_time - self.start_time return time.time() - self.start_time def to_dict(self) -> dict: return { "campaign_name": self.campaign_name, "total_steps": self.total_steps, "passed": self.passed, "flagged": self.flagged, "failed": self.failed, "duration_s": round(self.duration_s, 1), "steps": [s.to_dict() for s in self.steps], } def write_report(self, path: str | Path) -> None: """Write a JSON QA report to disk.""" path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w") as f: json.dump(self.to_dict(), f, indent=2) logger.info(f"QA report written to {path}") # --------------------------------------------------------------------------- # External script interface # --------------------------------------------------------------------------- def _run_script(script: str, *args: str, timeout: float = 15.0) -> str: """Run an external control script and return stdout. The script is called as::