diff --git a/atvm/watcher-service/atvm_run_watcher.py b/atvm/watcher-service/atvm_run_watcher.py index f9a646a..91a0073 100644 --- a/atvm/watcher-service/atvm_run_watcher.py +++ b/atvm/watcher-service/atvm_run_watcher.py @@ -106,6 +106,18 @@ def process_active(build_name: str) -> bool: return False +def extract_active_subrun_build(build_name: str) -> Optional[str]: + output = run_ps() + matches: List[str] = [] + for line in output.splitlines(): + if build_name not in line or "--ci-build-id" not in line: + continue + match = re.search(r"--ci-build-id\s+(\S+)", line) + if match: + matches.append(match.group(1)) + return matches[-1] if matches else None + + def read_text(path: Path) -> str: try: return path.read_text(encoding="utf-8", errors="replace") @@ -170,6 +182,13 @@ def parse_log_timestamp(raw: str) -> Optional[datetime]: return None +def first_log_timestamp(log_text: str) -> Optional[datetime]: + match = re.search(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - INFO - ", log_text, re.M) + if not match: + return None + return parse_log_timestamp(match.group(1)) + + def parse_host_xml(xml_path: Path) -> Optional[Tuple[str, HostResult]]: try: tree = ET.parse(xml_path) @@ -177,22 +196,34 @@ def parse_host_xml(xml_path: Path) -> Optional[Tuple[str, HostResult]]: return None root = tree.getroot() suites = root.findall("testsuite") - file_name = None - tests = int(float(root.attrib.get("tests", "0"))) - failures = int(float(root.attrib.get("failures", "0"))) - total_time = float(root.attrib.get("time", "0")) - timestamp = None + best: Optional[Tuple[str, int, int, float, Optional[datetime]]] = None for suite in suites: file_attr = suite.attrib.get("file", "") + suite_name = suite.attrib.get("name", "") + host_from_file = None + host_from_name = None if file_attr.startswith("cypress/cmcRegressionTest/atvm") and file_attr.endswith(".ts"): - file_name = Path(file_attr).stem - timestamp = parse_xml_timestamp(suite.attrib.get("timestamp")) - tests = int(float(suite.attrib.get("tests", root.attrib.get("tests", "0")))) - failures = int(float(suite.attrib.get("failures", root.attrib.get("failures", "0")))) - total_time = float(suite.attrib.get("time", root.attrib.get("time", "0"))) - break - if not file_name: + host_from_file = Path(file_attr).stem + name_match = re.search(r"(atvm[^)\s]+)", suite_name) + if name_match: + host_from_name = name_match.group(1) + host_name = host_from_file or host_from_name + if not host_name: + continue + tests = int(float(suite.attrib.get("tests", root.attrib.get("tests", "0")))) + failures = int(float(suite.attrib.get("failures", root.attrib.get("failures", "0")))) + total_time = float(suite.attrib.get("time", root.attrib.get("time", "0"))) + timestamp = parse_xml_timestamp(suite.attrib.get("timestamp")) + candidate = (host_name, tests, failures, total_time, timestamp) + if best is None: + best = candidate + continue + _, best_tests, _, best_total_time, _ = best + if tests > best_tests or (tests == best_tests and total_time >= best_total_time): + best = candidate + if not best: return None + file_name, tests, failures, total_time, timestamp = best detail = f"{tests} tests, {failures} failures" status = "FAIL" if failures else "PASS" return file_name, HostResult( @@ -207,6 +238,19 @@ def parse_host_xml(xml_path: Path) -> Optional[Tuple[str, HostResult]]: ) +def extract_check_xml_timestamp_from_file(xml_path: Path) -> Optional[datetime]: + try: + tree = ET.parse(xml_path) + except ET.ParseError: + return None + root = tree.getroot() + for suite in root.findall("testsuite"): + file_attr = suite.attrib.get("file", "") + if file_attr.endswith("check-xml-files.ts"): + return parse_xml_timestamp(suite.attrib.get("timestamp")) + return None + + def collect_host_results( reporter_root: Path, expected_hosts: List[str], @@ -267,6 +311,39 @@ def find_check_xml_end( return latest +def find_current_running_host(log_text: str, completed_hosts: List[str]) -> Optional[str]: + matches = re.findall(r"Running:\s+(?:cypress/cmcRegressionTest/)?(atvm[^/\s]+)\.ts", log_text) + for host in reversed(matches): + if host not in completed_hosts: + return host + return None + + +def infer_host_from_subrun_build( + subrun_build: str, + expected_hosts: List[str], + completed_hosts: List[str], +) -> Optional[str]: + remaining_hosts = [host for host in expected_hosts if host not in completed_hosts] + lowered_build = subrun_build.lower() + for host in remaining_hosts: + short = host.split("-", 1)[-1].lower() + if short.startswith("w2k"): + if "windows" in lowered_build or "w2k" in lowered_build: + return host + elif short.startswith("redhat") and "redhat" in lowered_build: + return host + elif short.startswith("ubuntu") and "ubuntu" in lowered_build: + return host + elif short.startswith("oracle") and "oracle" in lowered_build: + return host + elif short.startswith("rocky") and "rocky" in lowered_build: + return host + elif short.startswith("debian") and "debian" in lowered_build: + return host + return remaining_hosts[0] if remaining_hosts else None + + def infer_metadata() -> Dict[str, object]: return { "template": os.environ.get("ATVM_WATCHER_TEMPLATE", "unknown"), @@ -538,6 +615,89 @@ def evaluate_subrun( return "TERMINATED", host_results, start_ts, end_ts, subrun.currents_url, notes +def discover_categorized_subruns( + build_name: str, + reporter_root: Path, + inventory: Dict[str, str], + log_text: str, + started_at: datetime, + parent_active: bool, + cancelled: bool, +) -> List[Dict[str, object]]: + xml_dir = reporter_root / "xml" + current_subrun_build = extract_active_subrun_build(build_name) + expected_hosts = extract_expected_hosts(log_text) + subrun_states: List[Dict[str, object]] = [] + completed_hosts: List[str] = [] + discovered_builds: List[str] = [] + + if xml_dir.exists(): + prefix = f"test-result-{build_name}-" + for xml_path in sorted(xml_dir.glob(f"{prefix}*.xml"), key=lambda p: p.stat().st_mtime): + xml_mtime = datetime.fromtimestamp(xml_path.stat().st_mtime, tz=timezone.utc) + if xml_mtime < started_at: + continue + display_name = xml_path.stem[len("test-result-"):] + discovered_builds.append(display_name) + parsed = parse_host_xml(xml_path) + host_results: Dict[str, HostResult] = {} + if parsed: + host, result = parsed + result.kernel = inventory.get(host, "unknown") + host_results[host] = result + completed_hosts.append(host) + check_ts = extract_check_xml_timestamp_from_file(xml_path) + state = "RUNNING" + if cancelled: + state = "CANCELLED" + elif check_ts or display_name != current_subrun_build or not parent_active: + state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED" + notes = [f"Categorized sub-run discovered from reporter file `{xml_path.name}`."] + if check_ts: + notes.append("Final `check-xml-files.ts` validation passed.") + if cancelled: + notes.append("Cancellation marker detected.") + subrun_states.append( + { + "key": sanitize_key(display_name), + "display_name": display_name, + "state": state, + "host_results": host_results, + "start_ts": next((result.timestamp for result in host_results.values() if result.timestamp), xml_mtime), + "end_ts": check_ts or next((result.timestamp for result in host_results.values() if result.timestamp), xml_mtime), + "currents_url": None, + "notes": notes, + } + ) + + if current_subrun_build and current_subrun_build not in discovered_builds: + current_host = find_current_running_host(log_text, completed_hosts) + if not current_host or current_host in completed_hosts: + current_host = infer_host_from_subrun_build(current_subrun_build, expected_hosts, completed_hosts) + host_results: Dict[str, HostResult] = {} + if current_host: + host_results[current_host] = HostResult( + host=current_host, + kernel=inventory.get(current_host, "unknown"), + status="RUN", + detail="in progress", + ) + subrun_states.append( + { + "key": sanitize_key(current_subrun_build), + "display_name": current_subrun_build, + "state": "CANCELLED" if cancelled else "RUNNING", + "host_results": host_results, + "start_ts": started_at, + "end_ts": None, + "currents_url": None, + "notes": ["Active categorized sub-run inferred from live `--ci-build-id` process state."] + (["Cancellation marker detected."] if cancelled else []), + } + ) + + return subrun_states + + def determine_state( build_name: str, build_dir: Path, @@ -557,31 +717,45 @@ def determine_state( subrun_states: List[Dict[str, object]] = [] parent_host_results: Dict[str, HostResult] = {} - subruns = split_log_segments(log_text, build_name, bool(metadata.get("categorized")), started_at) - for index, subrun in enumerate(subruns): - next_started_at = subruns[index + 1].started_at if index + 1 < len(subruns) else None - state, host_results, start_ts, end_ts, currents_url, subrun_notes = evaluate_subrun( - subrun=subrun, + if metadata.get("categorized"): + subrun_states = discover_categorized_subruns( + build_name=build_name, reporter_root=reporter_root, inventory=inventory, - end_boundary=next_started_at, + log_text=log_text, + started_at=started_at, parent_active=active, cancelled=cancelled, ) - for host, result in host_results.items(): - parent_host_results[host] = result - subrun_states.append( - { - "key": subrun.key, - "display_name": subrun.display_name, - "state": state, - "host_results": host_results, - "start_ts": start_ts, - "end_ts": end_ts, - "currents_url": currents_url, - "notes": subrun_notes, - } - ) + for subrun in subrun_states: + for host, result in subrun["host_results"].items(): + parent_host_results[host] = result + else: + subruns = split_log_segments(log_text, build_name, bool(metadata.get("categorized")), started_at) + for index, subrun in enumerate(subruns): + next_started_at = subruns[index + 1].started_at if index + 1 < len(subruns) else None + state, host_results, start_ts, end_ts, currents_url, subrun_notes = evaluate_subrun( + subrun=subrun, + reporter_root=reporter_root, + inventory=inventory, + end_boundary=next_started_at, + parent_active=active, + cancelled=cancelled, + ) + for host, result in host_results.items(): + parent_host_results[host] = result + subrun_states.append( + { + "key": subrun.key, + "display_name": subrun.display_name, + "state": state, + "host_results": host_results, + "start_ts": start_ts, + "end_ts": end_ts, + "currents_url": currents_url, + "notes": subrun_notes, + } + ) parent_start_candidates = [subrun["start_ts"] for subrun in subrun_states if subrun["start_ts"]] parent_end_candidates = [subrun["end_ts"] for subrun in subrun_states if subrun["end_ts"]] @@ -631,7 +805,8 @@ if __name__ == "__main__": metadata = infer_metadata() state = load_state(state_file) - default_started_at = datetime.fromtimestamp(run_log.stat().st_mtime, tz=timezone.utc) if run_log.exists() else now_utc() + log_text_for_start = read_text(run_log) + default_started_at = first_log_timestamp(log_text_for_start) or (datetime.fromtimestamp(run_log.stat().st_mtime, tz=timezone.utc) if run_log.exists() else now_utc()) started_at = parse_xml_timestamp(state.get("started_at")) or default_started_at state.setdefault("build_name", build_name) state.setdefault("started_at", started_at.isoformat())