- update the watcher to stop trusting misleading categorized child build labels when they do not match the host/spec actually being executed - infer the reported categorized group name from the actual host being run, so mismatched labels like ubuntu-batch for a Red Hat host are corrected in status reporting - document the categorized watcher workaround in the ATVM guide, watcher design, and watcher README without changing the underlying ATVM runner scripts
1065 lines
42 KiB
Python
1065 lines
42 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import ast
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
import xml.etree.ElementTree as ET
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
|
|
RUN_STATES = {
|
|
"COMPLETED",
|
|
"FAILED",
|
|
"CANCELLED",
|
|
"TERMINATED",
|
|
"HUNG",
|
|
"UNKNOWN",
|
|
"RUNNING",
|
|
}
|
|
|
|
LOCAL_TZ = datetime.now().astimezone().tzinfo or timezone.utc
|
|
|
|
|
|
@dataclass
|
|
class HostResult:
|
|
host: str
|
|
kernel: str
|
|
status: str
|
|
detail: str
|
|
tests: int = 0
|
|
failures: int = 0
|
|
duration_seconds: Optional[float] = None
|
|
timestamp: Optional[datetime] = None
|
|
|
|
|
|
@dataclass
|
|
class SubRun:
|
|
key: str
|
|
display_name: str
|
|
started_at: datetime
|
|
expected_hosts: List[str]
|
|
completed: bool
|
|
currents_url: Optional[str]
|
|
notes: List[str]
|
|
|
|
|
|
def now_utc() -> datetime:
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--build-name", required=True)
|
|
parser.add_argument("--run-log", required=True)
|
|
parser.add_argument("--reporter-root", required=True)
|
|
parser.add_argument("--inventory-file", required=True)
|
|
parser.add_argument("--state-dir", required=True)
|
|
parser.add_argument("--poll-interval", type=int, default=30)
|
|
parser.add_argument("--max-watch-seconds", type=int, default=6 * 60 * 60)
|
|
parser.add_argument("--process-exit-grace-seconds", type=int, default=120)
|
|
return parser.parse_args()
|
|
|
|
|
|
def ensure_dir(path: Path) -> None:
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def load_inventory(path: Path) -> Dict[str, str]:
|
|
kernels: Dict[str, str] = {}
|
|
if not path.exists():
|
|
return kernels
|
|
for line in path.read_text(encoding="utf-8").splitlines():
|
|
if not line.startswith("|"):
|
|
continue
|
|
parts = [part.strip() for part in line.strip().strip("|").split("|")]
|
|
if len(parts) < 3 or parts[0] in {"OS", "---"}:
|
|
continue
|
|
host = parts[1]
|
|
kernel = parts[2]
|
|
kernels[host] = kernel or "unknown"
|
|
return kernels
|
|
|
|
|
|
def run_ps() -> str:
|
|
proc = subprocess.run(
|
|
["ps", "-eo", "pid=,args="],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
return proc.stdout
|
|
|
|
|
|
def process_active(build_name: str) -> bool:
|
|
output = run_ps()
|
|
for line in output.splitlines():
|
|
if "run-sorry-cypress.py" in line and f"--build_name {build_name}" in line:
|
|
return True
|
|
return False
|
|
|
|
|
|
def related_process_active(build_name: str) -> bool:
|
|
output = run_ps()
|
|
for line in output.splitlines():
|
|
if build_name not in line:
|
|
continue
|
|
if any(token in line for token in ("run-sorry-cypress.py", "cypress-cloud", "node ")):
|
|
return True
|
|
return False
|
|
|
|
|
|
def extract_active_subrun_build(build_name: str) -> Optional[str]:
|
|
output = run_ps()
|
|
matches: List[str] = []
|
|
for line in output.splitlines():
|
|
if build_name not in line or "--ci-build-id" not in line:
|
|
continue
|
|
match = re.search(r"--ci-build-id\s+(\S+)", line)
|
|
if match:
|
|
matches.append(match.group(1))
|
|
return matches[-1] if matches else None
|
|
|
|
|
|
def read_text(path: Path) -> str:
|
|
try:
|
|
return path.read_text(encoding="utf-8", errors="replace")
|
|
except FileNotFoundError:
|
|
return ""
|
|
|
|
|
|
def extract_expected_hosts(log_text: str) -> List[str]:
|
|
hosts: List[str] = []
|
|
spec_match = re.search(r'Extracted specPattern:\s*(\[[^\n]+\])', log_text)
|
|
if spec_match:
|
|
try:
|
|
spec_list = ast.literal_eval(spec_match.group(1))
|
|
except (SyntaxError, ValueError):
|
|
spec_list = []
|
|
for entry in spec_list:
|
|
if not isinstance(entry, str):
|
|
continue
|
|
match = re.search(r"(atvm[^/\s]+)\.ts$", entry)
|
|
if match:
|
|
host = match.group(1)
|
|
if host not in hosts:
|
|
hosts.append(host)
|
|
for match in re.finditer(r"Running:\s+(?:cypress/cmcRegressionTest/)?(atvm[^/\s]+)\.ts", log_text):
|
|
host = match.group(1)
|
|
if host not in hosts:
|
|
hosts.append(host)
|
|
return hosts
|
|
|
|
|
|
def extract_currents_url(log_text: str) -> Optional[str]:
|
|
match = re.search(r"(https://\S+/run/\S+)", log_text)
|
|
return match.group(1) if match else None
|
|
|
|
|
|
def load_state(state_file: Path) -> Dict[str, object]:
|
|
if not state_file.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(state_file.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
|
|
|
|
def write_state(state_file: Path, state: Dict[str, object]) -> None:
|
|
state_file.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8")
|
|
|
|
|
|
def parse_xml_timestamp(raw: Optional[str]) -> Optional[datetime]:
|
|
if not raw:
|
|
return None
|
|
try:
|
|
return datetime.fromisoformat(raw).replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def parse_log_timestamp(raw: str) -> Optional[datetime]:
|
|
try:
|
|
return datetime.strptime(raw, "%Y-%m-%d %H:%M:%S,%f").replace(tzinfo=LOCAL_TZ).astimezone(timezone.utc)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def first_log_timestamp(log_text: str) -> Optional[datetime]:
|
|
match = re.search(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - INFO - ", log_text, re.M)
|
|
if not match:
|
|
return None
|
|
return parse_log_timestamp(match.group(1))
|
|
|
|
|
|
def parse_host_xml(xml_path: Path) -> Optional[Tuple[str, HostResult]]:
|
|
try:
|
|
tree = ET.parse(xml_path)
|
|
except ET.ParseError:
|
|
return None
|
|
root = tree.getroot()
|
|
suites = root.findall("testsuite")
|
|
best: Optional[Tuple[str, int, int, float, Optional[datetime]]] = None
|
|
for suite in suites:
|
|
file_attr = suite.attrib.get("file", "")
|
|
suite_name = suite.attrib.get("name", "")
|
|
host_from_file = None
|
|
host_from_name = None
|
|
if file_attr.startswith("cypress/cmcRegressionTest/atvm") and file_attr.endswith(".ts"):
|
|
host_from_file = Path(file_attr).stem
|
|
name_match = re.search(r"(atvm[^)\s]+)", suite_name)
|
|
if name_match:
|
|
host_from_name = name_match.group(1)
|
|
host_name = host_from_file or host_from_name
|
|
if not host_name:
|
|
continue
|
|
tests = int(float(suite.attrib.get("tests", root.attrib.get("tests", "0"))))
|
|
failures = int(float(suite.attrib.get("failures", root.attrib.get("failures", "0"))))
|
|
total_time = float(suite.attrib.get("time", root.attrib.get("time", "0")))
|
|
timestamp = parse_xml_timestamp(suite.attrib.get("timestamp"))
|
|
candidate = (host_name, tests, failures, total_time, timestamp)
|
|
if best is None:
|
|
best = candidate
|
|
continue
|
|
_, best_tests, _, best_total_time, _ = best
|
|
if tests > best_tests or (tests == best_tests and total_time >= best_total_time):
|
|
best = candidate
|
|
if not best:
|
|
return None
|
|
file_name, tests, failures, total_time, timestamp = best
|
|
detail = f"{tests} tests, {failures} failures"
|
|
status = "FAIL" if failures else "PASS"
|
|
return file_name, HostResult(
|
|
host=file_name,
|
|
kernel="unknown",
|
|
status=status,
|
|
detail=detail,
|
|
tests=tests,
|
|
failures=failures,
|
|
duration_seconds=total_time,
|
|
timestamp=timestamp,
|
|
)
|
|
|
|
|
|
def extract_check_xml_timestamp_from_file(xml_path: Path) -> Optional[datetime]:
|
|
try:
|
|
tree = ET.parse(xml_path)
|
|
except ET.ParseError:
|
|
return None
|
|
root = tree.getroot()
|
|
for suite in root.findall("testsuite"):
|
|
file_attr = suite.attrib.get("file", "")
|
|
if file_attr.endswith("check-xml-files.ts"):
|
|
return parse_xml_timestamp(suite.attrib.get("timestamp"))
|
|
return None
|
|
|
|
|
|
def parse_duration_seconds(raw: str) -> Optional[float]:
|
|
raw = " ".join(raw.split())
|
|
match = re.search(r"(?:(\d+)h\s+)?(?:(\d+)m\s+)?(\d+(?:\.\d+)?)(?:s)?", raw)
|
|
if not match:
|
|
return None
|
|
hours = int(match.group(1) or 0)
|
|
minutes = int(match.group(2) or 0)
|
|
seconds = float(match.group(3))
|
|
return hours * 3600 + minutes * 60 + seconds
|
|
|
|
|
|
def extract_completed_subrun_summaries(log_text: str, inventory: Dict[str, str]) -> List[Dict[str, object]]:
|
|
summaries: List[Dict[str, object]] = []
|
|
cloud_blocks = list(re.finditer(r"Cloud Run Finished(.*?)(?:🏁 Recorded Run:\s*(https://\S+))", log_text, re.S))
|
|
for block in cloud_blocks:
|
|
block_text = block.group(1)
|
|
currents_url = block.group(2)
|
|
normalized = re.sub(r"\n\s*│\s*s\s*│", "s", block_text)
|
|
host_match = re.search(
|
|
r"✔\s+(atvm[^\s]+)\.ts\s+([0-9hms.\s]+?)\s+(\d+)\s+(\d+)\s+([-\d]+)\s+([-\d]+)\s+([-\d]+)",
|
|
normalized,
|
|
re.S,
|
|
)
|
|
if not host_match:
|
|
continue
|
|
host = host_match.group(1)
|
|
duration_seconds = parse_duration_seconds(host_match.group(2))
|
|
tests = int(host_match.group(3))
|
|
passing = int(host_match.group(4))
|
|
failing = 0 if host_match.group(5) == "-" else int(host_match.group(5))
|
|
detail = f"{tests} tests, {failing} failures"
|
|
status = "FAIL" if failing else "PASS"
|
|
summaries.append(
|
|
{
|
|
"host_results": {
|
|
host: HostResult(
|
|
host=host,
|
|
kernel=inventory.get(host, "unknown"),
|
|
status=status,
|
|
detail=detail,
|
|
tests=tests,
|
|
failures=failing,
|
|
duration_seconds=duration_seconds,
|
|
)
|
|
},
|
|
"currents_url": currents_url,
|
|
}
|
|
)
|
|
return summaries
|
|
|
|
|
|
def collect_host_results(
|
|
reporter_root: Path,
|
|
expected_hosts: List[str],
|
|
kernels: Dict[str, str],
|
|
run_started_at: datetime,
|
|
run_ended_at: Optional[datetime] = None,
|
|
) -> Dict[str, HostResult]:
|
|
xml_dir = reporter_root / "xml"
|
|
results: Dict[str, HostResult] = {}
|
|
if not xml_dir.exists():
|
|
return results
|
|
for xml_path in sorted(xml_dir.glob("test-result-*.xml"), key=lambda p: p.stat().st_mtime):
|
|
xml_mtime = datetime.fromtimestamp(xml_path.stat().st_mtime, tz=timezone.utc)
|
|
if xml_mtime < run_started_at:
|
|
continue
|
|
if run_ended_at and xml_mtime >= run_ended_at:
|
|
continue
|
|
parsed = parse_host_xml(xml_path)
|
|
if not parsed:
|
|
continue
|
|
host, result = parsed
|
|
if expected_hosts and host not in expected_hosts:
|
|
continue
|
|
result.kernel = kernels.get(host, "unknown")
|
|
results[host] = result
|
|
return results
|
|
|
|
|
|
def find_check_xml_end(
|
|
reporter_root: Path,
|
|
started_at: datetime,
|
|
ended_at: Optional[datetime] = None,
|
|
) -> Optional[datetime]:
|
|
xml_dir = reporter_root / "xml"
|
|
if not xml_dir.exists():
|
|
return None
|
|
latest: Optional[datetime] = None
|
|
for xml_path in sorted(xml_dir.glob("test-result-*.xml"), key=lambda p: p.stat().st_mtime):
|
|
xml_mtime = datetime.fromtimestamp(xml_path.stat().st_mtime, tz=timezone.utc)
|
|
if xml_mtime < started_at:
|
|
continue
|
|
if ended_at and xml_mtime >= ended_at:
|
|
continue
|
|
text = read_text(xml_path)
|
|
if "check-xml-files.ts" not in text:
|
|
continue
|
|
try:
|
|
tree = ET.parse(xml_path)
|
|
root = tree.getroot()
|
|
suite = root.find("testsuite")
|
|
if suite is None:
|
|
continue
|
|
ts = parse_xml_timestamp(suite.attrib.get("timestamp"))
|
|
if ts:
|
|
latest = ts
|
|
except ET.ParseError:
|
|
continue
|
|
return latest
|
|
|
|
|
|
def find_current_running_host(log_text: str, completed_hosts: List[str]) -> Optional[str]:
|
|
matches = re.findall(r"Running:\s+(?:cypress/cmcRegressionTest/)?(atvm[^/\s]+)\.ts", log_text)
|
|
for host in reversed(matches):
|
|
if host not in completed_hosts:
|
|
return host
|
|
return None
|
|
|
|
|
|
def infer_host_from_subrun_build(
|
|
subrun_build: str,
|
|
expected_hosts: List[str],
|
|
completed_hosts: List[str],
|
|
) -> Optional[str]:
|
|
remaining_hosts = [host for host in expected_hosts if host not in completed_hosts]
|
|
lowered_build = subrun_build.lower()
|
|
for host in remaining_hosts:
|
|
short = host.split("-", 1)[-1].lower()
|
|
if short.startswith("w2k"):
|
|
if "windows" in lowered_build or "w2k" in lowered_build:
|
|
return host
|
|
elif short.startswith("redhat") and "redhat" in lowered_build:
|
|
return host
|
|
elif short.startswith("ubuntu") and "ubuntu" in lowered_build:
|
|
return host
|
|
elif short.startswith("oracle") and "oracle" in lowered_build:
|
|
return host
|
|
elif short.startswith("rocky") and "rocky" in lowered_build:
|
|
return host
|
|
elif short.startswith("debian") and "debian" in lowered_build:
|
|
return host
|
|
return remaining_hosts[0] if remaining_hosts else None
|
|
|
|
|
|
def infer_metadata() -> Dict[str, object]:
|
|
return {
|
|
"template": os.environ.get("ATVM_WATCHER_TEMPLATE", "unknown"),
|
|
"config_family": os.environ.get("ATVM_WATCHER_CONFIG_FAMILY", "unknown"),
|
|
"migration_style": os.environ.get("ATVM_WATCHER_MIGRATION_STYLE", "ATVM automation validation"),
|
|
"integration_plugin": os.environ.get("ATVM_WATCHER_INTEGRATION_PLUGIN", "unknown"),
|
|
"scope_description": os.environ.get("ATVM_WATCHER_SCOPE_DESCRIPTION", "requested ATVM run scope"),
|
|
"categorized": os.environ.get("ATVM_WATCHER_CATEGORIZED", "false").lower() == "true",
|
|
}
|
|
|
|
|
|
def format_duration(seconds: Optional[float]) -> str:
|
|
if seconds is None:
|
|
return "n/a"
|
|
minutes, secs = divmod(seconds, 60)
|
|
hours, minutes = divmod(int(minutes), 60)
|
|
if hours:
|
|
return f"{hours}h {minutes:02d}m {secs:05.2f}s"
|
|
if minutes:
|
|
return f"{minutes}m {secs:05.2f}s"
|
|
return f"{secs:.3f}s"
|
|
|
|
|
|
def format_timestamp_local(ts: Optional[datetime]) -> str:
|
|
if not ts:
|
|
return "n/a"
|
|
local = ts.astimezone()
|
|
return local.strftime("%Y-%m-%d %H:%M:%S %Z")
|
|
|
|
|
|
def build_status_markdown(
|
|
build_name: str,
|
|
metadata: Dict[str, object],
|
|
host_results: Dict[str, HostResult],
|
|
run_state: str,
|
|
currents_url: Optional[str],
|
|
start_ts: Optional[datetime],
|
|
end_ts: Optional[datetime],
|
|
notes: List[str],
|
|
) -> str:
|
|
ordered_hosts = list(host_results.values())
|
|
finished = len([h for h in ordered_hosts if h.status in {"PASS", "FAIL"}])
|
|
passed = len([h for h in ordered_hosts if h.status == "PASS"])
|
|
failed = len([h for h in ordered_hosts if h.status == "FAIL"])
|
|
skipped = len([h for h in ordered_hosts if h.status == "SKIP"])
|
|
durations = [h.duration_seconds for h in ordered_hosts if h.duration_seconds is not None]
|
|
quickest = min((h for h in ordered_hosts if h.duration_seconds is not None), key=lambda h: h.duration_seconds, default=None)
|
|
longest = max((h for h in ordered_hosts if h.duration_seconds is not None), key=lambda h: h.duration_seconds, default=None)
|
|
average = (sum(durations) / len(durations)) if durations else None
|
|
|
|
host_lines = ["| Host | Kernel | Status | Detail |", "| --- | --- | --- | --- |"]
|
|
for host in ordered_hosts:
|
|
icon = {
|
|
"PASS": "✅ PASS",
|
|
"FAIL": "⚠️ FAIL",
|
|
"RUN": "⏳ RUN",
|
|
"SKIP": "⏭️ SKIP",
|
|
"NOT STARTED": "⏳ RUN",
|
|
}.get(host.status, host.status)
|
|
host_lines.append(f"| {host.host} | {host.kernel} | {icon} | {host.detail} |")
|
|
|
|
if currents_url:
|
|
notes = notes + [f"Currents recorded run: `{currents_url}`"]
|
|
|
|
notes_block = "\n".join(f"- {note}" for note in notes) if notes else "- none"
|
|
|
|
lines = [
|
|
"## ATVM Run Status",
|
|
f"### {build_name}",
|
|
"",
|
|
"**COVERAGE:**",
|
|
f"- template: `{metadata['template']}`",
|
|
f"- datastore/config family: `{metadata['config_family']}`",
|
|
f"- migration style: {metadata['migration_style']}",
|
|
f"- integration/plugin path: `{metadata['integration_plugin']}`",
|
|
f"- scope of this run: {metadata['scope_description']}",
|
|
"",
|
|
"**FUNCTIONALLY:**",
|
|
"- verify VM setup and power state",
|
|
"- power on, obtain IP address, and verify hostname reachability",
|
|
"- uninstall existing CMC if present",
|
|
"- prepare source and destination disks and validate source-side data",
|
|
"- install CMC and execute the requested ATVM migration workflow",
|
|
"- finalize reporting, cleanup, and the final `check-xml-files.ts` validation step",
|
|
"",
|
|
"**SUMMARY:**",
|
|
"",
|
|
"| Metric | Value |",
|
|
"| --- | --- |",
|
|
f"| finished | {finished} |",
|
|
f"| passed | {passed} |",
|
|
f"| failed | {failed} |",
|
|
f"| skipped | {skipped} |",
|
|
"",
|
|
"**HOSTS:**",
|
|
"",
|
|
*host_lines,
|
|
"",
|
|
"**TIMING:**",
|
|
"",
|
|
"| Metric | Value |",
|
|
"| --- | --- |",
|
|
f"| start | {format_timestamp_local(start_ts)} |",
|
|
f"| end | {format_timestamp_local(end_ts)} |",
|
|
f"| total | {format_duration((end_ts - start_ts).total_seconds()) if start_ts and end_ts else 'n/a'} |",
|
|
f"| quickest | {f'{quickest.host} - {format_duration(quickest.duration_seconds)}' if quickest else 'n/a'} |",
|
|
f"| longest | {f'{longest.host} - {format_duration(longest.duration_seconds)}' if longest else 'n/a'} |",
|
|
f"| average | {format_duration(average) if average is not None else 'n/a'} |",
|
|
"",
|
|
"**NOTES:**",
|
|
notes_block,
|
|
]
|
|
return "\n".join(lines)
|
|
|
|
|
|
def post_to_mattermost(text: str) -> str:
|
|
webhook = os.environ["MATTERMOST_ATVM_WEBHOOK"]
|
|
payload = {"text": text}
|
|
channel = os.environ.get("MATTERMOST_ATVM_CHANNEL")
|
|
if channel:
|
|
payload["channel"] = channel
|
|
data = json.dumps(payload).encode()
|
|
request = urllib.request.Request(webhook, data=data, headers={"Content-Type": "application/json"})
|
|
with urllib.request.urlopen(request) as response:
|
|
return response.read().decode().strip()
|
|
|
|
|
|
def sanitize_key(raw: str) -> str:
|
|
return re.sub(r"[^A-Za-z0-9_.-]+", "-", raw).strip("-") or "subrun"
|
|
|
|
|
|
def infer_group_label(hosts: List[str], index: int) -> str:
|
|
if not hosts:
|
|
return f"group{index}"
|
|
labels: List[str] = []
|
|
for host in hosts:
|
|
short = host.split("-", 1)[-1]
|
|
if short.startswith("w2k"):
|
|
label = "windows"
|
|
else:
|
|
label = re.sub(r"\d.*$", "", short) or short
|
|
if label not in labels:
|
|
labels.append(label)
|
|
return "-".join(labels) if labels else f"group{index}"
|
|
|
|
|
|
def infer_group_from_host(host: str) -> str:
|
|
short = host.split("-", 1)[-1].lower()
|
|
if short.startswith("w2k"):
|
|
return "windows"
|
|
if short.startswith("amazonlinux"):
|
|
return "amazonlinux"
|
|
if short.startswith("centos"):
|
|
return "centos"
|
|
if short.startswith("ubuntu"):
|
|
return "ubuntu"
|
|
if short.startswith("rocky"):
|
|
return "rocky"
|
|
if short.startswith("redhat"):
|
|
return "redhat"
|
|
if short.startswith("oracle"):
|
|
return "oracle"
|
|
if short.startswith("fedora"):
|
|
return "fedora"
|
|
if short.startswith("debian"):
|
|
return "debian"
|
|
if short.startswith("suse"):
|
|
return "suse"
|
|
return short or "group"
|
|
|
|
|
|
def corrected_categorized_display_name(raw_display_name: str, hosts: List[str]) -> str:
|
|
if not hosts:
|
|
return raw_display_name
|
|
group = infer_group_from_host(hosts[0])
|
|
return re.sub(r"-(amazonlinux|centos|ubuntu|rocky|redhat|oracle|fedora|debian|suse|windows|w2k)-batch", f"-{group}-batch", raw_display_name)
|
|
|
|
|
|
def extract_segment_build_name(segment_text: str, parent_build_name: str) -> Optional[str]:
|
|
patterns = [
|
|
rf"({re.escape(parent_build_name)}-[A-Za-z0-9_.-]*batch\d+_\d+)",
|
|
r"([A-Za-z0-9_.-]+-batch\d+_\d+)",
|
|
]
|
|
for pattern in patterns:
|
|
match = re.search(pattern, segment_text)
|
|
if match:
|
|
return match.group(1)
|
|
return None
|
|
|
|
|
|
def split_log_segments(log_text: str, parent_build_name: str, categorized: bool, default_started_at: datetime) -> List[SubRun]:
|
|
if not categorized:
|
|
return [
|
|
SubRun(
|
|
key=sanitize_key(parent_build_name),
|
|
display_name=parent_build_name,
|
|
started_at=default_started_at,
|
|
expected_hosts=extract_expected_hosts(log_text),
|
|
completed=False,
|
|
currents_url=extract_currents_url(log_text),
|
|
notes=[],
|
|
)
|
|
]
|
|
|
|
segment_starts: List[Tuple[int, Optional[datetime]]] = []
|
|
for match in re.finditer(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - INFO - Extracted specPattern:", log_text, re.M):
|
|
segment_starts.append((match.start(), parse_log_timestamp(match.group(1))))
|
|
|
|
if not segment_starts:
|
|
return [
|
|
SubRun(
|
|
key=sanitize_key(parent_build_name),
|
|
display_name=parent_build_name,
|
|
started_at=default_started_at,
|
|
expected_hosts=extract_expected_hosts(log_text),
|
|
completed=False,
|
|
currents_url=extract_currents_url(log_text),
|
|
notes=["Categorized mode was requested but no sub-run segment has appeared in the log yet."],
|
|
)
|
|
]
|
|
|
|
segments: List[SubRun] = []
|
|
for index, (start_offset, start_ts) in enumerate(segment_starts, start=1):
|
|
end_offset = segment_starts[index][0] if index < len(segment_starts) else len(log_text)
|
|
segment_text = log_text[start_offset:end_offset]
|
|
expected_hosts = extract_expected_hosts(segment_text)
|
|
display_name = extract_segment_build_name(segment_text, parent_build_name)
|
|
if not display_name:
|
|
display_name = f"{parent_build_name}-{infer_group_label(expected_hosts, index)}"
|
|
segments.append(
|
|
SubRun(
|
|
key=sanitize_key(display_name),
|
|
display_name=display_name,
|
|
started_at=start_ts or default_started_at,
|
|
expected_hosts=expected_hosts,
|
|
completed=index < len(segment_starts),
|
|
currents_url=extract_currents_url(segment_text),
|
|
notes=[f"Categorized sub-run {index} of {len(segment_starts)}."],
|
|
)
|
|
)
|
|
return segments
|
|
|
|
|
|
def evaluate_subrun(
|
|
subrun: SubRun,
|
|
reporter_root: Path,
|
|
inventory: Dict[str, str],
|
|
end_boundary: Optional[datetime],
|
|
parent_active: bool,
|
|
cancelled: bool,
|
|
) -> Tuple[str, Dict[str, HostResult], Optional[datetime], Optional[datetime], Optional[str], List[str]]:
|
|
notes = list(subrun.notes)
|
|
host_results = collect_host_results(
|
|
reporter_root=reporter_root,
|
|
expected_hosts=subrun.expected_hosts,
|
|
kernels=inventory,
|
|
run_started_at=subrun.started_at,
|
|
run_ended_at=end_boundary,
|
|
)
|
|
check_end = find_check_xml_end(reporter_root, subrun.started_at, end_boundary)
|
|
start_candidates = [result.timestamp for result in host_results.values() if result.timestamp]
|
|
end_candidates = [result.timestamp for result in host_results.values() if result.timestamp]
|
|
if check_end:
|
|
end_candidates.append(check_end)
|
|
start_ts = min(start_candidates) if start_candidates else subrun.started_at
|
|
end_ts = max(end_candidates) if end_candidates else None
|
|
|
|
if cancelled:
|
|
notes.append("Cancellation marker detected.")
|
|
return "CANCELLED", host_results, start_ts, end_ts, subrun.currents_url, notes
|
|
|
|
if subrun.completed:
|
|
if not host_results:
|
|
notes.append("This categorized sub-run ended but no host results were detected.")
|
|
return "UNKNOWN", host_results, start_ts, end_ts, subrun.currents_url, notes
|
|
notes.append("Categorized sub-run completed and the next grouped run was launched.")
|
|
if check_end:
|
|
notes.append("Final `check-xml-files.ts` validation passed.")
|
|
state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED"
|
|
return state, host_results, start_ts, end_ts, subrun.currents_url, notes
|
|
|
|
if parent_active:
|
|
current_host = next((host for host in subrun.expected_hosts if host not in host_results), None)
|
|
if current_host and current_host not in host_results:
|
|
host_results[current_host] = HostResult(
|
|
host=current_host,
|
|
kernel=inventory.get(current_host, "unknown"),
|
|
status="RUN",
|
|
detail="in progress",
|
|
)
|
|
return "RUNNING", host_results, start_ts, end_ts, subrun.currents_url, notes
|
|
|
|
if host_results:
|
|
notes.append("Categorized sub-run completed after the parent runner exited.")
|
|
if check_end:
|
|
notes.append("Final `check-xml-files.ts` validation passed.")
|
|
state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED"
|
|
return state, host_results, start_ts, end_ts, subrun.currents_url, notes
|
|
|
|
notes.append("Parent run exited before this categorized sub-run produced host results.")
|
|
return "TERMINATED", host_results, start_ts, end_ts, subrun.currents_url, notes
|
|
|
|
|
|
def discover_categorized_subruns(
|
|
build_name: str,
|
|
reporter_root: Path,
|
|
inventory: Dict[str, str],
|
|
log_text: str,
|
|
started_at: datetime,
|
|
parent_active: bool,
|
|
cancelled: bool,
|
|
) -> List[Dict[str, object]]:
|
|
xml_dir = reporter_root / "xml"
|
|
current_subrun_build = extract_active_subrun_build(build_name)
|
|
expected_hosts = extract_expected_hosts(log_text)
|
|
completed_summaries = extract_completed_subrun_summaries(log_text, inventory)
|
|
subrun_states: List[Dict[str, object]] = []
|
|
completed_hosts: List[str] = []
|
|
discovered_builds: List[str] = []
|
|
current_summary_index = 0
|
|
|
|
if xml_dir.exists():
|
|
prefix = f"test-result-{build_name}-"
|
|
for xml_path in sorted(xml_dir.glob(f"{prefix}*.xml"), key=lambda p: p.stat().st_mtime):
|
|
xml_mtime = datetime.fromtimestamp(xml_path.stat().st_mtime, tz=timezone.utc)
|
|
if xml_mtime < started_at:
|
|
continue
|
|
raw_display_name = xml_path.stem[len("test-result-"):]
|
|
discovered_builds.append(raw_display_name)
|
|
parsed = parse_host_xml(xml_path)
|
|
host_results: Dict[str, HostResult] = {}
|
|
if parsed:
|
|
host, result = parsed
|
|
result.kernel = inventory.get(host, "unknown")
|
|
host_results[host] = result
|
|
completed_hosts.append(host)
|
|
check_ts = extract_check_xml_timestamp_from_file(xml_path)
|
|
summary = completed_summaries[current_summary_index] if current_summary_index < len(completed_summaries) else None
|
|
if summary and (not host_results or all(result.host == "check-xml-files" for result in host_results.values())):
|
|
host_results = summary["host_results"]
|
|
completed_hosts.extend([host for host in host_results if host not in completed_hosts])
|
|
if summary:
|
|
current_summary_index += 1
|
|
state = "RUNNING"
|
|
if cancelled:
|
|
state = "CANCELLED"
|
|
elif check_ts or raw_display_name != current_subrun_build or not parent_active:
|
|
state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED"
|
|
display_name = corrected_categorized_display_name(raw_display_name, list(host_results))
|
|
notes = [f"Categorized sub-run discovered from reporter file `{xml_path.name}`."]
|
|
if check_ts:
|
|
notes.append("Final `check-xml-files.ts` validation passed.")
|
|
if summary and host_results:
|
|
notes.append("Host result details were derived from the parent categorized run log summary.")
|
|
if display_name != raw_display_name:
|
|
notes.append(f"Child build id was reported as `{raw_display_name}`, but the actual grouped run was inferred from host execution as `{display_name}`.")
|
|
if cancelled:
|
|
notes.append("Cancellation marker detected.")
|
|
end_ts = check_ts or next((result.timestamp for result in host_results.values() if result.timestamp), xml_mtime)
|
|
start_ts = next((result.timestamp for result in host_results.values() if result.timestamp), None)
|
|
if not start_ts and summary and end_ts:
|
|
duration_seconds = next((result.duration_seconds for result in host_results.values() if result.duration_seconds is not None), None)
|
|
if duration_seconds is not None:
|
|
candidate_start = end_ts.timestamp() - duration_seconds
|
|
start_ts = datetime.fromtimestamp(candidate_start, tz=timezone.utc)
|
|
if not start_ts:
|
|
start_ts = min(xml_mtime, end_ts) if end_ts else xml_mtime
|
|
if end_ts and start_ts > end_ts:
|
|
start_ts = end_ts
|
|
subrun_states.append(
|
|
{
|
|
"key": sanitize_key(display_name),
|
|
"display_name": display_name,
|
|
"state": state,
|
|
"host_results": host_results,
|
|
"start_ts": start_ts,
|
|
"end_ts": end_ts,
|
|
"currents_url": summary["currents_url"] if summary else None,
|
|
"notes": notes,
|
|
}
|
|
)
|
|
|
|
if current_subrun_build and current_subrun_build not in discovered_builds:
|
|
current_host = find_current_running_host(log_text, completed_hosts)
|
|
if not current_host or current_host in completed_hosts:
|
|
current_host = infer_host_from_subrun_build(current_subrun_build, expected_hosts, completed_hosts)
|
|
host_results: Dict[str, HostResult] = {}
|
|
if current_host:
|
|
host_results[current_host] = HostResult(
|
|
host=current_host,
|
|
kernel=inventory.get(current_host, "unknown"),
|
|
status="RUN",
|
|
detail="in progress",
|
|
)
|
|
display_name = corrected_categorized_display_name(current_subrun_build, [current_host] if current_host else [])
|
|
notes = ["Active categorized sub-run inferred from live `--ci-build-id` process state."]
|
|
if display_name != current_subrun_build:
|
|
notes.append(f"Child build id is `{current_subrun_build}`, but the actual grouped run was inferred from host execution as `{display_name}`.")
|
|
if cancelled:
|
|
notes.append("Cancellation marker detected.")
|
|
subrun_states.append(
|
|
{
|
|
"key": sanitize_key(current_subrun_build),
|
|
"display_name": display_name,
|
|
"state": "CANCELLED" if cancelled else "RUNNING",
|
|
"host_results": host_results,
|
|
"start_ts": started_at,
|
|
"end_ts": None,
|
|
"currents_url": None,
|
|
"notes": notes,
|
|
}
|
|
)
|
|
|
|
return subrun_states
|
|
|
|
|
|
def determine_state(
|
|
build_name: str,
|
|
build_dir: Path,
|
|
run_log: Path,
|
|
reporter_root: Path,
|
|
inventory: Dict[str, str],
|
|
metadata: Dict[str, object],
|
|
started_at: datetime,
|
|
process_gone_since: Optional[datetime],
|
|
process_exit_grace_seconds: int,
|
|
) -> Tuple[str, List[Dict[str, object]], Dict[str, HostResult], Optional[datetime], Optional[datetime], Optional[str], List[str]]:
|
|
cancelled_marker = build_dir / "cancelled.marker"
|
|
log_text = read_text(run_log)
|
|
active = related_process_active(build_name) if metadata.get("categorized") else process_active(build_name)
|
|
cancelled = cancelled_marker.exists()
|
|
notes: List[str] = []
|
|
subrun_states: List[Dict[str, object]] = []
|
|
parent_host_results: Dict[str, HostResult] = {}
|
|
|
|
if metadata.get("categorized"):
|
|
subrun_states = discover_categorized_subruns(
|
|
build_name=build_name,
|
|
reporter_root=reporter_root,
|
|
inventory=inventory,
|
|
log_text=log_text,
|
|
started_at=started_at,
|
|
parent_active=active,
|
|
cancelled=cancelled,
|
|
)
|
|
for subrun in subrun_states:
|
|
for host, result in subrun["host_results"].items():
|
|
parent_host_results[host] = result
|
|
else:
|
|
subruns = split_log_segments(log_text, build_name, bool(metadata.get("categorized")), started_at)
|
|
for index, subrun in enumerate(subruns):
|
|
next_started_at = subruns[index + 1].started_at if index + 1 < len(subruns) else None
|
|
state, host_results, start_ts, end_ts, currents_url, subrun_notes = evaluate_subrun(
|
|
subrun=subrun,
|
|
reporter_root=reporter_root,
|
|
inventory=inventory,
|
|
end_boundary=next_started_at,
|
|
parent_active=active,
|
|
cancelled=cancelled,
|
|
)
|
|
for host, result in host_results.items():
|
|
parent_host_results[host] = result
|
|
subrun_states.append(
|
|
{
|
|
"key": subrun.key,
|
|
"display_name": subrun.display_name,
|
|
"state": state,
|
|
"host_results": host_results,
|
|
"start_ts": start_ts,
|
|
"end_ts": end_ts,
|
|
"currents_url": currents_url,
|
|
"notes": subrun_notes,
|
|
}
|
|
)
|
|
|
|
parent_start_candidates = [subrun["start_ts"] for subrun in subrun_states if subrun["start_ts"]]
|
|
parent_end_candidates = [subrun["end_ts"] for subrun in subrun_states if subrun["end_ts"]]
|
|
start_ts = min(parent_start_candidates) if parent_start_candidates else started_at
|
|
end_ts = max(parent_end_candidates) if parent_end_candidates else find_check_xml_end(reporter_root, started_at)
|
|
currents_url = extract_currents_url(log_text)
|
|
|
|
if cancelled:
|
|
notes.append("Cancellation marker detected.")
|
|
return "CANCELLED", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
|
|
|
|
if active:
|
|
elapsed = (now_utc() - started_at).total_seconds()
|
|
if elapsed > args.max_watch_seconds:
|
|
notes.append("Watcher exceeded max watch duration while the run still appears active.")
|
|
return "HUNG", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
|
|
return "RUNNING", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
|
|
|
|
if metadata.get("categorized") and process_gone_since and (now_utc() - process_gone_since).total_seconds() < process_exit_grace_seconds:
|
|
notes.append("Categorized parent runner has not been gone long enough to treat the request as finished.")
|
|
return "RUNNING", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
|
|
|
|
terminal_subruns = [subrun for subrun in subrun_states if subrun["state"] in {"COMPLETED", "FAILED"}]
|
|
if terminal_subruns:
|
|
state = "FAILED" if any(result.failures for result in parent_host_results.values()) else "COMPLETED"
|
|
notes.append("Run finished and one or more sub-run result artifacts were detected.")
|
|
if end_ts:
|
|
notes.append("Final reporting artifacts were detected.")
|
|
return state, subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
|
|
|
|
if process_gone_since and (now_utc() - process_gone_since).total_seconds() >= process_exit_grace_seconds:
|
|
notes.append("Run process exited without a clean completion signal.")
|
|
return "TERMINATED", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
|
|
|
|
return "RUNNING", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_args()
|
|
build_name = args.build_name
|
|
run_log = Path(args.run_log)
|
|
reporter_root = Path(args.reporter_root)
|
|
inventory_file = Path(args.inventory_file)
|
|
state_root = Path(args.state_dir)
|
|
build_dir = state_root / build_name
|
|
ensure_dir(build_dir)
|
|
state_file = build_dir / "state.json"
|
|
posted_marker = build_dir / "posted.marker"
|
|
|
|
inventory = load_inventory(inventory_file)
|
|
metadata = infer_metadata()
|
|
|
|
state = load_state(state_file)
|
|
log_text_for_start = read_text(run_log)
|
|
default_started_at = first_log_timestamp(log_text_for_start) or (datetime.fromtimestamp(run_log.stat().st_mtime, tz=timezone.utc) if run_log.exists() else now_utc())
|
|
started_at = parse_xml_timestamp(state.get("started_at")) or default_started_at
|
|
state.setdefault("build_name", build_name)
|
|
state.setdefault("started_at", started_at.isoformat())
|
|
write_state(state_file, state)
|
|
|
|
process_gone_since: Optional[datetime] = None
|
|
|
|
while True:
|
|
active = process_active(build_name)
|
|
if not active and process_gone_since is None:
|
|
process_gone_since = now_utc()
|
|
if active:
|
|
process_gone_since = None
|
|
|
|
run_state, subrun_states, host_results, start_ts, end_ts, currents_url, notes = determine_state(
|
|
build_name=build_name,
|
|
build_dir=build_dir,
|
|
run_log=run_log,
|
|
reporter_root=reporter_root,
|
|
inventory=inventory,
|
|
metadata=metadata,
|
|
started_at=started_at,
|
|
process_gone_since=process_gone_since,
|
|
process_exit_grace_seconds=args.process_exit_grace_seconds,
|
|
)
|
|
|
|
state["last_state"] = run_state
|
|
state["last_seen_at"] = now_utc().isoformat()
|
|
state["host_results"] = {
|
|
host: {
|
|
"status": result.status,
|
|
"detail": result.detail,
|
|
"kernel": result.kernel,
|
|
"tests": result.tests,
|
|
"failures": result.failures,
|
|
}
|
|
for host, result in host_results.items()
|
|
}
|
|
state["subruns"] = {
|
|
subrun["display_name"]: {
|
|
"state": subrun["state"],
|
|
"hosts": sorted(subrun["host_results"].keys()),
|
|
"start_ts": subrun["start_ts"].isoformat() if subrun["start_ts"] else None,
|
|
"end_ts": subrun["end_ts"].isoformat() if subrun["end_ts"] else None,
|
|
"currents_url": subrun["currents_url"],
|
|
"notes": subrun["notes"],
|
|
}
|
|
for subrun in subrun_states
|
|
}
|
|
write_state(state_file, state)
|
|
|
|
for subrun in subrun_states:
|
|
subrun_dir = build_dir / "subruns" / subrun["key"]
|
|
ensure_dir(subrun_dir)
|
|
subrun_state_file = subrun_dir / "state.json"
|
|
subrun_posted_marker = subrun_dir / "posted.marker"
|
|
subrun_state = {
|
|
"display_name": subrun["display_name"],
|
|
"last_state": subrun["state"],
|
|
"last_seen_at": now_utc().isoformat(),
|
|
"host_results": {
|
|
host: {
|
|
"status": result.status,
|
|
"detail": result.detail,
|
|
"kernel": result.kernel,
|
|
"tests": result.tests,
|
|
"failures": result.failures,
|
|
}
|
|
for host, result in subrun["host_results"].items()
|
|
},
|
|
"notes": subrun["notes"],
|
|
"currents_url": subrun["currents_url"],
|
|
"started_at": subrun["start_ts"].isoformat() if subrun["start_ts"] else None,
|
|
"ended_at": subrun["end_ts"].isoformat() if subrun["end_ts"] else None,
|
|
}
|
|
if subrun["state"] in {"COMPLETED", "FAILED"} and not subrun_posted_marker.exists():
|
|
status_text = build_status_markdown(
|
|
build_name=subrun["display_name"],
|
|
metadata=metadata,
|
|
host_results=dict(sorted(subrun["host_results"].items())),
|
|
run_state=subrun["state"],
|
|
currents_url=subrun["currents_url"],
|
|
start_ts=subrun["start_ts"],
|
|
end_ts=subrun["end_ts"],
|
|
notes=subrun["notes"],
|
|
)
|
|
print(status_text)
|
|
response = post_to_mattermost(status_text)
|
|
if response != "ok":
|
|
raise SystemExit(f"Mattermost webhook did not return ok for {subrun['display_name']}: {response!r}")
|
|
subrun_posted_marker.write_text("ok\n", encoding="utf-8")
|
|
subrun_state["mattermost_posted"] = True
|
|
subrun_state["mattermost_response"] = response
|
|
print(f"[watcher] Mattermost post confirmed for {subrun['display_name']}.")
|
|
write_state(subrun_state_file, subrun_state)
|
|
|
|
if run_state == "RUNNING":
|
|
print(f"[watcher] {build_name}: RUNNING")
|
|
time.sleep(args.poll_interval)
|
|
continue
|
|
|
|
status_text = build_status_markdown(
|
|
build_name=build_name,
|
|
metadata=metadata,
|
|
host_results=dict(sorted(host_results.items())),
|
|
run_state=run_state,
|
|
currents_url=currents_url,
|
|
start_ts=start_ts,
|
|
end_ts=end_ts,
|
|
notes=notes,
|
|
)
|
|
print(status_text)
|
|
|
|
if not metadata.get("categorized") and run_state in {"COMPLETED", "FAILED"} and not posted_marker.exists():
|
|
response = post_to_mattermost(status_text)
|
|
if response != "ok":
|
|
raise SystemExit(f"Mattermost webhook did not return ok: {response!r}")
|
|
posted_marker.write_text("ok\n", encoding="utf-8")
|
|
state["mattermost_posted"] = True
|
|
state["mattermost_response"] = response
|
|
write_state(state_file, state)
|
|
print(f"[watcher] Mattermost post confirmed for {build_name}.")
|
|
|
|
state["closed_at"] = now_utc().isoformat()
|
|
write_state(state_file, state)
|
|
sys.exit(0)
|