From 86e89ab9f141fe3384c18c1324a9a2f55cd43b9b Mon Sep 17 00:00:00 2001 From: "anthony.wen" Date: Fri, 27 Mar 2026 16:08:13 -0400 Subject: [PATCH] Fix categorized watcher grouping and timing reconstruction --- atvm/docs/automation/run-learnings.md | 9 ++ atvm/watcher-service/atvm_run_watcher.py | 123 ++++++++++++++++++----- 2 files changed, 106 insertions(+), 26 deletions(-) diff --git a/atvm/docs/automation/run-learnings.md b/atvm/docs/automation/run-learnings.md index e09fea5..a6d6e1f 100644 --- a/atvm/docs/automation/run-learnings.md +++ b/atvm/docs/automation/run-learnings.md @@ -283,6 +283,15 @@ This file stores run-specific examples only when a run produced a new learning r - For categorized runs, when grouped XML only shows `check-xml-files.ts`, infer the subrun host from the categorized build id and recover the result from the latest matching per-host reporter artifact within the grouped completion window. - Do not keep a completed grouped subrun in `RUNNING` just because the grouped XML lacked a host testcase entry. +## Run Learning: 2026-03-27 (Categorized batch results must aggregate all hosts in the group and use the earliest grouped host timestamp) +- Observed failure mode: + - A categorized grouped batch can post with only one host even when the batch actually ran multiple hosts of the same distro group. + - This also causes the grouped `start` and `total` timing values to collapse to the last recovered host artifact instead of the full grouped batch duration. +- Action for future runs: + - For categorized grouped batches, recover all matching per-host reporter artifacts for the distro group within the grouped completion window, not only the latest host. + - Derive the grouped `start` time from the earliest recovered host run timestamp and the grouped `end` time from the grouped finalization timestamp. + - Prefer the reporter JSON metadata timestamp or artifact filename timestamp over file write time when reconstructing grouped host timing, because file mtime reflects artifact completion rather than run start. + ## Run Learning: 2026-03-27 (Default ATVM approval should include the watcher) - Observed requirement: - The operator wants `approve` to mean run with watcher by default. diff --git a/atvm/watcher-service/atvm_run_watcher.py b/atvm/watcher-service/atvm_run_watcher.py index 5172372..31010cd 100644 --- a/atvm/watcher-service/atvm_run_watcher.py +++ b/atvm/watcher-service/atvm_run_watcher.py @@ -264,6 +264,19 @@ def parse_log_timestamp(raw: str) -> Optional[datetime]: return None +def parse_reporter_metadata_timestamp(raw: Optional[str]) -> Optional[datetime]: + if not raw: + return None + normalized = raw.replace("Z", "+00:00") + try: + ts = datetime.fromisoformat(normalized) + except ValueError: + return None + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + return ts.astimezone(timezone.utc) + + def first_log_timestamp(log_text: str) -> Optional[datetime]: match = re.search(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - INFO - ", log_text, re.M) if not match: @@ -467,6 +480,27 @@ def collect_latest_host_result( return latest +def reporter_artifact_run_timestamp(artifact_path: Path) -> Optional[datetime]: + if artifact_path.suffix == ".json": + try: + payload = json.loads(artifact_path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + payload = {} + metadata = payload.get("metadata") if isinstance(payload, dict) else None + if isinstance(metadata, dict): + ts = parse_reporter_metadata_timestamp(metadata.get("timestamp")) + if ts: + return ts + + match = re.search(r"_(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.(?:json|txt)$", artifact_path.name) + if not match: + return None + try: + return datetime.strptime(match.group(1), "%Y-%m-%dT%H-%M-%S").replace(tzinfo=timezone.utc) + except ValueError: + return None + + def collect_latest_host_reporter_artifact( reporter_root: Path, expected_hosts: List[str], @@ -497,12 +531,13 @@ def collect_latest_host_reporter_artifact( if run_ended_at and artifact_mtime >= run_ended_at: continue + artifact_ts = reporter_artifact_run_timestamp(artifact_path) or artifact_mtime result = HostResult( host=host, kernel=kernels.get(host, "unknown"), status="PASS", detail="completed", - timestamp=artifact_mtime, + timestamp=artifact_ts, ) candidate = (host, result) if latest is None: @@ -514,21 +549,21 @@ def collect_latest_host_reporter_artifact( return latest -def collect_latest_group_host_reporter_artifact( +def collect_group_host_reporter_artifacts( reporter_root: Path, group_label: Optional[str], kernels: Dict[str, str], run_started_at: datetime, run_ended_at: Optional[datetime] = None, -) -> Optional[Tuple[str, HostResult]]: +) -> Dict[str, HostResult]: + results: Dict[str, HostResult] = {} if not group_label: - return None + return results logs_dir = reporter_root / "logs" if not logs_dir.exists(): - return None + return results - latest: Optional[Tuple[str, HostResult]] = None for host_dir in sorted(logs_dir.iterdir()): if not host_dir.is_dir(): continue @@ -538,6 +573,8 @@ def collect_latest_group_host_reporter_artifact( if infer_group_from_host(host) != group_label: continue + latest_artifact_mtime: Optional[datetime] = None + latest_run_ts: Optional[datetime] = None for artifact_path in sorted(host_dir.iterdir()): if artifact_path.suffix not in {".txt", ".json"}: continue @@ -546,22 +583,43 @@ def collect_latest_group_host_reporter_artifact( continue if run_ended_at and artifact_mtime >= run_ended_at: continue + if latest_artifact_mtime is None or artifact_mtime >= latest_artifact_mtime: + latest_artifact_mtime = artifact_mtime + latest_run_ts = reporter_artifact_run_timestamp(artifact_path) or artifact_mtime - result = HostResult( - host=host, - kernel=kernels.get(host, "unknown"), - status="PASS", - detail="completed", - timestamp=artifact_mtime, - ) - candidate = (host, result) - if latest is None: - latest = candidate - continue - latest_ts = latest[1].timestamp or datetime.fromtimestamp(0, tz=timezone.utc) - if artifact_mtime >= latest_ts: - latest = candidate - return latest + if latest_artifact_mtime is None: + continue + + results[host] = HostResult( + host=host, + kernel=kernels.get(host, "unknown"), + status="PASS", + detail="completed", + timestamp=latest_run_ts or latest_artifact_mtime, + ) + return results + + +def collect_latest_group_host_reporter_artifact( + reporter_root: Path, + group_label: Optional[str], + kernels: Dict[str, str], + run_started_at: datetime, + run_ended_at: Optional[datetime] = None, +) -> Optional[Tuple[str, HostResult]]: + results = collect_group_host_reporter_artifacts( + reporter_root=reporter_root, + group_label=group_label, + kernels=kernels, + run_started_at=run_started_at, + run_ended_at=run_ended_at, + ) + if not results: + return None + return max( + results.items(), + key=lambda item: item[1].timestamp or datetime.fromtimestamp(0, tz=timezone.utc), + ) def find_check_xml_end( @@ -1054,9 +1112,23 @@ def discover_categorized_subruns( check_ts = extract_check_xml_timestamp_from_file(xml_path) summary = completed_summaries[current_summary_index] if current_summary_index < len(completed_summaries) else None inferred_host = infer_host_from_subrun_build(raw_display_name, expected_hosts, completed_hosts) + display_group_match = re.search(r"-(amazonlinux|centos|ubuntu|rocky|redhat|oracle|fedora|debian|suse|windows)-batch", raw_display_name) + raw_display_group = display_group_match.group(1) if display_group_match else None if summary and (not host_results or all(result.host == "check-xml-files" for result in host_results.values())): host_results = summary["host_results"] completed_hosts.extend([host for host in host_results if host not in completed_hosts]) + if not host_results and check_ts: + group_host_results = collect_group_host_reporter_artifacts( + reporter_root=reporter_root, + group_label=raw_display_group, + kernels=inventory, + run_started_at=started_at, + run_ended_at=check_ts + timedelta(seconds=5), + ) + if group_host_results: + host_results = group_host_results + completed_hosts.extend([host for host in host_results if host not in completed_hosts]) + if not host_results and check_ts: scoped_expected_hosts = [inferred_host] if inferred_host else expected_hosts latest_host = collect_latest_host_reporter_artifact( @@ -1067,11 +1139,9 @@ def discover_categorized_subruns( run_ended_at=check_ts + timedelta(seconds=5), ) if not latest_host: - display_group_match = re.search(r"-(amazonlinux|centos|ubuntu|rocky|redhat|oracle|fedora|debian|suse|windows)-batch", raw_display_name) - display_group = display_group_match.group(1) if display_group_match else None latest_host = collect_latest_group_host_reporter_artifact( reporter_root=reporter_root, - group_label=display_group, + group_label=raw_display_group, kernels=inventory, run_started_at=started_at, run_ended_at=check_ts + timedelta(seconds=5), @@ -1103,7 +1173,7 @@ def discover_categorized_subruns( if summary and host_results: notes.append("Host result details were derived from the parent categorized run log summary.") elif host_results and check_ts: - notes.append("Host result details were derived from the latest matching host reporter artifact written before grouped finalization.") + notes.append("Host result details were derived from matching per-host reporter artifacts written before grouped finalization.") if inferred_host: notes.append(f"Grouped sub-run host scope was inferred as `{inferred_host}` from the categorized build id.") elif check_ts and not host_results and parent_active: @@ -1113,7 +1183,8 @@ def discover_categorized_subruns( if cancelled: notes.append("Cancellation marker detected.") end_ts = check_ts or next((result.timestamp for result in host_results.values() if result.timestamp), xml_mtime) - start_ts = next((result.timestamp for result in host_results.values() if result.timestamp), None) + host_timestamps = [result.timestamp for result in host_results.values() if result.timestamp] + start_ts = min(host_timestamps) if host_timestamps else None if not start_ts and summary and end_ts: duration_seconds = next((result.duration_seconds for result in host_results.values() if result.duration_seconds is not None), None) if duration_seconds is not None: