ci/lava: Integrate StructuralLogger with AutoSaveDict

Let's use the AutoSaveDict as structural logger abstraction to enable
real-time monitoring of LAVA jobs. Mainly used for local runs and
debugging of Mesa CI LAVA jobs.

Signed-off-by: Guilherme Gallo <guilherme.gallo@collabora.com>
Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/22500>
This commit is contained in:
Guilherme Gallo
2023-04-15 16:43:40 -03:00
committed by Marge Bot
parent eb1945c3d9
commit 5c5aec15b1

View File

@@ -9,7 +9,6 @@
"""Send a job to LAVA, track it and collect log back"""
import contextlib
import json
import pathlib
@@ -49,6 +48,14 @@ from lavacli.utils import flow_yaml as lava_yaml
# sophisticated dict-like data abstractions.
STRUCTURAL_LOG = defaultdict(list)
try:
from ci.structured_logger import StructuredLogger
except ImportError as e:
print_log(
f"Could not import StructuredLogger library: {e}. "
"Falling back to defaultdict based structured logger."
)
# Timeout in seconds to decide if the device from the dispatched LAVA job has
# hung or not due to the lack of new log output.
DEVICE_HANGING_TIMEOUT_SEC = int(getenv("LAVA_DEVICE_HANGING_TIMEOUT_SEC", 5*60))
@@ -173,6 +180,7 @@ def is_job_hanging(job, max_idle_time):
def parse_log_lines(job, log_follower, new_log_lines):
if log_follower.feed(new_log_lines):
# If we had non-empty log data, we can assure that the device is alive.
job.heartbeat()
@@ -190,6 +198,7 @@ def parse_log_lines(job, log_follower, new_log_lines):
def fetch_new_log_lines(job):
# The XMLRPC binary packet may be corrupted, causing a YAML scanner error.
# Retry the log fetching several times before exposing the error.
for _ in range(5):
@@ -275,19 +284,17 @@ def execute_job_with_retries(
) -> Optional[LAVAJob]:
for attempt_no in range(1, retry_count + 2):
# Need to get the logger value from its object to enable autosave
# features
# features, if AutoSaveDict is enabled from StructuredLogging module
jobs_log.append({})
job_log = jobs_log[-1]
job = LAVAJob(proxy, job_definition, job_log)
STRUCTURAL_LOG["dut_attempt_counter"] = attempt_no
try:
submit_job(job)
wait_for_job_get_started(job)
log_follower: LogFollower = bootstrap_log_follower()
follow_job_execution(job, log_follower)
return job
except (MesaCIException, KeyboardInterrupt) as exception:
job.handle_exception(exception)
print_log(
@@ -295,7 +302,6 @@ def execute_job_with_retries(
f"Finished executing LAVA job in the attempt #{attempt_no}"
f"{CONSOLE_LOG['RESET']}"
)
finally:
job_log["finished_time"] = datetime.now().isoformat()
print_job_final_status(job)
@@ -354,14 +360,17 @@ class LAVAJobSubmitter(PathResolver):
visibility_group: str = None # Only affects LAVA farm maintainers
job_rootfs_overlay_url: str = None
structured_log_file: pathlib.Path = None # Log file path with structured LAVA log
__structured_log_context = contextlib.nullcontext() # Structured Logger context
def __post_init__(self) -> None:
super().__post_init__()
# Remove mesa job names with spaces, which breaks the lava-test-case command
self.mesa_job_name = self.mesa_job_name.split(" ")[0]
if self.structured_log_file:
self.setup_structured_logger()
if not self.structured_log_file:
return
self.__structured_log_context = StructuredLoggerWrapper(self).logger_context()
def dump(self, job_definition):
if self.dump_yaml:
@@ -399,31 +408,56 @@ class LAVAJobSubmitter(PathResolver):
if self.validate_only:
return
try:
finished_job = retriable_follow_job(proxy, job_definition)
except Exception as exception:
STRUCTURAL_LOG["job_combined_fail_reason"] = str(exception)
raise exception
exit_code = 0 if finished_job.status == "pass" else 1
STRUCTURAL_LOG["job_combined_status"] = job.status
sys.exit(exit_code)
with self.__structured_log_context:
try:
finished_job = retriable_follow_job(proxy, job_definition)
exit_code = 0 if finished_job.status == "pass" else 1
STRUCTURAL_LOG["job_combined_status"] = job.status
sys.exit(exit_code)
def setup_structured_logger(self):
try:
global STRUCTURAL_LOG
STRUCTURAL_LOG = StructuredLogger(
self.structured_log_file, truncate=True
).data
except NameError as e:
print(
f"Could not import StructuredLogger library: {e}. "
"Falling back to DummyLogger"
)
except Exception as exception:
STRUCTURAL_LOG["job_combined_fail_reason"] = str(exception)
raise exception
STRUCTURAL_LOG["fixed_tags"] = self.lava_tags
STRUCTURAL_LOG["dut_job_type"] = self.device_type
class StructuredLoggerWrapper:
def __init__(self, submitter: LAVAJobSubmitter) -> None:
self.__submitter: LAVAJobSubmitter = submitter
def _init_logger(self):
STRUCTURAL_LOG["fixed_tags"] = self.__submitter.lava_tags
STRUCTURAL_LOG["dut_job_type"] = self.__submitter.device_type
STRUCTURAL_LOG["job_combined_fail_reason"] = None
STRUCTURAL_LOG["job_combined_status"] = "not_submitted"
STRUCTURAL_LOG["dut_attempt_counter"] = 0
# Initialize dut_jobs list to enable appends
STRUCTURAL_LOG["dut_jobs"] = []
@contextlib.contextmanager
def _simple_logger_context(self):
log_file = pathlib.Path(self.__submitter.structured_log_file)
log_file.parent.mkdir(parents=True, exist_ok=True)
try:
# Truncate the file
log_file.write_text("")
yield
finally:
log_file.write_text(json.dumps(STRUCTURAL_LOG, indent=2))
def logger_context(self):
context = contextlib.nullcontext()
try:
global STRUCTURAL_LOG
STRUCTURAL_LOG = StructuredLogger(
self.__submitter.structured_log_file, truncate=True
).data
except NameError:
context = self._simple_logger_context()
self._init_logger()
return context
if __name__ == "__main__":