- add the per-run ATVM watcher service package under atvm/watcher-service, including the Python watcher, systemd template unit, helper scripts, and deployment docs - document the watcher-service install and operating model, including one-run-per-instance behavior, Mattermost posting rules, and the best-practice /opt/atvm-watcher-service install path - clarify ATVM run approval semantics so `approve` means run without watcher and `approve with watcher` means run and start the watcher - update the ATVM automation guide and AGENTS rules so watcher usage and approval behavior are explicit and consistent
513 lines
18 KiB
Python
513 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import ast
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
import xml.etree.ElementTree as ET
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
|
|
RUN_STATES = {
|
|
"COMPLETED",
|
|
"FAILED",
|
|
"CANCELLED",
|
|
"TERMINATED",
|
|
"HUNG",
|
|
"UNKNOWN",
|
|
"RUNNING",
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class HostResult:
|
|
host: str
|
|
kernel: str
|
|
status: str
|
|
detail: str
|
|
tests: int = 0
|
|
failures: int = 0
|
|
duration_seconds: Optional[float] = None
|
|
timestamp: Optional[datetime] = None
|
|
|
|
|
|
def now_utc() -> datetime:
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--build-name", required=True)
|
|
parser.add_argument("--run-log", required=True)
|
|
parser.add_argument("--reporter-root", required=True)
|
|
parser.add_argument("--inventory-file", required=True)
|
|
parser.add_argument("--state-dir", required=True)
|
|
parser.add_argument("--poll-interval", type=int, default=30)
|
|
parser.add_argument("--max-watch-seconds", type=int, default=6 * 60 * 60)
|
|
parser.add_argument("--process-exit-grace-seconds", type=int, default=120)
|
|
return parser.parse_args()
|
|
|
|
|
|
def ensure_dir(path: Path) -> None:
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def load_inventory(path: Path) -> Dict[str, str]:
|
|
kernels: Dict[str, str] = {}
|
|
if not path.exists():
|
|
return kernels
|
|
for line in path.read_text(encoding="utf-8").splitlines():
|
|
if not line.startswith("|"):
|
|
continue
|
|
parts = [part.strip() for part in line.strip().strip("|").split("|")]
|
|
if len(parts) < 3 or parts[0] in {"OS", "---"}:
|
|
continue
|
|
host = parts[1]
|
|
kernel = parts[2]
|
|
kernels[host] = kernel or "unknown"
|
|
return kernels
|
|
|
|
|
|
def run_ps() -> str:
|
|
proc = subprocess.run(
|
|
["ps", "-eo", "pid=,args="],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
return proc.stdout
|
|
|
|
|
|
def process_active(build_name: str) -> bool:
|
|
output = run_ps()
|
|
for line in output.splitlines():
|
|
if "run-sorry-cypress.py" in line and f"--build_name {build_name}" in line:
|
|
return True
|
|
return False
|
|
|
|
|
|
def read_text(path: Path) -> str:
|
|
try:
|
|
return path.read_text(encoding="utf-8", errors="replace")
|
|
except FileNotFoundError:
|
|
return ""
|
|
|
|
|
|
def extract_expected_hosts(log_text: str) -> List[str]:
|
|
hosts: List[str] = []
|
|
spec_match = re.search(r'Extracted specPattern:\s*(\[[^\n]+\])', log_text)
|
|
if spec_match:
|
|
try:
|
|
spec_list = ast.literal_eval(spec_match.group(1))
|
|
except (SyntaxError, ValueError):
|
|
spec_list = []
|
|
for entry in spec_list:
|
|
if not isinstance(entry, str):
|
|
continue
|
|
match = re.search(r"(atvm[^/\s]+)\.ts$", entry)
|
|
if match:
|
|
host = match.group(1)
|
|
if host not in hosts:
|
|
hosts.append(host)
|
|
for match in re.finditer(r"Running:\s+(?:cypress/cmcRegressionTest/)?(atvm[^/\s]+)\.ts", log_text):
|
|
host = match.group(1)
|
|
if host not in hosts:
|
|
hosts.append(host)
|
|
return hosts
|
|
|
|
|
|
def extract_currents_url(log_text: str) -> Optional[str]:
|
|
match = re.search(r"(https://\S+/run/\S+)", log_text)
|
|
return match.group(1) if match else None
|
|
|
|
|
|
def load_state(state_file: Path) -> Dict[str, object]:
|
|
if not state_file.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(state_file.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
|
|
|
|
def write_state(state_file: Path, state: Dict[str, object]) -> None:
|
|
state_file.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8")
|
|
|
|
|
|
def parse_xml_timestamp(raw: Optional[str]) -> Optional[datetime]:
|
|
if not raw:
|
|
return None
|
|
try:
|
|
return datetime.fromisoformat(raw).replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def parse_host_xml(xml_path: Path) -> Optional[Tuple[str, HostResult]]:
|
|
try:
|
|
tree = ET.parse(xml_path)
|
|
except ET.ParseError:
|
|
return None
|
|
root = tree.getroot()
|
|
suites = root.findall("testsuite")
|
|
file_name = None
|
|
tests = int(float(root.attrib.get("tests", "0")))
|
|
failures = int(float(root.attrib.get("failures", "0")))
|
|
total_time = float(root.attrib.get("time", "0"))
|
|
timestamp = None
|
|
for suite in suites:
|
|
file_attr = suite.attrib.get("file", "")
|
|
if file_attr.startswith("cypress/cmcRegressionTest/atvm") and file_attr.endswith(".ts"):
|
|
file_name = Path(file_attr).stem
|
|
timestamp = parse_xml_timestamp(suite.attrib.get("timestamp"))
|
|
tests = int(float(suite.attrib.get("tests", root.attrib.get("tests", "0"))))
|
|
failures = int(float(suite.attrib.get("failures", root.attrib.get("failures", "0"))))
|
|
total_time = float(suite.attrib.get("time", root.attrib.get("time", "0")))
|
|
break
|
|
if not file_name:
|
|
return None
|
|
detail = f"{tests} tests, {failures} failures"
|
|
status = "FAIL" if failures else "PASS"
|
|
return file_name, HostResult(
|
|
host=file_name,
|
|
kernel="unknown",
|
|
status=status,
|
|
detail=detail,
|
|
tests=tests,
|
|
failures=failures,
|
|
duration_seconds=total_time,
|
|
timestamp=timestamp,
|
|
)
|
|
|
|
|
|
def collect_host_results(
|
|
reporter_root: Path,
|
|
expected_hosts: List[str],
|
|
kernels: Dict[str, str],
|
|
run_started_at: datetime,
|
|
) -> Dict[str, HostResult]:
|
|
xml_dir = reporter_root / "xml"
|
|
results: Dict[str, HostResult] = {}
|
|
if not xml_dir.exists():
|
|
return results
|
|
for xml_path in sorted(xml_dir.glob("test-result-*.xml"), key=lambda p: p.stat().st_mtime):
|
|
xml_mtime = datetime.fromtimestamp(xml_path.stat().st_mtime, tz=timezone.utc)
|
|
if xml_mtime < run_started_at:
|
|
continue
|
|
parsed = parse_host_xml(xml_path)
|
|
if not parsed:
|
|
continue
|
|
host, result = parsed
|
|
if expected_hosts and host not in expected_hosts:
|
|
continue
|
|
result.kernel = kernels.get(host, "unknown")
|
|
results[host] = result
|
|
return results
|
|
|
|
|
|
def find_current_running_host(log_text: str, completed_hosts: List[str]) -> Optional[str]:
|
|
matches = re.findall(r"Running:\s+(?:cypress/cmcRegressionTest/)?(atvm[^/\s]+)\.ts", log_text)
|
|
for host in reversed(matches):
|
|
if host not in completed_hosts:
|
|
return host
|
|
return None
|
|
|
|
|
|
def infer_metadata() -> Dict[str, str]:
|
|
return {
|
|
"template": os.environ.get("ATVM_WATCHER_TEMPLATE", "unknown"),
|
|
"config_family": os.environ.get("ATVM_WATCHER_CONFIG_FAMILY", "unknown"),
|
|
"migration_style": os.environ.get("ATVM_WATCHER_MIGRATION_STYLE", "ATVM automation validation"),
|
|
"integration_plugin": os.environ.get("ATVM_WATCHER_INTEGRATION_PLUGIN", "unknown"),
|
|
"scope_description": os.environ.get("ATVM_WATCHER_SCOPE_DESCRIPTION", "requested ATVM run scope"),
|
|
}
|
|
|
|
|
|
def format_duration(seconds: Optional[float]) -> str:
|
|
if seconds is None:
|
|
return "n/a"
|
|
minutes, secs = divmod(seconds, 60)
|
|
hours, minutes = divmod(int(minutes), 60)
|
|
if hours:
|
|
return f"{hours}h {minutes:02d}m {secs:05.2f}s"
|
|
if minutes:
|
|
return f"{minutes}m {secs:05.2f}s"
|
|
return f"{secs:.3f}s"
|
|
|
|
|
|
def format_timestamp_local(ts: Optional[datetime]) -> str:
|
|
if not ts:
|
|
return "n/a"
|
|
local = ts.astimezone()
|
|
return local.strftime("%Y-%m-%d %H:%M:%S %Z")
|
|
|
|
|
|
def build_status_markdown(
|
|
build_name: str,
|
|
metadata: Dict[str, str],
|
|
host_results: Dict[str, HostResult],
|
|
run_state: str,
|
|
currents_url: Optional[str],
|
|
start_ts: Optional[datetime],
|
|
end_ts: Optional[datetime],
|
|
notes: List[str],
|
|
) -> str:
|
|
ordered_hosts = list(host_results.values())
|
|
finished = len([h for h in ordered_hosts if h.status in {"PASS", "FAIL"}])
|
|
passed = len([h for h in ordered_hosts if h.status == "PASS"])
|
|
failed = len([h for h in ordered_hosts if h.status == "FAIL"])
|
|
skipped = len([h for h in ordered_hosts if h.status == "SKIP"])
|
|
durations = [h.duration_seconds for h in ordered_hosts if h.duration_seconds is not None]
|
|
quickest = min((h for h in ordered_hosts if h.duration_seconds is not None), key=lambda h: h.duration_seconds, default=None)
|
|
longest = max((h for h in ordered_hosts if h.duration_seconds is not None), key=lambda h: h.duration_seconds, default=None)
|
|
average = (sum(durations) / len(durations)) if durations else None
|
|
|
|
host_lines = ["| Host | Kernel | Status | Detail |", "| --- | --- | --- | --- |"]
|
|
for host in ordered_hosts:
|
|
icon = {
|
|
"PASS": "✅ PASS",
|
|
"FAIL": "⚠️ FAIL",
|
|
"RUN": "⏳ RUN",
|
|
"SKIP": "⏭️ SKIP",
|
|
"NOT STARTED": "⏳ RUN",
|
|
}.get(host.status, host.status)
|
|
host_lines.append(f"| {host.host} | {host.kernel} | {icon} | {host.detail} |")
|
|
|
|
if currents_url:
|
|
notes = notes + [f"Currents recorded run: `{currents_url}`"]
|
|
|
|
notes_block = "\n".join(f"- {note}" for note in notes) if notes else "- none"
|
|
|
|
lines = [
|
|
"## ATVM Run Status",
|
|
f"### {build_name}",
|
|
"",
|
|
"**COVERAGE:**",
|
|
f"- template: `{metadata['template']}`",
|
|
f"- datastore/config family: `{metadata['config_family']}`",
|
|
f"- migration style: {metadata['migration_style']}",
|
|
f"- integration/plugin path: `{metadata['integration_plugin']}`",
|
|
f"- scope of this run: {metadata['scope_description']}",
|
|
"",
|
|
"**FUNCTIONALLY:**",
|
|
"- verify VM setup and power state",
|
|
"- power on, obtain IP address, and verify hostname reachability",
|
|
"- uninstall existing CMC if present",
|
|
"- prepare source and destination disks and validate source-side data",
|
|
"- install CMC and execute the requested ATVM migration workflow",
|
|
"- finalize reporting, cleanup, and the final `check-xml-files.ts` validation step",
|
|
"",
|
|
"**SUMMARY:**",
|
|
"",
|
|
"| Metric | Value |",
|
|
"| --- | --- |",
|
|
f"| finished | {finished} |",
|
|
f"| passed | {passed} |",
|
|
f"| failed | {failed} |",
|
|
f"| skipped | {skipped} |",
|
|
"",
|
|
"**HOSTS:**",
|
|
"",
|
|
*host_lines,
|
|
"",
|
|
"**TIMING:**",
|
|
"",
|
|
"| Metric | Value |",
|
|
"| --- | --- |",
|
|
f"| start | {format_timestamp_local(start_ts)} |",
|
|
f"| end | {format_timestamp_local(end_ts)} |",
|
|
f"| total | {format_duration((end_ts - start_ts).total_seconds()) if start_ts and end_ts else 'n/a'} |",
|
|
f"| quickest | {f'{quickest.host} - {format_duration(quickest.duration_seconds)}' if quickest else 'n/a'} |",
|
|
f"| longest | {f'{longest.host} - {format_duration(longest.duration_seconds)}' if longest else 'n/a'} |",
|
|
f"| average | {format_duration(average) if average is not None else 'n/a'} |",
|
|
"",
|
|
"**NOTES:**",
|
|
notes_block,
|
|
]
|
|
return "\n".join(lines)
|
|
|
|
|
|
def post_to_mattermost(text: str) -> str:
|
|
webhook = os.environ["MATTERMOST_ATVM_WEBHOOK"]
|
|
payload = {"text": text}
|
|
channel = os.environ.get("MATTERMOST_ATVM_CHANNEL")
|
|
if channel:
|
|
payload["channel"] = channel
|
|
data = json.dumps(payload).encode()
|
|
request = urllib.request.Request(webhook, data=data, headers={"Content-Type": "application/json"})
|
|
with urllib.request.urlopen(request) as response:
|
|
return response.read().decode().strip()
|
|
|
|
|
|
def determine_state(
|
|
build_name: str,
|
|
build_dir: Path,
|
|
run_log: Path,
|
|
reporter_root: Path,
|
|
inventory: Dict[str, str],
|
|
started_at: datetime,
|
|
process_gone_since: Optional[datetime],
|
|
process_exit_grace_seconds: int,
|
|
) -> Tuple[str, Dict[str, HostResult], str, Optional[datetime], Optional[datetime], Optional[str], List[str]]:
|
|
cancelled_marker = build_dir / "cancelled.marker"
|
|
log_text = read_text(run_log)
|
|
expected_hosts = extract_expected_hosts(log_text)
|
|
host_results = collect_host_results(reporter_root, expected_hosts, inventory, started_at)
|
|
active = process_active(build_name)
|
|
currents_url = extract_currents_url(log_text)
|
|
notes: List[str] = []
|
|
|
|
current_host = find_current_running_host(log_text, list(host_results.keys()))
|
|
if current_host and current_host not in host_results:
|
|
host_results[current_host] = HostResult(
|
|
host=current_host,
|
|
kernel=inventory.get(current_host, "unknown"),
|
|
status="RUN",
|
|
detail="in progress",
|
|
)
|
|
|
|
start_candidates = [result.timestamp for result in host_results.values() if result.timestamp]
|
|
end_candidates = [result.timestamp for result in host_results.values() if result.timestamp]
|
|
check_xml = reporter_root / "xml"
|
|
for xml_path in sorted(check_xml.glob("test-result-*.xml"), key=lambda p: p.stat().st_mtime, reverse=True):
|
|
xml_mtime = datetime.fromtimestamp(xml_path.stat().st_mtime, tz=timezone.utc)
|
|
if xml_mtime < started_at:
|
|
continue
|
|
text = read_text(xml_path)
|
|
if "check-xml-files.ts" in text:
|
|
try:
|
|
tree = ET.parse(xml_path)
|
|
root = tree.getroot()
|
|
suite = root.find("testsuite")
|
|
if suite is not None:
|
|
ts = parse_xml_timestamp(suite.attrib.get("timestamp"))
|
|
if ts:
|
|
end_candidates.append(ts)
|
|
except ET.ParseError:
|
|
pass
|
|
break
|
|
|
|
start_ts = min(start_candidates) if start_candidates else started_at
|
|
end_ts = max(end_candidates) if end_candidates else None
|
|
|
|
if cancelled_marker.exists():
|
|
notes.append("Cancellation marker detected.")
|
|
return "CANCELLED", host_results, log_text, start_ts, end_ts, currents_url, notes
|
|
|
|
if active:
|
|
elapsed = (now_utc() - started_at).total_seconds()
|
|
if elapsed > args.max_watch_seconds:
|
|
notes.append("Watcher exceeded max watch duration while the run still appears active.")
|
|
return "HUNG", host_results, log_text, start_ts, end_ts, currents_url, notes
|
|
return "RUNNING", host_results, log_text, start_ts, end_ts, currents_url, notes
|
|
|
|
if "Cloud Run Finished" in log_text or currents_url:
|
|
state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED"
|
|
notes.append("Run finished and final reporting artifacts were detected.")
|
|
if any("check-xml-files.ts" in line for line in log_text.splitlines()):
|
|
notes.append("Final `check-xml-files.ts` validation passed.")
|
|
return state, host_results, log_text, start_ts, end_ts, currents_url, notes
|
|
|
|
if process_gone_since and (now_utc() - process_gone_since).total_seconds() >= process_exit_grace_seconds:
|
|
notes.append("Run process exited without a clean completion signal.")
|
|
return "TERMINATED", host_results, log_text, start_ts, end_ts, currents_url, notes
|
|
|
|
return "RUNNING", host_results, log_text, start_ts, end_ts, currents_url, notes
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_args()
|
|
build_name = args.build_name
|
|
run_log = Path(args.run_log)
|
|
reporter_root = Path(args.reporter_root)
|
|
inventory_file = Path(args.inventory_file)
|
|
state_root = Path(args.state_dir)
|
|
build_dir = state_root / build_name
|
|
ensure_dir(build_dir)
|
|
state_file = build_dir / "state.json"
|
|
posted_marker = build_dir / "posted.marker"
|
|
|
|
inventory = load_inventory(inventory_file)
|
|
metadata = infer_metadata()
|
|
|
|
state = load_state(state_file)
|
|
default_started_at = datetime.fromtimestamp(run_log.stat().st_mtime, tz=timezone.utc) if run_log.exists() else now_utc()
|
|
started_at = parse_xml_timestamp(state.get("started_at")) or default_started_at
|
|
state.setdefault("build_name", build_name)
|
|
state.setdefault("started_at", started_at.isoformat())
|
|
write_state(state_file, state)
|
|
|
|
process_gone_since: Optional[datetime] = None
|
|
|
|
while True:
|
|
active = process_active(build_name)
|
|
if not active and process_gone_since is None:
|
|
process_gone_since = now_utc()
|
|
if active:
|
|
process_gone_since = None
|
|
|
|
run_state, host_results, log_text, start_ts, end_ts, currents_url, notes = determine_state(
|
|
build_name=build_name,
|
|
build_dir=build_dir,
|
|
run_log=run_log,
|
|
reporter_root=reporter_root,
|
|
inventory=inventory,
|
|
started_at=started_at,
|
|
process_gone_since=process_gone_since,
|
|
process_exit_grace_seconds=args.process_exit_grace_seconds,
|
|
)
|
|
|
|
state["last_state"] = run_state
|
|
state["last_seen_at"] = now_utc().isoformat()
|
|
state["host_results"] = {
|
|
host: {
|
|
"status": result.status,
|
|
"detail": result.detail,
|
|
"kernel": result.kernel,
|
|
"tests": result.tests,
|
|
"failures": result.failures,
|
|
}
|
|
for host, result in host_results.items()
|
|
}
|
|
write_state(state_file, state)
|
|
|
|
if run_state == "RUNNING":
|
|
print(f"[watcher] {build_name}: RUNNING")
|
|
time.sleep(args.poll_interval)
|
|
continue
|
|
|
|
status_text = build_status_markdown(
|
|
build_name=build_name,
|
|
metadata=metadata,
|
|
host_results=dict(sorted(host_results.items())),
|
|
run_state=run_state,
|
|
currents_url=currents_url,
|
|
start_ts=start_ts,
|
|
end_ts=end_ts,
|
|
notes=notes,
|
|
)
|
|
print(status_text)
|
|
|
|
if run_state in {"COMPLETED", "FAILED"} and not posted_marker.exists():
|
|
response = post_to_mattermost(status_text)
|
|
if response != "ok":
|
|
raise SystemExit(f"Mattermost webhook did not return ok: {response!r}")
|
|
posted_marker.write_text("ok\n", encoding="utf-8")
|
|
state["mattermost_posted"] = True
|
|
state["mattermost_response"] = response
|
|
write_state(state_file, state)
|
|
print(f"[watcher] Mattermost post confirmed for {build_name}.")
|
|
|
|
state["closed_at"] = now_utc().isoformat()
|
|
write_state(state_file, state)
|
|
sys.exit(0)
|