Handle Currents "Cloud Run Finished" table rows where the trailing "s" in long duration values wraps onto its own continuation line. Instead of appending that standalone continuation to the end of the host row, drop the wrapped row and rely on the existing duration parser to accept values without the trailing "s". This preserves all host rows in parent summary parsing for completed non-categorized runs. Also record the failure mode and recovery guidance in the ATVM run learnings doc.
2226 lines
88 KiB
Python
2226 lines
88 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import ast
|
|
import html
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
import xml.etree.ElementTree as ET
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
|
|
RUN_STATES = {
|
|
"COMPLETED",
|
|
"FAILED",
|
|
"CANCELLED",
|
|
"TERMINATED",
|
|
"HUNG",
|
|
"UNKNOWN",
|
|
"RUNNING",
|
|
}
|
|
|
|
LOCAL_TZ = datetime.now().astimezone().tzinfo or timezone.utc
|
|
|
|
|
|
DEFAULT_TEST_FLOW = [
|
|
"1. Test flow is template-specific for this run",
|
|
"2. Verifying set up",
|
|
"3. Execute the requested template workflow",
|
|
"4. Validate expected run outcome",
|
|
"5. Power off",
|
|
]
|
|
|
|
|
|
TEMPLATE_TEST_FLOWS = {
|
|
"cmc-e2e": [
|
|
"1. Verifying set up",
|
|
"2. Power on and obtain ip address and host name",
|
|
"3. Uninstall CMC if still exists",
|
|
"4. Setting up disk on the host",
|
|
"5. Copy CMC install command from GUI",
|
|
"6. Install CMC",
|
|
"7. Create migration session",
|
|
"8. Tracking Changes",
|
|
"9. Trigger cmotion and do I/O test before actual cutover",
|
|
"10. Verify data for cmotion",
|
|
"11. Trigger revert cmotion and do I/O test before and during cmotion",
|
|
"12. Verify data for revert cmotion",
|
|
"13. Trigger cmotion again",
|
|
"14. Finalize cutover",
|
|
"15. Create migration report",
|
|
"16. Delete migration session",
|
|
"17. Verify local destination disk",
|
|
"18. Remove enabled FC integration",
|
|
"19. Remove host and volumes",
|
|
"20. Uninstall CMC",
|
|
"21. Clean up iSCSI targets",
|
|
"22. Power off",
|
|
],
|
|
"cmc-systemOS": [
|
|
"1. Verifying set up",
|
|
"2. Power on and obtain ip address and host name",
|
|
"3. Uninstall CMC if still exists",
|
|
"4. Attach destination disk on the host",
|
|
"5. Copy CMC install command from GUI",
|
|
"6. Install CMC on the host",
|
|
"7. Create migration session (Simple Migration)",
|
|
"8. Tracking Changes (Simple Migration)",
|
|
"9. Finalize cutover (Simple Migration)",
|
|
"10. Create migration report (Simple Migration)",
|
|
"11. Delete migration session (Simple Migration)",
|
|
"12. Power off the host",
|
|
"13. Detach original source OS disk",
|
|
"14. Reassign destination OS disk",
|
|
"15. Power on to verify destination disk",
|
|
"16. Power off the host",
|
|
"17. Detach destination OS disk",
|
|
"18. Attach original source OS disk back",
|
|
"19. Power on and obtain ip address and host name",
|
|
"20. Uninstall CMC on the host",
|
|
"21. Power off the host",
|
|
],
|
|
"cmc-reboot": [
|
|
"1. Verifying set up",
|
|
"2. Power on and obtain ip address and host name",
|
|
"3. Uninstall CMC if still exists",
|
|
"4. Setting up disk on the host",
|
|
"5. Copy CMC install command from GUI",
|
|
"6. Install CMC on the host",
|
|
"7. Create migration session",
|
|
"8. Tracking Changes",
|
|
"9. Create reboot validation file on the source disk",
|
|
"10. Trigger cmotion",
|
|
"11. Reboot the host",
|
|
"12. Update disk info after power on",
|
|
"13. Mount source disk",
|
|
"14. Verify reboot validation file after reboot",
|
|
"15. Create second reboot validation file on the source disk",
|
|
"16. Revert cmotion",
|
|
"17. Reboot the host",
|
|
"18. Update disk info after power on",
|
|
"19. Mount source disk",
|
|
"20. Verify second reboot validation file after reboot",
|
|
"21. Trigger cmotion and do I/O test during cmotion",
|
|
"22. Revert cmotion",
|
|
"23. Verify data with md5 checksum for the I/O test",
|
|
"24. Trigger cmotion again",
|
|
"25. Finalize cutover",
|
|
"26. Create migration report",
|
|
"27. Delete migration session",
|
|
"28. Verify local destination disk",
|
|
"29. Remove host and disks",
|
|
"30. Remove enabled integration",
|
|
"31. Uninstall CMC on the host",
|
|
"32. Clean up iSCSI targets",
|
|
"33. Power off the host",
|
|
],
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class HostResult:
|
|
host: str
|
|
kernel: str
|
|
status: str
|
|
detail: str
|
|
tests: int = 0
|
|
failures: int = 0
|
|
duration_seconds: Optional[float] = None
|
|
timestamp: Optional[datetime] = None
|
|
|
|
|
|
@dataclass
|
|
class SubRun:
|
|
key: str
|
|
display_name: str
|
|
started_at: datetime
|
|
expected_hosts: List[str]
|
|
completed: bool
|
|
currents_url: Optional[str]
|
|
notes: List[str]
|
|
|
|
|
|
def now_utc() -> datetime:
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--build-name", required=True)
|
|
parser.add_argument("--run-log", required=True)
|
|
parser.add_argument("--reporter-root", required=True)
|
|
parser.add_argument("--inventory-file", required=True)
|
|
parser.add_argument("--state-dir", required=True)
|
|
parser.add_argument("--poll-interval", type=int, default=30)
|
|
parser.add_argument("--max-watch-seconds", type=int, default=6 * 60 * 60)
|
|
parser.add_argument("--process-exit-grace-seconds", type=int, default=120)
|
|
return parser.parse_args()
|
|
|
|
|
|
def ensure_dir(path: Path) -> None:
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def load_inventory(path: Path) -> Dict[str, str]:
|
|
kernels: Dict[str, str] = {}
|
|
if not path.exists():
|
|
return kernels
|
|
for line in path.read_text(encoding="utf-8").splitlines():
|
|
if not line.startswith("|"):
|
|
continue
|
|
parts = [part.strip() for part in line.strip().strip("|").split("|")]
|
|
if len(parts) < 3 or parts[0] in {"OS", "---"}:
|
|
continue
|
|
host = parts[1]
|
|
kernel = parts[2]
|
|
kernels[host] = kernel or "unknown"
|
|
return kernels
|
|
|
|
|
|
def run_ps() -> str:
|
|
proc = subprocess.run(
|
|
["ps", "-eo", "pid=,args="],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
return proc.stdout
|
|
|
|
|
|
def normalize_logged_command(raw: str, command_name: str) -> Optional[str]:
|
|
patterns = {
|
|
"cmc-templates.py": r"((?:python3?\s+)?(?:\./)?cmc-templates\.py\b.*)",
|
|
"run-sorry-cypress.py": r"((?:python3?\s+)?(?:\./)?run-sorry-cypress\.py\b.*)",
|
|
}
|
|
pattern = patterns.get(command_name)
|
|
if not pattern:
|
|
return None
|
|
match = re.search(pattern, raw)
|
|
if not match:
|
|
return None
|
|
normalized = " ".join(match.group(1).split())
|
|
return normalized or None
|
|
|
|
|
|
def extract_command_from_ps(build_name: str, command_name: str) -> Optional[str]:
|
|
output = run_ps()
|
|
matches: List[str] = []
|
|
for line in output.splitlines():
|
|
if command_name not in line:
|
|
continue
|
|
if command_name == "run-sorry-cypress.py" and f"--build_name {build_name}" not in line:
|
|
continue
|
|
normalized = normalize_logged_command(line, command_name)
|
|
if normalized:
|
|
matches.append(normalized)
|
|
return matches[-1] if matches else None
|
|
|
|
|
|
def process_active(build_name: str) -> bool:
|
|
output = run_ps()
|
|
for line in output.splitlines():
|
|
if "run-sorry-cypress.py" in line and f"--build_name {build_name}" in line:
|
|
return True
|
|
return False
|
|
|
|
|
|
def related_process_active(build_name: str) -> bool:
|
|
output = run_ps()
|
|
for line in output.splitlines():
|
|
if build_name not in line:
|
|
continue
|
|
if any(token in line for token in ("run-sorry-cypress.py", "cypress-cloud", "node ")):
|
|
return True
|
|
return False
|
|
|
|
|
|
def extract_active_subrun_build(build_name: str) -> Optional[str]:
|
|
output = run_ps()
|
|
matches: List[str] = []
|
|
for line in output.splitlines():
|
|
if build_name not in line or "--ci-build-id" not in line:
|
|
continue
|
|
match = re.search(r"--ci-build-id\s+(\S+)", line)
|
|
if match:
|
|
matches.append(match.group(1))
|
|
return matches[-1] if matches else None
|
|
|
|
|
|
def read_text(path: Path) -> str:
|
|
try:
|
|
return path.read_text(encoding="utf-8", errors="replace")
|
|
except FileNotFoundError:
|
|
return ""
|
|
|
|
|
|
def extract_expected_hosts(log_text: str) -> List[str]:
|
|
hosts: List[str] = []
|
|
spec_match = re.search(r'Extracted specPattern:\s*(\[[^\n]+\])', log_text)
|
|
if spec_match:
|
|
try:
|
|
spec_list = ast.literal_eval(spec_match.group(1))
|
|
except (SyntaxError, ValueError):
|
|
spec_list = []
|
|
for entry in spec_list:
|
|
if not isinstance(entry, str):
|
|
continue
|
|
match = re.search(r"(atvm[^/\s]+)\.ts$", entry)
|
|
if match:
|
|
host = match.group(1)
|
|
if host not in hosts:
|
|
hosts.append(host)
|
|
for match in re.finditer(r"Running:\s+(?:cypress/cmcRegressionTest/)?(atvm[^/\s]+)\.ts", log_text):
|
|
host = match.group(1)
|
|
if host not in hosts:
|
|
hosts.append(host)
|
|
return hosts
|
|
|
|
|
|
def extract_currents_url(log_text: str) -> Optional[str]:
|
|
match = re.search(r"(https://\S+/run/\S+)", log_text)
|
|
return match.group(1) if match else None
|
|
|
|
|
|
def extract_command_from_log(log_text: str, command_name: str, build_name: Optional[str] = None) -> Optional[str]:
|
|
matches: List[str] = []
|
|
for line in log_text.splitlines():
|
|
if command_name not in line:
|
|
continue
|
|
normalized = normalize_logged_command(line, command_name)
|
|
if not normalized:
|
|
continue
|
|
if command_name == "run-sorry-cypress.py" and build_name and f"--build_name {build_name}" not in normalized:
|
|
continue
|
|
matches.append(normalized)
|
|
return matches[-1] if matches else None
|
|
|
|
|
|
def load_state(state_file: Path) -> Dict[str, object]:
|
|
if not state_file.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(state_file.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
|
|
|
|
def write_state(state_file: Path, state: Dict[str, object]) -> None:
|
|
state_file.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8")
|
|
|
|
|
|
def load_currents_store(build_dir: Path) -> Dict[str, object]:
|
|
store_path = build_dir / "currents_urls.json"
|
|
if not store_path.exists():
|
|
return {}
|
|
try:
|
|
loaded = json.loads(store_path.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
return loaded if isinstance(loaded, dict) else {}
|
|
|
|
|
|
def latest_currents_url(build_dir: Path) -> Optional[str]:
|
|
store = load_currents_store(build_dir)
|
|
latest_url = store.get("latest_url")
|
|
return latest_url if isinstance(latest_url, str) and latest_url else None
|
|
|
|
|
|
def persisted_currents_url_for_build(build_dir: Path, build_id: Optional[str]) -> Optional[str]:
|
|
store = load_currents_store(build_dir)
|
|
by_build_id = store.get("by_build_id")
|
|
if not isinstance(by_build_id, dict):
|
|
return latest_currents_url(build_dir)
|
|
if build_id:
|
|
entry = by_build_id.get(build_id)
|
|
if isinstance(entry, dict):
|
|
url = entry.get("url")
|
|
if isinstance(url, str) and url:
|
|
return url
|
|
if isinstance(entry, str) and entry:
|
|
return entry
|
|
return latest_currents_url(build_dir)
|
|
|
|
|
|
def parse_xml_timestamp(raw: Optional[str]) -> Optional[datetime]:
|
|
if not raw:
|
|
return None
|
|
try:
|
|
return datetime.fromisoformat(raw).replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def parse_log_timestamp(raw: str) -> Optional[datetime]:
|
|
try:
|
|
return datetime.strptime(raw, "%Y-%m-%d %H:%M:%S,%f").replace(tzinfo=LOCAL_TZ).astimezone(timezone.utc)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def parse_reporter_metadata_timestamp(raw: Optional[str]) -> Optional[datetime]:
|
|
if not raw:
|
|
return None
|
|
normalized = raw.replace("Z", "+00:00")
|
|
try:
|
|
ts = datetime.fromisoformat(normalized)
|
|
except ValueError:
|
|
return None
|
|
if ts.tzinfo is None:
|
|
ts = ts.replace(tzinfo=timezone.utc)
|
|
return ts.astimezone(timezone.utc)
|
|
|
|
|
|
def first_log_timestamp(log_text: str) -> Optional[datetime]:
|
|
match = re.search(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - INFO - ", log_text, re.M)
|
|
if not match:
|
|
return None
|
|
return parse_log_timestamp(match.group(1))
|
|
|
|
|
|
def parse_host_xml(xml_path: Path) -> Optional[Tuple[str, HostResult]]:
|
|
try:
|
|
tree = ET.parse(xml_path)
|
|
except ET.ParseError:
|
|
return None
|
|
root = tree.getroot()
|
|
suites = root.findall("testsuite")
|
|
best: Optional[Tuple[str, int, int, float, Optional[datetime], Optional[str]]] = None
|
|
for suite in suites:
|
|
file_attr = suite.attrib.get("file", "")
|
|
suite_name = suite.attrib.get("name", "")
|
|
host_from_file = None
|
|
host_from_name = None
|
|
if file_attr.startswith("cypress/cmcRegressionTest/atvm") and file_attr.endswith(".ts"):
|
|
host_from_file = Path(file_attr).stem
|
|
name_match = re.search(r"(atvm[^)\s]+)", suite_name)
|
|
if name_match:
|
|
host_from_name = name_match.group(1)
|
|
host_name = host_from_file or host_from_name
|
|
if not host_name:
|
|
continue
|
|
tests = int(float(suite.attrib.get("tests", root.attrib.get("tests", "0"))))
|
|
failures = int(float(suite.attrib.get("failures", root.attrib.get("failures", "0"))))
|
|
total_time = float(suite.attrib.get("time", root.attrib.get("time", "0")))
|
|
timestamp = parse_xml_timestamp(suite.attrib.get("timestamp"))
|
|
failure_detail = extract_failure_detail_from_xml_suite(suite)
|
|
candidate = (host_name, tests, failures, total_time, timestamp, failure_detail)
|
|
if best is None:
|
|
best = candidate
|
|
continue
|
|
_, best_tests, _, best_total_time, _, _ = best
|
|
if tests > best_tests or (tests == best_tests and total_time >= best_total_time):
|
|
best = candidate
|
|
if not best:
|
|
return None
|
|
file_name, tests, failures, total_time, timestamp, failure_detail = best
|
|
detail = f"{tests} tests, {failures} failures"
|
|
if failures:
|
|
detail = append_failure_detail(detail, failure_detail)
|
|
status = "FAIL" if failures else "PASS"
|
|
return file_name, HostResult(
|
|
host=file_name,
|
|
kernel="unknown",
|
|
status=status,
|
|
detail=detail,
|
|
tests=tests,
|
|
failures=failures,
|
|
duration_seconds=total_time,
|
|
timestamp=timestamp,
|
|
)
|
|
|
|
|
|
def extract_check_xml_timestamp_from_file(xml_path: Path) -> Optional[datetime]:
|
|
try:
|
|
tree = ET.parse(xml_path)
|
|
except ET.ParseError:
|
|
return None
|
|
root = tree.getroot()
|
|
for suite in root.findall("testsuite"):
|
|
file_attr = suite.attrib.get("file", "")
|
|
if file_attr.endswith("check-xml-files.ts"):
|
|
return parse_xml_timestamp(suite.attrib.get("timestamp"))
|
|
return None
|
|
|
|
|
|
def parse_duration_seconds(raw: str) -> Optional[float]:
|
|
raw = " ".join(raw.split())
|
|
if re.fullmatch(r"\d+:\d+(?::\d+)?", raw):
|
|
parts = [int(part) for part in raw.split(":")]
|
|
if len(parts) == 2:
|
|
minutes, seconds = parts
|
|
return minutes * 60 + seconds
|
|
if len(parts) == 3:
|
|
hours, minutes, seconds = parts
|
|
return hours * 3600 + minutes * 60 + seconds
|
|
match = re.search(r"(?:(\d+)h\s+)?(?:(\d+)m\s+)?(\d+(?:\.\d+)?)(?:s)?", raw)
|
|
if not match:
|
|
return None
|
|
hours = int(match.group(1) or 0)
|
|
minutes = int(match.group(2) or 0)
|
|
seconds = float(match.group(3))
|
|
return hours * 3600 + minutes * 60 + seconds
|
|
|
|
|
|
def compact_failure_detail(raw: str, limit: int = 220) -> str:
|
|
normalized = " ".join(raw.split())
|
|
if len(normalized) <= limit:
|
|
return normalized
|
|
return normalized[: limit - 3].rstrip() + "..."
|
|
|
|
|
|
def append_failure_detail(detail: str, failure_detail: Optional[str]) -> str:
|
|
if not failure_detail or failure_detail in detail:
|
|
return detail
|
|
return f"{detail} - {failure_detail}"
|
|
|
|
|
|
def concise_testcase_name(raw: str) -> str:
|
|
if "->" in raw:
|
|
return raw.split("->", 1)[1].strip()
|
|
return raw.strip()
|
|
|
|
|
|
def extract_failure_detail_from_xml_suite(suite: ET.Element) -> Optional[str]:
|
|
for testcase in suite.findall("testcase"):
|
|
failure = testcase.find("failure")
|
|
if failure is None:
|
|
continue
|
|
case_name = testcase.attrib.get("classname") or testcase.attrib.get("name") or "failed testcase"
|
|
parts = []
|
|
message = failure.attrib.get("message")
|
|
if message:
|
|
parts.append(message)
|
|
if failure.text:
|
|
parts.append(failure.text)
|
|
failure_text = compact_failure_detail(" ".join(parts)) if parts else ""
|
|
if failure_text:
|
|
return compact_failure_detail(f"{case_name}: {failure_text}")
|
|
return compact_failure_detail(case_name)
|
|
return None
|
|
|
|
|
|
def extract_failure_detail_from_text_blob(text: str) -> Optional[str]:
|
|
for pattern in (
|
|
r"(Error:\s.*?)(?:\n\s*\n|$)",
|
|
r"(AssertionError:\s.*?)(?:\n\s*\n|$)",
|
|
r"(Timed out!.*?)(?:\n\s*\n|$)",
|
|
):
|
|
match = re.search(pattern, text, re.I | re.S)
|
|
if match:
|
|
return compact_failure_detail(match.group(1))
|
|
return None
|
|
|
|
|
|
def extract_failure_from_reporter_events(testcase_name: str, testcase_events: object) -> Optional[str]:
|
|
if not isinstance(testcase_events, list):
|
|
return None
|
|
for event in testcase_events:
|
|
if not isinstance(event, list) or len(event) < 3:
|
|
continue
|
|
event_type = str(event[0]).lower()
|
|
message_value = str(event[1]) if len(event) > 1 else ""
|
|
status_value = str(event[2]).lower()
|
|
if event_type in {"cy:command", "cy:task"} and status_value in {"failed", "fail", "error"}:
|
|
return compact_failure_detail(f"{concise_testcase_name(testcase_name)} - {message_value}")
|
|
return None
|
|
|
|
|
|
def extract_failure_from_reporter_text(text: str) -> Optional[str]:
|
|
sections = re.split(r"^=== (.+?) ===\s*$", text, flags=re.M)
|
|
if len(sections) < 3:
|
|
return None
|
|
for index in range(1, len(sections), 2):
|
|
testcase_name = sections[index]
|
|
section_body = sections[index + 1] if index + 1 < len(sections) else ""
|
|
for line in section_body.splitlines():
|
|
parts = line.split("\t")
|
|
if len(parts) < 3:
|
|
continue
|
|
event_type = parts[0].strip().lower()
|
|
status_value = parts[1].strip().lower()
|
|
message_value = "\t".join(parts[2:]).strip()
|
|
if event_type in {"cy:command", "cy:task"} and status_value in {"failed", "fail", "error"}:
|
|
return compact_failure_detail(f"{concise_testcase_name(testcase_name)} - {message_value}")
|
|
return None
|
|
|
|
|
|
def extract_failure_detail_from_log_text(log_text: str, host: str) -> Optional[str]:
|
|
pattern = (
|
|
rf"\d+\)\s+Testing .*?{re.escape(host)}.*?\n"
|
|
rf"\s+([^\n]+):\n"
|
|
rf"\s+(.*?)(?:\n\s*\n|\n\s*<?xml|\n\s*Screenshots:|\n\s*Video output:|$)"
|
|
)
|
|
match = re.search(pattern, log_text, re.S)
|
|
if not match:
|
|
return None
|
|
testcase = match.group(1).strip()
|
|
body = compact_failure_detail(match.group(2))
|
|
if body:
|
|
return compact_failure_detail(f"{testcase} - {body}")
|
|
return compact_failure_detail(testcase)
|
|
|
|
|
|
def decode_json_string_fragment(raw: str) -> str:
|
|
try:
|
|
return json.loads(f'"{raw}"')
|
|
except json.JSONDecodeError:
|
|
return raw
|
|
|
|
|
|
def extract_first_json_string(block: str, key: str) -> Optional[str]:
|
|
match = re.search(rf'"{re.escape(key)}":"((?:\\.|[^"])*)"', block, re.S)
|
|
if not match:
|
|
return None
|
|
return decode_json_string_fragment(match.group(1))
|
|
|
|
|
|
def extract_step_number(text: str) -> Optional[int]:
|
|
match = re.search(r"\b(\d+)\.\s", text)
|
|
if not match:
|
|
return None
|
|
return int(match.group(1))
|
|
|
|
|
|
def extract_testcase_from_host_detail(detail: str) -> Optional[str]:
|
|
match = re.match(r"^\d+ tests, \d+ failures(?:, \d+ pending)? - (.+?)(?: - .+)?$", detail)
|
|
if not match:
|
|
return None
|
|
return match.group(1).strip()
|
|
|
|
|
|
def extract_failure_from_mochawesome(
|
|
reporter_root: Path,
|
|
build_name: str,
|
|
host: str,
|
|
) -> Optional[Tuple[str, str, str]]:
|
|
mochawesome_dir = reporter_root / "mochawesome"
|
|
if not mochawesome_dir.exists():
|
|
return None
|
|
|
|
candidates = sorted(
|
|
existing_paths(mochawesome_dir.glob(f"*{build_name}*.html")),
|
|
key=lambda path: (safe_mtime(path), 0 if path.name.endswith("_001.html") else 1),
|
|
reverse=True,
|
|
)
|
|
host_pattern = re.escape(host)
|
|
|
|
for html_path in candidates:
|
|
try:
|
|
text = html.unescape(html_path.read_text(encoding="utf-8", errors="replace"))
|
|
except OSError:
|
|
continue
|
|
for match in re.finditer(r'"fullTitle":"((?:\\.|[^"])*)"', text, re.S):
|
|
full_title = decode_json_string_fragment(match.group(1))
|
|
if host not in full_title:
|
|
continue
|
|
object_start = text.rfind('{"title":"', 0, match.start())
|
|
alternate_start = text.rfind(',"title":"', 0, match.start())
|
|
if alternate_start != -1 and (object_start == -1 or alternate_start > object_start):
|
|
object_start = alternate_start + 1
|
|
if object_start == -1:
|
|
continue
|
|
object_end_candidates = [
|
|
index for index in (
|
|
text.find('},{"title":"', match.end()),
|
|
text.find('}]}', match.end()),
|
|
text.find('}]}</script>', match.end()),
|
|
)
|
|
if index != -1
|
|
]
|
|
object_end = min(object_end_candidates) if object_end_candidates else min(len(text), match.end() + 16000)
|
|
obj = text[object_start:object_end]
|
|
if not re.search(r'"state":"failed"', obj):
|
|
continue
|
|
testcase = extract_first_json_string(obj, "title") or "failed testcase"
|
|
message = extract_first_json_string(obj, "message") or ""
|
|
estack = extract_first_json_string(obj, "estack") or ""
|
|
if testcase and (message or estack):
|
|
return testcase, message, estack
|
|
return None
|
|
|
|
|
|
def summarize_host_detail_with_mochawesome(detail: str, testcase: str, message: str) -> str:
|
|
prefix_match = re.match(r"^(\d+ tests, \d+ failures(?:, \d+ pending)?)", detail)
|
|
prefix = prefix_match.group(1) if prefix_match else detail
|
|
normalized_message = " ".join((message or "").split())
|
|
message_summary = ""
|
|
for pattern in (
|
|
r"(md5sum:\s.*?No such file or directory)",
|
|
r"(sshpass does not contain OK(?:\.\s*Output:)?)(?:$)",
|
|
r"(AssertionError:\s.*?)(?:$)",
|
|
r"(Timed out!? .*?)(?:$)",
|
|
r"(Error:\s.*?)(?:$)",
|
|
):
|
|
match = re.search(pattern, normalized_message, re.I)
|
|
if match:
|
|
message_summary = match.group(1)
|
|
break
|
|
if not message_summary:
|
|
message_summary = normalized_message or testcase
|
|
message_summary = compact_failure_detail(message_summary, limit=120)
|
|
testcase_summary = compact_failure_detail(testcase, limit=140)
|
|
return f"{prefix} - {testcase_summary} - {message_summary}"
|
|
|
|
|
|
def extract_host_results_from_run_finished_segment(segment_text: str, inventory: Dict[str, str]) -> Dict[str, HostResult]:
|
|
host_results: Dict[str, HostResult] = {}
|
|
# Currents can wrap the trailing "s" in long duration cells onto its own table row.
|
|
# The duration parser already accepts values without the trailing "s", so drop the
|
|
# standalone continuation row instead of appending it to the end of the host row.
|
|
normalized = re.sub(r"\n\s*│\s*s\s*│\s*", "\n", segment_text)
|
|
for host_match in re.finditer(
|
|
r"(?m)^\s*│\s*([✔✖])\s+(atvm[^\s]+)\.ts\s+([0-9:hms. ]+?)\s+(\d+)\s+(\d+)\s+([-\d]+)\s+([-\d]+)\s+([-\d]+)\s*│\s*$",
|
|
normalized,
|
|
):
|
|
host = host_match.group(2)
|
|
duration_seconds = parse_duration_seconds(host_match.group(3))
|
|
tests = int(host_match.group(4))
|
|
passing = int(host_match.group(5))
|
|
failing = 0 if host_match.group(6) == "-" else int(host_match.group(6))
|
|
detail = f"{tests} tests, {failing} failures"
|
|
status = "FAIL" if failing else "PASS"
|
|
host_results[host] = HostResult(
|
|
host=host,
|
|
kernel=inventory.get(host, "unknown"),
|
|
status=status,
|
|
detail=detail,
|
|
tests=tests,
|
|
failures=failing,
|
|
duration_seconds=duration_seconds,
|
|
)
|
|
return host_results
|
|
|
|
|
|
def extract_completed_subrun_summaries(log_text: str, inventory: Dict[str, str]) -> List[Dict[str, object]]:
|
|
summaries: List[Dict[str, object]] = []
|
|
cloud_starts = [match.start() for match in re.finditer(r"(?m)^\s*Cloud Run Finished\s*$", log_text)]
|
|
previous_block_end = 0
|
|
for index, block_start in enumerate(cloud_starts):
|
|
next_cloud_start = cloud_starts[index + 1] if index + 1 < len(cloud_starts) else len(log_text)
|
|
section_text = log_text[block_start:next_cloud_start]
|
|
recorded_run_match = re.search(r"🏁 Recorded Run:\s*(https://\S+)", section_text)
|
|
next_spec_match = re.search(r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} - INFO - Extracted specPattern:", log_text[block_start + 1 :], re.M)
|
|
block_end = next_cloud_start
|
|
if recorded_run_match:
|
|
candidate_end = block_start + recorded_run_match.end()
|
|
if candidate_end < block_end:
|
|
block_end = candidate_end
|
|
if next_spec_match:
|
|
candidate_end = block_start + 1 + next_spec_match.start()
|
|
if candidate_end < block_end:
|
|
block_end = candidate_end
|
|
block_text = log_text[block_start:block_end]
|
|
currents_match = re.search(r"🏁 Recorded Run:\s*(https://\S+)", block_text)
|
|
currents_url = currents_match.group(1) if currents_match else None
|
|
prior_segment = log_text[previous_block_end:block_start]
|
|
detail_source = prior_segment + "\n" + block_text
|
|
host_results = extract_host_results_from_run_finished_segment(block_text, inventory)
|
|
if not host_results:
|
|
host_results = extract_host_results_from_run_finished_segment(prior_segment, inventory)
|
|
if not host_results:
|
|
previous_block_end = block_end
|
|
continue
|
|
for host, result in host_results.items():
|
|
if result.failures:
|
|
result.detail = append_failure_detail(result.detail, extract_failure_detail_from_log_text(detail_source, host))
|
|
summaries.append(
|
|
{
|
|
"host_results": host_results,
|
|
"currents_url": currents_url,
|
|
}
|
|
)
|
|
previous_block_end = block_end
|
|
return summaries
|
|
|
|
|
|
def extract_latest_run_summary(log_text: str, inventory: Dict[str, str]) -> Optional[Dict[str, object]]:
|
|
summaries = extract_completed_subrun_summaries(log_text, inventory)
|
|
if not summaries:
|
|
return None
|
|
return summaries[-1]
|
|
|
|
|
|
def extract_merged_run_summary(log_text: str, inventory: Dict[str, str]) -> Optional[Dict[str, object]]:
|
|
summaries = extract_completed_subrun_summaries(log_text, inventory)
|
|
if not summaries:
|
|
return None
|
|
|
|
merged_host_results: Dict[str, HostResult] = {}
|
|
latest_currents_url: Optional[str] = None
|
|
for summary in summaries:
|
|
for host, result in summary["host_results"].items():
|
|
merged_host_results[host] = result
|
|
currents_url = summary.get("currents_url")
|
|
if isinstance(currents_url, str) and currents_url:
|
|
latest_currents_url = currents_url
|
|
|
|
return {
|
|
"host_results": merged_host_results,
|
|
"currents_url": latest_currents_url,
|
|
}
|
|
|
|
|
|
def collect_host_results(
|
|
reporter_root: Path,
|
|
expected_hosts: List[str],
|
|
kernels: Dict[str, str],
|
|
run_started_at: datetime,
|
|
run_ended_at: Optional[datetime] = None,
|
|
) -> Dict[str, HostResult]:
|
|
xml_dir = reporter_root / "xml"
|
|
results: Dict[str, HostResult] = {}
|
|
if not xml_dir.exists():
|
|
return results
|
|
for xml_path in sorted(existing_paths(xml_dir.glob("test-result-*.xml")), key=safe_mtime):
|
|
xml_mtime = datetime.fromtimestamp(safe_mtime(xml_path), tz=timezone.utc)
|
|
if xml_mtime < run_started_at:
|
|
continue
|
|
if run_ended_at and xml_mtime >= run_ended_at:
|
|
continue
|
|
parsed = parse_host_xml(xml_path)
|
|
if not parsed:
|
|
continue
|
|
host, result = parsed
|
|
if expected_hosts and host not in expected_hosts:
|
|
continue
|
|
result.kernel = kernels.get(host, "unknown")
|
|
results[host] = result
|
|
return results
|
|
|
|
|
|
def collect_latest_host_result(
|
|
reporter_root: Path,
|
|
expected_hosts: List[str],
|
|
kernels: Dict[str, str],
|
|
run_started_at: datetime,
|
|
run_ended_at: Optional[datetime] = None,
|
|
) -> Optional[Tuple[str, HostResult]]:
|
|
results = collect_host_results(
|
|
reporter_root=reporter_root,
|
|
expected_hosts=expected_hosts,
|
|
kernels=kernels,
|
|
run_started_at=run_started_at,
|
|
run_ended_at=run_ended_at,
|
|
)
|
|
if not results:
|
|
return None
|
|
latest = max(
|
|
results.items(),
|
|
key=lambda item: item[1].timestamp or datetime.fromtimestamp(0, tz=timezone.utc),
|
|
)
|
|
return latest
|
|
|
|
|
|
def reporter_artifact_run_timestamp(artifact_path: Path) -> Optional[datetime]:
|
|
if artifact_path.suffix == ".json":
|
|
try:
|
|
payload = json.loads(artifact_path.read_text(encoding="utf-8"))
|
|
except (json.JSONDecodeError, OSError):
|
|
payload = {}
|
|
metadata = payload.get("metadata") if isinstance(payload, dict) else None
|
|
if isinstance(metadata, dict):
|
|
ts = parse_reporter_metadata_timestamp(metadata.get("timestamp"))
|
|
if ts:
|
|
return ts
|
|
|
|
match = re.search(r"_(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.(?:json|txt)$", artifact_path.name)
|
|
if not match:
|
|
return None
|
|
try:
|
|
return datetime.strptime(match.group(1), "%Y-%m-%dT%H-%M-%S").replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def safe_mtime(path: Path) -> float:
|
|
try:
|
|
return path.stat().st_mtime
|
|
except FileNotFoundError:
|
|
return 0.0
|
|
|
|
|
|
def existing_paths(paths) -> List[Path]:
|
|
return [path for path in paths if path.exists()]
|
|
|
|
|
|
def collect_latest_host_reporter_artifact(
|
|
reporter_root: Path,
|
|
expected_hosts: List[str],
|
|
kernels: Dict[str, str],
|
|
run_started_at: datetime,
|
|
run_ended_at: Optional[datetime] = None,
|
|
) -> Optional[Tuple[str, HostResult]]:
|
|
logs_dir = reporter_root / "logs"
|
|
if not logs_dir.exists():
|
|
return None
|
|
|
|
latest: Optional[Tuple[str, HostResult, datetime, str]] = None
|
|
for host_dir in sorted(logs_dir.iterdir()):
|
|
if not host_dir.is_dir():
|
|
continue
|
|
host = host_dir.name
|
|
if not host.startswith("atvm"):
|
|
continue
|
|
if expected_hosts and host not in expected_hosts:
|
|
continue
|
|
|
|
for artifact_path in sorted(host_dir.iterdir()):
|
|
if artifact_path.suffix not in {".txt", ".json"}:
|
|
continue
|
|
artifact_mtime = datetime.fromtimestamp(artifact_path.stat().st_mtime, tz=timezone.utc)
|
|
if artifact_mtime < run_started_at:
|
|
continue
|
|
if run_ended_at and artifact_mtime >= run_ended_at:
|
|
continue
|
|
|
|
result = parse_host_reporter_artifact(artifact_path, host, kernels)
|
|
if result is None:
|
|
continue
|
|
artifact_ts = result.timestamp or reporter_artifact_run_timestamp(artifact_path) or artifact_mtime
|
|
result.timestamp = artifact_ts
|
|
candidate = (host, result, artifact_ts, artifact_path.suffix)
|
|
if latest is None:
|
|
latest = candidate
|
|
continue
|
|
|
|
latest_result_ts = latest[2]
|
|
latest_suffix = latest[3]
|
|
# Prefer the newest logical run artifact, and prefer JSON over TXT
|
|
# when both artifacts represent the same run timestamp.
|
|
if artifact_ts > latest_result_ts:
|
|
latest = candidate
|
|
continue
|
|
if artifact_ts == latest_result_ts and artifact_path.suffix == ".json" and latest_suffix != ".json":
|
|
latest = candidate
|
|
if latest is None:
|
|
return None
|
|
return latest[0], latest[1]
|
|
|
|
|
|
def parse_host_reporter_json(artifact_path: Path, host: str, kernels: Dict[str, str]) -> Optional[HostResult]:
|
|
try:
|
|
payload = json.loads(artifact_path.read_text(encoding="utf-8"))
|
|
except (json.JSONDecodeError, OSError):
|
|
return None
|
|
if not isinstance(payload, dict):
|
|
return None
|
|
|
|
stats = payload.get("stats")
|
|
metadata = payload.get("metadata")
|
|
tests_payload = payload.get("tests")
|
|
if not isinstance(stats, dict):
|
|
if not isinstance(tests_payload, dict):
|
|
return None
|
|
tests = len([name for name, events in tests_payload.items() if isinstance(name, str) and isinstance(events, list)])
|
|
failures = 0
|
|
pending = 0
|
|
duration_ms = 0
|
|
failure_detail = None
|
|
for testcase_name, testcase_events in tests_payload.items():
|
|
current_failure = extract_failure_from_reporter_events(testcase_name, testcase_events)
|
|
if current_failure:
|
|
failures += 1
|
|
if failure_detail is None:
|
|
failure_detail = current_failure
|
|
timestamp = None
|
|
if isinstance(metadata, dict):
|
|
timestamp = parse_reporter_metadata_timestamp(metadata.get("timestamp"))
|
|
if timestamp is None:
|
|
timestamp = reporter_artifact_run_timestamp(artifact_path)
|
|
|
|
detail_parts = [f"{tests} tests", f"{failures} failures"]
|
|
detail = ", ".join(detail_parts)
|
|
if failures:
|
|
detail = append_failure_detail(detail, failure_detail)
|
|
status = "FAIL" if failures else "PASS"
|
|
return HostResult(
|
|
host=host,
|
|
kernel=kernels.get(host, "unknown"),
|
|
status=status,
|
|
detail=detail,
|
|
tests=tests,
|
|
failures=failures,
|
|
duration_seconds=None,
|
|
timestamp=timestamp,
|
|
)
|
|
|
|
tests = int(stats.get("tests", 0) or 0)
|
|
failures = int(stats.get("failures", 0) or 0)
|
|
pending = int(stats.get("pending", 0) or 0)
|
|
duration_ms = stats.get("duration", 0) or 0
|
|
timestamp = None
|
|
if isinstance(metadata, dict):
|
|
timestamp = parse_reporter_metadata_timestamp(metadata.get("timestamp"))
|
|
if timestamp is None:
|
|
timestamp = reporter_artifact_run_timestamp(artifact_path)
|
|
|
|
detail_parts = [f"{tests} tests", f"{failures} failures"]
|
|
if pending:
|
|
detail_parts.append(f"{pending} pending")
|
|
|
|
failure_detail = None
|
|
if failures and isinstance(tests_payload, dict):
|
|
for testcase_name, testcase_events in tests_payload.items():
|
|
failure_detail = extract_failure_from_reporter_events(testcase_name, testcase_events)
|
|
if failure_detail is not None:
|
|
break
|
|
|
|
if failures:
|
|
status = "FAIL"
|
|
elif pending:
|
|
status = "RUN"
|
|
else:
|
|
status = "PASS"
|
|
|
|
detail = ", ".join(detail_parts)
|
|
if failures:
|
|
detail = append_failure_detail(detail, failure_detail)
|
|
|
|
return HostResult(
|
|
host=host,
|
|
kernel=kernels.get(host, "unknown"),
|
|
status=status,
|
|
detail=detail,
|
|
tests=tests,
|
|
failures=failures,
|
|
duration_seconds=float(duration_ms) / 1000 if duration_ms else None,
|
|
timestamp=timestamp,
|
|
)
|
|
|
|
|
|
def parse_host_reporter_artifact(artifact_path: Path, host: str, kernels: Dict[str, str]) -> Optional[HostResult]:
|
|
if artifact_path.suffix == ".json":
|
|
return parse_host_reporter_json(artifact_path, host, kernels)
|
|
|
|
artifact_ts = reporter_artifact_run_timestamp(artifact_path)
|
|
if artifact_ts is None:
|
|
artifact_ts = datetime.fromtimestamp(artifact_path.stat().st_mtime, tz=timezone.utc)
|
|
|
|
try:
|
|
text = artifact_path.read_text(encoding="utf-8", errors="replace")
|
|
except OSError:
|
|
text = ""
|
|
|
|
sectioned_failure_detail = extract_failure_from_reporter_text(text)
|
|
failure_detail = extract_failure_detail_from_text_blob(text) or sectioned_failure_detail
|
|
structured_failure = re.search(r"^(?:cy:command|cy:task)\terror\t", text, re.I | re.M)
|
|
failures = 1 if failure_detail or structured_failure else 0
|
|
tests = len(re.findall(r"^=== .+? ===\s*$", text, re.M))
|
|
status = "FAIL" if failures else "PASS"
|
|
detail = f"{tests} tests, 1 failures" if failures and tests else ("1 failures" if failures else "completed")
|
|
if failures:
|
|
detail = append_failure_detail(detail, failure_detail)
|
|
return HostResult(
|
|
host=host,
|
|
kernel=kernels.get(host, "unknown"),
|
|
status=status,
|
|
detail=detail,
|
|
tests=tests,
|
|
failures=failures,
|
|
timestamp=artifact_ts,
|
|
)
|
|
|
|
|
|
def collect_group_host_reporter_artifacts(
|
|
reporter_root: Path,
|
|
group_label: Optional[str],
|
|
kernels: Dict[str, str],
|
|
run_started_at: datetime,
|
|
run_ended_at: Optional[datetime] = None,
|
|
) -> Dict[str, HostResult]:
|
|
results: Dict[str, HostResult] = {}
|
|
if not group_label:
|
|
return results
|
|
|
|
logs_dir = reporter_root / "logs"
|
|
if not logs_dir.exists():
|
|
return results
|
|
|
|
for host_dir in sorted(logs_dir.iterdir()):
|
|
if not host_dir.is_dir():
|
|
continue
|
|
host = host_dir.name
|
|
if not host.startswith("atvm"):
|
|
continue
|
|
if infer_group_from_host(host) != group_label:
|
|
continue
|
|
|
|
latest_artifact_mtime: Optional[datetime] = None
|
|
latest_result: Optional[HostResult] = None
|
|
for artifact_path in sorted(host_dir.iterdir()):
|
|
if artifact_path.suffix not in {".txt", ".json"}:
|
|
continue
|
|
artifact_mtime = datetime.fromtimestamp(artifact_path.stat().st_mtime, tz=timezone.utc)
|
|
if artifact_mtime < run_started_at:
|
|
continue
|
|
if run_ended_at and artifact_mtime >= run_ended_at:
|
|
continue
|
|
|
|
parsed_result = parse_host_reporter_artifact(artifact_path, host, kernels)
|
|
if parsed_result is None:
|
|
continue
|
|
|
|
# Prefer the newest artifact, and prefer JSON over TXT when timestamps tie.
|
|
if latest_artifact_mtime is None or artifact_mtime > latest_artifact_mtime or (
|
|
artifact_mtime == latest_artifact_mtime and artifact_path.suffix == ".json"
|
|
):
|
|
latest_artifact_mtime = artifact_mtime
|
|
latest_result = parsed_result
|
|
|
|
if latest_artifact_mtime is None or latest_result is None:
|
|
continue
|
|
|
|
if latest_result.timestamp is None:
|
|
latest_result.timestamp = latest_artifact_mtime
|
|
results[host] = latest_result
|
|
return results
|
|
|
|
|
|
def collect_latest_group_host_reporter_artifact(
|
|
reporter_root: Path,
|
|
group_label: Optional[str],
|
|
kernels: Dict[str, str],
|
|
run_started_at: datetime,
|
|
run_ended_at: Optional[datetime] = None,
|
|
) -> Optional[Tuple[str, HostResult]]:
|
|
results = collect_group_host_reporter_artifacts(
|
|
reporter_root=reporter_root,
|
|
group_label=group_label,
|
|
kernels=kernels,
|
|
run_started_at=run_started_at,
|
|
run_ended_at=run_ended_at,
|
|
)
|
|
if not results:
|
|
return None
|
|
return max(
|
|
results.items(),
|
|
key=lambda item: item[1].timestamp or datetime.fromtimestamp(0, tz=timezone.utc),
|
|
)
|
|
|
|
|
|
def find_check_xml_end(
|
|
reporter_root: Path,
|
|
started_at: datetime,
|
|
ended_at: Optional[datetime] = None,
|
|
) -> Optional[datetime]:
|
|
xml_dir = reporter_root / "xml"
|
|
if not xml_dir.exists():
|
|
return None
|
|
latest: Optional[datetime] = None
|
|
for xml_path in sorted(existing_paths(xml_dir.glob("test-result-*.xml")), key=safe_mtime):
|
|
xml_mtime = datetime.fromtimestamp(safe_mtime(xml_path), tz=timezone.utc)
|
|
if xml_mtime < started_at:
|
|
continue
|
|
if ended_at and xml_mtime >= ended_at:
|
|
continue
|
|
text = read_text(xml_path)
|
|
if "check-xml-files.ts" not in text:
|
|
continue
|
|
try:
|
|
tree = ET.parse(xml_path)
|
|
root = tree.getroot()
|
|
suite = root.find("testsuite")
|
|
if suite is None:
|
|
continue
|
|
ts = parse_xml_timestamp(suite.attrib.get("timestamp"))
|
|
if ts:
|
|
latest = ts
|
|
except ET.ParseError:
|
|
continue
|
|
return latest
|
|
|
|
|
|
def find_current_running_host(log_text: str, completed_hosts: List[str]) -> Optional[str]:
|
|
matches = re.findall(r"Running:\s+(?:cypress/cmcRegressionTest/)?(atvm[^/\s]+)\.ts", log_text)
|
|
for host in reversed(matches):
|
|
if host not in completed_hosts:
|
|
return host
|
|
return None
|
|
|
|
|
|
def infer_host_from_subrun_build(
|
|
subrun_build: str,
|
|
expected_hosts: List[str],
|
|
completed_hosts: List[str],
|
|
) -> Optional[str]:
|
|
remaining_hosts = [host for host in expected_hosts if host not in completed_hosts]
|
|
lowered_build = subrun_build.lower()
|
|
for host in remaining_hosts:
|
|
short = host.split("-", 1)[-1].lower()
|
|
if short.startswith("w2k"):
|
|
if "windows" in lowered_build or "w2k" in lowered_build:
|
|
return host
|
|
elif short.startswith("redhat") and "redhat" in lowered_build:
|
|
return host
|
|
elif short.startswith("ubuntu") and "ubuntu" in lowered_build:
|
|
return host
|
|
elif short.startswith("oracle") and "oracle" in lowered_build:
|
|
return host
|
|
elif short.startswith("rocky") and "rocky" in lowered_build:
|
|
return host
|
|
elif short.startswith("debian") and "debian" in lowered_build:
|
|
return host
|
|
return remaining_hosts[0] if remaining_hosts else None
|
|
|
|
|
|
def infer_metadata(build_name: str, log_text: str) -> Dict[str, object]:
|
|
try:
|
|
extra_options = json.loads(os.environ.get("ATVM_WATCHER_EXTRA_OPTIONS", "[]"))
|
|
except json.JSONDecodeError:
|
|
extra_options = []
|
|
if not isinstance(extra_options, list):
|
|
extra_options = []
|
|
extra_options = [value for value in extra_options if isinstance(value, str) and value]
|
|
template_command = os.environ.get("ATVM_WATCHER_TEMPLATE_COMMAND", "")
|
|
if not template_command:
|
|
template_command = extract_command_from_log(log_text, "cmc-templates.py") or extract_command_from_ps(build_name, "cmc-templates.py") or ""
|
|
runner_command = os.environ.get("ATVM_WATCHER_RUNNER_COMMAND", "")
|
|
if not runner_command:
|
|
runner_command = extract_command_from_log(log_text, "run-sorry-cypress.py", build_name) or extract_command_from_ps(build_name, "run-sorry-cypress.py") or ""
|
|
return {
|
|
"template": os.environ.get("ATVM_WATCHER_TEMPLATE", "unknown"),
|
|
"template_command": template_command,
|
|
"runner_command": runner_command,
|
|
"config_family": os.environ.get("ATVM_WATCHER_CONFIG_FAMILY", "unknown"),
|
|
"config_file": os.environ.get("ATVM_WATCHER_CONFIG_FILE", "unknown"),
|
|
"migration_style": os.environ.get("ATVM_WATCHER_MIGRATION_STYLE", "ATVM automation validation"),
|
|
"integration_plugin": os.environ.get("ATVM_WATCHER_INTEGRATION_PLUGIN", "unknown"),
|
|
"extra_options": extra_options,
|
|
"scope_description": os.environ.get("ATVM_WATCHER_SCOPE_DESCRIPTION", "requested ATVM run scope"),
|
|
"categorized": os.environ.get("ATVM_WATCHER_CATEGORIZED", "false").lower() == "true",
|
|
}
|
|
|
|
|
|
def format_duration(seconds: Optional[float]) -> str:
|
|
if seconds is None:
|
|
return "n/a"
|
|
minutes, secs = divmod(seconds, 60)
|
|
hours, minutes = divmod(int(minutes), 60)
|
|
if hours:
|
|
return f"{hours}h {minutes:02d}m {secs:05.2f}s"
|
|
if minutes:
|
|
return f"{minutes}m {secs:05.2f}s"
|
|
return f"{secs:.3f}s"
|
|
|
|
|
|
def format_timestamp_local(ts: Optional[datetime]) -> str:
|
|
if not ts:
|
|
return "n/a"
|
|
local = ts.astimezone()
|
|
return local.strftime("%Y-%m-%d %H:%M:%S %Z")
|
|
|
|
|
|
def get_test_flow(template_name: object) -> List[str]:
|
|
if not isinstance(template_name, str):
|
|
return DEFAULT_TEST_FLOW
|
|
return TEMPLATE_TEST_FLOWS.get(template_name, DEFAULT_TEST_FLOW)
|
|
|
|
|
|
def read_spec_pattern_from_config(project_root: Path, config_file: object) -> List[str]:
|
|
if not isinstance(config_file, str) or not config_file or config_file == "unknown":
|
|
return []
|
|
config_path = project_root / config_file
|
|
if not config_path.exists():
|
|
return []
|
|
text = config_path.read_text(encoding="utf-8", errors="replace")
|
|
match = re.search(r'"specPattern"\s*:\s*(\[[^\n]+\])', text)
|
|
if not match:
|
|
return []
|
|
try:
|
|
spec_list = ast.literal_eval(match.group(1))
|
|
except (SyntaxError, ValueError):
|
|
return []
|
|
return spec_list if isinstance(spec_list, list) else []
|
|
|
|
|
|
def extract_test_flow_from_generated_spec(
|
|
reporter_root: Path,
|
|
log_text: str,
|
|
metadata: Dict[str, object],
|
|
) -> List[str]:
|
|
spec_match = re.search(r'Extracted specPattern:\s*(\[[^\n]+\])', log_text)
|
|
cypress_root = reporter_root.parent
|
|
project_root = cypress_root.parent
|
|
spec_list: List[str] = []
|
|
if spec_match:
|
|
try:
|
|
parsed = ast.literal_eval(spec_match.group(1))
|
|
except (SyntaxError, ValueError):
|
|
parsed = []
|
|
if isinstance(parsed, list):
|
|
spec_list = parsed
|
|
if not spec_list:
|
|
spec_list = read_spec_pattern_from_config(project_root, metadata.get("config_file"))
|
|
if not spec_list:
|
|
return []
|
|
|
|
runtime_settings: Dict[str, object] = {}
|
|
config_file = metadata.get("config_file")
|
|
if isinstance(config_file, str) and config_file:
|
|
config_path = project_root / config_file
|
|
if config_path.exists():
|
|
config_text = config_path.read_text(encoding="utf-8", errors="replace")
|
|
for key in ("pure_plugin_type", "debug-type"):
|
|
match = re.search(rf'"{re.escape(key)}"\s*:\s*"([^"]*)"', config_text)
|
|
if match:
|
|
runtime_settings[key] = match.group(1)
|
|
for key in ("test-unaligned-fio", "isRegularCutover", "test-install-only"):
|
|
match = re.search(rf'"{re.escape(key)}"\s*:\s*(true|false)', config_text)
|
|
if match:
|
|
runtime_settings[key] = match.group(1) == "true"
|
|
|
|
def selected_plugin_gates() -> Optional[set[str]]:
|
|
install_only = runtime_settings.get("test-install-only")
|
|
if install_only is True:
|
|
return set()
|
|
pure_plugin_type = runtime_settings.get("pure_plugin_type")
|
|
if isinstance(pure_plugin_type, str):
|
|
lowered = pure_plugin_type.lower()
|
|
if lowered == "both":
|
|
return {"fc", "iscsi"}
|
|
if lowered in {"fc", "iscsi"}:
|
|
return {lowered}
|
|
integration_plugin = metadata.get("integration_plugin")
|
|
if not isinstance(integration_plugin, str):
|
|
return None
|
|
lowered = integration_plugin.lower()
|
|
if not lowered or lowered == "unknown":
|
|
return None
|
|
if "both" in lowered:
|
|
return {"fc", "iscsi"}
|
|
selected: set[str] = set()
|
|
if re.search(r"\bfc\b", lowered):
|
|
selected.add("fc")
|
|
if "iscsi" in lowered:
|
|
selected.add("iscsi")
|
|
return selected or None
|
|
|
|
def evaluate_gate_line(line: str, allowed_plugins: Optional[set[str]]) -> Tuple[bool, Optional[bool]]:
|
|
normalized = re.sub(r"\s+", "", line)
|
|
plugin_gate_match = re.match(r"if\((use[A-Za-z]+Plugin)\)", normalized)
|
|
if plugin_gate_match:
|
|
variable_name = plugin_gate_match.group(1).lower()
|
|
if "iscsi" in variable_name:
|
|
return True, allowed_plugins is None or "iscsi" in allowed_plugins
|
|
if "fc" in variable_name:
|
|
return True, allowed_plugins is None or "fc" in allowed_plugins
|
|
if normalized.startswith('if(Cypress.env("test-unaligned-fio")==true)'):
|
|
value = runtime_settings.get("test-unaligned-fio")
|
|
return True, value if isinstance(value, bool) else None
|
|
install_only_match = re.match(
|
|
r"""if\(Cypress\.env\((["'])test-install-only\1\)==(true|false)\)""",
|
|
normalized,
|
|
)
|
|
if install_only_match:
|
|
value = runtime_settings.get("test-install-only")
|
|
expected = install_only_match.group(2) == "true"
|
|
if isinstance(value, bool):
|
|
return True, value is expected
|
|
return True, None
|
|
if normalized.startswith('if(Cypress.env("isRegularCutover")==false)'):
|
|
value = runtime_settings.get("isRegularCutover")
|
|
return True, (value is False) if isinstance(value, bool) else None
|
|
if normalized.startswith('if(Cypress.env("isRegularCutover")===true)'):
|
|
value = runtime_settings.get("isRegularCutover")
|
|
return True, (value is True) if isinstance(value, bool) else None
|
|
if normalized.startswith("if(!enabled_percpu_debug)"):
|
|
debug_type = runtime_settings.get("debug-type")
|
|
return True, debug_type != "percpu" if isinstance(debug_type, str) else None
|
|
if normalized.startswith("if(enabled_percpu_debug)"):
|
|
debug_type = runtime_settings.get("debug-type")
|
|
return True, debug_type == "percpu" if isinstance(debug_type, str) else None
|
|
return False, None
|
|
|
|
allowed_plugins = selected_plugin_gates()
|
|
|
|
for entry in spec_list:
|
|
if not isinstance(entry, str) or "check-xml-files.ts" in entry:
|
|
continue
|
|
spec_path = cypress_root / Path(entry).relative_to("cypress")
|
|
if not spec_path.exists():
|
|
continue
|
|
steps: List[str] = []
|
|
active_gate_blocks: List[Tuple[Optional[bool], int]] = []
|
|
pending_gate_block: Optional[Optional[bool]] = None
|
|
current_depth = 0
|
|
for line in spec_path.read_text(encoding="utf-8", errors="replace").splitlines():
|
|
matched_gate, gate_result = evaluate_gate_line(line, allowed_plugins)
|
|
if matched_gate:
|
|
if "{" in line:
|
|
active_gate_blocks.append((gate_result, current_depth + line.count("{")))
|
|
else:
|
|
pending_gate_block = gate_result
|
|
elif pending_gate_block is not None and "{" in line:
|
|
active_gate_blocks.append((pending_gate_block, current_depth + line.count("{")))
|
|
pending_gate_block = None
|
|
|
|
match = re.search(r'it\(\s*`?\$\{numStep\+\+\}\.\s*(.*?)`\s*,', line)
|
|
if match:
|
|
step_text = match.group(1).strip()
|
|
include_step = (
|
|
step_text
|
|
and not any(gate_result is False for gate_result, _ in active_gate_blocks)
|
|
)
|
|
if include_step:
|
|
steps.append(f"{len(steps) + 1}. {step_text}")
|
|
current_depth += line.count("{") - line.count("}")
|
|
while active_gate_blocks and current_depth < active_gate_blocks[-1][1]:
|
|
active_gate_blocks.pop()
|
|
if steps:
|
|
return steps
|
|
return []
|
|
|
|
|
|
def coverage_lines(metadata: Dict[str, object]) -> List[str]:
|
|
lines = [
|
|
f"- template: `{metadata['template']}`",
|
|
f"- categorize mode: `{'enabled' if metadata.get('categorized') else 'disabled'}`",
|
|
f"- datastore/config family: `{metadata['config_family']}`",
|
|
f"- config file: `{metadata.get('config_file', 'unknown')}`",
|
|
f"- migration style: {metadata['migration_style']}",
|
|
]
|
|
integration_plugin = metadata.get("integration_plugin")
|
|
if isinstance(integration_plugin, str) and integration_plugin and integration_plugin != "unknown":
|
|
lines.append(f"- integration/plugin path: `{integration_plugin}`")
|
|
coverage_options = list(metadata.get("extra_options", [])) if isinstance(metadata.get("extra_options"), list) else []
|
|
if metadata.get("categorized"):
|
|
coverage_options = [value for value in coverage_options if value != "--categorize"]
|
|
lines.append(f"- run options: {', '.join(f'`{value}`' for value in coverage_options) or 'none'}")
|
|
return lines
|
|
|
|
|
|
def infer_missing_host_durations(ordered_hosts: List[HostResult], end_ts: Optional[datetime]) -> None:
|
|
timed_hosts = [host for host in ordered_hosts if host.timestamp]
|
|
if not timed_hosts:
|
|
return
|
|
timed_hosts.sort(key=lambda host: host.timestamp or datetime.fromtimestamp(0, tz=timezone.utc))
|
|
for index, host in enumerate(timed_hosts):
|
|
if host.duration_seconds is not None:
|
|
continue
|
|
current_ts = host.timestamp
|
|
if current_ts is None:
|
|
continue
|
|
next_ts = timed_hosts[index + 1].timestamp if index + 1 < len(timed_hosts) else end_ts
|
|
if next_ts is None or next_ts <= current_ts:
|
|
continue
|
|
host.duration_seconds = (next_ts - current_ts).total_seconds()
|
|
|
|
|
|
def build_status_markdown(
|
|
build_name: str,
|
|
metadata: Dict[str, object],
|
|
host_results: Dict[str, HostResult],
|
|
run_state: str,
|
|
currents_url: Optional[str],
|
|
start_ts: Optional[datetime],
|
|
end_ts: Optional[datetime],
|
|
notes: List[str],
|
|
reporter_root: Path,
|
|
log_text: str,
|
|
) -> str:
|
|
ordered_hosts = list(host_results.values())
|
|
infer_missing_host_durations(ordered_hosts, end_ts)
|
|
finished = len([h for h in ordered_hosts if h.status in {"PASS", "FAIL"}])
|
|
passed = len([h for h in ordered_hosts if h.status == "PASS"])
|
|
failed = len([h for h in ordered_hosts if h.status == "FAIL"])
|
|
skipped = len([h for h in ordered_hosts if h.status == "SKIP"])
|
|
durations = [h.duration_seconds for h in ordered_hosts if h.duration_seconds is not None]
|
|
quickest = min((h for h in ordered_hosts if h.duration_seconds is not None), key=lambda h: h.duration_seconds, default=None)
|
|
longest = max((h for h in ordered_hosts if h.duration_seconds is not None), key=lambda h: h.duration_seconds, default=None)
|
|
average = (sum(durations) / len(durations)) if durations else None
|
|
|
|
failure_notes: List[str] = []
|
|
for host in ordered_hosts:
|
|
if host.status != "FAIL":
|
|
continue
|
|
mochawesome_failure = extract_failure_from_mochawesome(reporter_root, build_name, host.host)
|
|
if not mochawesome_failure:
|
|
continue
|
|
testcase, message, estack = mochawesome_failure
|
|
if not (message or estack):
|
|
continue
|
|
existing_step = extract_step_number(host.detail)
|
|
mochawesome_step = extract_step_number(testcase)
|
|
if (
|
|
existing_step is not None
|
|
and mochawesome_step is not None
|
|
and existing_step != mochawesome_step
|
|
):
|
|
testcase = extract_testcase_from_host_detail(host.detail) or testcase
|
|
host.detail = summarize_host_detail_with_mochawesome(host.detail, testcase, message)
|
|
failure_excerpt_source = estack or message
|
|
if failure_excerpt_source.strip():
|
|
failure_notes.append(
|
|
f"{host.host} failure excerpt: `{compact_failure_detail(failure_excerpt_source, limit=420)}`"
|
|
)
|
|
|
|
host_lines = ["| Host | Kernel | Status | Detail (For failures, see Failure Notes section below for more details) |", "| --- | --- | --- | --- |"]
|
|
for host in ordered_hosts:
|
|
icon = {
|
|
"PASS": "✅ PASS",
|
|
"FAIL": "⚠️ FAIL",
|
|
"RUN": "⏳ RUN",
|
|
"SKIP": "⏭️ SKIP",
|
|
"NOT STARTED": "⏳ RUN",
|
|
}.get(host.status, host.status)
|
|
host_lines.append(f"| {host.host} | {host.kernel} | {icon} | {host.detail} |")
|
|
|
|
if currents_url:
|
|
notes = notes + [f"Currents recorded run: `{currents_url}`"]
|
|
template_command = metadata.get("template_command")
|
|
if isinstance(template_command, str) and template_command:
|
|
notes = notes + [f"Template command: `{template_command}`"]
|
|
runner_command = metadata.get("runner_command")
|
|
if isinstance(runner_command, str) and runner_command:
|
|
notes = notes + [f"Run command: `{runner_command}`"]
|
|
template_name = metadata.get("template")
|
|
integration_plugin = metadata.get("integration_plugin")
|
|
if (
|
|
template_name == "cmc-reboot"
|
|
and isinstance(integration_plugin, str)
|
|
and integration_plugin.lower() == "pure/both"
|
|
):
|
|
notes = notes + [
|
|
"Both iscsi and fc disks were used for the reboot test. As a result, iscsi disks may not have attached before the mtdi started. So if the test failed, that is most likely the issue."
|
|
]
|
|
failure_notes_block = "\n".join(f"- {note}" for note in failure_notes) if failure_notes else "- none"
|
|
notes_block = "\n".join(f"- {note}" for note in notes) if notes else "- none"
|
|
resolved_flow = extract_test_flow_from_generated_spec(reporter_root, log_text, metadata) or get_test_flow(metadata.get("template"))
|
|
test_flow_lines = [f"- {step}" for step in resolved_flow]
|
|
coverage_block = coverage_lines(metadata)
|
|
|
|
lines = [
|
|
"## ATVM Run Status",
|
|
f"### {build_name}",
|
|
"",
|
|
"**SUMMARY:**",
|
|
"",
|
|
"| Metric | Value |",
|
|
"| --- | --- |",
|
|
f"| finished | {finished} |",
|
|
f"| passed | {passed} |",
|
|
f"| failed | {failed} |",
|
|
f"| skipped | {skipped} |",
|
|
"",
|
|
"**HOSTS:**",
|
|
"",
|
|
*host_lines,
|
|
"",
|
|
"**TIMING:**",
|
|
"",
|
|
"| Metric | Value |",
|
|
"| --- | --- |",
|
|
f"| start | {format_timestamp_local(start_ts)} |",
|
|
f"| end | {format_timestamp_local(end_ts)} |",
|
|
f"| total | {format_duration((end_ts - start_ts).total_seconds()) if start_ts and end_ts else 'n/a'} |",
|
|
f"| quickest | {f'{quickest.host} - {format_duration(quickest.duration_seconds)}' if quickest else 'n/a'} |",
|
|
f"| longest | {f'{longest.host} - {format_duration(longest.duration_seconds)}' if longest else 'n/a'} |",
|
|
f"| average | {format_duration(average) if average is not None else 'n/a'} |",
|
|
"",
|
|
"**COVERAGE:**",
|
|
*coverage_block,
|
|
"",
|
|
"**TEST FLOW:**",
|
|
*test_flow_lines,
|
|
"",
|
|
"**FAILURE NOTES:**",
|
|
failure_notes_block,
|
|
"",
|
|
"**NOTES:**",
|
|
notes_block,
|
|
]
|
|
return "\n".join(lines)
|
|
|
|
|
|
def post_to_mattermost(text: str) -> str:
|
|
webhook = os.environ["MATTERMOST_ATVM_WEBHOOK"]
|
|
payload = {"text": text}
|
|
channel = os.environ.get("MATTERMOST_ATVM_CHANNEL")
|
|
if channel:
|
|
payload["channel"] = channel
|
|
data = json.dumps(payload).encode()
|
|
request = urllib.request.Request(webhook, data=data, headers={"Content-Type": "application/json"})
|
|
with urllib.request.urlopen(request) as response:
|
|
return response.read().decode().strip()
|
|
|
|
|
|
def sanitize_key(raw: str) -> str:
|
|
return re.sub(r"[^A-Za-z0-9_.-]+", "-", raw).strip("-") or "subrun"
|
|
|
|
|
|
def infer_group_label(hosts: List[str], index: int) -> str:
|
|
if not hosts:
|
|
return f"group{index}"
|
|
labels: List[str] = []
|
|
for host in hosts:
|
|
short = host.split("-", 1)[-1]
|
|
if short.startswith("w2k"):
|
|
label = "windows"
|
|
else:
|
|
label = re.sub(r"\d.*$", "", short) or short
|
|
if label not in labels:
|
|
labels.append(label)
|
|
return "-".join(labels) if labels else f"group{index}"
|
|
|
|
|
|
def infer_group_from_host(host: str) -> str:
|
|
short = host.split("-", 1)[-1].lower()
|
|
if short.startswith("w2k"):
|
|
return "windows"
|
|
if short.startswith("amazonlinux"):
|
|
return "amazonlinux"
|
|
if short.startswith("centos"):
|
|
return "centos"
|
|
if short.startswith("ubuntu"):
|
|
return "ubuntu"
|
|
if short.startswith("rocky"):
|
|
return "rocky"
|
|
if short.startswith("redhat"):
|
|
return "redhat"
|
|
if short.startswith("oracle"):
|
|
return "oracle"
|
|
if short.startswith("fedora"):
|
|
return "fedora"
|
|
if short.startswith("debian"):
|
|
return "debian"
|
|
if short.startswith("suse"):
|
|
return "suse"
|
|
return short or "group"
|
|
|
|
|
|
def corrected_categorized_display_name(raw_display_name: str, hosts: List[str]) -> str:
|
|
if not hosts:
|
|
return raw_display_name
|
|
group = infer_group_from_host(hosts[0])
|
|
return re.sub(r"-(amazonlinux|centos|ubuntu|rocky|redhat|oracle|fedora|debian|suse|windows|w2k)-batch", f"-{group}-batch", raw_display_name)
|
|
|
|
|
|
def logical_categorized_key(display_name: str) -> str:
|
|
match = re.match(
|
|
r"^(.*?-(amazonlinux|centos|ubuntu|rocky|redhat|oracle|fedora|debian|suse|windows|w2k))-batch\d+_\d+.*$",
|
|
display_name,
|
|
)
|
|
if match:
|
|
return sanitize_key(match.group(1))
|
|
return sanitize_key(display_name)
|
|
|
|
|
|
def merge_categorized_state(
|
|
merged: Dict[str, Dict[str, object]],
|
|
display_name: str,
|
|
state: str,
|
|
host_results: Dict[str, HostResult],
|
|
start_ts: Optional[datetime],
|
|
end_ts: Optional[datetime],
|
|
currents_url: Optional[str],
|
|
notes: List[str],
|
|
) -> None:
|
|
key = logical_categorized_key(display_name)
|
|
existing = merged.get(key)
|
|
if existing is None:
|
|
merged[key] = {
|
|
"key": key,
|
|
"display_name": display_name,
|
|
"state": state,
|
|
"host_results": dict(host_results),
|
|
"start_ts": start_ts,
|
|
"end_ts": end_ts,
|
|
"currents_url": currents_url,
|
|
"notes": list(notes),
|
|
}
|
|
return
|
|
|
|
existing["host_results"].update(host_results)
|
|
|
|
if start_ts and (existing["start_ts"] is None or start_ts < existing["start_ts"]):
|
|
existing["start_ts"] = start_ts
|
|
if end_ts and (existing["end_ts"] is None or end_ts > existing["end_ts"]):
|
|
existing["end_ts"] = end_ts
|
|
if currents_url and not existing["currents_url"]:
|
|
existing["currents_url"] = currents_url
|
|
|
|
for note in notes:
|
|
if note not in existing["notes"]:
|
|
existing["notes"].append(note)
|
|
|
|
existing_state = existing["state"]
|
|
if state == "CANCELLED" or existing_state == "CANCELLED":
|
|
existing["state"] = "CANCELLED"
|
|
elif state == "RUNNING":
|
|
if existing_state not in {"COMPLETED", "FAILED"}:
|
|
existing["state"] = "RUNNING"
|
|
elif existing_state == "RUNNING":
|
|
existing["state"] = state
|
|
elif state == "FAILED" or existing_state == "FAILED":
|
|
existing["state"] = "FAILED"
|
|
elif state == "COMPLETED" or existing_state == "COMPLETED":
|
|
existing["state"] = "COMPLETED"
|
|
else:
|
|
existing["state"] = state
|
|
|
|
|
|
def extract_segment_build_name(segment_text: str, parent_build_name: str) -> Optional[str]:
|
|
patterns = [
|
|
rf"({re.escape(parent_build_name)}-[A-Za-z0-9_.-]*batch\d+_\d+)",
|
|
r"([A-Za-z0-9_.-]+-batch\d+_\d+)",
|
|
]
|
|
for pattern in patterns:
|
|
match = re.search(pattern, segment_text)
|
|
if match:
|
|
return match.group(1)
|
|
return None
|
|
|
|
|
|
def split_log_segments(log_text: str, parent_build_name: str, categorized: bool, default_started_at: datetime) -> List[SubRun]:
|
|
if not categorized:
|
|
return [
|
|
SubRun(
|
|
key=sanitize_key(parent_build_name),
|
|
display_name=parent_build_name,
|
|
started_at=default_started_at,
|
|
expected_hosts=extract_expected_hosts(log_text),
|
|
completed=False,
|
|
currents_url=None,
|
|
notes=[],
|
|
)
|
|
]
|
|
|
|
segment_starts: List[Tuple[int, Optional[datetime]]] = []
|
|
for match in re.finditer(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - INFO - Extracted specPattern:", log_text, re.M):
|
|
segment_starts.append((match.start(), parse_log_timestamp(match.group(1))))
|
|
|
|
if not segment_starts:
|
|
return [
|
|
SubRun(
|
|
key=sanitize_key(parent_build_name),
|
|
display_name=parent_build_name,
|
|
started_at=default_started_at,
|
|
expected_hosts=extract_expected_hosts(log_text),
|
|
completed=False,
|
|
currents_url=None,
|
|
notes=["Categorized mode was requested but no sub-run segment has appeared in the log yet."],
|
|
)
|
|
]
|
|
|
|
segments: List[SubRun] = []
|
|
for index, (start_offset, start_ts) in enumerate(segment_starts, start=1):
|
|
end_offset = segment_starts[index][0] if index < len(segment_starts) else len(log_text)
|
|
segment_text = log_text[start_offset:end_offset]
|
|
expected_hosts = extract_expected_hosts(segment_text)
|
|
display_name = extract_segment_build_name(segment_text, parent_build_name)
|
|
if not display_name:
|
|
display_name = f"{parent_build_name}-{infer_group_label(expected_hosts, index)}"
|
|
segments.append(
|
|
SubRun(
|
|
key=sanitize_key(display_name),
|
|
display_name=display_name,
|
|
started_at=start_ts or default_started_at,
|
|
expected_hosts=expected_hosts,
|
|
completed=index < len(segment_starts),
|
|
currents_url=None,
|
|
notes=[f"Categorized sub-run {index} of {len(segment_starts)}."],
|
|
)
|
|
)
|
|
return segments
|
|
|
|
|
|
def evaluate_subrun(
|
|
subrun: SubRun,
|
|
reporter_root: Path,
|
|
inventory: Dict[str, str],
|
|
build_dir: Path,
|
|
end_boundary: Optional[datetime],
|
|
parent_active: bool,
|
|
cancelled: bool,
|
|
) -> Tuple[str, Dict[str, HostResult], Optional[datetime], Optional[datetime], Optional[str], List[str]]:
|
|
notes = list(subrun.notes)
|
|
currents_url = subrun.currents_url or persisted_currents_url_for_build(build_dir, subrun.display_name)
|
|
host_results = collect_host_results(
|
|
reporter_root=reporter_root,
|
|
expected_hosts=subrun.expected_hosts,
|
|
kernels=inventory,
|
|
run_started_at=subrun.started_at,
|
|
run_ended_at=end_boundary,
|
|
)
|
|
check_end = find_check_xml_end(reporter_root, subrun.started_at, end_boundary)
|
|
start_candidates = [result.timestamp for result in host_results.values() if result.timestamp]
|
|
end_candidates = [result.timestamp for result in host_results.values() if result.timestamp]
|
|
if check_end:
|
|
end_candidates.append(check_end)
|
|
start_ts = min(start_candidates) if start_candidates else subrun.started_at
|
|
end_ts = max(end_candidates) if end_candidates else None
|
|
|
|
if cancelled:
|
|
notes.append("Cancellation marker detected.")
|
|
return "CANCELLED", host_results, start_ts, end_ts, currents_url, notes
|
|
|
|
if subrun.completed:
|
|
if not host_results:
|
|
notes.append("This categorized sub-run ended but no host results were detected.")
|
|
return "UNKNOWN", host_results, start_ts, end_ts, currents_url, notes
|
|
notes.append("Categorized sub-run completed and the next grouped run was launched.")
|
|
state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED"
|
|
return state, host_results, start_ts, end_ts, currents_url, notes
|
|
|
|
if parent_active:
|
|
current_host = next((host for host in subrun.expected_hosts if host not in host_results), None)
|
|
if current_host and current_host not in host_results:
|
|
host_results[current_host] = HostResult(
|
|
host=current_host,
|
|
kernel=inventory.get(current_host, "unknown"),
|
|
status="RUN",
|
|
detail="in progress",
|
|
)
|
|
return "RUNNING", host_results, start_ts, end_ts, currents_url, notes
|
|
|
|
if check_end and not host_results:
|
|
latest_host = collect_latest_host_reporter_artifact(
|
|
reporter_root=reporter_root,
|
|
expected_hosts=subrun.expected_hosts,
|
|
kernels=inventory,
|
|
run_started_at=subrun.started_at,
|
|
run_ended_at=check_end + timedelta(seconds=5),
|
|
)
|
|
if latest_host:
|
|
host, result = latest_host
|
|
host_results = {host: result}
|
|
|
|
if host_results:
|
|
notes.append("Run completed after the parent runner exited.")
|
|
state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED"
|
|
return state, host_results, start_ts, end_ts, currents_url, notes
|
|
|
|
notes.append("Run process exited before host results were detected.")
|
|
return "TERMINATED", host_results, start_ts, end_ts, currents_url, notes
|
|
|
|
|
|
def discover_categorized_subruns(
|
|
build_name: str,
|
|
build_dir: Path,
|
|
reporter_root: Path,
|
|
inventory: Dict[str, str],
|
|
log_text: str,
|
|
started_at: datetime,
|
|
parent_active: bool,
|
|
cancelled: bool,
|
|
) -> List[Dict[str, object]]:
|
|
xml_dir = reporter_root / "xml"
|
|
current_subrun_build = extract_active_subrun_build(build_name)
|
|
current_active_host = find_current_running_host(log_text, [])
|
|
current_active_group = infer_group_from_host(current_active_host) if current_active_host else None
|
|
expected_hosts = extract_expected_hosts(log_text)
|
|
completed_summaries = extract_completed_subrun_summaries(log_text, inventory)
|
|
merged_subrun_states: Dict[str, Dict[str, object]] = {}
|
|
completed_hosts: List[str] = []
|
|
discovered_builds: List[str] = []
|
|
current_summary_index = 0
|
|
|
|
if xml_dir.exists():
|
|
prefix = f"test-result-{build_name}-"
|
|
for xml_path in sorted(existing_paths(xml_dir.glob(f"{prefix}*.xml")), key=safe_mtime):
|
|
xml_mtime = datetime.fromtimestamp(safe_mtime(xml_path), tz=timezone.utc)
|
|
if xml_mtime < started_at:
|
|
continue
|
|
raw_display_name = xml_path.stem[len("test-result-"):]
|
|
discovered_builds.append(raw_display_name)
|
|
parsed = parse_host_xml(xml_path)
|
|
host_results: Dict[str, HostResult] = {}
|
|
if parsed:
|
|
host, result = parsed
|
|
result.kernel = inventory.get(host, "unknown")
|
|
host_results[host] = result
|
|
completed_hosts.append(host)
|
|
check_ts = extract_check_xml_timestamp_from_file(xml_path)
|
|
summary = completed_summaries[current_summary_index] if current_summary_index < len(completed_summaries) else None
|
|
inferred_host = infer_host_from_subrun_build(raw_display_name, expected_hosts, completed_hosts)
|
|
display_group_match = re.search(r"-(amazonlinux|centos|ubuntu|rocky|redhat|oracle|fedora|debian|suse|windows)-batch", raw_display_name)
|
|
raw_display_group = display_group_match.group(1) if display_group_match else None
|
|
if summary and (not host_results or all(result.host == "check-xml-files" for result in host_results.values())):
|
|
host_results = summary["host_results"]
|
|
completed_hosts.extend([host for host in host_results if host not in completed_hosts])
|
|
if check_ts:
|
|
group_host_results = collect_group_host_reporter_artifacts(
|
|
reporter_root=reporter_root,
|
|
group_label=raw_display_group,
|
|
kernels=inventory,
|
|
run_started_at=started_at,
|
|
run_ended_at=check_ts + timedelta(seconds=5),
|
|
)
|
|
if group_host_results:
|
|
merged_results = dict(host_results)
|
|
merged_results.update(group_host_results)
|
|
host_results = merged_results
|
|
completed_hosts.extend([host for host in host_results if host not in completed_hosts])
|
|
|
|
if not host_results and check_ts:
|
|
scoped_expected_hosts = [inferred_host] if inferred_host else expected_hosts
|
|
latest_host = collect_latest_host_reporter_artifact(
|
|
reporter_root=reporter_root,
|
|
expected_hosts=scoped_expected_hosts,
|
|
kernels=inventory,
|
|
run_started_at=started_at,
|
|
run_ended_at=check_ts + timedelta(seconds=5),
|
|
)
|
|
if not latest_host:
|
|
latest_host = collect_latest_group_host_reporter_artifact(
|
|
reporter_root=reporter_root,
|
|
group_label=raw_display_group,
|
|
kernels=inventory,
|
|
run_started_at=started_at,
|
|
run_ended_at=check_ts + timedelta(seconds=5),
|
|
)
|
|
if latest_host:
|
|
host, result = latest_host
|
|
host_results = {host: result}
|
|
if host not in completed_hosts:
|
|
completed_hosts.append(host)
|
|
if summary:
|
|
current_summary_index += 1
|
|
state = "RUNNING"
|
|
display_name = corrected_categorized_display_name(raw_display_name, list(host_results))
|
|
display_group = None
|
|
display_group_match = re.search(r"-(amazonlinux|centos|ubuntu|rocky|redhat|oracle|fedora|debian|suse|windows)-batch", display_name)
|
|
if display_group_match:
|
|
display_group = display_group_match.group(1)
|
|
if cancelled:
|
|
state = "CANCELLED"
|
|
elif check_ts and not host_results and parent_active:
|
|
state = "RUNNING"
|
|
elif check_ts and display_group and current_active_group and display_group != current_active_group:
|
|
state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED"
|
|
elif check_ts or raw_display_name != current_subrun_build or not parent_active:
|
|
state = "FAILED" if any(result.failures for result in host_results.values()) else "COMPLETED"
|
|
notes = [f"Categorized sub-run discovered from reporter file `{xml_path.name}`."]
|
|
if check_ts and not host_results and parent_active:
|
|
notes.append("Grouped reporter XML arrived before the parent run log exposed the final host summary; waiting to post until host details are available.")
|
|
if display_name != raw_display_name:
|
|
notes.append(f"Child build id was reported as `{raw_display_name}`, but the actual grouped run was inferred from host execution as `{display_name}`.")
|
|
if cancelled:
|
|
notes.append("Cancellation marker detected.")
|
|
end_ts = check_ts or next((result.timestamp for result in host_results.values() if result.timestamp), xml_mtime)
|
|
host_timestamps = [result.timestamp for result in host_results.values() if result.timestamp]
|
|
start_ts = min(host_timestamps) if host_timestamps else None
|
|
if not start_ts and summary and end_ts:
|
|
duration_seconds = next((result.duration_seconds for result in host_results.values() if result.duration_seconds is not None), None)
|
|
if duration_seconds is not None:
|
|
candidate_start = end_ts.timestamp() - duration_seconds
|
|
start_ts = datetime.fromtimestamp(candidate_start, tz=timezone.utc)
|
|
if not start_ts:
|
|
start_ts = min(xml_mtime, end_ts) if end_ts else xml_mtime
|
|
if end_ts and start_ts > end_ts:
|
|
start_ts = end_ts
|
|
merge_categorized_state(
|
|
merged_subrun_states,
|
|
display_name=display_name,
|
|
state=state,
|
|
host_results=host_results,
|
|
start_ts=start_ts,
|
|
end_ts=end_ts,
|
|
currents_url=(summary["currents_url"] if summary else None) or persisted_currents_url_for_build(build_dir, raw_display_name),
|
|
notes=notes,
|
|
)
|
|
|
|
if current_subrun_build and current_subrun_build not in discovered_builds:
|
|
current_host = find_current_running_host(log_text, completed_hosts)
|
|
if not current_host or current_host in completed_hosts:
|
|
current_host = infer_host_from_subrun_build(current_subrun_build, expected_hosts, completed_hosts)
|
|
host_results: Dict[str, HostResult] = {}
|
|
if current_host:
|
|
host_results[current_host] = HostResult(
|
|
host=current_host,
|
|
kernel=inventory.get(current_host, "unknown"),
|
|
status="RUN",
|
|
detail="in progress",
|
|
)
|
|
display_name = corrected_categorized_display_name(current_subrun_build, [current_host] if current_host else [])
|
|
notes = ["Active categorized sub-run inferred from live `--ci-build-id` process state."]
|
|
if display_name != current_subrun_build:
|
|
notes.append(f"Child build id is `{current_subrun_build}`, but the actual grouped run was inferred from host execution as `{display_name}`.")
|
|
if cancelled:
|
|
notes.append("Cancellation marker detected.")
|
|
merge_categorized_state(
|
|
merged_subrun_states,
|
|
display_name=display_name,
|
|
state="CANCELLED" if cancelled else "RUNNING",
|
|
host_results=host_results,
|
|
start_ts=started_at,
|
|
end_ts=None,
|
|
currents_url=persisted_currents_url_for_build(build_dir, current_subrun_build),
|
|
notes=notes,
|
|
)
|
|
|
|
return list(merged_subrun_states.values())
|
|
|
|
|
|
def determine_state(
|
|
build_name: str,
|
|
build_dir: Path,
|
|
run_log: Path,
|
|
reporter_root: Path,
|
|
inventory: Dict[str, str],
|
|
metadata: Dict[str, object],
|
|
started_at: datetime,
|
|
process_gone_since: Optional[datetime],
|
|
process_exit_grace_seconds: int,
|
|
) -> Tuple[str, List[Dict[str, object]], Dict[str, HostResult], Optional[datetime], Optional[datetime], Optional[str], List[str]]:
|
|
cancelled_marker = build_dir / "cancelled.marker"
|
|
log_text = read_text(run_log)
|
|
active = related_process_active(build_name) if metadata.get("categorized") else process_active(build_name)
|
|
cancelled = cancelled_marker.exists()
|
|
notes: List[str] = []
|
|
subrun_states: List[Dict[str, object]] = []
|
|
parent_host_results: Dict[str, HostResult] = {}
|
|
|
|
if metadata.get("categorized"):
|
|
subrun_states = discover_categorized_subruns(
|
|
build_name=build_name,
|
|
build_dir=build_dir,
|
|
reporter_root=reporter_root,
|
|
inventory=inventory,
|
|
log_text=log_text,
|
|
started_at=started_at,
|
|
parent_active=active,
|
|
cancelled=cancelled,
|
|
)
|
|
for subrun in subrun_states:
|
|
for host, result in subrun["host_results"].items():
|
|
parent_host_results[host] = result
|
|
else:
|
|
subruns = split_log_segments(log_text, build_name, bool(metadata.get("categorized")), started_at)
|
|
for index, subrun in enumerate(subruns):
|
|
next_started_at = subruns[index + 1].started_at if index + 1 < len(subruns) else None
|
|
state, host_results, start_ts, end_ts, currents_url, subrun_notes = evaluate_subrun(
|
|
subrun=subrun,
|
|
reporter_root=reporter_root,
|
|
inventory=inventory,
|
|
build_dir=build_dir,
|
|
end_boundary=next_started_at,
|
|
parent_active=active,
|
|
cancelled=cancelled,
|
|
)
|
|
for host, result in host_results.items():
|
|
parent_host_results[host] = result
|
|
subrun_states.append(
|
|
{
|
|
"key": subrun.key,
|
|
"display_name": subrun.display_name,
|
|
"state": state,
|
|
"host_results": host_results,
|
|
"start_ts": start_ts,
|
|
"end_ts": end_ts,
|
|
"currents_url": currents_url,
|
|
"notes": subrun_notes,
|
|
}
|
|
)
|
|
|
|
# Non-categorized runs often only write a final check-xml reporter XML.
|
|
# Prefer the parent "Cloud Run Finished" host summary because it preserves
|
|
# final pass/fail counts even when reporter fallback artifacts are sparse.
|
|
merged_summary = extract_merged_run_summary(log_text, inventory)
|
|
if merged_summary:
|
|
summary_results = merged_summary["host_results"]
|
|
for host, result in summary_results.items():
|
|
parent_host_results[host] = result
|
|
if subrun_states:
|
|
subrun = subrun_states[-1]
|
|
subrun["host_results"] = dict(parent_host_results)
|
|
if not subrun.get("currents_url") and merged_summary.get("currents_url"):
|
|
subrun["currents_url"] = merged_summary["currents_url"]
|
|
subrun["state"] = "FAILED" if any(result.failures for result in summary_results.values()) else "COMPLETED"
|
|
|
|
parent_start_candidates = [subrun["start_ts"] for subrun in subrun_states if subrun["start_ts"]]
|
|
parent_end_candidates = [subrun["end_ts"] for subrun in subrun_states if subrun["end_ts"]]
|
|
start_ts = min(parent_start_candidates) if parent_start_candidates else started_at
|
|
end_ts = max(parent_end_candidates) if parent_end_candidates else find_check_xml_end(reporter_root, started_at)
|
|
currents_url = extract_currents_url(log_text) or latest_currents_url(build_dir)
|
|
|
|
if cancelled:
|
|
notes.append("Cancellation marker detected.")
|
|
return "CANCELLED", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
|
|
|
|
if active:
|
|
elapsed = (now_utc() - started_at).total_seconds()
|
|
if elapsed > args.max_watch_seconds:
|
|
notes.append("Watcher exceeded max watch duration while the run still appears active.")
|
|
return "HUNG", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
|
|
return "RUNNING", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
|
|
|
|
if metadata.get("categorized") and process_gone_since and (now_utc() - process_gone_since).total_seconds() < process_exit_grace_seconds:
|
|
notes.append("Categorized parent runner has not been gone long enough to treat the request as finished.")
|
|
return "RUNNING", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
|
|
|
|
terminal_subruns = [subrun for subrun in subrun_states if subrun["state"] in {"COMPLETED", "FAILED"}]
|
|
if terminal_subruns:
|
|
state = "FAILED" if any(result.failures for result in parent_host_results.values()) else "COMPLETED"
|
|
return state, subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
|
|
|
|
if process_gone_since and (now_utc() - process_gone_since).total_seconds() >= process_exit_grace_seconds:
|
|
notes.append("Run process exited without a clean completion signal.")
|
|
return "TERMINATED", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
|
|
|
|
return "RUNNING", subrun_states, parent_host_results, start_ts, end_ts, currents_url, notes
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_args()
|
|
build_name = args.build_name
|
|
run_log = Path(args.run_log)
|
|
reporter_root = Path(args.reporter_root)
|
|
inventory_file = Path(args.inventory_file)
|
|
state_root = Path(args.state_dir)
|
|
build_dir = state_root / build_name
|
|
ensure_dir(build_dir)
|
|
state_file = build_dir / "state.json"
|
|
posted_marker = build_dir / "posted.marker"
|
|
|
|
inventory = load_inventory(inventory_file)
|
|
state = load_state(state_file)
|
|
log_text_for_start = read_text(run_log)
|
|
metadata = infer_metadata(build_name, log_text_for_start)
|
|
default_started_at = first_log_timestamp(log_text_for_start) or (datetime.fromtimestamp(run_log.stat().st_mtime, tz=timezone.utc) if run_log.exists() else now_utc())
|
|
started_at = parse_xml_timestamp(state.get("started_at")) or default_started_at
|
|
state.setdefault("build_name", build_name)
|
|
state.setdefault("started_at", started_at.isoformat())
|
|
write_state(state_file, state)
|
|
|
|
process_gone_since: Optional[datetime] = None
|
|
|
|
while True:
|
|
active = process_active(build_name)
|
|
if not active and process_gone_since is None:
|
|
process_gone_since = now_utc()
|
|
if active:
|
|
process_gone_since = None
|
|
current_log_text = read_text(run_log)
|
|
refreshed_metadata = infer_metadata(build_name, current_log_text)
|
|
for key in ("template_command", "runner_command"):
|
|
value = refreshed_metadata.get(key)
|
|
if isinstance(value, str) and value and not metadata.get(key):
|
|
metadata[key] = value
|
|
|
|
run_state, subrun_states, host_results, start_ts, end_ts, currents_url, notes = determine_state(
|
|
build_name=build_name,
|
|
build_dir=build_dir,
|
|
run_log=run_log,
|
|
reporter_root=reporter_root,
|
|
inventory=inventory,
|
|
metadata=metadata,
|
|
started_at=started_at,
|
|
process_gone_since=process_gone_since,
|
|
process_exit_grace_seconds=args.process_exit_grace_seconds,
|
|
)
|
|
|
|
state["last_state"] = run_state
|
|
state["last_seen_at"] = now_utc().isoformat()
|
|
state["host_results"] = {
|
|
host: {
|
|
"status": result.status,
|
|
"detail": result.detail,
|
|
"kernel": result.kernel,
|
|
"tests": result.tests,
|
|
"failures": result.failures,
|
|
}
|
|
for host, result in host_results.items()
|
|
}
|
|
state["subruns"] = {
|
|
subrun["display_name"]: {
|
|
"state": subrun["state"],
|
|
"hosts": sorted(subrun["host_results"].keys()),
|
|
"start_ts": subrun["start_ts"].isoformat() if subrun["start_ts"] else None,
|
|
"end_ts": subrun["end_ts"].isoformat() if subrun["end_ts"] else None,
|
|
"currents_url": subrun["currents_url"],
|
|
"notes": subrun["notes"],
|
|
}
|
|
for subrun in subrun_states
|
|
}
|
|
write_state(state_file, state)
|
|
|
|
for subrun in subrun_states:
|
|
subrun_dir = build_dir / "subruns" / subrun["key"]
|
|
ensure_dir(subrun_dir)
|
|
subrun_state_file = subrun_dir / "state.json"
|
|
subrun_posted_marker = subrun_dir / "posted.marker"
|
|
subrun_state = {
|
|
"display_name": subrun["display_name"],
|
|
"last_state": subrun["state"],
|
|
"last_seen_at": now_utc().isoformat(),
|
|
"host_results": {
|
|
host: {
|
|
"status": result.status,
|
|
"detail": result.detail,
|
|
"kernel": result.kernel,
|
|
"tests": result.tests,
|
|
"failures": result.failures,
|
|
}
|
|
for host, result in subrun["host_results"].items()
|
|
},
|
|
"notes": subrun["notes"],
|
|
"currents_url": subrun["currents_url"],
|
|
"started_at": subrun["start_ts"].isoformat() if subrun["start_ts"] else None,
|
|
"ended_at": subrun["end_ts"].isoformat() if subrun["end_ts"] else None,
|
|
}
|
|
if metadata.get("categorized") and subrun["state"] in {"COMPLETED", "FAILED"} and not subrun_posted_marker.exists():
|
|
status_text = build_status_markdown(
|
|
build_name=subrun["display_name"],
|
|
metadata=metadata,
|
|
host_results=dict(sorted(subrun["host_results"].items())),
|
|
run_state=subrun["state"],
|
|
currents_url=subrun["currents_url"],
|
|
start_ts=subrun["start_ts"],
|
|
end_ts=subrun["end_ts"],
|
|
notes=subrun["notes"],
|
|
reporter_root=reporter_root,
|
|
log_text=current_log_text,
|
|
)
|
|
print(status_text)
|
|
response = post_to_mattermost(status_text)
|
|
if response != "ok":
|
|
raise SystemExit(f"Mattermost webhook did not return ok for {subrun['display_name']}: {response!r}")
|
|
subrun_posted_marker.write_text("ok\n", encoding="utf-8")
|
|
subrun_state["mattermost_posted"] = True
|
|
subrun_state["mattermost_response"] = response
|
|
print(f"[watcher] Mattermost post confirmed for {subrun['display_name']}.")
|
|
write_state(subrun_state_file, subrun_state)
|
|
|
|
if run_state == "RUNNING":
|
|
print(f"[watcher] {build_name}: RUNNING")
|
|
time.sleep(args.poll_interval)
|
|
continue
|
|
|
|
status_text = build_status_markdown(
|
|
build_name=build_name,
|
|
metadata=metadata,
|
|
host_results=dict(sorted(host_results.items())),
|
|
run_state=run_state,
|
|
currents_url=currents_url,
|
|
start_ts=start_ts,
|
|
end_ts=end_ts,
|
|
notes=notes,
|
|
reporter_root=reporter_root,
|
|
log_text=current_log_text,
|
|
)
|
|
print(status_text)
|
|
|
|
if not metadata.get("categorized") and run_state in {"COMPLETED", "FAILED"} and not posted_marker.exists():
|
|
response = post_to_mattermost(status_text)
|
|
if response != "ok":
|
|
raise SystemExit(f"Mattermost webhook did not return ok: {response!r}")
|
|
posted_marker.write_text("ok\n", encoding="utf-8")
|
|
state["mattermost_posted"] = True
|
|
state["mattermost_response"] = response
|
|
write_state(state_file, state)
|
|
print(f"[watcher] Mattermost post confirmed for {build_name}.")
|
|
|
|
state["closed_at"] = now_utc().isoformat()
|
|
write_state(state_file, state)
|
|
sys.exit(0)
|