#!/usr/bin/env python3 from __future__ import annotations import argparse import ast import json import os import re import subprocess import sys import time import urllib.request import xml.etree.ElementTree as ET from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple RUN_STATES = { "COMPLETED", "FAILED", "CANCELLED", "TERMINATED", "HUNG", "UNKNOWN", "RUNNING", } @dataclass class HostResult: host: str kernel: str status: str detail: str tests: int = 0 failures: int = 0 duration_seconds: Optional[float] = None timestamp: Optional[datetime] = None def now_utc() -> datetime: return datetime.now(timezone.utc) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument("--build-name", required=True) parser.add_argument("--run-log", required=True) parser.add_argument("--reporter-root", required=True) parser.add_argument("--inventory-file", required=True) parser.add_argument("--state-dir", required=True) parser.add_argument("--poll-interval", type=int, default=30) parser.add_argument("--max-watch-seconds", type=int, default=6 * 60 * 60) parser.add_argument("--process-exit-grace-seconds", type=int, default=120) return parser.parse_args() def ensure_dir(path: Path) -> None: path.mkdir(parents=True, exist_ok=True) def load_inventory(path: Path) -> Dict[str, str]: kernels: Dict[str, str] = {} if not path.exists(): return kernels for line in path.read_text(encoding="utf-8").splitlines(): if not line.startswith("|"): continue parts = [part.strip() for part in line.strip().strip("|").split("|")] if len(parts) < 3 or parts[0] in {"OS", "---"}: continue host = parts[1] kernel = parts[2] kernels[host] = kernel or "unknown" return kernels def run_ps() -> str: proc = subprocess.run( ["ps", "-eo", "pid=,args="], capture_output=True, text=True, check=True, ) return proc.stdout def process_active(build_name: str) -> bool: output = run_ps() for line in output.splitlines(): if "run-sorry-cypress.py" in line and f"--build_name {build_name}" in line: return True return False def read_text(path: Path) -> str: try: return path.read_text(encoding="utf-8", errors="replace") except FileNotFoundError: return "" def extract_expected_hosts(log_text: str) -> List[str]: hosts: List[str] = [] spec_match = re.search(r'Extracted specPattern:\s*(\[[^\n]+\])', log_text) if spec_match: try: spec_list = ast.literal_eval(spec_match.group(1)) except (SyntaxError, ValueError): spec_list = [] for entry in spec_list: if not isinstance(entry, str): continue match = re.search(r"(atvm[^/\s]+)\.ts$", entry) if match: host = match.group(1) if host not in hosts: hosts.append(host) for match in re.finditer(r"Running:\s+(?:cypress/cmcRegressionTest/)?(atvm[^/\s]+)\.ts", log_text): host = match.group(1) if host not in hosts: hosts.append(host) return hosts def extract_currents_url(log_text: str) -> Optional[str]: match = re.search(r"(https://\S+/run/\S+)", log_text) return match.group(1) if match else None def load_state(state_file: Path) -> Dict[str, object]: if not state_file.exists(): return {} try: return json.loads(state_file.read_text(encoding="utf-8")) except json.JSONDecodeError: return {} def write_state(state_file: Path, state: Dict[str, object]) -> None: state_file.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8") def parse_xml_timestamp(raw: Optional[str]) -> Optional[datetime]: if not raw: return None try: return datetime.fromisoformat(raw).replace(tzinfo=timezone.utc) except ValueError: return None def parse_host_xml(xml_path: Path) -> Optional[Tuple[str, HostResult]]: try: tree = ET.parse(xml_path) except ET.ParseError: return None root = tree.getroot() suites = root.findall("testsuite") file_name = None tests = int(float(root.attrib.get("tests", "0"))) failures = int(float(root.attrib.get("failures", "0"))) total_time = float(root.attrib.get("time", "0")) timestamp = None for suite in suites: file_attr = suite.attrib.get("file", "") if file_attr.startswith("cypress/cmcRegressionTest/atvm") and file_attr.endswith(".ts"): file_name = Path(file_attr).stem timestamp = parse_xml_timestamp(suite.attrib.get("timestamp")) tests = int(float(suite.attrib.get("tests", root.attrib.get("tests", "0")))) failures = int(float(suite.attrib.get("failures", root.attrib.get("failures", "0")))) total_time = float(suite.attrib.get("time", root.attrib.get("time", "0"))) break if not file_name: return None detail = f"{tests} tests, {failures} failures" status = "FAIL" if failures else "PASS" return file_name, HostResult( host=file_name, kernel="unknown", status=status, detail=detail, tests=tests, failures=failures, duration_seconds=total_time, timestamp=timestamp, ) def collect_host_results( reporter_root: Path, expected_hosts: List[str], kernels: Dict[str, str], run_started_at: datetime, ) -> Dict[str, HostResult]: xml_dir = reporter_root / "xml" results: Dict[str, HostResult] = {} if not xml_dir.exists(): return results for xml_path in sorted(xml_dir.glob("test-result-*.xml"), key=lambda p: p.stat().st_mtime): xml_mtime = datetime.fromtimestamp(xml_path.stat().st_mtime, tz=timezone.utc) if xml_mtime < run_started_at: continue parsed = parse_host_xml(xml_path) if not parsed: continue host, result = parsed if expected_hosts and host not in expected_hosts: continue result.kernel = kernels.get(host, "unknown") results[host] = result return results def find_current_running_host(log_text: str, completed_hosts: List[str]) -> Optional[str]: matches = re.findall(r"Running:\s+(?:cypress/cmcRegressionTest/)?(atvm[^/\s]+)\.ts", log_text) for host in reversed(matches): if host not in completed_hosts: return host return None def infer_metadata() -> Dict[str, str]: return { "template": os.environ.get("ATVM_WATCHER_TEMPLATE", "unknown"), "config_family": os.environ.get("ATVM_WATCHER_CONFIG_FAMILY", "unknown"), "migration_style": os.environ.get("ATVM_WATCHER_MIGRATION_STYLE", "ATVM automation validation"), "integration_plugin": os.environ.get("ATVM_WATCHER_INTEGRATION_PLUGIN", "unknown"), "scope_description": os.environ.get("ATVM_WATCHER_SCOPE_DESCRIPTION", "requested ATVM run scope"), } def format_duration(seconds: Optional[float]) -> str: if seconds is None: return "n/a" minutes, secs = divmod(seconds, 60) hours, minutes = divmod(int(minutes), 60) if hours: return f"{hours}h {minutes:02d}m {secs:05.2f}s" if minutes: return f"{minutes}m {secs:05.2f}s" return f"{secs:.3f}s" def format_timestamp_local(ts: Optional[datetime]) -> str: if not ts: return "n/a" local = ts.astimezone() return local.strftime("%Y-%m-%d %H:%M:%S %Z") def build_status_markdown( build_name: str, metadata: Dict[str, str], host_results: Dict[str, HostResult], run_state: str, currents_url: Optional[str], start_ts: Optional[datetime], end_ts: Optional[datetime], notes: List[str], ) -> str: ordered_hosts = list(host_results.values()) finished = len([h for h in ordered_hosts if h.status in {"PASS", "FAIL"}]) passed = len([h for h in ordered_hosts if h.status == "PASS"]) failed = len([h for h in ordered_hosts if h.status == "FAIL"]) skipped = len([h for h in ordered_hosts if h.status == "SKIP"]) durations = [h.duration_seconds for h in ordered_hosts if h.duration_seconds is not None] quickest = min((h for h in ordered_hosts if h.duration_seconds is not None), key=lambda h: h.duration_seconds, default=None) longest = max((h for h in ordered_hosts if h.duration_seconds is not None), key=lambda h: h.duration_seconds, default=None) average = (sum(durations) / len(durations)) if durations else None host_lines = ["| Host | Kernel | Status | Detail |", "| --- | --- | --- | --- |"] for host in ordered_hosts: icon = { "PASS": "✅ PASS", "FAIL": "⚠️ FAIL", "RUN": "⏳ RUN", "SKIP": "⏭️ SKIP", "NOT STARTED": "⏳ RUN", }.get(host.status, host.status) host_lines.append(f"| {host.host} | {host.kernel} | {icon} | {host.detail} |") if currents_url: notes = notes + [f"Currents recorded run: `{currents_url}`"] notes_block = "\n".join(f"- {note}" for note in notes) if notes else "- none" lines = [ "## ATVM Run Status", f"### {build_name}", "", "**COVERAGE:**", f"- template: `{metadata['template']}`", f"- datastore/config family: `{metadata['config_family']}`", f"- migration style: {metadata['migration_style']}", f"- integration/plugin path: `{metadata['integration_plugin']}`", f"- scope of this run: {metadata['scope_description']}", "", "**FUNCTIONALLY:**", "- verify VM setup and power state", "- power on, obtain IP address, and verify hostname reachability", "- uninstall existing CMC if present", "- prepare source and destination disks and validate source-side data", "- install CMC and execute the requested ATVM migration workflow", "- finalize reporting, cleanup, and the final `check-xml-files.ts` validation step", "", "**SUMMARY:**", "", "| Metric | Value |", "| --- | --- |", f"| finished | {finished} |", f"| passed | {passed} |", f"| failed | {failed} |", f"| skipped | {skipped} |", "", "**HOSTS:**", "", *host_lines, "", "**TIMING:**", "", "| Metric | Value |", "| --- | --- |", f"| start | {format_timestamp_local(start_ts)} |", f"| end | {format_timestamp_local(end_ts)} |", f"| total | {format_duration((end_ts - start_ts).total_seconds()) if start_ts and end_ts else 'n/a'} |", f"| quickest | {f'{quickest.host} - {format_duration(quickest.duration_seconds)}' if quickest else 'n/a'} |", f"| longest | {f'{longest.host} - {format_duration(longest.duration_seconds)}' if longest else 'n/a'} |", f"| average | {format_duration(average) if average is not None else 'n/a'} |", "", "**NOTES:**", notes_block, ] return "\n".join(lines) def post_to_mattermost(text: str) -> str: webhook = os.environ["MATTERMOST_ATVM_WEBHOOK"] payload = {"text": text} channel = os.environ.get("MATTERMOST_ATVM_CHANNEL") if channel: payload["channel"] = channel data = json.dumps(payload).encode() request = urllib.request.Request(webhook, data=data, headers={"Content-Type": "application/json"}) with urllib.request.urlopen(request) as response: return response.read().decode().strip() def determine_state( build_name: str, build_dir: Path, run_log: Path, reporter_root: Path, inventory: Dict[str, str], started_at: datetime, process_gone_since: Optional[datetime], process_exit_grace_seconds: int, ) -> Tuple[str, Dict[str, HostResult], str, Optional[datetime], Optional[datetime], Optional[str], List[str]]: cancelled_marker = build_dir / "cancelled.marker" log_text = read_text(run_log) expected_hosts = extract_expected_hosts(log_text) host_results = collect_host_results(reporter_root, expected_hosts, inventory, started_at) active = process_active(build_name) currents_url = extract_currents_url(log_text) notes: List[str] = [] current_host = find_current_running_host(log_text, list(host_results.keys())) if current_host and current_host not in host_results: host_results[current_host] = HostResult( host=current_host, kernel=inventory.get(current_host, "unknown"), status="RUN", detail="in progress", ) start_candidates = [result.timestamp for result in host_results.values() if result.timestamp] end_candidates = [result.timestamp for result in host_results.values() if result.timestamp] check_xml = reporter_root / "xml" for xml_path in sorted(check_xml.glob("test-result-*.xml"), key=lambda p: p.stat().st_mtime, reverse=True): xml_mtime = datetime.fromtimestamp(xml_path.stat().st_mtime, tz=timezone.utc) if xml_mtime < started_at: continue text = read_text(xml_path) if "check-xml-files.ts" in text: try: tree = ET.parse(xml_path) root = tree.getroot() suite = root.find("testsuite") if suite is not None: ts = parse_xml_timestamp(suite.attrib.get("timestamp")) if ts: end_candidates.append(ts) except ET.ParseError: pass break start_ts = min(start_candidates) if start_candidates else started_at end_ts = max(end_candidates) if end_candidates else None if cancelled_marker.exists(): notes.append("Cancellation marker detected.") return "CANCELLED", host_results, log_text, start_ts, end_ts, currents_url, notes if active: elapsed = (now_utc() - started_at).total_seconds() if elapsed > args.max_watch_seconds: notes.append("Watcher exceeded max watch duration while the run still appears active.") return "HUNG", host_results, log_text, start_ts, end_ts, currents_url, notes return "RUNNING", host_results, log_text, start_ts, end_ts, currents_url, notes if "Cloud Run Finished" in log_text or currents_url: state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED" notes.append("Run finished and final reporting artifacts were detected.") if any("check-xml-files.ts" in line for line in log_text.splitlines()): notes.append("Final `check-xml-files.ts` validation passed.") return state, host_results, log_text, start_ts, end_ts, currents_url, notes if process_gone_since and (now_utc() - process_gone_since).total_seconds() >= process_exit_grace_seconds: notes.append("Run process exited without a clean completion signal.") return "TERMINATED", host_results, log_text, start_ts, end_ts, currents_url, notes return "RUNNING", host_results, log_text, start_ts, end_ts, currents_url, notes if __name__ == "__main__": args = parse_args() build_name = args.build_name run_log = Path(args.run_log) reporter_root = Path(args.reporter_root) inventory_file = Path(args.inventory_file) state_root = Path(args.state_dir) build_dir = state_root / build_name ensure_dir(build_dir) state_file = build_dir / "state.json" posted_marker = build_dir / "posted.marker" inventory = load_inventory(inventory_file) metadata = infer_metadata() state = load_state(state_file) default_started_at = datetime.fromtimestamp(run_log.stat().st_mtime, tz=timezone.utc) if run_log.exists() else now_utc() started_at = parse_xml_timestamp(state.get("started_at")) or default_started_at state.setdefault("build_name", build_name) state.setdefault("started_at", started_at.isoformat()) write_state(state_file, state) process_gone_since: Optional[datetime] = None while True: active = process_active(build_name) if not active and process_gone_since is None: process_gone_since = now_utc() if active: process_gone_since = None run_state, host_results, log_text, start_ts, end_ts, currents_url, notes = determine_state( build_name=build_name, build_dir=build_dir, run_log=run_log, reporter_root=reporter_root, inventory=inventory, started_at=started_at, process_gone_since=process_gone_since, process_exit_grace_seconds=args.process_exit_grace_seconds, ) state["last_state"] = run_state state["last_seen_at"] = now_utc().isoformat() state["host_results"] = { host: { "status": result.status, "detail": result.detail, "kernel": result.kernel, "tests": result.tests, "failures": result.failures, } for host, result in host_results.items() } write_state(state_file, state) if run_state == "RUNNING": print(f"[watcher] {build_name}: RUNNING") time.sleep(args.poll_interval) continue status_text = build_status_markdown( build_name=build_name, metadata=metadata, host_results=dict(sorted(host_results.items())), run_state=run_state, currents_url=currents_url, start_ts=start_ts, end_ts=end_ts, notes=notes, ) print(status_text) if run_state in {"COMPLETED", "FAILED"} and not posted_marker.exists(): response = post_to_mattermost(status_text) if response != "ok": raise SystemExit(f"Mattermost webhook did not return ok: {response!r}") posted_marker.write_text("ok\n", encoding="utf-8") state["mattermost_posted"] = True state["mattermost_response"] = response write_state(state_file, state) print(f"[watcher] Mattermost post confirmed for {build_name}.") state["closed_at"] = now_utc().isoformat() write_state(state_file, state) sys.exit(0)