Merge categorized watcher subruns by inferred group identity

- update the watcher to merge consecutive categorized child build ids into one logical grouped subrun when they resolve to the same real host-based group
- prevent one real grouped run from being fragmented across multiple watcher subruns just because the raw ci-build-id label changes between hosts
- keep grouped Mattermost posting aligned with the inferred group identity instead of the unstable raw child build id
This commit is contained in:
2026-03-26 16:51:10 -04:00
parent 2eed645a85
commit 3b9b9eef0f

View File

@@ -12,7 +12,7 @@ import time
import urllib.request import urllib.request
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timedelta, timezone
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
@@ -608,6 +608,67 @@ def corrected_categorized_display_name(raw_display_name: str, hosts: List[str])
return re.sub(r"-(amazonlinux|centos|ubuntu|rocky|redhat|oracle|fedora|debian|suse|windows|w2k)-batch", f"-{group}-batch", raw_display_name) return re.sub(r"-(amazonlinux|centos|ubuntu|rocky|redhat|oracle|fedora|debian|suse|windows|w2k)-batch", f"-{group}-batch", raw_display_name)
def logical_categorized_key(display_name: str) -> str:
match = re.match(
r"^(.*?-(amazonlinux|centos|ubuntu|rocky|redhat|oracle|fedora|debian|suse|windows|w2k))-batch\d+_\d+.*$",
display_name,
)
if match:
return sanitize_key(match.group(1))
return sanitize_key(display_name)
def merge_categorized_state(
merged: Dict[str, Dict[str, object]],
display_name: str,
state: str,
host_results: Dict[str, HostResult],
start_ts: Optional[datetime],
end_ts: Optional[datetime],
currents_url: Optional[str],
notes: List[str],
) -> None:
key = logical_categorized_key(display_name)
existing = merged.get(key)
if existing is None:
merged[key] = {
"key": key,
"display_name": display_name,
"state": state,
"host_results": dict(host_results),
"start_ts": start_ts,
"end_ts": end_ts,
"currents_url": currents_url,
"notes": list(notes),
}
return
existing["host_results"].update(host_results)
if start_ts and (existing["start_ts"] is None or start_ts < existing["start_ts"]):
existing["start_ts"] = start_ts
if end_ts and (existing["end_ts"] is None or end_ts > existing["end_ts"]):
existing["end_ts"] = end_ts
if currents_url and not existing["currents_url"]:
existing["currents_url"] = currents_url
for note in notes:
if note not in existing["notes"]:
existing["notes"].append(note)
existing_state = existing["state"]
if state == "CANCELLED" or existing_state == "CANCELLED":
existing["state"] = "CANCELLED"
elif state == "RUNNING" or existing_state == "RUNNING":
existing["state"] = "RUNNING"
elif state == "FAILED" or existing_state == "FAILED":
existing["state"] = "FAILED"
elif state == "COMPLETED" or existing_state == "COMPLETED":
existing["state"] = "COMPLETED"
else:
existing["state"] = state
def extract_segment_build_name(segment_text: str, parent_build_name: str) -> Optional[str]: def extract_segment_build_name(segment_text: str, parent_build_name: str) -> Optional[str]:
patterns = [ patterns = [
rf"({re.escape(parent_build_name)}-[A-Za-z0-9_.-]*batch\d+_\d+)", rf"({re.escape(parent_build_name)}-[A-Za-z0-9_.-]*batch\d+_\d+)",
@@ -746,7 +807,7 @@ def discover_categorized_subruns(
current_subrun_build = extract_active_subrun_build(build_name) current_subrun_build = extract_active_subrun_build(build_name)
expected_hosts = extract_expected_hosts(log_text) expected_hosts = extract_expected_hosts(log_text)
completed_summaries = extract_completed_subrun_summaries(log_text, inventory) completed_summaries = extract_completed_subrun_summaries(log_text, inventory)
subrun_states: List[Dict[str, object]] = [] merged_subrun_states: Dict[str, Dict[str, object]] = {}
completed_hosts: List[str] = [] completed_hosts: List[str] = []
discovered_builds: List[str] = [] discovered_builds: List[str] = []
current_summary_index = 0 current_summary_index = 0
@@ -818,17 +879,15 @@ def discover_categorized_subruns(
start_ts = min(xml_mtime, end_ts) if end_ts else xml_mtime start_ts = min(xml_mtime, end_ts) if end_ts else xml_mtime
if end_ts and start_ts > end_ts: if end_ts and start_ts > end_ts:
start_ts = end_ts start_ts = end_ts
subrun_states.append( merge_categorized_state(
{ merged_subrun_states,
"key": sanitize_key(display_name), display_name=display_name,
"display_name": display_name, state=state,
"state": state, host_results=host_results,
"host_results": host_results, start_ts=start_ts,
"start_ts": start_ts, end_ts=end_ts,
"end_ts": end_ts, currents_url=summary["currents_url"] if summary else None,
"currents_url": summary["currents_url"] if summary else None, notes=notes,
"notes": notes,
}
) )
if current_subrun_build and current_subrun_build not in discovered_builds: if current_subrun_build and current_subrun_build not in discovered_builds:
@@ -849,20 +908,18 @@ def discover_categorized_subruns(
notes.append(f"Child build id is `{current_subrun_build}`, but the actual grouped run was inferred from host execution as `{display_name}`.") notes.append(f"Child build id is `{current_subrun_build}`, but the actual grouped run was inferred from host execution as `{display_name}`.")
if cancelled: if cancelled:
notes.append("Cancellation marker detected.") notes.append("Cancellation marker detected.")
subrun_states.append( merge_categorized_state(
{ merged_subrun_states,
"key": sanitize_key(current_subrun_build), display_name=display_name,
"display_name": display_name, state="CANCELLED" if cancelled else "RUNNING",
"state": "CANCELLED" if cancelled else "RUNNING", host_results=host_results,
"host_results": host_results, start_ts=started_at,
"start_ts": started_at, end_ts=None,
"end_ts": None, currents_url=None,
"currents_url": None, notes=notes,
"notes": notes,
}
) )
return subrun_states return list(merged_subrun_states.values())
def determine_state( def determine_state(