Improve categorized ATVM watcher sub-run detection

- update the watcher to detect the active categorized sub-run from the live `--ci-build-id` process state instead of treating the parent run as one synthetic grouped run
- fix host XML parsing so the watcher prefers the real host suite over the `Root Suite` entry, avoiding `0 tests, 0 failures` summaries
- use the first timestamp inside the run log as the watcher start time so restarted watchers do not miss current-run categorized artifacts because of log file mtime drift
- improve active-host inference for categorized runs so the watcher maps the current categorized build to the correct host family while the sub-run is still in progress
This commit is contained in:
2026-03-26 12:01:07 -04:00
parent f5849dde0c
commit 3ea732d63c

View File

@@ -106,6 +106,18 @@ def process_active(build_name: str) -> bool:
return False
def extract_active_subrun_build(build_name: str) -> Optional[str]:
output = run_ps()
matches: List[str] = []
for line in output.splitlines():
if build_name not in line or "--ci-build-id" not in line:
continue
match = re.search(r"--ci-build-id\s+(\S+)", line)
if match:
matches.append(match.group(1))
return matches[-1] if matches else None
def read_text(path: Path) -> str:
try:
return path.read_text(encoding="utf-8", errors="replace")
@@ -170,6 +182,13 @@ def parse_log_timestamp(raw: str) -> Optional[datetime]:
return None
def first_log_timestamp(log_text: str) -> Optional[datetime]:
match = re.search(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - INFO - ", log_text, re.M)
if not match:
return None
return parse_log_timestamp(match.group(1))
def parse_host_xml(xml_path: Path) -> Optional[Tuple[str, HostResult]]:
try:
tree = ET.parse(xml_path)
@@ -177,22 +196,34 @@ def parse_host_xml(xml_path: Path) -> Optional[Tuple[str, HostResult]]:
return None
root = tree.getroot()
suites = root.findall("testsuite")
file_name = None
tests = int(float(root.attrib.get("tests", "0")))
failures = int(float(root.attrib.get("failures", "0")))
total_time = float(root.attrib.get("time", "0"))
timestamp = None
best: Optional[Tuple[str, int, int, float, Optional[datetime]]] = None
for suite in suites:
file_attr = suite.attrib.get("file", "")
suite_name = suite.attrib.get("name", "")
host_from_file = None
host_from_name = None
if file_attr.startswith("cypress/cmcRegressionTest/atvm") and file_attr.endswith(".ts"):
file_name = Path(file_attr).stem
timestamp = parse_xml_timestamp(suite.attrib.get("timestamp"))
tests = int(float(suite.attrib.get("tests", root.attrib.get("tests", "0"))))
failures = int(float(suite.attrib.get("failures", root.attrib.get("failures", "0"))))
total_time = float(suite.attrib.get("time", root.attrib.get("time", "0")))
break
if not file_name:
host_from_file = Path(file_attr).stem
name_match = re.search(r"(atvm[^)\s]+)", suite_name)
if name_match:
host_from_name = name_match.group(1)
host_name = host_from_file or host_from_name
if not host_name:
continue
tests = int(float(suite.attrib.get("tests", root.attrib.get("tests", "0"))))
failures = int(float(suite.attrib.get("failures", root.attrib.get("failures", "0"))))
total_time = float(suite.attrib.get("time", root.attrib.get("time", "0")))
timestamp = parse_xml_timestamp(suite.attrib.get("timestamp"))
candidate = (host_name, tests, failures, total_time, timestamp)
if best is None:
best = candidate
continue
_, best_tests, _, best_total_time, _ = best
if tests > best_tests or (tests == best_tests and total_time >= best_total_time):
best = candidate
if not best:
return None
file_name, tests, failures, total_time, timestamp = best
detail = f"{tests} tests, {failures} failures"
status = "FAIL" if failures else "PASS"
return file_name, HostResult(
@@ -207,6 +238,19 @@ def parse_host_xml(xml_path: Path) -> Optional[Tuple[str, HostResult]]:
)
def extract_check_xml_timestamp_from_file(xml_path: Path) -> Optional[datetime]:
try:
tree = ET.parse(xml_path)
except ET.ParseError:
return None
root = tree.getroot()
for suite in root.findall("testsuite"):
file_attr = suite.attrib.get("file", "")
if file_attr.endswith("check-xml-files.ts"):
return parse_xml_timestamp(suite.attrib.get("timestamp"))
return None
def collect_host_results(
reporter_root: Path,
expected_hosts: List[str],
@@ -267,6 +311,39 @@ def find_check_xml_end(
return latest
def find_current_running_host(log_text: str, completed_hosts: List[str]) -> Optional[str]:
matches = re.findall(r"Running:\s+(?:cypress/cmcRegressionTest/)?(atvm[^/\s]+)\.ts", log_text)
for host in reversed(matches):
if host not in completed_hosts:
return host
return None
def infer_host_from_subrun_build(
subrun_build: str,
expected_hosts: List[str],
completed_hosts: List[str],
) -> Optional[str]:
remaining_hosts = [host for host in expected_hosts if host not in completed_hosts]
lowered_build = subrun_build.lower()
for host in remaining_hosts:
short = host.split("-", 1)[-1].lower()
if short.startswith("w2k"):
if "windows" in lowered_build or "w2k" in lowered_build:
return host
elif short.startswith("redhat") and "redhat" in lowered_build:
return host
elif short.startswith("ubuntu") and "ubuntu" in lowered_build:
return host
elif short.startswith("oracle") and "oracle" in lowered_build:
return host
elif short.startswith("rocky") and "rocky" in lowered_build:
return host
elif short.startswith("debian") and "debian" in lowered_build:
return host
return remaining_hosts[0] if remaining_hosts else None
def infer_metadata() -> Dict[str, object]:
return {
"template": os.environ.get("ATVM_WATCHER_TEMPLATE", "unknown"),
@@ -538,6 +615,89 @@ def evaluate_subrun(
return "TERMINATED", host_results, start_ts, end_ts, subrun.currents_url, notes
def discover_categorized_subruns(
build_name: str,
reporter_root: Path,
inventory: Dict[str, str],
log_text: str,
started_at: datetime,
parent_active: bool,
cancelled: bool,
) -> List[Dict[str, object]]:
xml_dir = reporter_root / "xml"
current_subrun_build = extract_active_subrun_build(build_name)
expected_hosts = extract_expected_hosts(log_text)
subrun_states: List[Dict[str, object]] = []
completed_hosts: List[str] = []
discovered_builds: List[str] = []
if xml_dir.exists():
prefix = f"test-result-{build_name}-"
for xml_path in sorted(xml_dir.glob(f"{prefix}*.xml"), key=lambda p: p.stat().st_mtime):
xml_mtime = datetime.fromtimestamp(xml_path.stat().st_mtime, tz=timezone.utc)
if xml_mtime < started_at:
continue
display_name = xml_path.stem[len("test-result-"):]
discovered_builds.append(display_name)
parsed = parse_host_xml(xml_path)
host_results: Dict[str, HostResult] = {}
if parsed:
host, result = parsed
result.kernel = inventory.get(host, "unknown")
host_results[host] = result
completed_hosts.append(host)
check_ts = extract_check_xml_timestamp_from_file(xml_path)
state = "RUNNING"
if cancelled:
state = "CANCELLED"
elif check_ts or display_name != current_subrun_build or not parent_active:
state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED"
notes = [f"Categorized sub-run discovered from reporter file `{xml_path.name}`."]
if check_ts:
notes.append("Final `check-xml-files.ts` validation passed.")
if cancelled:
notes.append("Cancellation marker detected.")
subrun_states.append(
{
"key": sanitize_key(display_name),
"display_name": display_name,
"state": state,
"host_results": host_results,
"start_ts": next((result.timestamp for result in host_results.values() if result.timestamp), xml_mtime),
"end_ts": check_ts or next((result.timestamp for result in host_results.values() if result.timestamp), xml_mtime),
"currents_url": None,
"notes": notes,
}
)
if current_subrun_build and current_subrun_build not in discovered_builds:
current_host = find_current_running_host(log_text, completed_hosts)
if not current_host or current_host in completed_hosts:
current_host = infer_host_from_subrun_build(current_subrun_build, expected_hosts, completed_hosts)
host_results: Dict[str, HostResult] = {}
if current_host:
host_results[current_host] = HostResult(
host=current_host,
kernel=inventory.get(current_host, "unknown"),
status="RUN",
detail="in progress",
)
subrun_states.append(
{
"key": sanitize_key(current_subrun_build),
"display_name": current_subrun_build,
"state": "CANCELLED" if cancelled else "RUNNING",
"host_results": host_results,
"start_ts": started_at,
"end_ts": None,
"currents_url": None,
"notes": ["Active categorized sub-run inferred from live `--ci-build-id` process state."] + (["Cancellation marker detected."] if cancelled else []),
}
)
return subrun_states
def determine_state(
build_name: str,
build_dir: Path,
@@ -557,31 +717,45 @@ def determine_state(
subrun_states: List[Dict[str, object]] = []
parent_host_results: Dict[str, HostResult] = {}
subruns = split_log_segments(log_text, build_name, bool(metadata.get("categorized")), started_at)
for index, subrun in enumerate(subruns):
next_started_at = subruns[index + 1].started_at if index + 1 < len(subruns) else None
state, host_results, start_ts, end_ts, currents_url, subrun_notes = evaluate_subrun(
subrun=subrun,
if metadata.get("categorized"):
subrun_states = discover_categorized_subruns(
build_name=build_name,
reporter_root=reporter_root,
inventory=inventory,
end_boundary=next_started_at,
log_text=log_text,
started_at=started_at,
parent_active=active,
cancelled=cancelled,
)
for host, result in host_results.items():
parent_host_results[host] = result
subrun_states.append(
{
"key": subrun.key,
"display_name": subrun.display_name,
"state": state,
"host_results": host_results,
"start_ts": start_ts,
"end_ts": end_ts,
"currents_url": currents_url,
"notes": subrun_notes,
}
)
for subrun in subrun_states:
for host, result in subrun["host_results"].items():
parent_host_results[host] = result
else:
subruns = split_log_segments(log_text, build_name, bool(metadata.get("categorized")), started_at)
for index, subrun in enumerate(subruns):
next_started_at = subruns[index + 1].started_at if index + 1 < len(subruns) else None
state, host_results, start_ts, end_ts, currents_url, subrun_notes = evaluate_subrun(
subrun=subrun,
reporter_root=reporter_root,
inventory=inventory,
end_boundary=next_started_at,
parent_active=active,
cancelled=cancelled,
)
for host, result in host_results.items():
parent_host_results[host] = result
subrun_states.append(
{
"key": subrun.key,
"display_name": subrun.display_name,
"state": state,
"host_results": host_results,
"start_ts": start_ts,
"end_ts": end_ts,
"currents_url": currents_url,
"notes": subrun_notes,
}
)
parent_start_candidates = [subrun["start_ts"] for subrun in subrun_states if subrun["start_ts"]]
parent_end_candidates = [subrun["end_ts"] for subrun in subrun_states if subrun["end_ts"]]
@@ -631,7 +805,8 @@ if __name__ == "__main__":
metadata = infer_metadata()
state = load_state(state_file)
default_started_at = datetime.fromtimestamp(run_log.stat().st_mtime, tz=timezone.utc) if run_log.exists() else now_utc()
log_text_for_start = read_text(run_log)
default_started_at = first_log_timestamp(log_text_for_start) or (datetime.fromtimestamp(run_log.stat().st_mtime, tz=timezone.utc) if run_log.exists() else now_utc())
started_at = parse_xml_timestamp(state.get("started_at")) or default_started_at
state.setdefault("build_name", build_name)
state.setdefault("started_at", started_at.isoformat())