diff --git a/atvm/watcher-service/atvm_run_watcher.py b/atvm/watcher-service/atvm_run_watcher.py index cc75130..1ff6c45 100644 --- a/atvm/watcher-service/atvm_run_watcher.py +++ b/atvm/watcher-service/atvm_run_watcher.py @@ -12,7 +12,7 @@ import time import urllib.request import xml.etree.ElementTree as ET from dataclasses import dataclass -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple @@ -608,6 +608,67 @@ def corrected_categorized_display_name(raw_display_name: str, hosts: List[str]) return re.sub(r"-(amazonlinux|centos|ubuntu|rocky|redhat|oracle|fedora|debian|suse|windows|w2k)-batch", f"-{group}-batch", raw_display_name) +def logical_categorized_key(display_name: str) -> str: + match = re.match( + r"^(.*?-(amazonlinux|centos|ubuntu|rocky|redhat|oracle|fedora|debian|suse|windows|w2k))-batch\d+_\d+.*$", + display_name, + ) + if match: + return sanitize_key(match.group(1)) + return sanitize_key(display_name) + + +def merge_categorized_state( + merged: Dict[str, Dict[str, object]], + display_name: str, + state: str, + host_results: Dict[str, HostResult], + start_ts: Optional[datetime], + end_ts: Optional[datetime], + currents_url: Optional[str], + notes: List[str], +) -> None: + key = logical_categorized_key(display_name) + existing = merged.get(key) + if existing is None: + merged[key] = { + "key": key, + "display_name": display_name, + "state": state, + "host_results": dict(host_results), + "start_ts": start_ts, + "end_ts": end_ts, + "currents_url": currents_url, + "notes": list(notes), + } + return + + existing["host_results"].update(host_results) + + if start_ts and (existing["start_ts"] is None or start_ts < existing["start_ts"]): + existing["start_ts"] = start_ts + if end_ts and (existing["end_ts"] is None or end_ts > existing["end_ts"]): + existing["end_ts"] = end_ts + if currents_url and not existing["currents_url"]: + existing["currents_url"] = currents_url + + for note in notes: + if note not in existing["notes"]: + existing["notes"].append(note) + + existing_state = existing["state"] + if state == "CANCELLED" or existing_state == "CANCELLED": + existing["state"] = "CANCELLED" + elif state == "RUNNING" or existing_state == "RUNNING": + existing["state"] = "RUNNING" + elif state == "FAILED" or existing_state == "FAILED": + existing["state"] = "FAILED" + elif state == "COMPLETED" or existing_state == "COMPLETED": + existing["state"] = "COMPLETED" + else: + existing["state"] = state + + def extract_segment_build_name(segment_text: str, parent_build_name: str) -> Optional[str]: patterns = [ rf"({re.escape(parent_build_name)}-[A-Za-z0-9_.-]*batch\d+_\d+)", @@ -746,7 +807,7 @@ def discover_categorized_subruns( current_subrun_build = extract_active_subrun_build(build_name) expected_hosts = extract_expected_hosts(log_text) completed_summaries = extract_completed_subrun_summaries(log_text, inventory) - subrun_states: List[Dict[str, object]] = [] + merged_subrun_states: Dict[str, Dict[str, object]] = {} completed_hosts: List[str] = [] discovered_builds: List[str] = [] current_summary_index = 0 @@ -818,17 +879,15 @@ def discover_categorized_subruns( start_ts = min(xml_mtime, end_ts) if end_ts else xml_mtime if end_ts and start_ts > end_ts: start_ts = end_ts - subrun_states.append( - { - "key": sanitize_key(display_name), - "display_name": display_name, - "state": state, - "host_results": host_results, - "start_ts": start_ts, - "end_ts": end_ts, - "currents_url": summary["currents_url"] if summary else None, - "notes": notes, - } + merge_categorized_state( + merged_subrun_states, + display_name=display_name, + state=state, + host_results=host_results, + start_ts=start_ts, + end_ts=end_ts, + currents_url=summary["currents_url"] if summary else None, + notes=notes, ) if current_subrun_build and current_subrun_build not in discovered_builds: @@ -849,20 +908,18 @@ def discover_categorized_subruns( notes.append(f"Child build id is `{current_subrun_build}`, but the actual grouped run was inferred from host execution as `{display_name}`.") if cancelled: notes.append("Cancellation marker detected.") - subrun_states.append( - { - "key": sanitize_key(current_subrun_build), - "display_name": display_name, - "state": "CANCELLED" if cancelled else "RUNNING", - "host_results": host_results, - "start_ts": started_at, - "end_ts": None, - "currents_url": None, - "notes": notes, - } + merge_categorized_state( + merged_subrun_states, + display_name=display_name, + state="CANCELLED" if cancelled else "RUNNING", + host_results=host_results, + start_ts=started_at, + end_ts=None, + currents_url=None, + notes=notes, ) - return subrun_states + return list(merged_subrun_states.values()) def determine_state(