Fix categorized watcher grouping and timing reconstruction

This commit is contained in:
2026-03-27 16:08:13 -04:00
parent 20c9ba7178
commit 86e89ab9f1
2 changed files with 106 additions and 26 deletions

View File

@@ -283,6 +283,15 @@ This file stores run-specific examples only when a run produced a new learning r
- For categorized runs, when grouped XML only shows `check-xml-files.ts`, infer the subrun host from the categorized build id and recover the result from the latest matching per-host reporter artifact within the grouped completion window.
- Do not keep a completed grouped subrun in `RUNNING` just because the grouped XML lacked a host testcase entry.
## Run Learning: 2026-03-27 (Categorized batch results must aggregate all hosts in the group and use the earliest grouped host timestamp)
- Observed failure mode:
- A categorized grouped batch can post with only one host even when the batch actually ran multiple hosts of the same distro group.
- This also causes the grouped `start` and `total` timing values to collapse to the last recovered host artifact instead of the full grouped batch duration.
- Action for future runs:
- For categorized grouped batches, recover all matching per-host reporter artifacts for the distro group within the grouped completion window, not only the latest host.
- Derive the grouped `start` time from the earliest recovered host run timestamp and the grouped `end` time from the grouped finalization timestamp.
- Prefer the reporter JSON metadata timestamp or artifact filename timestamp over file write time when reconstructing grouped host timing, because file mtime reflects artifact completion rather than run start.
## Run Learning: 2026-03-27 (Default ATVM approval should include the watcher)
- Observed requirement:
- The operator wants `approve` to mean run with watcher by default.

View File

