Update ATVM watcher for categorized sub-run posting

- update the watcher design and automation guide to treat --categorize as sequential ATVM sub-runs rather than one parent run with internal phases
- document that categorized runs should send one Mattermost status per completed grouped sub-run instead of one parent-only final post
- add a --categorize option to the watcher start helper so categorized mode is explicit in watcher startup
- update the watcher implementation to track categorized sub-runs separately, write per-subrun state, and post each completed grouped run once
This commit is contained in:
2026-03-26 11:00:39 -04:00
parent 68cd428733
commit d60b8b9b18
6 changed files with 399 additions and 89 deletions

View File

@@ -40,6 +40,17 @@ class HostResult:
timestamp: Optional[datetime] = None
@dataclass
class SubRun:
key: str
display_name: str
started_at: datetime
expected_hosts: List[str]
completed: bool
currents_url: Optional[str]
notes: List[str]
def now_utc() -> datetime:
return datetime.now(timezone.utc)
@@ -152,6 +163,13 @@ def parse_xml_timestamp(raw: Optional[str]) -> Optional[datetime]:
return None
def parse_log_timestamp(raw: str) -> Optional[datetime]:
try:
return datetime.strptime(raw, "%Y-%m-%d %H:%M:%S,%f").replace(tzinfo=timezone.utc)
except ValueError:
return None
def parse_host_xml(xml_path: Path) -> Optional[Tuple[str, HostResult]]:
try:
tree = ET.parse(xml_path)
@@ -194,6 +212,7 @@ def collect_host_results(
expected_hosts: List[str],
kernels: Dict[str, str],
run_started_at: datetime,
run_ended_at: Optional[datetime] = None,
) -> Dict[str, HostResult]:
xml_dir = reporter_root / "xml"
results: Dict[str, HostResult] = {}
@@ -203,6 +222,8 @@ def collect_host_results(
xml_mtime = datetime.fromtimestamp(xml_path.stat().st_mtime, tz=timezone.utc)
if xml_mtime < run_started_at:
continue
if run_ended_at and xml_mtime >= run_ended_at:
continue
parsed = parse_host_xml(xml_path)
if not parsed:
continue
@@ -214,21 +235,46 @@ def collect_host_results(
return results
def find_current_running_host(log_text: str, completed_hosts: List[str]) -> Optional[str]:
matches = re.findall(r"Running:\s+(?:cypress/cmcRegressionTest/)?(atvm[^/\s]+)\.ts", log_text)
for host in reversed(matches):
if host not in completed_hosts:
return host
return None
def find_check_xml_end(
reporter_root: Path,
started_at: datetime,
ended_at: Optional[datetime] = None,
) -> Optional[datetime]:
xml_dir = reporter_root / "xml"
if not xml_dir.exists():
return None
latest: Optional[datetime] = None
for xml_path in sorted(xml_dir.glob("test-result-*.xml"), key=lambda p: p.stat().st_mtime):
xml_mtime = datetime.fromtimestamp(xml_path.stat().st_mtime, tz=timezone.utc)
if xml_mtime < started_at:
continue
if ended_at and xml_mtime >= ended_at:
continue
text = read_text(xml_path)
if "check-xml-files.ts" not in text:
continue
try:
tree = ET.parse(xml_path)
root = tree.getroot()
suite = root.find("testsuite")
if suite is None:
continue
ts = parse_xml_timestamp(suite.attrib.get("timestamp"))
if ts:
latest = ts
except ET.ParseError:
continue
return latest
def infer_metadata() -> Dict[str, str]:
def infer_metadata() -> Dict[str, object]:
return {
"template": os.environ.get("ATVM_WATCHER_TEMPLATE", "unknown"),
"config_family": os.environ.get("ATVM_WATCHER_CONFIG_FAMILY", "unknown"),
"migration_style": os.environ.get("ATVM_WATCHER_MIGRATION_STYLE", "ATVM automation validation"),
"integration_plugin": os.environ.get("ATVM_WATCHER_INTEGRATION_PLUGIN", "unknown"),
"scope_description": os.environ.get("ATVM_WATCHER_SCOPE_DESCRIPTION", "requested ATVM run scope"),
"categorized": os.environ.get("ATVM_WATCHER_CATEGORIZED", "false").lower() == "true",
}
@@ -253,7 +299,7 @@ def format_timestamp_local(ts: Optional[datetime]) -> str:
def build_status_markdown(
build_name: str,
metadata: Dict[str, str],
metadata: Dict[str, object],
host_results: Dict[str, HostResult],
run_state: str,
currents_url: Optional[str],
@@ -348,80 +394,225 @@ def post_to_mattermost(text: str) -> str:
return response.read().decode().strip()
def sanitize_key(raw: str) -> str:
return re.sub(r"[^A-Za-z0-9_.-]+", "-", raw).strip("-") or "subrun"
def infer_group_label(hosts: List[str], index: int) -> str:
if not hosts:
return f"group{index}"
labels: List[str] = []
for host in hosts:
short = host.split("-", 1)[-1]
if short.startswith("w2k"):
label = "windows"
else:
label = re.sub(r"\d.*$", "", short) or short
if label not in labels:
labels.append(label)
return "-".join(labels) if labels else f"group{index}"
def extract_segment_build_name(segment_text: str, parent_build_name: str) -> Optional[str]:
patterns = [
rf"({re.escape(parent_build_name)}-[A-Za-z0-9_.-]*batch\d+_\d+)",
r"([A-Za-z0-9_.-]+-batch\d+_\d+)",
]
for pattern in patterns:
match = re.search(pattern, segment_text)
if match:
return match.group(1)
return None
def split_log_segments(log_text: str, parent_build_name: str, categorized: bool, default_started_at: datetime) -> List[SubRun]:
if not categorized:
return [
SubRun(
key=sanitize_key(parent_build_name),
display_name=parent_build_name,
started_at=default_started_at,
expected_hosts=extract_expected_hosts(log_text),
completed=False,
currents_url=extract_currents_url(log_text),
notes=[],
)
]
segment_starts: List[Tuple[int, Optional[datetime]]] = []
for match in re.finditer(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - INFO - Extracted specPattern:", log_text, re.M):
segment_starts.append((match.start(), parse_log_timestamp(match.group(1))))
if not segment_starts:
return [
SubRun(
key=sanitize_key(parent_build_name),
display_name=parent_build_name,
started_at=default_started_at,
expected_hosts=extract_expected_hosts(log_text),
completed=False,
currents_url=extract_currents_url(log_text),
notes=["Categorized mode was requested but no sub-run segment has appeared in the log yet."],
)
]
segments: List[SubRun] = []
for index, (start_offset, start_ts) in enumerate(segment_starts, start=1):
end_offset = segment_starts[index][0] if index < len(segment_starts) else len(log_text)
segment_text = log_text[start_offset:end_offset]
expected_hosts = extract_expected_hosts(segment_text)
display_name = extract_segment_build_name(segment_text, parent_build_name)
if not display_name:
display_name = f"{parent_build_name}-{infer_group_label(expected_hosts, index)}"
segments.append(
SubRun(
key=sanitize_key(display_name),
display_name=display_name,
started_at=start_ts or default_started_at,
expected_hosts=expected_hosts,
completed=index < len(segment_starts),
currents_url=extract_currents_url(segment_text),
notes=[f"Categorized sub-run {index} of {len(segment_starts)}."],
)
)
return segments
def evaluate_subrun(
subrun: SubRun,
reporter_root: Path,
inventory: Dict[str, str],
end_boundary: Optional[datetime],
parent_active: bool,
cancelled: bool,
) -> Tuple[str, Dict[str, HostResult], Optional[datetime], Optional[datetime], Optional[str], List[str]]:
notes = list(subrun.notes)
host_results = collect_host_results(
reporter_root=reporter_root,
expected_hosts=subrun.expected_hosts,
kernels=inventory,
run_started_at=subrun.started_at,
run_ended_at=end_boundary,
)
check_end = find_check_xml_end(reporter_root, subrun.started_at, end_boundary)
start_candidates = [result.timestamp for result in host_results.values() if result.timestamp]
end_candidates = [result.timestamp for result in host_results.values() if result.timestamp]
if check_end:
end_candidates.append(check_end)
start_ts = min(start_candidates) if start_candidates else subrun.started_at
end_ts = max(end_candidates) if end_candidates else None
if cancelled:
notes.append("Cancellation marker detected.")
return "CANCELLED", host_results, start_ts, end_ts, subrun.currents_url, notes
if subrun.completed:
if not host_results:
notes.append("This categorized sub-run ended but no host results were detected.")
return "UNKNOWN", host_results, start_ts, end_ts, subrun.currents_url, notes
notes.append("Categorized sub-run completed and the next grouped run was launched.")
if check_end:
notes.append("Final `check-xml-files.ts` validation passed.")
state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED"
return state, host_results, start_ts, end_ts, subrun.currents_url, notes
if parent_active:
current_host = next((host for host in subrun.expected_hosts if host not in host_results), None)
if current_host and current_host not in host_results:
host_results[current_host] = HostResult(
host=current_host,
kernel=inventory.get(current_host, "unknown"),
status="RUN",
detail="in progress",
)
return "RUNNING", host_results, start_ts, end_ts, subrun.currents_url, notes
if host_results:
notes.append("Categorized sub-run completed after the parent runner exited.")
if check_end:
notes.append("Final `check-xml-files.ts` validation passed.")
state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED"
return state, host_results, start_ts, end_ts, subrun.currents_url, notes
notes.append("Parent run exited before this categorized sub-run produced host results.")
return "TERMINATED", host_results, start_ts, end_ts, subrun.currents_url, notes
def determine_state(
build_name: str,
build_dir: Path,
run_log: Path,
reporter_root: Path,
inventory: Dict[str, str],
metadata: Dict[str, object],
started_at: datetime,
process_gone_since: Optional[datetime],
process_exit_grace_seconds: int,
) -> Tuple[str, Dict[str, HostResult], str, Optional[datetime], Optional[datetime], Optional[str], List[str]]:
) -> Tuple[str, List[Dict[str, object]], Dict[str, HostResult], Optional[datetime], Optional[datetime], Optional[str], List[str]]:
cancelled_marker = build_dir / "cancelled.marker"
log_text = read_text(run_log)
expected_hosts = extract_expected_hosts(log_text)
host_results = collect_host_results(reporter_root, expected_hosts, inventory, started_at)
active = process_active(build_name)
currents_url = extract_currents_url(log_text)
cancelled = cancelled_marker.exists()
notes: List[str] = []
subrun_states: List[Dict[str, object]] = []
parent_host_results: Dict[str, HostResult] = {}
current_host = find_current_running_host(log_text, list(host_results.keys()))
if current_host and current_host not in host_results:
host_results[current_host] = HostResult(
host=current_host,
kernel=inventory.get(current_host, "unknown"),
status="RUN",
detail="in progress",
subruns = split_log_segments(log_text, build_name, bool(metadata.get("categorized")), started_at)
for index, subrun in enumerate(subruns):
next_started_at = subruns[index + 1].started_at if index + 1 < len(subruns) else None
state, host_results, start_ts, end_ts, currents_url, subrun_notes = evaluate_subrun(
subrun=subrun,
reporter_root=reporter_root,
inventory=inventory,
end_boundary=next_started_at,
parent_active=active,
cancelled=cancelled,
)
for host, result in host_results.items():
parent_host_results[host] = result
subrun_states.append(
{
"key": subrun.key,
"display_name": subrun.display_name,
"state": state,
"host_results": host_results,
"start_ts": start_ts,
"end_ts": end_ts,
"currents_url": currents_url,
"notes": subrun_notes,
}
)
start_candidates = [result.timestamp for result in host_results.values() if result.timestamp]
end_candidates = [result.timestamp for result in host_results.values() if result.timestamp]
check_xml = reporter_root / "xml"
for xml_path in sorted(check_xml.glob("test-result-*.xml"), key=lambda p: p.stat().st_mtime, reverse=True):
xml_mtime = datetime.fromtimestamp(xml_path.stat().st_mtime, tz=timezone.utc)
if xml_mtime < started_at:
continue
text = read_text(xml_path)
if "check-xml-files.ts" in text:
try:
tree = ET.parse(xml_path)
root = tree.getroot()
suite = root.find("testsuite")
if suite is not None:
ts = parse_xml_timestamp(suite.attrib.get("timestamp"))
if ts:
end_candidates.append(ts)
except ET.ParseError:
pass
break
parent_start_candidates = [subrun["start_ts"] for subrun in subrun_states if subrun["start_ts"]]
parent_end_candidates = [subrun["end_ts"] for subrun in subrun_states if subrun["end_ts"]]
start_ts = min(parent_start_candidates) if parent_start_candidates else started_at
end_ts = max(parent_end_candidates) if parent_end_candidates else find_check_xml_end(reporter_root, started_at)
currents_url = extract_currents_url(log_text)
start_ts = min(start_candidates) if start_candidates else started_at
end_ts = max(end_candidates) if end_candidates else None
if cancelled_marker.exists():
if cancelled:
notes.append("Cancellation marker detected.")
return "CANCELLED", host_results, log_text, start_ts, end_ts, currents_url, notes
return "CANCELLED", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
if active:
elapsed = (now_utc() - started_at).total_seconds()
if elapsed > args.max_watch_seconds:
notes.append("Watcher exceeded max watch duration while the run still appears active.")
return "HUNG", host_results, log_text, start_ts, end_ts, currents_url, notes
return "RUNNING", host_results, log_text, start_ts, end_ts, currents_url, notes
return "HUNG", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
return "RUNNING", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
if "Cloud Run Finished" in log_text or currents_url:
state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED"
notes.append("Run finished and final reporting artifacts were detected.")
if any("check-xml-files.ts" in line for line in log_text.splitlines()):
notes.append("Final `check-xml-files.ts` validation passed.")
return state, host_results, log_text, start_ts, end_ts, currents_url, notes
terminal_subruns = [subrun for subrun in subrun_states if subrun["state"] in {"COMPLETED", "FAILED"}]
if terminal_subruns:
state = "FAILED" if any(result.failures for result in parent_host_results.values()) else "COMPLETED"
notes.append("Run finished and one or more sub-run result artifacts were detected.")
if end_ts:
notes.append("Final reporting artifacts were detected.")
return state, subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
if process_gone_since and (now_utc() - process_gone_since).total_seconds() >= process_exit_grace_seconds:
notes.append("Run process exited without a clean completion signal.")
return "TERMINATED", host_results, log_text, start_ts, end_ts, currents_url, notes
return "TERMINATED", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
return "RUNNING", host_results, log_text, start_ts, end_ts, currents_url, notes
return "RUNNING", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
if __name__ == "__main__":
@@ -455,12 +646,13 @@ if __name__ == "__main__":
if active:
process_gone_since = None
run_state, host_results, log_text, start_ts, end_ts, currents_url, notes = determine_state(
run_state, subrun_states, host_results, start_ts, end_ts, currents_url, notes = determine_state(
build_name=build_name,
build_dir=build_dir,
run_log=run_log,
reporter_root=reporter_root,
inventory=inventory,
metadata=metadata,
started_at=started_at,
process_gone_since=process_gone_since,
process_exit_grace_seconds=args.process_exit_grace_seconds,
@@ -478,8 +670,64 @@ if __name__ == "__main__":
}
for host, result in host_results.items()
}
state["subruns"] = {
subrun["display_name"]: {
"state": subrun["state"],
"hosts": sorted(subrun["host_results"].keys()),
"start_ts": subrun["start_ts"].isoformat() if subrun["start_ts"] else None,
"end_ts": subrun["end_ts"].isoformat() if subrun["end_ts"] else None,
"currents_url": subrun["currents_url"],
"notes": subrun["notes"],
}
for subrun in subrun_states
}
write_state(state_file, state)
for subrun in subrun_states:
subrun_dir = build_dir / "subruns" / subrun["key"]
ensure_dir(subrun_dir)
subrun_state_file = subrun_dir / "state.json"
subrun_posted_marker = subrun_dir / "posted.marker"
subrun_state = {
"display_name": subrun["display_name"],
"last_state": subrun["state"],
"last_seen_at": now_utc().isoformat(),
"host_results": {
host: {
"status": result.status,
"detail": result.detail,
"kernel": result.kernel,
"tests": result.tests,
"failures": result.failures,
}
for host, result in subrun["host_results"].items()
},
"notes": subrun["notes"],
"currents_url": subrun["currents_url"],
"started_at": subrun["start_ts"].isoformat() if subrun["start_ts"] else None,
"ended_at": subrun["end_ts"].isoformat() if subrun["end_ts"] else None,
}
if subrun["state"] in {"COMPLETED", "FAILED"} and not subrun_posted_marker.exists():
status_text = build_status_markdown(
build_name=subrun["display_name"],
metadata=metadata,
host_results=dict(sorted(subrun["host_results"].items())),
run_state=subrun["state"],
currents_url=subrun["currents_url"],
start_ts=subrun["start_ts"],
end_ts=subrun["end_ts"],
notes=subrun["notes"],
)
print(status_text)
response = post_to_mattermost(status_text)
if response != "ok":
raise SystemExit(f"Mattermost webhook did not return ok for {subrun['display_name']}: {response!r}")
subrun_posted_marker.write_text("ok\n", encoding="utf-8")
subrun_state["mattermost_posted"] = True
subrun_state["mattermost_response"] = response
print(f"[watcher] Mattermost post confirmed for {subrun['display_name']}.")
write_state(subrun_state_file, subrun_state)
if run_state == "RUNNING":
print(f"[watcher] {build_name}: RUNNING")
time.sleep(args.poll_interval)
@@ -497,7 +745,7 @@ if __name__ == "__main__":
)
print(status_text)
if run_state in {"COMPLETED", "FAILED"} and not posted_marker.exists():
if not metadata.get("categorized") and run_state in {"COMPLETED", "FAILED"} and not posted_marker.exists():
response = post_to_mattermost(status_text)
if response != "ok":
raise SystemExit(f"Mattermost webhook did not return ok: {response!r}")