Files
cds-ai/atvm/watcher-service/atvm_run_watcher.py

1329 lines
52 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import ast
import json
import os
import re
import subprocess
import sys
import time
import urllib.request
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Dict, List, Optional, Tuple
RUN_STATES = {
"COMPLETED",
"FAILED",
"CANCELLED",
"TERMINATED",
"HUNG",
"UNKNOWN",
"RUNNING",
}
LOCAL_TZ = datetime.now().astimezone().tzinfo or timezone.utc
DEFAULT_TEST_FLOW = [
"1. Test flow is template-specific for this run",
"2. Verifying set up",
"3. Execute the requested template workflow",
"4. Validate expected run outcome",
"5. Power off",
]
TEMPLATE_TEST_FLOWS = {
"cmc-e2e": [
"1. Verifying set up",
"2. Power on and obtain ip address and host name",
"3. Uninstall CMC if still exists",
"4. Setting up disk on the host",
"5. Copy CMC install command from GUI",
"6. Install CMC",
"7. Create migration session",
"8. Tracking Changes",
"9. Trigger cmotion and do I/O test before actual cutover",
"10. Verify data for cmotion",
"11. Trigger revert cmotion and do I/O test before and during cmotion",
"12. Verify data for revert cmotion",
"13. Trigger cmotion again",
"14. Finalize cutover",
"15. Create migration report",
"16. Delete migration session",
"17. Verify local destination disk",
"18. Remove enabled FC integration",
"19. Remove host and volumes",
"20. Uninstall CMC",
"21. Clean up iSCSI targets",
"22. Power off",
],
}
@dataclass
class HostResult:
host: str
kernel: str
status: str
detail: str
tests: int = 0
failures: int = 0
duration_seconds: Optional[float] = None
timestamp: Optional[datetime] = None
@dataclass
class SubRun:
key: str
display_name: str
started_at: datetime
expected_hosts: List[str]
completed: bool
currents_url: Optional[str]
notes: List[str]
def now_utc() -> datetime:
return datetime.now(timezone.utc)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--build-name", required=True)
parser.add_argument("--run-log", required=True)
parser.add_argument("--reporter-root", required=True)
parser.add_argument("--inventory-file", required=True)
parser.add_argument("--state-dir", required=True)
parser.add_argument("--poll-interval", type=int, default=30)
parser.add_argument("--max-watch-seconds", type=int, default=6 * 60 * 60)
parser.add_argument("--process-exit-grace-seconds", type=int, default=120)
return parser.parse_args()
def ensure_dir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
def load_inventory(path: Path) -> Dict[str, str]:
kernels: Dict[str, str] = {}
if not path.exists():
return kernels
for line in path.read_text(encoding="utf-8").splitlines():
if not line.startswith("|"):
continue
parts = [part.strip() for part in line.strip().strip("|").split("|")]
if len(parts) < 3 or parts[0] in {"OS", "---"}:
continue
host = parts[1]
kernel = parts[2]
kernels[host] = kernel or "unknown"
return kernels
def run_ps() -> str:
proc = subprocess.run(
["ps", "-eo", "pid=,args="],
capture_output=True,
text=True,
check=True,
)
return proc.stdout
def process_active(build_name: str) -> bool:
output = run_ps()
for line in output.splitlines():
if "run-sorry-cypress.py" in line and f"--build_name {build_name}" in line:
return True
return False
def related_process_active(build_name: str) -> bool:
output = run_ps()
for line in output.splitlines():
if build_name not in line:
continue
if any(token in line for token in ("run-sorry-cypress.py", "cypress-cloud", "node ")):
return True
return False
def extract_active_subrun_build(build_name: str) -> Optional[str]:
output = run_ps()
matches: List[str] = []
for line in output.splitlines():
if build_name not in line or "--ci-build-id" not in line:
continue
match = re.search(r"--ci-build-id\s+(\S+)", line)
if match:
matches.append(match.group(1))
return matches[-1] if matches else None
def read_text(path: Path) -> str:
try:
return path.read_text(encoding="utf-8", errors="replace")
except FileNotFoundError:
return ""
def extract_expected_hosts(log_text: str) -> List[str]:
hosts: List[str] = []
spec_match = re.search(r'Extracted specPattern:\s*(\[[^\n]+\])', log_text)
if spec_match:
try:
spec_list = ast.literal_eval(spec_match.group(1))
except (SyntaxError, ValueError):
spec_list = []
for entry in spec_list:
if not isinstance(entry, str):
continue
match = re.search(r"(atvm[^/\s]+)\.ts$", entry)
if match:
host = match.group(1)
if host not in hosts:
hosts.append(host)
for match in re.finditer(r"Running:\s+(?:cypress/cmcRegressionTest/)?(atvm[^/\s]+)\.ts", log_text):
host = match.group(1)
if host not in hosts:
hosts.append(host)
return hosts
def extract_currents_url(log_text: str) -> Optional[str]:
match = re.search(r"(https://\S+/run/\S+)", log_text)
return match.group(1) if match else None
def load_state(state_file: Path) -> Dict[str, object]:
if not state_file.exists():
return {}
try:
return json.loads(state_file.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
def write_state(state_file: Path, state: Dict[str, object]) -> None:
state_file.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8")
def parse_xml_timestamp(raw: Optional[str]) -> Optional[datetime]:
if not raw:
return None
try:
return datetime.fromisoformat(raw).replace(tzinfo=timezone.utc)
except ValueError:
return None
def parse_log_timestamp(raw: str) -> Optional[datetime]:
try:
return datetime.strptime(raw, "%Y-%m-%d %H:%M:%S,%f").replace(tzinfo=LOCAL_TZ).astimezone(timezone.utc)
except ValueError:
return None
def first_log_timestamp(log_text: str) -> Optional[datetime]:
match = re.search(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - INFO - ", log_text, re.M)
if not match:
return None
return parse_log_timestamp(match.group(1))
def parse_host_xml(xml_path: Path) -> Optional[Tuple[str, HostResult]]:
try:
tree = ET.parse(xml_path)
except ET.ParseError:
return None
root = tree.getroot()
suites = root.findall("testsuite")
best: Optional[Tuple[str, int, int, float, Optional[datetime]]] = None
for suite in suites:
file_attr = suite.attrib.get("file", "")
suite_name = suite.attrib.get("name", "")
host_from_file = None
host_from_name = None
if file_attr.startswith("cypress/cmcRegressionTest/atvm") and file_attr.endswith(".ts"):
host_from_file = Path(file_attr).stem
name_match = re.search(r"(atvm[^)\s]+)", suite_name)
if name_match:
host_from_name = name_match.group(1)
host_name = host_from_file or host_from_name
if not host_name:
continue
tests = int(float(suite.attrib.get("tests", root.attrib.get("tests", "0"))))
failures = int(float(suite.attrib.get("failures", root.attrib.get("failures", "0"))))
total_time = float(suite.attrib.get("time", root.attrib.get("time", "0")))
timestamp = parse_xml_timestamp(suite.attrib.get("timestamp"))
candidate = (host_name, tests, failures, total_time, timestamp)
if best is None:
best = candidate
continue
_, best_tests, _, best_total_time, _ = best
if tests > best_tests or (tests == best_tests and total_time >= best_total_time):
best = candidate
if not best:
return None
file_name, tests, failures, total_time, timestamp = best
detail = f"{tests} tests, {failures} failures"
status = "FAIL" if failures else "PASS"
return file_name, HostResult(
host=file_name,
kernel="unknown",
status=status,
detail=detail,
tests=tests,
failures=failures,
duration_seconds=total_time,
timestamp=timestamp,
)
def extract_check_xml_timestamp_from_file(xml_path: Path) -> Optional[datetime]:
try:
tree = ET.parse(xml_path)
except ET.ParseError:
return None
root = tree.getroot()
for suite in root.findall("testsuite"):
file_attr = suite.attrib.get("file", "")
if file_attr.endswith("check-xml-files.ts"):
return parse_xml_timestamp(suite.attrib.get("timestamp"))
return None
def parse_duration_seconds(raw: str) -> Optional[float]:
raw = " ".join(raw.split())
if re.fullmatch(r"\d+:\d+(?::\d+)?", raw):
parts = [int(part) for part in raw.split(":")]
if len(parts) == 2:
minutes, seconds = parts
return minutes * 60 + seconds
if len(parts) == 3:
hours, minutes, seconds = parts
return hours * 3600 + minutes * 60 + seconds
match = re.search(r"(?:(\d+)h\s+)?(?:(\d+)m\s+)?(\d+(?:\.\d+)?)(?:s)?", raw)
if not match:
return None
hours = int(match.group(1) or 0)
minutes = int(match.group(2) or 0)
seconds = float(match.group(3))
return hours * 3600 + minutes * 60 + seconds
def extract_host_results_from_run_finished_segment(segment_text: str, inventory: Dict[str, str]) -> Dict[str, HostResult]:
host_results: Dict[str, HostResult] = {}
normalized = re.sub(r"\n\s*│\s*s\s*│", "s", segment_text)
for host_match in re.finditer(
r"\s+(atvm[^\s]+)\.ts\s+([0-9:hms.\s]+?)\s+(\d+)\s+(\d+)\s+([-\d]+)\s+([-\d]+)\s+([-\d]+)",
normalized,
re.S,
):
host = host_match.group(1)
duration_seconds = parse_duration_seconds(host_match.group(2))
tests = int(host_match.group(3))
passing = int(host_match.group(4))
failing = 0 if host_match.group(5) == "-" else int(host_match.group(5))
detail = f"{tests} tests, {failing} failures"
status = "FAIL" if failing else "PASS"
host_results[host] = HostResult(
host=host,
kernel=inventory.get(host, "unknown"),
status=status,
detail=detail,
tests=tests,
failures=failing,
duration_seconds=duration_seconds,
)
return host_results
def extract_completed_subrun_summaries(log_text: str, inventory: Dict[str, str]) -> List[Dict[str, object]]:
summaries: List[Dict[str, object]] = []
cloud_blocks = list(
re.finditer(
r"Cloud Run Finished(.*?)(?:🏁 Recorded Run:\s*(https://\S+)|\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} - INFO - Detected 'Recorded Run' after 'Cloud Run Finished' - results uploaded successfully\.)",
log_text,
re.S,
)
)
previous_block_end = 0
for block in cloud_blocks:
block_text = block.group(1)
currents_url = block.group(2)
host_results = extract_host_results_from_run_finished_segment(block_text, inventory)
if not host_results:
prior_segment = log_text[previous_block_end:block.start()]
host_results = extract_host_results_from_run_finished_segment(prior_segment, inventory)
if not host_results:
previous_block_end = block.end()
continue
summaries.append(
{
"host_results": host_results,
"currents_url": currents_url,
}
)
previous_block_end = block.end()
return summaries
def extract_latest_run_summary(log_text: str, inventory: Dict[str, str]) -> Optional[Dict[str, object]]:
summaries = extract_completed_subrun_summaries(log_text, inventory)
if not summaries:
return None
return summaries[-1]
def collect_host_results(
reporter_root: Path,
expected_hosts: List[str],
kernels: Dict[str, str],
run_started_at: datetime,
run_ended_at: Optional[datetime] = None,
) -> Dict[str, HostResult]:
xml_dir = reporter_root / "xml"
results: Dict[str, HostResult] = {}
if not xml_dir.exists():
return results
for xml_path in sorted(xml_dir.glob("test-result-*.xml"), key=lambda p: p.stat().st_mtime):
xml_mtime = datetime.fromtimestamp(xml_path.stat().st_mtime, tz=timezone.utc)
if xml_mtime < run_started_at:
continue
if run_ended_at and xml_mtime >= run_ended_at:
continue
parsed = parse_host_xml(xml_path)
if not parsed:
continue
host, result = parsed
if expected_hosts and host not in expected_hosts:
continue
result.kernel = kernels.get(host, "unknown")
results[host] = result
return results
def collect_latest_host_result(
reporter_root: Path,
expected_hosts: List[str],
kernels: Dict[str, str],
run_started_at: datetime,
run_ended_at: Optional[datetime] = None,
) -> Optional[Tuple[str, HostResult]]:
results = collect_host_results(
reporter_root=reporter_root,
expected_hosts=expected_hosts,
kernels=kernels,
run_started_at=run_started_at,
run_ended_at=run_ended_at,
)
if not results:
return None
latest = max(
results.items(),
key=lambda item: item[1].timestamp or datetime.fromtimestamp(0, tz=timezone.utc),
)
return latest
def collect_latest_host_reporter_artifact(
reporter_root: Path,
expected_hosts: List[str],
kernels: Dict[str, str],
run_started_at: datetime,
run_ended_at: Optional[datetime] = None,
) -> Optional[Tuple[str, HostResult]]:
logs_dir = reporter_root / "logs"
if not logs_dir.exists():
return None
latest: Optional[Tuple[str, HostResult]] = None
for host_dir in sorted(logs_dir.iterdir()):
if not host_dir.is_dir():
continue
host = host_dir.name
if not host.startswith("atvm"):
continue
if expected_hosts and host not in expected_hosts:
continue
for artifact_path in sorted(host_dir.iterdir()):
if artifact_path.suffix not in {".txt", ".json"}:
continue
artifact_mtime = datetime.fromtimestamp(artifact_path.stat().st_mtime, tz=timezone.utc)
if artifact_mtime < run_started_at:
continue
if run_ended_at and artifact_mtime >= run_ended_at:
continue
result = HostResult(
host=host,
kernel=kernels.get(host, "unknown"),
status="PASS",
detail="completed",
timestamp=artifact_mtime,
)
candidate = (host, result)
if latest is None:
latest = candidate
continue
latest_ts = latest[1].timestamp or datetime.fromtimestamp(0, tz=timezone.utc)
if artifact_mtime >= latest_ts:
latest = candidate
return latest
def find_check_xml_end(
reporter_root: Path,
started_at: datetime,
ended_at: Optional[datetime] = None,
) -> Optional[datetime]:
xml_dir = reporter_root / "xml"
if not xml_dir.exists():
return None
latest: Optional[datetime] = None
for xml_path in sorted(xml_dir.glob("test-result-*.xml"), key=lambda p: p.stat().st_mtime):
xml_mtime = datetime.fromtimestamp(xml_path.stat().st_mtime, tz=timezone.utc)
if xml_mtime < started_at:
continue
if ended_at and xml_mtime >= ended_at:
continue
text = read_text(xml_path)
if "check-xml-files.ts" not in text:
continue
try:
tree = ET.parse(xml_path)
root = tree.getroot()
suite = root.find("testsuite")
if suite is None:
continue
ts = parse_xml_timestamp(suite.attrib.get("timestamp"))
if ts:
latest = ts
except ET.ParseError:
continue
return latest
def find_current_running_host(log_text: str, completed_hosts: List[str]) -> Optional[str]:
matches = re.findall(r"Running:\s+(?:cypress/cmcRegressionTest/)?(atvm[^/\s]+)\.ts", log_text)
for host in reversed(matches):
if host not in completed_hosts:
return host
return None
def infer_host_from_subrun_build(
subrun_build: str,
expected_hosts: List[str],
completed_hosts: List[str],
) -> Optional[str]:
remaining_hosts = [host for host in expected_hosts if host not in completed_hosts]
lowered_build = subrun_build.lower()
for host in remaining_hosts:
short = host.split("-", 1)[-1].lower()
if short.startswith("w2k"):
if "windows" in lowered_build or "w2k" in lowered_build:
return host
elif short.startswith("redhat") and "redhat" in lowered_build:
return host
elif short.startswith("ubuntu") and "ubuntu" in lowered_build:
return host
elif short.startswith("oracle") and "oracle" in lowered_build:
return host
elif short.startswith("rocky") and "rocky" in lowered_build:
return host
elif short.startswith("debian") and "debian" in lowered_build:
return host
return remaining_hosts[0] if remaining_hosts else None
def infer_metadata() -> Dict[str, object]:
return {
"template": os.environ.get("ATVM_WATCHER_TEMPLATE", "unknown"),
"config_family": os.environ.get("ATVM_WATCHER_CONFIG_FAMILY", "unknown"),
"migration_style": os.environ.get("ATVM_WATCHER_MIGRATION_STYLE", "ATVM automation validation"),
"integration_plugin": os.environ.get("ATVM_WATCHER_INTEGRATION_PLUGIN", "unknown"),
"scope_description": os.environ.get("ATVM_WATCHER_SCOPE_DESCRIPTION", "requested ATVM run scope"),
"categorized": os.environ.get("ATVM_WATCHER_CATEGORIZED", "false").lower() == "true",
}
def format_duration(seconds: Optional[float]) -> str:
if seconds is None:
return "n/a"
minutes, secs = divmod(seconds, 60)
hours, minutes = divmod(int(minutes), 60)
if hours:
return f"{hours}h {minutes:02d}m {secs:05.2f}s"
if minutes:
return f"{minutes}m {secs:05.2f}s"
return f"{secs:.3f}s"
def format_timestamp_local(ts: Optional[datetime]) -> str:
if not ts:
return "n/a"
local = ts.astimezone()
return local.strftime("%Y-%m-%d %H:%M:%S %Z")
def get_test_flow(template_name: object) -> List[str]:
if not isinstance(template_name, str):
return DEFAULT_TEST_FLOW
return TEMPLATE_TEST_FLOWS.get(template_name, DEFAULT_TEST_FLOW)
def build_status_markdown(
build_name: str,
metadata: Dict[str, object],
host_results: Dict[str, HostResult],
run_state: str,
currents_url: Optional[str],
start_ts: Optional[datetime],
end_ts: Optional[datetime],
notes: List[str],
) -> str:
ordered_hosts = list(host_results.values())
finished = len([h for h in ordered_hosts if h.status in {"PASS", "FAIL"}])
passed = len([h for h in ordered_hosts if h.status == "PASS"])
failed = len([h for h in ordered_hosts if h.status == "FAIL"])
skipped = len([h for h in ordered_hosts if h.status == "SKIP"])
durations = [h.duration_seconds for h in ordered_hosts if h.duration_seconds is not None]
quickest = min((h for h in ordered_hosts if h.duration_seconds is not None), key=lambda h: h.duration_seconds, default=None)
longest = max((h for h in ordered_hosts if h.duration_seconds is not None), key=lambda h: h.duration_seconds, default=None)
average = (sum(durations) / len(durations)) if durations else None
host_lines = ["| Host | Kernel | Status | Detail |", "| --- | --- | --- | --- |"]
for host in ordered_hosts:
icon = {
"PASS": "✅ PASS",
"FAIL": "⚠️ FAIL",
"RUN": "⏳ RUN",
"SKIP": "⏭️ SKIP",
"NOT STARTED": "⏳ RUN",
}.get(host.status, host.status)
host_lines.append(f"| {host.host} | {host.kernel} | {icon} | {host.detail} |")
if currents_url:
notes = notes + [f"Currents recorded run: `{currents_url}`"]
notes_block = "\n".join(f"- {note}" for note in notes) if notes else "- none"
test_flow_lines = [f"- {step}" for step in get_test_flow(metadata.get("template"))]
lines = [
"## ATVM Run Status",
f"### {build_name}",
"",
"**COVERAGE:**",
f"- template: `{metadata['template']}`",
f"- datastore/config family: `{metadata['config_family']}`",
f"- migration style: {metadata['migration_style']}",
f"- integration/plugin path: `{metadata['integration_plugin']}`",
f"- scope of this run: {metadata['scope_description']}",
"",
"**TEST FLOW:**",
*test_flow_lines,
"",
"**SUMMARY:**",
"",
"| Metric | Value |",
"| --- | --- |",
f"| finished | {finished} |",
f"| passed | {passed} |",
f"| failed | {failed} |",
f"| skipped | {skipped} |",
"",
"**HOSTS:**",
"",
*host_lines,
"",
"**TIMING:**",
"",
"| Metric | Value |",
"| --- | --- |",
f"| start | {format_timestamp_local(start_ts)} |",
f"| end | {format_timestamp_local(end_ts)} |",
f"| total | {format_duration((end_ts - start_ts).total_seconds()) if start_ts and end_ts else 'n/a'} |",
f"| quickest | {f'{quickest.host} - {format_duration(quickest.duration_seconds)}' if quickest else 'n/a'} |",
f"| longest | {f'{longest.host} - {format_duration(longest.duration_seconds)}' if longest else 'n/a'} |",
f"| average | {format_duration(average) if average is not None else 'n/a'} |",
"",
"**NOTES:**",
notes_block,
]
return "\n".join(lines)
def post_to_mattermost(text: str) -> str:
webhook = os.environ["MATTERMOST_ATVM_WEBHOOK"]
payload = {"text": text}
channel = os.environ.get("MATTERMOST_ATVM_CHANNEL")
if channel:
payload["channel"] = channel
data = json.dumps(payload).encode()
request = urllib.request.Request(webhook, data=data, headers={"Content-Type": "application/json"})
with urllib.request.urlopen(request) as response:
return response.read().decode().strip()
def sanitize_key(raw: str) -> str:
return re.sub(r"[^A-Za-z0-9_.-]+", "-", raw).strip("-") or "subrun"
def infer_group_label(hosts: List[str], index: int) -> str:
if not hosts:
return f"group{index}"
labels: List[str] = []
for host in hosts:
short = host.split("-", 1)[-1]
if short.startswith("w2k"):
label = "windows"
else:
label = re.sub(r"\d.*$", "", short) or short
if label not in labels:
labels.append(label)
return "-".join(labels) if labels else f"group{index}"
def infer_group_from_host(host: str) -> str:
short = host.split("-", 1)[-1].lower()
if short.startswith("w2k"):
return "windows"
if short.startswith("amazonlinux"):
return "amazonlinux"
if short.startswith("centos"):
return "centos"
if short.startswith("ubuntu"):
return "ubuntu"
if short.startswith("rocky"):
return "rocky"
if short.startswith("redhat"):
return "redhat"
if short.startswith("oracle"):
return "oracle"
if short.startswith("fedora"):
return "fedora"
if short.startswith("debian"):
return "debian"
if short.startswith("suse"):
return "suse"
return short or "group"
def corrected_categorized_display_name(raw_display_name: str, hosts: List[str]) -> str:
if not hosts:
return raw_display_name
group = infer_group_from_host(hosts[0])
return re.sub(r"-(amazonlinux|centos|ubuntu|rocky|redhat|oracle|fedora|debian|suse|windows|w2k)-batch", f"-{group}-batch", raw_display_name)
def logical_categorized_key(display_name: str) -> str:
match = re.match(
r"^(.*?-(amazonlinux|centos|ubuntu|rocky|redhat|oracle|fedora|debian|suse|windows|w2k))-batch\d+_\d+.*$",
display_name,
)
if match:
return sanitize_key(match.group(1))
return sanitize_key(display_name)
def merge_categorized_state(
merged: Dict[str, Dict[str, object]],
display_name: str,
state: str,
host_results: Dict[str, HostResult],
start_ts: Optional[datetime],
end_ts: Optional[datetime],
currents_url: Optional[str],
notes: List[str],
) -> None:
key = logical_categorized_key(display_name)
existing = merged.get(key)
if existing is None:
merged[key] = {
"key": key,
"display_name": display_name,
"state": state,
"host_results": dict(host_results),
"start_ts": start_ts,
"end_ts": end_ts,
"currents_url": currents_url,
"notes": list(notes),
}
return
existing["host_results"].update(host_results)
if start_ts and (existing["start_ts"] is None or start_ts < existing["start_ts"]):
existing["start_ts"] = start_ts
if end_ts and (existing["end_ts"] is None or end_ts > existing["end_ts"]):
existing["end_ts"] = end_ts
if currents_url and not existing["currents_url"]:
existing["currents_url"] = currents_url
for note in notes:
if note not in existing["notes"]:
existing["notes"].append(note)
existing_state = existing["state"]
if state == "CANCELLED" or existing_state == "CANCELLED":
existing["state"] = "CANCELLED"
elif state == "RUNNING":
if existing_state not in {"COMPLETED", "FAILED"}:
existing["state"] = "RUNNING"
elif existing_state == "RUNNING":
existing["state"] = state
elif state == "FAILED" or existing_state == "FAILED":
existing["state"] = "FAILED"
elif state == "COMPLETED" or existing_state == "COMPLETED":
existing["state"] = "COMPLETED"
else:
existing["state"] = state
def extract_segment_build_name(segment_text: str, parent_build_name: str) -> Optional[str]:
patterns = [
rf"({re.escape(parent_build_name)}-[A-Za-z0-9_.-]*batch\d+_\d+)",
r"([A-Za-z0-9_.-]+-batch\d+_\d+)",
]
for pattern in patterns:
match = re.search(pattern, segment_text)
if match:
return match.group(1)
return None
def split_log_segments(log_text: str, parent_build_name: str, categorized: bool, default_started_at: datetime) -> List[SubRun]:
if not categorized:
return [
SubRun(
key=sanitize_key(parent_build_name),
display_name=parent_build_name,
started_at=default_started_at,
expected_hosts=extract_expected_hosts(log_text),
completed=False,
currents_url=extract_currents_url(log_text),
notes=[],
)
]
segment_starts: List[Tuple[int, Optional[datetime]]] = []
for match in re.finditer(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - INFO - Extracted specPattern:", log_text, re.M):
segment_starts.append((match.start(), parse_log_timestamp(match.group(1))))
if not segment_starts:
return [
SubRun(
key=sanitize_key(parent_build_name),
display_name=parent_build_name,
started_at=default_started_at,
expected_hosts=extract_expected_hosts(log_text),
completed=False,
currents_url=extract_currents_url(log_text),
notes=["Categorized mode was requested but no sub-run segment has appeared in the log yet."],
)
]
segments: List[SubRun] = []
for index, (start_offset, start_ts) in enumerate(segment_starts, start=1):
end_offset = segment_starts[index][0] if index < len(segment_starts) else len(log_text)
segment_text = log_text[start_offset:end_offset]
expected_hosts = extract_expected_hosts(segment_text)
display_name = extract_segment_build_name(segment_text, parent_build_name)
if not display_name:
display_name = f"{parent_build_name}-{infer_group_label(expected_hosts, index)}"
segments.append(
SubRun(
key=sanitize_key(display_name),
display_name=display_name,
started_at=start_ts or default_started_at,
expected_hosts=expected_hosts,
completed=index < len(segment_starts),
currents_url=extract_currents_url(segment_text),
notes=[f"Categorized sub-run {index} of {len(segment_starts)}."],
)
)
return segments
def evaluate_subrun(
subrun: SubRun,
reporter_root: Path,
inventory: Dict[str, str],
end_boundary: Optional[datetime],
parent_active: bool,
cancelled: bool,
) -> Tuple[str, Dict[str, HostResult], Optional[datetime], Optional[datetime], Optional[str], List[str]]:
notes = list(subrun.notes)
host_results = collect_host_results(
reporter_root=reporter_root,
expected_hosts=subrun.expected_hosts,
kernels=inventory,
run_started_at=subrun.started_at,
run_ended_at=end_boundary,
)
check_end = find_check_xml_end(reporter_root, subrun.started_at, end_boundary)
start_candidates = [result.timestamp for result in host_results.values() if result.timestamp]
end_candidates = [result.timestamp for result in host_results.values() if result.timestamp]
if check_end:
end_candidates.append(check_end)
start_ts = min(start_candidates) if start_candidates else subrun.started_at
end_ts = max(end_candidates) if end_candidates else None
if cancelled:
notes.append("Cancellation marker detected.")
return "CANCELLED", host_results, start_ts, end_ts, subrun.currents_url, notes
if subrun.completed:
if not host_results:
notes.append("This categorized sub-run ended but no host results were detected.")
return "UNKNOWN", host_results, start_ts, end_ts, subrun.currents_url, notes
notes.append("Categorized sub-run completed and the next grouped run was launched.")
if check_end:
notes.append("Final `check-xml-files.ts` validation passed.")
state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED"
return state, host_results, start_ts, end_ts, subrun.currents_url, notes
if parent_active:
current_host = next((host for host in subrun.expected_hosts if host not in host_results), None)
if current_host and current_host not in host_results:
host_results[current_host] = HostResult(
host=current_host,
kernel=inventory.get(current_host, "unknown"),
status="RUN",
detail="in progress",
)
return "RUNNING", host_results, start_ts, end_ts, subrun.currents_url, notes
if check_end and not host_results:
latest_host = collect_latest_host_reporter_artifact(
reporter_root=reporter_root,
expected_hosts=subrun.expected_hosts,
kernels=inventory,
run_started_at=subrun.started_at,
run_ended_at=check_end + timedelta(seconds=5),
)
if latest_host:
host, result = latest_host
host_results = {host: result}
if host_results:
notes.append("Run completed after the parent runner exited.")
if check_end:
notes.append("Final `check-xml-files.ts` validation passed.")
latest_artifact_note = "Host result details were derived from the latest matching host reporter artifact written before final validation."
if latest_artifact_note not in notes and all(result.tests == 0 for result in host_results.values()):
notes.append(latest_artifact_note)
state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED"
return state, host_results, start_ts, end_ts, subrun.currents_url, notes
notes.append("Run process exited before host results were detected.")
return "TERMINATED", host_results, start_ts, end_ts, subrun.currents_url, notes
def discover_categorized_subruns(
build_name: str,
reporter_root: Path,
inventory: Dict[str, str],
log_text: str,
started_at: datetime,
parent_active: bool,
cancelled: bool,
) -> List[Dict[str, object]]:
xml_dir = reporter_root / "xml"
current_subrun_build = extract_active_subrun_build(build_name)
current_active_host = find_current_running_host(log_text, [])
current_active_group = infer_group_from_host(current_active_host) if current_active_host else None
expected_hosts = extract_expected_hosts(log_text)
completed_summaries = extract_completed_subrun_summaries(log_text, inventory)
merged_subrun_states: Dict[str, Dict[str, object]] = {}
completed_hosts: List[str] = []
discovered_builds: List[str] = []
current_summary_index = 0
if xml_dir.exists():
prefix = f"test-result-{build_name}-"
for xml_path in sorted(xml_dir.glob(f"{prefix}*.xml"), key=lambda p: p.stat().st_mtime):
xml_mtime = datetime.fromtimestamp(xml_path.stat().st_mtime, tz=timezone.utc)
if xml_mtime < started_at:
continue
raw_display_name = xml_path.stem[len("test-result-"):]
discovered_builds.append(raw_display_name)
parsed = parse_host_xml(xml_path)
host_results: Dict[str, HostResult] = {}
if parsed:
host, result = parsed
result.kernel = inventory.get(host, "unknown")
host_results[host] = result
completed_hosts.append(host)
check_ts = extract_check_xml_timestamp_from_file(xml_path)
summary = completed_summaries[current_summary_index] if current_summary_index < len(completed_summaries) else None
if summary and (not host_results or all(result.host == "check-xml-files" for result in host_results.values())):
host_results = summary["host_results"]
completed_hosts.extend([host for host in host_results if host not in completed_hosts])
if not host_results and check_ts:
latest_host = collect_latest_host_result(
reporter_root=reporter_root,
expected_hosts=expected_hosts,
kernels=inventory,
run_started_at=started_at,
run_ended_at=check_ts + timedelta(seconds=5),
)
if latest_host:
host, result = latest_host
host_results = {host: result}
if host not in completed_hosts:
completed_hosts.append(host)
if summary:
current_summary_index += 1
state = "RUNNING"
display_name = corrected_categorized_display_name(raw_display_name, list(host_results))
display_group = None
display_group_match = re.search(r"-(amazonlinux|centos|ubuntu|rocky|redhat|oracle|fedora|debian|suse|windows)-batch", display_name)
if display_group_match:
display_group = display_group_match.group(1)
if cancelled:
state = "CANCELLED"
elif check_ts and not host_results and parent_active:
state = "RUNNING"
elif check_ts and display_group and current_active_group and display_group != current_active_group:
state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED"
elif check_ts or raw_display_name != current_subrun_build or not parent_active:
state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED"
notes = [f"Categorized sub-run discovered from reporter file `{xml_path.name}`."]
if check_ts:
notes.append("Final `check-xml-files.ts` validation passed.")
if summary and host_results:
notes.append("Host result details were derived from the parent categorized run log summary.")
elif host_results and check_ts:
notes.append("Host result details were derived from the latest matching host reporter artifact written before grouped finalization.")
elif check_ts and not host_results and parent_active:
notes.append("Grouped reporter XML arrived before the parent run log exposed the final host summary; waiting to post until host details are available.")
if display_name != raw_display_name:
notes.append(f"Child build id was reported as `{raw_display_name}`, but the actual grouped run was inferred from host execution as `{display_name}`.")
if cancelled:
notes.append("Cancellation marker detected.")
end_ts = check_ts or next((result.timestamp for result in host_results.values() if result.timestamp), xml_mtime)
start_ts = next((result.timestamp for result in host_results.values() if result.timestamp), None)
if not start_ts and summary and end_ts:
duration_seconds = next((result.duration_seconds for result in host_results.values() if result.duration_seconds is not None), None)
if duration_seconds is not None:
candidate_start = end_ts.timestamp() - duration_seconds
start_ts = datetime.fromtimestamp(candidate_start, tz=timezone.utc)
if not start_ts:
start_ts = min(xml_mtime, end_ts) if end_ts else xml_mtime
if end_ts and start_ts > end_ts:
start_ts = end_ts
merge_categorized_state(
merged_subrun_states,
display_name=display_name,
state=state,
host_results=host_results,
start_ts=start_ts,
end_ts=end_ts,
currents_url=summary["currents_url"] if summary else None,
notes=notes,
)
if current_subrun_build and current_subrun_build not in discovered_builds:
current_host = find_current_running_host(log_text, completed_hosts)
if not current_host or current_host in completed_hosts:
current_host = infer_host_from_subrun_build(current_subrun_build, expected_hosts, completed_hosts)
host_results: Dict[str, HostResult] = {}
if current_host:
host_results[current_host] = HostResult(
host=current_host,
kernel=inventory.get(current_host, "unknown"),
status="RUN",
detail="in progress",
)
display_name = corrected_categorized_display_name(current_subrun_build, [current_host] if current_host else [])
notes = ["Active categorized sub-run inferred from live `--ci-build-id` process state."]
if display_name != current_subrun_build:
notes.append(f"Child build id is `{current_subrun_build}`, but the actual grouped run was inferred from host execution as `{display_name}`.")
if cancelled:
notes.append("Cancellation marker detected.")
merge_categorized_state(
merged_subrun_states,
display_name=display_name,
state="CANCELLED" if cancelled else "RUNNING",
host_results=host_results,
start_ts=started_at,
end_ts=None,
currents_url=None,
notes=notes,
)
return list(merged_subrun_states.values())
def determine_state(
build_name: str,
build_dir: Path,
run_log: Path,
reporter_root: Path,
inventory: Dict[str, str],
metadata: Dict[str, object],
started_at: datetime,
process_gone_since: Optional[datetime],
process_exit_grace_seconds: int,
) -> Tuple[str, List[Dict[str, object]], Dict[str, HostResult], Optional[datetime], Optional[datetime], Optional[str], List[str]]:
cancelled_marker = build_dir / "cancelled.marker"
log_text = read_text(run_log)
active = related_process_active(build_name) if metadata.get("categorized") else process_active(build_name)
cancelled = cancelled_marker.exists()
notes: List[str] = []
subrun_states: List[Dict[str, object]] = []
parent_host_results: Dict[str, HostResult] = {}
if metadata.get("categorized"):
subrun_states = discover_categorized_subruns(
build_name=build_name,
reporter_root=reporter_root,
inventory=inventory,
log_text=log_text,
started_at=started_at,
parent_active=active,
cancelled=cancelled,
)
for subrun in subrun_states:
for host, result in subrun["host_results"].items():
parent_host_results[host] = result
else:
subruns = split_log_segments(log_text, build_name, bool(metadata.get("categorized")), started_at)
for index, subrun in enumerate(subruns):
next_started_at = subruns[index + 1].started_at if index + 1 < len(subruns) else None
state, host_results, start_ts, end_ts, currents_url, subrun_notes = evaluate_subrun(
subrun=subrun,
reporter_root=reporter_root,
inventory=inventory,
end_boundary=next_started_at,
parent_active=active,
cancelled=cancelled,
)
for host, result in host_results.items():
parent_host_results[host] = result
subrun_states.append(
{
"key": subrun.key,
"display_name": subrun.display_name,
"state": state,
"host_results": host_results,
"start_ts": start_ts,
"end_ts": end_ts,
"currents_url": currents_url,
"notes": subrun_notes,
}
)
# Non-categorized runs often only write a final check-xml reporter XML.
# Fall back to the parent "Cloud Run Finished" summary when host XML is absent.
if not parent_host_results:
latest_summary = extract_latest_run_summary(log_text, inventory)
if latest_summary:
summary_results = latest_summary["host_results"]
for host, result in summary_results.items():
parent_host_results[host] = result
if subrun_states:
subrun = subrun_states[-1]
subrun["host_results"] = summary_results
if not subrun.get("currents_url") and latest_summary.get("currents_url"):
subrun["currents_url"] = latest_summary["currents_url"]
notes_list = list(subrun.get("notes", []))
fallback_note = "Host result details were derived from the parent run log summary."
if fallback_note not in notes_list:
notes_list.append(fallback_note)
subrun["notes"] = notes_list
if subrun["state"] in {"UNKNOWN", "TERMINATED"}:
subrun["state"] = "FAILED" if any(result.failures for result in summary_results.values()) else "COMPLETED"
parent_start_candidates = [subrun["start_ts"] for subrun in subrun_states if subrun["start_ts"]]
parent_end_candidates = [subrun["end_ts"] for subrun in subrun_states if subrun["end_ts"]]
start_ts = min(parent_start_candidates) if parent_start_candidates else started_at
end_ts = max(parent_end_candidates) if parent_end_candidates else find_check_xml_end(reporter_root, started_at)
currents_url = extract_currents_url(log_text)
if cancelled:
notes.append("Cancellation marker detected.")
return "CANCELLED", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
if active:
elapsed = (now_utc() - started_at).total_seconds()
if elapsed > args.max_watch_seconds:
notes.append("Watcher exceeded max watch duration while the run still appears active.")
return "HUNG", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
return "RUNNING", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
if metadata.get("categorized") and process_gone_since and (now_utc() - process_gone_since).total_seconds() < process_exit_grace_seconds:
notes.append("Categorized parent runner has not been gone long enough to treat the request as finished.")
return "RUNNING", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
terminal_subruns = [subrun for subrun in subrun_states if subrun["state"] in {"COMPLETED", "FAILED"}]
if terminal_subruns:
state = "FAILED" if any(result.failures for result in parent_host_results.values()) else "COMPLETED"
notes.append("Run finished and one or more sub-run result artifacts were detected.")
if end_ts:
notes.append("Final reporting artifacts were detected.")
return state, subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
if process_gone_since and (now_utc() - process_gone_since).total_seconds() >= process_exit_grace_seconds:
notes.append("Run process exited without a clean completion signal.")
return "TERMINATED", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
return "RUNNING", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
if __name__ == "__main__":
args = parse_args()
build_name = args.build_name
run_log = Path(args.run_log)
reporter_root = Path(args.reporter_root)
inventory_file = Path(args.inventory_file)
state_root = Path(args.state_dir)
build_dir = state_root / build_name
ensure_dir(build_dir)
state_file = build_dir / "state.json"
posted_marker = build_dir / "posted.marker"
inventory = load_inventory(inventory_file)
metadata = infer_metadata()
state = load_state(state_file)
log_text_for_start = read_text(run_log)
default_started_at = first_log_timestamp(log_text_for_start) or (datetime.fromtimestamp(run_log.stat().st_mtime, tz=timezone.utc) if run_log.exists() else now_utc())
started_at = parse_xml_timestamp(state.get("started_at")) or default_started_at
state.setdefault("build_name", build_name)
state.setdefault("started_at", started_at.isoformat())
write_state(state_file, state)
process_gone_since: Optional[datetime] = None
while True:
active = process_active(build_name)
if not active and process_gone_since is None:
process_gone_since = now_utc()
if active:
process_gone_since = None
run_state, subrun_states, host_results, start_ts, end_ts, currents_url, notes = determine_state(
build_name=build_name,
build_dir=build_dir,
run_log=run_log,
reporter_root=reporter_root,
inventory=inventory,
metadata=metadata,
started_at=started_at,
process_gone_since=process_gone_since,
process_exit_grace_seconds=args.process_exit_grace_seconds,
)
state["last_state"] = run_state
state["last_seen_at"] = now_utc().isoformat()
state["host_results"] = {
host: {
"status": result.status,
"detail": result.detail,
"kernel": result.kernel,
"tests": result.tests,
"failures": result.failures,
}
for host, result in host_results.items()
}
state["subruns"] = {
subrun["display_name"]: {
"state": subrun["state"],
"hosts": sorted(subrun["host_results"].keys()),
"start_ts": subrun["start_ts"].isoformat() if subrun["start_ts"] else None,
"end_ts": subrun["end_ts"].isoformat() if subrun["end_ts"] else None,
"currents_url": subrun["currents_url"],
"notes": subrun["notes"],
}
for subrun in subrun_states
}
write_state(state_file, state)
for subrun in subrun_states:
subrun_dir = build_dir / "subruns" / subrun["key"]
ensure_dir(subrun_dir)
subrun_state_file = subrun_dir / "state.json"
subrun_posted_marker = subrun_dir / "posted.marker"
subrun_state = {
"display_name": subrun["display_name"],
"last_state": subrun["state"],
"last_seen_at": now_utc().isoformat(),
"host_results": {
host: {
"status": result.status,
"detail": result.detail,
"kernel": result.kernel,
"tests": result.tests,
"failures": result.failures,
}
for host, result in subrun["host_results"].items()
},
"notes": subrun["notes"],
"currents_url": subrun["currents_url"],
"started_at": subrun["start_ts"].isoformat() if subrun["start_ts"] else None,
"ended_at": subrun["end_ts"].isoformat() if subrun["end_ts"] else None,
}
if metadata.get("categorized") and subrun["state"] in {"COMPLETED", "FAILED"} and not subrun_posted_marker.exists():
status_text = build_status_markdown(
build_name=subrun["display_name"],
metadata=metadata,
host_results=dict(sorted(subrun["host_results"].items())),
run_state=subrun["state"],
currents_url=subrun["currents_url"],
start_ts=subrun["start_ts"],
end_ts=subrun["end_ts"],
notes=subrun["notes"],
)
print(status_text)
response = post_to_mattermost(status_text)
if response != "ok":
raise SystemExit(f"Mattermost webhook did not return ok for {subrun['display_name']}: {response!r}")
subrun_posted_marker.write_text("ok\n", encoding="utf-8")
subrun_state["mattermost_posted"] = True
subrun_state["mattermost_response"] = response
print(f"[watcher] Mattermost post confirmed for {subrun['display_name']}.")
write_state(subrun_state_file, subrun_state)
if run_state == "RUNNING":
print(f"[watcher] {build_name}: RUNNING")
time.sleep(args.poll_interval)
continue
status_text = build_status_markdown(
build_name=build_name,
metadata=metadata,
host_results=dict(sorted(host_results.items())),
run_state=run_state,
currents_url=currents_url,
start_ts=start_ts,
end_ts=end_ts,
notes=notes,
)
print(status_text)
if not metadata.get("categorized") and run_state in {"COMPLETED", "FAILED"} and not posted_marker.exists():
response = post_to_mattermost(status_text)
if response != "ok":
raise SystemExit(f"Mattermost webhook did not return ok: {response!r}")
posted_marker.write_text("ok\n", encoding="utf-8")
state["mattermost_posted"] = True
state["mattermost_response"] = response
write_state(state_file, state)
print(f"[watcher] Mattermost post confirmed for {build_name}.")
state["closed_at"] = now_utc().isoformat()
write_state(state_file, state)
sys.exit(0)