@@ -264,6 +264,19 @@ def parse_log_timestamp(raw: str) -> Optional[datetime]:
return None
def parse_reporter_metadata_timestamp(raw: Optional[str]) -> Optional[datetime]:
if not raw:
return None
normalized = raw.replace("Z", "+00:00")
try:
ts = datetime.fromisoformat(normalized)
except ValueError:
return None
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
return ts.astimezone(timezone.utc)
def first_log_timestamp(log_text: str) -> Optional[datetime]:
match = re.search(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - INFO - ", log_text, re.M)
if not match:
@@ -467,6 +480,27 @@ def collect_latest_host_result(
return latest
def reporter_artifact_run_timestamp(artifact_path: Path) -> Optional[datetime]:
if artifact_path.suffix == ".json":
try:
payload = json.loads(artifact_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
payload = {}
metadata = payload.get("metadata") if isinstance(payload, dict) else None
if isinstance(metadata, dict):
ts = parse_reporter_metadata_timestamp(metadata.get("timestamp"))
if ts:
return ts
match = re.search(r"_(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.(?:json|txt)$", artifact_path.name)
if not match:
return None
try:
return datetime.strptime(match.group(1), "%Y-%m-%dT%H-%M-%S").replace(tzinfo=timezone.utc)
except ValueError:
return None
def collect_latest_host_reporter_artifact(
reporter_root: Path,
expected_hosts: List[str],
@@ -497,12 +531,13 @@ def collect_latest_host_reporter_artifact(
if run_ended_at and artifact_mtime >= run_ended_at:
continue
artifact_ts = reporter_artifact_run_timestamp(artifact_path) or artifact_mtime
result = HostResult(
host=host,
kernel=kernels.get(host, "unknown"),
status="PASS",
detail="completed",
timestamp=artifact_mtime,
timestamp=artifact_ts,
)
candidate = (host, result)
if latest is None:
@@ -514,21 +549,21 @@ def collect_latest_host_reporter_artifact(
return latest
def collect_latest_group_host_reporter_artifact(
def collect_group_host_reporter_artifacts(
reporter_root: Path,
group_label: Optional[str],
kernels: Dict[str, str],
run_started_at: datetime,
run_ended_at: Optional[datetime] = None,
) -> Optional[Tuple[str, HostResult]]:
) -> Dict[str, HostResult]:
results: Dict[str, HostResult] = {}
if not group_label:
return None
return results
logs_dir = reporter_root / "logs"
if not logs_dir.exists():
return None
return results
latest: Optional[Tuple[str, HostResult]] = None
for host_dir in sorted(logs_dir.iterdir()):
if not host_dir.is_dir():
continue
@@ -538,6 +573,8 @@ def collect_latest_group_host_reporter_artifact(
if infer_group_from_host(host) != group_label:
continue
latest_artifact_mtime: Optional[datetime] = None
latest_run_ts: Optional[datetime] = None
for artifact_path in sorted(host_dir.iterdir()):
if artifact_path.suffix not in {".txt", ".json"}:
continue
@@ -546,22 +583,43 @@ def collect_latest_group_host_reporter_artifact(
continue
if run_ended_at and artifact_mtime >= run_ended_at:
continue
if latest_artifact_mtime is None or artifact_mtime >= latest_artifact_mtime:
latest_artifact_mtime = artifact_mtime
latest_run_ts = reporter_artifact_run_timestamp(artifact_path) or artifact_mtime
result = HostResult(
host=host,
kernel=kernels.get(host, "unknown"),
status="PASS",
detail="completed",
timestamp=artifact_mtime,
)
candidate = (host, result)
if latest is None:
latest = candidate
continue
latest_ts = latest[1].timestamp or datetime.fromtimestamp(0, tz=timezone.utc)
if artifact_mtime >= latest_ts:
latest = candidate
return latest
if latest_artifact_mtime is None:
continue
results[host] = HostResult(
host=host,
kernel=kernels.get(host, "unknown"),
status="PASS",
detail="completed",
timestamp=latest_run_ts or latest_artifact_mtime,
)
return results
def collect_latest_group_host_reporter_artifact(
reporter_root: Path,
group_label: Optional[str],
kernels: Dict[str, str],
run_started_at: datetime,
run_ended_at: Optional[datetime] = None,
) -> Optional[Tuple[str, HostResult]]:
results = collect_group_host_reporter_artifacts(
reporter_root=reporter_root,
group_label=group_label,
kernels=kernels,
run_started_at=run_started_at,
run_ended_at=run_ended_at,
)
if not results:
return None
return max(
results.items(),
key=lambda item: item[1].timestamp or datetime.fromtimestamp(0, tz=timezone.utc),
)
def find_check_xml_end(
@@ -1054,9 +1112,23 @@ def discover_categorized_subruns(
check_ts = extract_check_xml_timestamp_from_file(xml_path)
summary = completed_summaries[current_summary_index] if current_summary_index < len(completed_summaries) else None
inferred_host = infer_host_from_subrun_build(raw_display_name, expected_hosts, completed_hosts)
display_group_match = re.search(r"-(amazonlinux|centos|ubuntu|rocky|redhat|oracle|fedora|debian|suse|windows)-batch", raw_display_name)
raw_display_group = display_group_match.group(1) if display_group_match else None
if summary and (not host_results or all(result.host == "check-xml-files" for result in host_results.values())):
host_results = summary["host_results"]
completed_hosts.extend([host for host in host_results if host not in completed_hosts])
if not host_results and check_ts:
group_host_results = collect_group_host_reporter_artifacts(
reporter_root=reporter_root,
group_label=raw_display_group,
kernels=inventory,
run_started_at=started_at,
run_ended_at=check_ts + timedelta(seconds=5),
)
if group_host_results:
host_results = group_host_results
completed_hosts.extend([host for host in host_results if host not in completed_hosts])
if not host_results and check_ts:
scoped_expected_hosts = [inferred_host] if inferred_host else expected_hosts
latest_host = collect_latest_host_reporter_artifact(
@@ -1067,11 +1139,9 @@ def discover_categorized_subruns(
run_ended_at=check_ts + timedelta(seconds=5),
)
if not latest_host:
display_group_match = re.search(r"-(amazonlinux|centos|ubuntu|rocky|redhat|oracle|fedora|debian|suse|windows)-batch", raw_display_name)
display_group = display_group_match.group(1) if display_group_match else None
latest_host = collect_latest_group_host_reporter_artifact(
reporter_root=reporter_root,
group_label=display_group,
group_label=raw_display_group,
kernels=inventory,
run_started_at=started_at,
run_ended_at=check_ts + timedelta(seconds=5),
@@ -1103,7 +1173,7 @@ def discover_categorized_subruns(
if summary and host_results:
notes.append("Host result details were derived from the parent categorized run log summary.")
elif host_results and check_ts:
notes.append("Host result details were derived from the latest matching host reporter artifact written before grouped finalization.")
notes.append("Host result details were derived from matching per-host reporter artifacts written before grouped finalization.")
if inferred_host:
notes.append(f"Grouped sub-run host scope was inferred as `{inferred_host}` from the categorized build id.")
elif check_ts and not host_results and parent_active:
@@ -1113,7 +1183,8 @@ def discover_categorized_subruns(
if cancelled:
notes.append("Cancellation marker detected.")
end_ts = check_ts or next((result.timestamp for result in host_results.values() if result.timestamp), xml_mtime)
start_ts = next((result.timestamp for result in host_results.values() if result.timestamp), None)
host_timestamps = [result.timestamp for result in host_results.values() if result.timestamp]
start_ts = min(host_timestamps) if host_timestamps else None
if not start_ts and summary and end_ts:
duration_seconds = next((result.duration_seconds for result in host_results.values() if result.duration_seconds is not None), None)
if duration_seconds is not None: