ci: Remove lava-job-submitter, LAVA containers, and tests

lava-job-submitter has been moved to a new repository:
https://gitlab.freedesktop.org/gfx-ci/lava-job-submitter

Remove the LAVA-related code from Mesa, since we now use the containers
built in that repository. These containers already include
lava-job-submitter and run the relevant tests there.

Signed-off-by: Valentine Burley <valentine.burley@collabora.com>
Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/37510>
This commit is contained in:
Valentine Burley
2025-09-22 14:17:02 +02:00
committed by Marge Bot
parent 215199af88
commit 040201d46a
36 changed files with 5 additions and 4666 deletions

View File

@@ -54,17 +54,3 @@ alpine/x86_64_build:
optional: true
- job: alpine/x86_64_build
optional: true
# Alpine based x86_64 image for LAVA SSH dockerized client
alpine/x86_64_lava_ssh_client:
extends:
- .alpine/x86_64_build-base
variables:
MESA_IMAGE_TAG: &alpine-x86_64_lava_ssh_client ${ALPINE_X86_64_LAVA_SSH_TAG}
# Alpine based x86_64 image to run LAVA jobs
alpine/x86_64_lava-trigger:
extends:
- .alpine/x86_64_build-base
variables:
MESA_IMAGE_TAG: &alpine-x86_64_lava_trigger ${ALPINE_X86_64_LAVA_TRIGGER_TAG}

View File

@@ -1,49 +0,0 @@
#!/usr/bin/env bash
# This is a ci-templates build script to generate a container for triggering LAVA jobs.
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_TRIGGER_TAG
# shellcheck disable=SC1091
set -e
. .gitlab-ci/setup-test-env.sh
set -o xtrace
uncollapsed_section_start alpine_setup "Base Alpine system setup"
# Ephemeral packages (installed for this script and removed again at the end)
EPHEMERAL=(
git
py3-pip
)
# We only need these very basic packages to run the LAVA jobs
DEPS=(
curl
python3
tar
zstd
)
apk --no-cache add "${DEPS[@]}" "${EPHEMERAL[@]}"
pip3 install --break-system-packages -r bin/ci/requirements-lava.txt
cp -Rp .gitlab-ci/lava /
cp -Rp .gitlab-ci/bin/structured_logger.py /lava
. .gitlab-ci/container/container_pre_build.sh
############### Uninstall the build software
uncollapsed_section_switch alpine_cleanup "Cleaning up base Alpine system"
apk del "${EPHEMERAL[@]}"
. .gitlab-ci/container/container_post_build.sh
section_end alpine_cleanup

View File

@@ -1,36 +0,0 @@
#!/usr/bin/env bash
# This is a ci-templates build script to generate a container for LAVA SSH client.
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_SSH_TAG
# shellcheck disable=SC1091
set -e
. .gitlab-ci/setup-test-env.sh
set -o xtrace
EPHEMERAL=(
)
# We only need these very basic packages to run the tests.
DEPS=(
openssh-client # for ssh
iputils # for ping
bash
curl
)
apk --no-cache add "${DEPS[@]}" "${EPHEMERAL[@]}"
. .gitlab-ci/container/container_pre_build.sh
############### Uninstall the build software
apk del "${EPHEMERAL[@]}"
. .gitlab-ci/container/container_post_build.sh

View File

@@ -29,8 +29,6 @@ variables:
DEBIAN_TEST_VK_TAG: "20250930-piglit-a7"
ALPINE_X86_64_BUILD_TAG: "20250917-rust"
ALPINE_X86_64_LAVA_SSH_TAG: "20250423-rootfs"
ALPINE_X86_64_LAVA_TRIGGER_TAG: "20250814-trixie"
FEDORA_X86_64_BUILD_TAG: "20250917-rust"

View File

@@ -1,3 +0,0 @@
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_TRIGGER_TAG

View File

@@ -1,45 +0,0 @@
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_TRIGGER_TAG
from datetime import timedelta
class MesaCIException(Exception):
pass
class MesaCIRetriableException(MesaCIException):
pass
class MesaCITimeoutError(MesaCIRetriableException):
def __init__(self, *args, timeout_duration: timedelta) -> None:
super().__init__(*args)
self.timeout_duration = timeout_duration
class MesaCIRetryError(MesaCIRetriableException):
def __init__(self, *args, retry_count: int, last_job: None) -> None:
super().__init__(*args)
self.retry_count = retry_count
self.last_job = last_job
class MesaCIFatalException(MesaCIException):
"""Exception raised when the Mesa CI script encounters a fatal error that
prevents the script from continuing."""
def __init__(self, *args) -> None:
super().__init__(*args)
class MesaCIParseException(MesaCIRetriableException):
pass
class MesaCIKnownIssueException(MesaCIRetriableException):
"""Exception raised when the Mesa CI script finds something in the logs that
is known to cause the LAVA job to eventually fail"""
pass

View File

@@ -1,622 +0,0 @@
#!/usr/bin/env python3
#
# Copyright (C) 2020 - 2023 Collabora Limited
# Authors:
# Gustavo Padovan <gustavo.padovan@collabora.com>
# Guilherme Gallo <guilherme.gallo@collabora.com>
#
# SPDX-License-Identifier: MIT
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_TRIGGER_TAG
"""Send a job to LAVA, track it and collect log back"""
import contextlib
import json
import pathlib
import sys
import time
from collections import defaultdict
from dataclasses import dataclass, field, fields
from datetime import datetime, timedelta, UTC
from os import environ, getenv
from typing import Any, Optional, Self
import fire
from lavacli.utils import flow_yaml as lava_yaml
from lava.exceptions import (
MesaCIException,
MesaCIFatalException,
MesaCIRetriableException,
MesaCIParseException,
MesaCIRetryError,
MesaCITimeoutError,
)
from lava.utils import (
CONSOLE_LOG,
GitlabSection,
LAVAJob,
LAVAJobDefinition,
LogFollower,
LogSectionType,
call_proxy,
fatal_err,
hide_sensitive_data,
print_log,
setup_lava_proxy,
)
from lava.utils import DEFAULT_GITLAB_SECTION_TIMEOUTS as GL_SECTION_TIMEOUTS
# Initialize structural logging with a defaultdict, it can be changed for more
# sophisticated dict-like data abstractions.
STRUCTURAL_LOG = defaultdict(list)
try:
from structured_logger import StructuredLogger
except ImportError as e:
print_log(
f"Could not import StructuredLogger library: {e}. "
"Falling back to defaultdict based structured logger."
)
# Timeout in seconds to decide if the device from the dispatched LAVA job has
# hung or not due to the lack of new log output.
DEVICE_HANGING_TIMEOUT_SEC = int(getenv("DEVICE_HANGING_TIMEOUT_SEC", 5 * 60))
# How many seconds the script should wait before try a new polling iteration to
# check if the dispatched LAVA job is running or waiting in the job queue.
WAIT_FOR_DEVICE_POLLING_TIME_SEC = int(
getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 1)
)
# How many seconds the script will wait to let LAVA finalize the job and give
# the final details.
WAIT_FOR_LAVA_POST_PROCESSING_SEC = int(getenv("LAVA_WAIT_LAVA_POST_PROCESSING_SEC", 5))
WAIT_FOR_LAVA_POST_PROCESSING_RETRIES = int(
getenv("LAVA_WAIT_LAVA_POST_PROCESSING_RETRIES", 6)
)
# How many seconds to wait between log output LAVA RPC calls.
LOG_POLLING_TIME_SEC = int(getenv("LAVA_LOG_POLLING_TIME_SEC", 5))
# How many retries should be made when a timeout happen.
NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int(
getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2)
)
CI_JOB_TIMEOUT_SEC = int(getenv("CI_JOB_TIMEOUT", 3600))
# How many seconds the script will wait to let LAVA run the job and give the final details.
EXPECTED_JOB_DURATION_SEC = int(getenv("EXPECTED_JOB_DURATION_SEC", 60 * 10))
# CI_JOB_STARTED is given by GitLab CI/CD in UTC timezone by default.
CI_JOB_STARTED_AT_RAW = getenv("CI_JOB_STARTED_AT", "")
CI_JOB_STARTED_AT: datetime = (
datetime.fromisoformat(CI_JOB_STARTED_AT_RAW)
if CI_JOB_STARTED_AT_RAW
else datetime.now(tz=UTC)
)
def raise_exception_from_metadata(metadata: dict, job_id: int) -> None:
"""
Investigate infrastructure errors from the job metadata.
If it finds an error, raise it as MesaCIRetriableException.
"""
if "result" not in metadata or metadata["result"] != "fail":
return
if "error_type" in metadata:
error_type: str = metadata["error_type"]
error_msg: str = metadata.get("error_msg", "")
full_err_msg: str = error_type if not error_msg else f"{error_type}: {error_msg}"
if error_type == "Job":
# This happens when LAVA assumes that the job cannot terminate or
# with mal-formed job definitions. As we are always validating the
# jobs, only the former is probable to happen. E.g.: When some LAVA
# action timed out more times than expected in job definition.
raise MesaCIRetriableException(
f"LAVA job {job_id} failed with {full_err_msg}. Retry."
"(possible LAVA timeout misconfiguration/bug). Retry."
)
if error_type:
raise MesaCIRetriableException(
f"LAVA job {job_id} failed with error type: {full_err_msg}. Retry."
)
if "case" in metadata and metadata["case"] == "validate":
raise MesaCIRetriableException(
f"LAVA job {job_id} failed validation (possible download error). Retry."
)
def raise_lava_error(job) -> None:
# Look for infrastructure errors, raise them, and retry if we see them.
results_yaml = call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id)
results = lava_yaml.load(results_yaml)
for res in results:
metadata = res["metadata"]
raise_exception_from_metadata(metadata, job.job_id)
# If we reach this far, it means that the job ended without hwci script
# result and no LAVA infrastructure problem was found
job.status = "fail"
def fetch_logs(job, max_idle_time, log_follower) -> None:
is_job_hanging(job, max_idle_time)
time.sleep(LOG_POLLING_TIME_SEC)
new_log_lines = fetch_new_log_lines(job)
parsed_lines = parse_log_lines(job, log_follower, new_log_lines)
for line in parsed_lines:
print_log(line)
def is_job_hanging(job, max_idle_time):
# Poll to check for new logs, assuming that a prolonged period of
# silence means that the device has died and we should try it again
if datetime.now(tz=UTC) - job.last_log_time > max_idle_time:
max_idle_time_min = max_idle_time.total_seconds() / 60
raise MesaCITimeoutError(
f"{CONSOLE_LOG['FG_BOLD_YELLOW']}"
f"LAVA job {job.job_id} unresponsive for {max_idle_time_min} "
"minutes; retrying the job."
f"{CONSOLE_LOG['RESET']}",
timeout_duration=max_idle_time,
)
def parse_log_lines(job, log_follower, new_log_lines):
if log_follower.feed(new_log_lines):
# If we had non-empty log data, we can assure that the device is alive.
job.heartbeat()
parsed_lines = log_follower.flush()
# Only parse job results when the script reaches the end of the logs.
# Depending on how much payload the RPC scheduler.jobs.logs get, it may
# reach the LAVA_POST_PROCESSING phase.
if log_follower.current_section.type in (
LogSectionType.TEST_CASE,
LogSectionType.LAVA_POST_PROCESSING,
):
parsed_lines = job.parse_job_result_from_log(parsed_lines)
return parsed_lines
def fetch_new_log_lines(job):
# The XMLRPC binary packet may be corrupted, causing a YAML scanner error.
# Retry the log fetching several times before exposing the error.
for _ in range(5):
with contextlib.suppress(MesaCIParseException):
new_log_lines = job.get_logs()
break
else:
raise MesaCIParseException
return new_log_lines
def submit_job(job):
try:
job.submit()
except Exception as mesa_ci_err:
raise MesaCIRetriableException(
f"Could not submit LAVA job. Reason: {mesa_ci_err}"
) from mesa_ci_err
def wait_for_job_get_started(job, attempt_no):
print_log(f"Waiting for job {job.job_id} to start.")
while not job.is_started():
current_job_duration_sec: int = int(
(datetime.now(tz=UTC) - CI_JOB_STARTED_AT).total_seconds()
)
remaining_time_sec: int = max(0, CI_JOB_TIMEOUT_SEC - current_job_duration_sec)
if remaining_time_sec < EXPECTED_JOB_DURATION_SEC:
job.cancel()
raise MesaCIFatalException(
f"{CONSOLE_LOG['FG_BOLD_YELLOW']}"
f"Job {job.job_id} only has {remaining_time_sec} seconds "
"remaining to run, but it is expected to take at least "
f"{EXPECTED_JOB_DURATION_SEC} seconds."
f"{CONSOLE_LOG['RESET']}",
)
time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
job.refresh_log()
print_log(f"Job {job.job_id} started.")
def bootstrap_log_follower(main_test_case, timestamp_relative_to) -> LogFollower:
start_section = GitlabSection(
id="dut_deploy",
header="Running LAVA deploy action",
type=LogSectionType.LAVA_DEPLOY,
start_collapsed=True,
timestamp_relative_to=timestamp_relative_to,
)
print(start_section.start())
return LogFollower(
starting_section=start_section,
main_test_case=main_test_case,
timestamp_relative_to=timestamp_relative_to
)
def follow_job_execution(job, log_follower):
with log_follower:
max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
# Start to check job's health
job.heartbeat()
while not job.is_finished:
fetch_logs(job, max_idle_time, log_follower)
structural_log_phases(job, log_follower)
# Mesa Developers expect to have a simple pass/fail job result.
# If this does not happen, it probably means a LAVA infrastructure error
# happened.
if job.status not in ["pass", "fail"]:
raise_lava_error(job)
# LogFollower does some cleanup after the early exit (trigger by
# `hwci: mesa: exit_code: \d+` regex), let's update the phases after the cleanup.
structural_log_phases(job, log_follower)
def structural_log_phases(job, log_follower):
phases: dict[str, Any] = {
s.header.split(" - ")[0]: {
k: str(getattr(s, k)) for k in ("start_time", "end_time")
}
for s in log_follower.section_history
}
job.log["dut_job_phases"] = phases
def print_job_final_status(job, timestamp_relative_to):
job.refresh_log()
if job.status == "running":
job.status = "hung"
colour = LAVAJob.COLOR_STATUS_MAP.get(job.status, CONSOLE_LOG["FG_RED"])
with GitlabSection(
"job_data",
f"Hardware job info for {job.status} job",
type=LogSectionType.LAVA_POST_PROCESSING,
start_collapsed=True,
colour=colour,
timestamp_relative_to=timestamp_relative_to,
):
wait_post_processing_retries: int = WAIT_FOR_LAVA_POST_PROCESSING_RETRIES
while not job.is_post_processed() and wait_post_processing_retries > 0:
# Wait a little until LAVA finishes processing metadata
time.sleep(WAIT_FOR_LAVA_POST_PROCESSING_SEC)
wait_post_processing_retries -= 1
if not job.is_post_processed():
waited_for_sec: int = (
WAIT_FOR_LAVA_POST_PROCESSING_RETRIES
* WAIT_FOR_LAVA_POST_PROCESSING_SEC
)
print_log(
"Timed out waiting for LAVA post-processing after "
f"{waited_for_sec} seconds. Printing incomplete information "
"anyway."
)
details: dict[str, str] = job.show()
for field, value in details.items():
print(f"{field:<15}: {value}")
job.refresh_log()
def execute_job_with_retries(
proxy, job_definition, retry_count, jobs_log, main_test_case,
timestamp_relative_to
) -> Optional[LAVAJob]:
last_failed_job = None
for attempt_no in range(1, retry_count + 2):
# Need to get the logger value from its object to enable autosave
# features, if AutoSaveDict is enabled from StructuredLogging module
jobs_log.append({})
job_log = jobs_log[-1]
job = LAVAJob(proxy, job_definition, job_log)
STRUCTURAL_LOG["dut_attempt_counter"] = attempt_no
try:
job_log["submitter_start_time"] = datetime.now(tz=UTC).isoformat()
submit_job(job)
queue_section = GitlabSection(
id="dut_queue",
header="Waiting for hardware device to become available",
type=LogSectionType.LAVA_QUEUE,
start_collapsed=False,
timestamp_relative_to=timestamp_relative_to
)
with queue_section as section:
wait_for_job_get_started(job, attempt_no)
log_follower: LogFollower = bootstrap_log_follower(
main_test_case, timestamp_relative_to
)
follow_job_execution(job, log_follower)
return job
except (MesaCIException, KeyboardInterrupt) as exception:
job.handle_exception(exception)
finally:
print_job_final_status(job, timestamp_relative_to)
# If LAVA takes too long to post process the job, the submitter
# gives up and proceeds.
job_log["submitter_end_time"] = datetime.now(tz=UTC).isoformat()
last_failed_job = job
print_log(
f"{CONSOLE_LOG['BOLD']}"
f"Finished executing LAVA job in the attempt #{attempt_no}"
f"{CONSOLE_LOG['RESET']}"
)
if job.exception and not isinstance(job.exception, MesaCIRetriableException):
break
return last_failed_job
def retriable_follow_job(
proxy, job_definition, main_test_case, timestamp_relative_to
) -> LAVAJob:
number_of_retries = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
last_attempted_job = execute_job_with_retries(
proxy, job_definition, number_of_retries, STRUCTURAL_LOG["dut_jobs"],
main_test_case, timestamp_relative_to
)
if last_attempted_job.exception is not None:
# Infra failed in all attempts
raise MesaCIRetryError(
f"{CONSOLE_LOG['BOLD']}"
f"{CONSOLE_LOG['FG_RED']}"
"Job failed after it exceeded the number of "
f"{number_of_retries} retries."
f"{CONSOLE_LOG['RESET']}",
retry_count=number_of_retries,
last_job=last_attempted_job,
)
return last_attempted_job
@dataclass
class PathResolver:
def __post_init__(self):
for field in fields(self):
value = getattr(self, field.name)
if not value:
continue
if field.type == pathlib.Path:
value = pathlib.Path(value)
setattr(self, field.name, value.resolve())
@dataclass
class LAVAJobSubmitter(PathResolver):
boot_method: str
device_type: str
farm: str
job_timeout_min: int # The job timeout in minutes
shell_prompt: str = "lava-shell:"
dtb_filename: str = None
dump_yaml: bool = False # Whether to dump the YAML payload to stdout
env_file: pathlib.Path = None
jwt_file: pathlib.Path = None
kernel_image_name: str = None
kernel_image_type: str = ""
kernel_url_prefix: str = None
lava_tags: str | tuple[str, ...] = () # Comma-separated LAVA tags for the job
mesa_job_name: str = "mesa_ci_job"
pipeline_info: str = ""
rootfs_url: str = None
validate_only: bool = False # Whether to only validate the job, not execute it
visibility_group: str = None # Only affects LAVA farm maintainers
structured_log_file: pathlib.Path = None # Log file path with structured LAVA log
ssh_client_image: str = None # x86_64 SSH client image to follow the job's output
project_dir: str = None # Project directory to be used to find Mesa artifacts
project_name: str = None # Project name to be used in the job name
starting_section: str = None # GitLab section used to start
job_submitted_at: [str | datetime] = None
__structured_log_context = contextlib.nullcontext() # Structured Logger context
_overlays: dict = field(default_factory=dict, init=False)
def __post_init__(self) -> Self:
super().__post_init__()
# Remove mesa job names with spaces, which breaks the lava-test-case command
self.mesa_job_name = self.mesa_job_name.split(" ")[0]
if self.structured_log_file:
self.__structured_log_context = StructuredLoggerWrapper(self).logger_context()
if self.job_submitted_at:
self.job_submitted_at = datetime.fromisoformat(self.job_submitted_at)
self.proxy = setup_lava_proxy()
return self
def append_overlay(
self, name: str, path: str, url: str, format: str = "tar", compression: str = ""
) -> Self:
"""
Append an overlay to the LAVA job definition.
Args:
name (str): The name of the overlay.
url (str): The URL from where the overlay can be downloaded.
path (str): The path where the overlay should be applied.
format (str, optional): The format of the overlay (default is "tar").
compression (str, optional): The compression type of the overlay (e.g., "gz", "xz").
Returns:
Self: The instance of LAVAJobSubmitter with the overlay appended.
"""
self._overlays[name] = {
"url": url,
"path": path,
"format": format,
}
if compression:
self._overlays[name]["compression"] = compression
return self
def print(self) -> Self:
"""
Prints the dictionary representation of the instance and returns the instance itself.
Returns:
Self: The instance of the class.
"""
print(self.__dict__)
return self
def __prepare_submission(self) -> str:
# Overwrite the timeout for the testcases with the value offered by the
# user. The testcase running time should be at least 4 times greater than
# the other sections (boot and setup), so we can safely ignore them.
# If LAVA fails to stop the job at this stage, it will fall back to the
# script section timeout with a reasonable delay.
GL_SECTION_TIMEOUTS[LogSectionType.TEST_CASE] = timedelta(
minutes=self.job_timeout_min
)
job_definition = LAVAJobDefinition(self).generate_lava_job_definition()
if self.dump_yaml:
self.dump_job_definition(job_definition)
validation_job = LAVAJob(self.proxy, job_definition)
if errors := validation_job.validate():
fatal_err(f"Error in LAVA job definition: {errors}")
return job_definition
@classmethod
def is_under_ci(cls):
ci_envvar: str = getenv("CI", "false")
return ci_envvar.lower() == "true"
def dump_job_definition(self, job_definition) -> None:
with GitlabSection(
"yaml_dump",
"LAVA job definition (YAML)",
type=LogSectionType.LAVA_BOOT,
start_collapsed=True,
):
print(hide_sensitive_data(job_definition))
def submit(self) -> None:
"""
Prepares and submits the LAVA job.
If `validate_only` is True, it validates the job without submitting it.
If the job finishes with a non-pass status or encounters an exception,
the program exits with a non-zero return code.
"""
job_definition: str = self.__prepare_submission()
if self.validate_only:
return
if self.starting_section:
gl = GitlabSection(
id=self.starting_section,
header="Preparing to submit job for scheduling",
type=LogSectionType.LAVA_SUBMIT,
start_collapsed=True,
timestamp_relative_to=self.job_submitted_at,
)
gl.start()
print(gl.end())
with self.__structured_log_context:
last_attempt_job = None
try:
last_attempt_job = retriable_follow_job(
self.proxy, job_definition,
f'{self.project_name}_{self.mesa_job_name}',
self.job_submitted_at)
except MesaCIRetryError as retry_exception:
last_attempt_job = retry_exception.last_job
except Exception as exception:
STRUCTURAL_LOG["job_combined_fail_reason"] = str(exception)
raise exception
finally:
self.finish_script(last_attempt_job)
def finish_script(self, last_attempt_job):
if not last_attempt_job:
# No job was run, something bad happened
STRUCTURAL_LOG["job_combined_status"] = "script_crash"
current_exception = str(sys.exc_info()[1])
STRUCTURAL_LOG["job_combined_fail_reason"] = current_exception
print(f"Interrupting the script. Reason: {current_exception}")
raise SystemExit(1)
STRUCTURAL_LOG["job_combined_status"] = last_attempt_job.status
STRUCTURAL_LOG["job_exit_code"] = last_attempt_job.exit_code
if last_attempt_job.status != "pass":
raise SystemExit(last_attempt_job.exit_code)
class StructuredLoggerWrapper:
def __init__(self, submitter: LAVAJobSubmitter) -> None:
self.__submitter: LAVAJobSubmitter = submitter
def _init_logger(self):
STRUCTURAL_LOG["fixed_tags"] = self.__submitter.lava_tags
STRUCTURAL_LOG["dut_job_type"] = self.__submitter.device_type
STRUCTURAL_LOG["farm"] = self.__submitter.farm
STRUCTURAL_LOG["job_combined_fail_reason"] = None
STRUCTURAL_LOG["job_combined_status"] = "not_submitted"
STRUCTURAL_LOG["job_exit_code"] = None
STRUCTURAL_LOG["dut_attempt_counter"] = 0
# Initialize dut_jobs list to enable appends
STRUCTURAL_LOG["dut_jobs"] = []
@contextlib.contextmanager
def _simple_logger_context(self):
log_file = pathlib.Path(self.__submitter.structured_log_file)
log_file.parent.mkdir(parents=True, exist_ok=True)
try:
# Truncate the file
log_file.write_text("")
yield
finally:
log_file.write_text(json.dumps(STRUCTURAL_LOG, indent=2))
def logger_context(self):
context = contextlib.nullcontext()
try:
global STRUCTURAL_LOG
STRUCTURAL_LOG = StructuredLogger(
self.__submitter.structured_log_file, truncate=True
).data
except NameError:
context = self._simple_logger_context()
self._init_logger()
return context
def main():
# given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us ->
# GitLab runner -> GitLab primary -> user, safe to say we don't need any
# more buffering
sys.stdout.reconfigure(line_buffering=True)
sys.stderr.reconfigure(line_buffering=True)
fire.Fire(LAVAJobSubmitter)
if __name__ == "__main__":
main()

View File

@@ -1,74 +0,0 @@
from collections import defaultdict
from unittest.mock import MagicMock, patch
import pytest
import yaml
from freezegun import freeze_time
from .lava.helpers import generate_testsuite_result, jobs_logs_response
def pytest_configure(config):
config.addinivalue_line(
"markers", "slow: marks tests as slow (deselect with '-m \"not slow\"')"
)
@pytest.fixture
def mock_sleep():
"""Mock time.sleep to make test faster"""
with patch("time.sleep", return_value=None):
yield
@pytest.fixture
def frozen_time(mock_sleep):
with freeze_time("2024-01-01") as frozen_time:
yield frozen_time
RESULT_GET_TESTJOB_RESULTS = [{"metadata": {"result": "test"}}]
@pytest.fixture
def mock_proxy(frozen_time):
def mock_job_state(jid) -> dict[str, str]:
frozen_time.tick(1)
return {"job_state": "Running"}
def create_proxy_mock(
job_results=RESULT_GET_TESTJOB_RESULTS,
testsuite_results=[generate_testsuite_result()],
**kwargs
):
proxy_mock = MagicMock()
proxy_submit_mock = proxy_mock.scheduler.jobs.submit
proxy_submit_mock.return_value = "1234"
proxy_results_mock = proxy_mock.results.get_testjob_results_yaml
proxy_results_mock.return_value = yaml.safe_dump(job_results)
proxy_test_suites_mock = proxy_mock.results.get_testsuite_results_yaml
proxy_test_suites_mock.return_value = yaml.safe_dump(testsuite_results)
proxy_logs_mock = proxy_mock.scheduler.jobs.logs
proxy_logs_mock.return_value = jobs_logs_response()
proxy_job_state = proxy_mock.scheduler.job_state
proxy_job_state.side_effect = mock_job_state
proxy_show_mock = proxy_mock.scheduler.jobs.show
proxy_show_mock.return_value = defaultdict(
str,
{
"device_type": "test_device",
"device": "test_device-cbg-1",
"state": "created",
},
)
for key, value in kwargs.items():
setattr(proxy_logs_mock, key, value)
return proxy_mock
yield create_proxy_mock

View File

@@ -1,147 +0,0 @@
job_name: 'test-project: my_pipeline_info'
device_type: my_fastboot_device_type
visibility:
group:
- my_visibility_group
priority: 75
context:
extra_nfsroot_args: ' init=/init rootwait usbcore.quirks=0bda:8153:k'
timeouts:
job:
minutes: 10
actions:
depthcharge-retry:
minutes: 6
depthcharge-action:
minutes: 5
uboot-action:
seconds: 200
actions:
- deploy:
timeout:
minutes: 10
to: nfs
nfsrootfs:
url: None
compression: zstd
format: tar
overlays: {}
namespace: dut
- deploy:
timeout:
minutes: 5
to: downloads
os: oe
images:
kernel:
url: None/None
dtb:
url: None/my_dtb_filename.dtb
postprocess:
docker:
image: registry.gitlab.collabora.com/lava/health-check-docker
steps:
- cat Image.gz my_dtb_filename.dtb > Image.gz+dtb
- mkbootimg --kernel Image.gz+dtb --cmdline "root=/dev/nfs rw nfsroot=$NFS_SERVER_IP:$NFS_ROOTFS,tcp,hard,v3
ip=dhcp init=/init rootwait usbcore.quirks=0bda:8153:k" --pagesize 4096
--base 0x80000000 -o boot.img
namespace: dut
- deploy:
timeout:
minutes: 10
to: fastboot
docker:
image: registry.gitlab.collabora.com/lava/health-check-docker
images:
boot:
url: downloads://boot.img
namespace: dut
failure_retry: 2
- boot:
timeout:
minutes: 2
docker:
image: registry.gitlab.collabora.com/lava/health-check-docker
failure_retry: 2
method: fastboot
prompts:
- 'lava-shell:'
commands:
- set_active a
namespace: dut
auto_login:
login_commands:
- dropbear -R -B
- touch /dut_ready
login_prompt: 'ogin:'
username: ''
- test:
namespace: dut
definitions:
- from: inline
name: setup-ssh-server
path: inline-setup-ssh-server
repository:
metadata:
format: Lava-Test Test Definition 1.0
name: dut-env-export
run:
steps:
- |-
FARM=test_farm . /test_dir/install/common/init-stage1.sh
echo RkFSTT10ZXN0X2Zhcm0gL3Rlc3RfZGlyL2luc3RhbGwvY29tbW9uL2luaXQtc3RhZ2UxLnNo | base64 -d >> /set-job-env-vars.sh
export CURRENT_SECTION=dut_boot
- export -p > /dut-env-vars.sh
- test:
namespace: container
timeout:
minutes: 10
failure_retry: 3
definitions:
- name: docker_ssh_client
from: inline
path: inline/docker_ssh_client.yaml
repository:
metadata:
name: mesa
description: Mesa test plan
format: Lava-Test Test Definition 1.0
run:
steps:
- |-
set -ex
timeout 1m bash << EOF
while [ -z "$(lava-target-ip)" ]; do
echo Waiting for DUT to join LAN;
sleep 1;
done
EOF
ping -c 5 -w 60 $(lava-target-ip)
lava_ssh_test_case() {
set -ex
local test_case="${1}"
shift
lava-test-case "${test_case}" --shell \
ssh ${SSH_PTY_ARGS:--T} \
-o StrictHostKeyChecking=no \
-o UserKnownHostsFile=/dev/null \
-o ConnectTimeout=60 \
root@$(lava-target-ip) "${@}"
}
- |-
lava_ssh_test_case 'wait_for_dut_login' << EOF
while [ ! -e /dut_ready ]; do sleep 1; done;
EOF
- |-
lava_ssh_test_case 'artifact_download' 'bash --' << EOF
source /dut-env-vars.sh
set -e
echo Could not find jwt file, disabling S3 requests...
sed -i '/S3_RESULTS_UPLOAD/d' /set-job-env-vars.sh
EOF
- export SSH_PTY_ARGS=-tt
- lava_ssh_test_case 'test-project_dut' 'cd / && /test_dir/install/common/init-stage2.sh'
docker:
image:

View File

@@ -1,95 +0,0 @@
job_name: 'test-project: my_pipeline_info'
device_type: my_fastboot_device_type
visibility:
group:
- my_visibility_group
priority: 75
context:
extra_nfsroot_args: ' init=/init rootwait usbcore.quirks=0bda:8153:k'
timeouts:
job:
minutes: 10
actions:
depthcharge-retry:
minutes: 6
depthcharge-action:
minutes: 5
uboot-action:
seconds: 200
actions:
- deploy:
timeout:
minutes: 10
to: nfs
nfsrootfs:
url: None
compression: zstd
format: tar
overlays: {}
- deploy:
timeout:
minutes: 5
to: downloads
os: oe
images:
kernel:
url: None/None
dtb:
url: None/my_dtb_filename.dtb
postprocess:
docker:
image: registry.gitlab.collabora.com/lava/health-check-docker
steps:
- cat Image.gz my_dtb_filename.dtb > Image.gz+dtb
- mkbootimg --kernel Image.gz+dtb --cmdline "root=/dev/nfs rw nfsroot=$NFS_SERVER_IP:$NFS_ROOTFS,tcp,hard,v3
ip=dhcp init=/init rootwait usbcore.quirks=0bda:8153:k" --pagesize 4096
--base 0x80000000 -o boot.img
- deploy:
timeout:
minutes: 2
to: fastboot
docker:
image: registry.gitlab.collabora.com/lava/health-check-docker
images:
boot:
url: downloads://boot.img
- boot:
timeout:
minutes: 2
docker:
image: registry.gitlab.collabora.com/lava/health-check-docker
failure_retry: 2
method: fastboot
prompts:
- 'lava-shell:'
commands:
- set_active a
- test:
timeout:
minutes: 10
failure_retry: 1
definitions:
- name: mesa
from: inline
lava-signal: kmsg
path: inline/mesa.yaml
repository:
metadata:
name: mesa
description: Mesa test plan
os:
- oe
scope:
- functional
format: Lava-Test Test Definition 1.0
run:
steps:
- FARM=test_farm . /test_dir/install/common/init-stage1.sh
- echo RkFSTT10ZXN0X2Zhcm0gL3Rlc3RfZGlyL2luc3RhbGwvY29tbW9uL2luaXQtc3RhZ2UxLnNo
| base64 -d >> /set-job-env-vars.sh
- export CURRENT_SECTION=dut_boot
- set -e
- echo Could not find jwt file, disabling S3 requests...
- sed -i '/S3_RESULTS_UPLOAD/d' /set-job-env-vars.sh
- sleep 1
- lava-test-case 'test-project_dut' --shell /test_dir/install/common/init-stage2.sh

View File

@@ -1,118 +0,0 @@
job_name: 'test-project: my_pipeline_info'
device_type: my_uboot_device_type
visibility:
group:
- my_visibility_group
priority: 75
context:
extra_nfsroot_args: ' init=/init rootwait usbcore.quirks=0bda:8153:k'
timeouts:
job:
minutes: 10
actions:
depthcharge-retry:
minutes: 6
depthcharge-action:
minutes: 5
uboot-action:
seconds: 200
actions:
- deploy:
timeout:
minutes: 10
to: tftp
os: oe
kernel:
url: None/None
nfsrootfs:
url: None
compression: zstd
format: tar
overlays: {}
dtb:
url: None/my_dtb_filename.dtb
namespace: dut
failure_retry: 2
- boot:
failure_retry: 2
method: u-boot
prompts:
- 'lava-shell:'
commands: nfs
namespace: dut
auto_login:
login_commands:
- dropbear -R -B
- touch /dut_ready
login_prompt: 'ogin:'
username: ''
- test:
namespace: dut
definitions:
- from: inline
name: setup-ssh-server
path: inline-setup-ssh-server
repository:
metadata:
format: Lava-Test Test Definition 1.0
name: dut-env-export
run:
steps:
- |-
FARM=test_farm . /test_dir/install/common/init-stage1.sh
echo RkFSTT10ZXN0X2Zhcm0gL3Rlc3RfZGlyL2luc3RhbGwvY29tbW9uL2luaXQtc3RhZ2UxLnNo | base64 -d >> /set-job-env-vars.sh
export CURRENT_SECTION=dut_boot
- export -p > /dut-env-vars.sh
- test:
namespace: container
timeout:
minutes: 10
failure_retry: 3
definitions:
- name: docker_ssh_client
from: inline
path: inline/docker_ssh_client.yaml
repository:
metadata:
name: mesa
description: Mesa test plan
format: Lava-Test Test Definition 1.0
run:
steps:
- |-
set -ex
timeout 1m bash << EOF
while [ -z "$(lava-target-ip)" ]; do
echo Waiting for DUT to join LAN;
sleep 1;
done
EOF
ping -c 5 -w 60 $(lava-target-ip)
lava_ssh_test_case() {
set -ex
local test_case="${1}"
shift
lava-test-case "${test_case}" --shell \
ssh ${SSH_PTY_ARGS:--T} \
-o StrictHostKeyChecking=no \
-o UserKnownHostsFile=/dev/null \
-o ConnectTimeout=60 \
root@$(lava-target-ip) "${@}"
}
- |-
lava_ssh_test_case 'wait_for_dut_login' << EOF
while [ ! -e /dut_ready ]; do sleep 1; done;
EOF
- |-
lava_ssh_test_case 'artifact_download' 'bash --' << EOF
source /dut-env-vars.sh
set -e
echo Could not find jwt file, disabling S3 requests...
sed -i '/S3_RESULTS_UPLOAD/d' /set-job-env-vars.sh
EOF
- export SSH_PTY_ARGS=-tt
- lava_ssh_test_case 'test-project_dut' 'cd / && /test_dir/install/common/init-stage2.sh'
docker:
image:

View File

@@ -1,68 +0,0 @@
job_name: 'test-project: my_pipeline_info'
device_type: my_uboot_device_type
visibility:
group:
- my_visibility_group
priority: 75
context:
extra_nfsroot_args: ' init=/init rootwait usbcore.quirks=0bda:8153:k'
timeouts:
job:
minutes: 10
actions:
depthcharge-retry:
minutes: 6
depthcharge-action:
minutes: 5
uboot-action:
seconds: 200
actions:
- deploy:
timeout:
minutes: 5
to: tftp
os: oe
kernel:
url: None/None
nfsrootfs:
url: None
compression: zstd
format: tar
overlays: {}
dtb:
url: None/my_dtb_filename.dtb
- boot:
failure_retry: 2
method: u-boot
prompts:
- 'lava-shell:'
commands: nfs
- test:
timeout:
minutes: 10
failure_retry: 1
definitions:
- name: mesa
from: inline
lava-signal: kmsg
path: inline/mesa.yaml
repository:
metadata:
name: mesa
description: Mesa test plan
os:
- oe
scope:
- functional
format: Lava-Test Test Definition 1.0
run:
steps:
- FARM=test_farm . /test_dir/install/common/init-stage1.sh
- echo RkFSTT10ZXN0X2Zhcm0gL3Rlc3RfZGlyL2luc3RhbGwvY29tbW9uL2luaXQtc3RhZ2UxLnNo
| base64 -d >> /set-job-env-vars.sh
- export CURRENT_SECTION=dut_boot
- set -e
- echo Could not find jwt file, disabling S3 requests...
- sed -i '/S3_RESULTS_UPLOAD/d' /set-job-env-vars.sh
- sleep 1
- lava-test-case 'test-project_dut' --shell /test_dir/install/common/init-stage2.sh

View File

@@ -1,150 +0,0 @@
from contextlib import nullcontext as does_not_raise
from datetime import UTC, datetime
from io import StringIO
from itertools import cycle
from typing import Any, Callable, Generator, Iterable, Optional, Tuple, Union
from freezegun import freeze_time
from lava.utils.log_section import (
DEFAULT_GITLAB_SECTION_TIMEOUTS,
FALLBACK_GITLAB_SECTION_TIMEOUT,
LogSectionType,
)
from lavacli.utils import flow_yaml as lava_yaml
def yaml_dump(data: dict[str, Any]) -> str:
stream = StringIO()
lava_yaml.dump(data, stream)
return stream.getvalue()
def section_timeout(section_type: LogSectionType) -> int:
return int(
DEFAULT_GITLAB_SECTION_TIMEOUTS.get(
section_type, FALLBACK_GITLAB_SECTION_TIMEOUT
).total_seconds()
)
def create_lava_yaml_msg(
dt: Callable = datetime.now, msg="test", lvl="target"
) -> dict[str, str]:
return {"dt": str(dt()), "msg": msg, "lvl": lvl}
def generate_testsuite_result(
name="test-mesa-ci", result="pass", exit_code=0, metadata_extra=None, extra=None
):
if metadata_extra is None:
metadata_extra = {}
if extra is None:
extra = {}
return {"metadata": {"result": result, "exit_code": exit_code, **metadata_extra}, "name": name}
def jobs_logs_response(
finished=False, msg=None, lvl="target", result=None, exit_code=None
) -> Tuple[bool, str]:
timed_msg = {"dt": str(datetime.now(tz=UTC)), "msg": "New message", "lvl": lvl}
if result:
timed_msg["lvl"] = "target"
timed_msg["msg"] = f"hwci: mesa: exit_code: {exit_code}"
logs = [timed_msg] if msg is None else msg
return finished, yaml_dump(logs)
def section_aware_message_generator(
messages: dict[LogSectionType,
Iterable[int]],
result: Optional[str] = None,
exit_code: Optional[int] = None
) -> Iterable[tuple[dict, Iterable[int]]]:
default = [1]
result_message_section = LogSectionType.TEST_CASE
for section_type in LogSectionType:
delay = messages.get(section_type, default)
yield mock_lava_signal(section_type), delay
if result and section_type == result_message_section:
# To consider the job finished, the result `echo` should be produced
# in the correct section
yield create_lava_yaml_msg(msg=f"hwci: mesa: exit_code: {exit_code}"), delay
def message_generator():
for section_type in LogSectionType:
yield mock_lava_signal(section_type)
def level_generator():
# Tests all known levels by default
yield from cycle(("results", "feedback", "warning", "error", "debug", "target"))
def generate_n_logs(
n=1,
tick_fn: Union[Generator, Iterable[int], int] = 1,
level_fn=level_generator,
result="pass",
exit_code=0,
):
"""Simulate a log partitionated in n components"""
level_gen = level_fn()
if isinstance(tick_fn, Generator):
tick_gen = tick_fn
elif isinstance(tick_fn, Iterable):
tick_gen = cycle(tick_fn)
else:
tick_gen = cycle((tick_fn,))
with freeze_time(datetime.now(tz=UTC)) as time_travel:
tick_sec: int = next(tick_gen)
while True:
# Simulate a scenario where the target job is waiting for being started
for _ in range(n - 1):
level: str = next(level_gen)
time_travel.tick(tick_sec)
yield jobs_logs_response(finished=False, msg=[], lvl=level)
time_travel.tick(tick_sec)
yield jobs_logs_response(finished=True, result=result, exit_code=exit_code)
def to_iterable(tick_fn):
if isinstance(tick_fn, Generator):
return tick_fn
elif isinstance(tick_fn, Iterable):
return cycle(tick_fn)
else:
return cycle((tick_fn,))
def mock_logs(messages=None, result=None, exit_code=None):
if messages is None:
messages = {}
with freeze_time(datetime.now(tz=UTC)) as time_travel:
# Simulate a complete run given by message_fn
for msg, tick_list in section_aware_message_generator(messages, result, exit_code):
for tick_sec in tick_list:
yield jobs_logs_response(finished=False, msg=[msg])
time_travel.tick(tick_sec)
def mock_lava_signal(type: LogSectionType) -> dict[str, str]:
return {
LogSectionType.TEST_CASE: create_lava_yaml_msg(
msg="<STARTTC> case", lvl="debug"
),
LogSectionType.TEST_SUITE: create_lava_yaml_msg(
msg="<STARTRUN> suite", lvl="debug"
),
LogSectionType.LAVA_POST_PROCESSING: create_lava_yaml_msg(
msg="<LAVA_SIGNAL_ENDTC case>", lvl="target"
),
}.get(type, create_lava_yaml_msg())

View File

@@ -1,557 +0,0 @@
#!/usr/bin/env python3
#
# Copyright (C) 2022 Collabora Limited
# Author: Guilherme Gallo <guilherme.gallo@collabora.com>
#
# SPDX-License-Identifier: MIT
import os
import xmlrpc.client
from contextlib import nullcontext as does_not_raise
from datetime import UTC, datetime
from itertools import cycle, islice, repeat
from pathlib import Path
from typing import Generator
from unittest.mock import MagicMock, patch
import pytest
from lava.exceptions import MesaCIException, MesaCIRetryError, MesaCIFatalException
from lava.lava_job_submitter import (
DEVICE_HANGING_TIMEOUT_SEC,
NUMBER_OF_RETRIES_TIMEOUT_DETECTION,
LAVAJob,
LAVAJobSubmitter,
bootstrap_log_follower,
follow_job_execution,
retriable_follow_job,
wait_for_job_get_started,
)
from lava.utils import LogSectionType, LAVA_TEST_CASE_TIMEOUT
from .lava.helpers import (
generate_n_logs,
generate_testsuite_result,
jobs_logs_response,
mock_lava_signal,
mock_logs,
section_timeout,
)
NUMBER_OF_MAX_ATTEMPTS = NUMBER_OF_RETRIES_TIMEOUT_DETECTION + 1
def max_sec_before_timeout():
return min(1000, LAVA_TEST_CASE_TIMEOUT * 60 - 1)
@pytest.fixture
def mock_proxy_waiting_time(mock_proxy):
def update_mock_proxy(frozen_time, **kwargs):
def mock_job_state(jid) -> dict[str, str]:
frozen_time.tick(wait_time)
return {"job_state": "Running"}
wait_time = kwargs.pop("wait_time", 1)
proxy_mock = mock_proxy(**kwargs)
proxy_job_state = proxy_mock.scheduler.job_state
proxy_job_state.side_effect = mock_job_state
return proxy_mock
return update_mock_proxy
@pytest.fixture(params=[{"CI": "true"}, {"CI": "false"}], ids=["Under CI", "Local run"])
def ci_environment(request):
with patch.dict(os.environ, request.param):
yield
@pytest.fixture
def lava_job_submitter(
ci_environment,
tmp_path,
mock_proxy,
):
os.chdir(tmp_path)
tmp_file = Path(tmp_path) / "log.json"
with patch("lava.lava_job_submitter.setup_lava_proxy") as mock_setup_lava_proxy:
mock_setup_lava_proxy.return_value = mock_proxy()
yield LAVAJobSubmitter(
boot_method="test_boot",
shell_prompt="lava-shell:",
device_type="test_device",
farm="test_farm",
job_timeout_min=1,
structured_log_file=tmp_file,
)
@pytest.mark.parametrize("exception", [RuntimeError, SystemError, KeyError])
def test_submit_and_follow_respects_exceptions(mock_sleep, mock_proxy, exception):
with pytest.raises(MesaCIException):
proxy = mock_proxy(side_effect=exception)
job = LAVAJob(proxy, "")
log_follower = bootstrap_log_follower(main_test_case="", timestamp_relative_to=None)
follow_job_execution(job, log_follower)
NETWORK_EXCEPTION = xmlrpc.client.ProtocolError("", 0, "test", {})
XMLRPC_FAULT = xmlrpc.client.Fault(0, "test")
PROXY_SCENARIOS = {
"simple pass case": (mock_logs(result="pass", exit_code=0), does_not_raise(), "pass", 0, {}),
"simple fail case": (mock_logs(result="fail", exit_code=1), does_not_raise(), "fail", 1, {}),
"simple hung case": (
mock_logs(
messages={
LogSectionType.TEST_CASE: [
section_timeout(LogSectionType.TEST_CASE) + 1
]
* max_sec_before_timeout()
},
result="fail",
exit_code=1,
),
pytest.raises(MesaCIRetryError),
"hung",
1,
{},
),
"leftover dump from last job in boot section": (
(
mock_lava_signal(LogSectionType.LAVA_BOOT),
jobs_logs_response(finished=False, msg=None, result="fail", exit_code=1),
),
pytest.raises(MesaCIRetryError),
"hung",
1,
{},
),
"boot works at last retry": (
mock_logs(
messages={
LogSectionType.LAVA_BOOT: [
section_timeout(LogSectionType.LAVA_BOOT) + 1
]
* NUMBER_OF_RETRIES_TIMEOUT_DETECTION
+ [1]
},
result="pass",
exit_code=0,
),
does_not_raise(),
"pass",
0,
{},
),
"test case took too long": pytest.param(
mock_logs(
messages={
LogSectionType.TEST_CASE: [
section_timeout(LogSectionType.TEST_CASE) + 1
]
* (NUMBER_OF_MAX_ATTEMPTS + 1)
},
result="pass",
exit_code=0,
),
pytest.raises(MesaCIRetryError),
"pass",
0,
{},
),
"timed out more times than retry attempts": (
generate_n_logs(n=4, tick_fn=9999999),
pytest.raises(MesaCIRetryError),
"fail",
1,
{},
),
"long log case, no silence": (
mock_logs(
messages={LogSectionType.TEST_CASE: [1] * (max_sec_before_timeout())},
result="pass",
exit_code=0,
),
does_not_raise(),
"pass",
0,
{},
),
"no retries, testsuite succeed": (
mock_logs(result="pass", exit_code=0),
does_not_raise(),
"pass",
0,
{"testsuite_results": [generate_testsuite_result(result="pass", exit_code=0)]},
),
"no retries, but testsuite fails": (
mock_logs(result="fail", exit_code=1),
does_not_raise(),
"fail",
1,
{"testsuite_results": [generate_testsuite_result(result="fail", exit_code=1)]},
),
"no retries, one testsuite fails": (
mock_logs(result="fail", exit_code=1),
does_not_raise(),
"fail",
1,
{
"testsuite_results": [
generate_testsuite_result(result="fail", exit_code=1),
generate_testsuite_result(result="pass", exit_code=0),
]
},
),
"very long silence": (
generate_n_logs(n=NUMBER_OF_MAX_ATTEMPTS + 1, tick_fn=100000),
pytest.raises(MesaCIRetryError),
"fail",
1,
{},
),
# If a protocol error happens, _call_proxy will retry without affecting timeouts
"unstable connection, ProtocolError followed by final message": (
(NETWORK_EXCEPTION, *list(mock_logs(result="pass", exit_code=0))),
does_not_raise(),
"pass",
0,
{},
),
# After an arbitrary number of retries, _call_proxy should call sys.exit
"unreachable case, subsequent ProtocolErrors": (
repeat(NETWORK_EXCEPTION),
pytest.raises(SystemExit),
"fail",
1,
{},
),
"XMLRPC Fault": ([XMLRPC_FAULT], pytest.raises(MesaCIRetryError), False, 1, {}),
}
@pytest.mark.parametrize(
"test_log, expectation, job_result, exit_code, proxy_args",
PROXY_SCENARIOS.values(),
ids=PROXY_SCENARIOS.keys(),
)
def test_retriable_follow_job(
mock_sleep,
test_log,
expectation,
job_result,
exit_code,
proxy_args,
mock_proxy,
):
with expectation:
proxy = mock_proxy(side_effect=test_log, **proxy_args)
job: LAVAJob = retriable_follow_job(proxy, "", "", None)
assert job_result == job.status
assert exit_code == job.exit_code
WAIT_FOR_JOB_SCENARIOS = {"one log run taking (sec):": (mock_logs(result="pass", exit_code=0))}
@pytest.mark.parametrize("wait_time", (DEVICE_HANGING_TIMEOUT_SEC * 2,))
@pytest.mark.parametrize(
"side_effect",
WAIT_FOR_JOB_SCENARIOS.values(),
ids=WAIT_FOR_JOB_SCENARIOS.keys(),
)
def test_simulate_a_long_wait_to_start_a_job(
frozen_time,
wait_time,
side_effect,
mock_proxy_waiting_time,
):
start_time = datetime.now(tz=UTC)
job: LAVAJob = retriable_follow_job(
mock_proxy_waiting_time(
frozen_time, side_effect=side_effect, wait_time=wait_time
),
"",
"",
None
)
end_time = datetime.now(tz=UTC)
delta_time = end_time - start_time
assert job.status == "pass"
assert job.exit_code == 0
assert delta_time.total_seconds() >= wait_time
LONG_LAVA_QUEUE_SCENARIOS = {
"no_time_to_run": (0, pytest.raises(MesaCIFatalException)),
"enough_time_to_run": (9999999999, does_not_raise()),
}
@pytest.mark.parametrize(
"job_timeout, expectation",
LONG_LAVA_QUEUE_SCENARIOS.values(),
ids=LONG_LAVA_QUEUE_SCENARIOS.keys(),
)
def test_wait_for_job_get_started_no_time_to_run(monkeypatch, job_timeout, expectation):
monkeypatch.setattr("lava.lava_job_submitter.CI_JOB_TIMEOUT_SEC", job_timeout)
monkeypatch.setattr("lava.lava_job_submitter.CI_JOB_STARTED_AT", datetime.now(tz=UTC))
job = MagicMock()
# Make it escape the loop
job.is_started.side_effect = (False, False, True)
with expectation as e:
wait_for_job_get_started(job, 1)
if e:
job.cancel.assert_called_with()
CORRUPTED_LOG_SCENARIOS = {
"too much subsequent corrupted data": (
[(False, "{'msg': 'Incomplete}")] * 100 + [jobs_logs_response(True)],
pytest.raises((MesaCIRetryError)),
),
"one subsequent corrupted data": (
[(False, "{'msg': 'Incomplete}")] * 2 + [jobs_logs_response(True)],
does_not_raise(),
),
}
@pytest.mark.parametrize(
"data_sequence, expected_exception",
CORRUPTED_LOG_SCENARIOS.values(),
ids=CORRUPTED_LOG_SCENARIOS.keys(),
)
def test_log_corruption(mock_sleep, data_sequence, expected_exception, mock_proxy):
proxy_mock = mock_proxy()
proxy_logs_mock = proxy_mock.scheduler.jobs.logs
proxy_logs_mock.side_effect = data_sequence
with expected_exception:
retriable_follow_job(proxy_mock, "", "", None)
LAVA_RESULT_LOG_SCENARIOS = {
# the submitter should accept xtrace logs
"Bash xtrace echo with kmsg interleaving": (
"echo hwci: mesa: exit_code: 0[ 737.673352] <LAVA_SIGNAL_ENDTC mesa-ci>",
"pass", 0,
),
# the submitter should accept xtrace logs
"kmsg result print": (
"[ 737.673352] hwci: mesa: exit_code: 0",
"pass", 0,
),
# if the job result echo has a very bad luck, it still can be interleaved
# with kmsg
"echo output with kmsg interleaving": (
"hwci: mesa: exit_code: 0[ 737.673352] <LAVA_SIGNAL_ENDTC mesa-ci>",
"pass", 0,
),
"fail case": (
"hwci: mesa: exit_code: 1",
"fail", 1,
),
# fail case with different exit code
"fail case (exit code 101)": (
"hwci: mesa: exit_code: 101",
"fail", 101,
),
}
@pytest.mark.parametrize(
"message, expected_status, expected_exit_code",
LAVA_RESULT_LOG_SCENARIOS.values(),
ids=LAVA_RESULT_LOG_SCENARIOS.keys(),
)
def test_parse_job_result_from_log(message, expected_status, expected_exit_code, mock_proxy):
job = LAVAJob(mock_proxy(), "")
job.parse_job_result_from_log([message])
assert job.status == expected_status
assert job.exit_code == expected_exit_code
@pytest.mark.slow(
reason="Slow and sketchy test. Needs a LAVA log raw file at /tmp/log.yaml"
)
@pytest.mark.skipif(
not Path("/tmp/log.yaml").is_file(), reason="Missing /tmp/log.yaml file."
)
def test_full_yaml_log(mock_proxy, frozen_time, lava_job_submitter):
import random
from lavacli.utils import flow_yaml as lava_yaml
def time_travel_from_log_chunk(data_chunk):
if not data_chunk:
return
first_log = lava_yaml.load(data_chunk[0])[0]
first_log_time = first_log["dt"]
frozen_time.move_to(first_log_time)
yield
last_log = lava_yaml.load(data_chunk[-1])[0]
last_log_time = last_log["dt"]
frozen_time.move_to(last_log_time)
yield
def time_travel_to_test_time():
# Suppose that the first message timestamp of the entire LAVA job log is
# the same of from the job submitter execution
with open("/tmp/log.yaml", "r") as f:
first_log = f.readline()
first_log_time = lava_yaml.load(first_log)[0]["dt"]
frozen_time.move_to(first_log_time)
def load_lines() -> Generator[tuple[bool, str], None, None]:
with open("/tmp/log.yaml", "r") as f:
# data = yaml.safe_load(f)
log_lines = f.readlines()
serial_message: str = ""
chunk_start_line = 0
chunk_end_line = 0
chunk_max_size = 100
try:
while True:
chunk_end_line = chunk_start_line + random.randint(1, chunk_max_size)
# split the log in chunks of random size
log_chunk = list(islice(log_lines, chunk_start_line, chunk_end_line))
chunk_start_line = chunk_end_line + 1
serial_message = "".join(log_chunk)
# time_traveller_gen will make the time trave according to the timestamp from
# the message
time_traveller_gen = time_travel_from_log_chunk(log_chunk)
# Suppose that the first message timestamp is the same of
# log fetch RPC call
next(time_traveller_gen)
yield False, "[]"
# Travel to the same datetime of the last fetched log line
# in the chunk
next(time_traveller_gen)
yield False, serial_message
except StopIteration:
yield True, serial_message
return
proxy = mock_proxy()
def reset_logs(*args):
proxy.scheduler.jobs.logs.side_effect = load_lines()
proxy.scheduler.jobs.submit = reset_logs
try:
time_travel_to_test_time()
start_time = datetime.now(tz=UTC)
retriable_follow_job(proxy, "", "", None)
finally:
try:
# If the job fails, maybe there will be no structured log
print(lava_job_submitter.structured_log_file.read_text())
finally:
end_time = datetime.now(tz=UTC)
print("---- Reproduction log stats ----")
print(f"Start time: {start_time}")
print(f"End time: {end_time}")
print(f"Duration: {end_time - start_time}")
@pytest.mark.parametrize(
"validate_only,finished_job_status,job_exit_code,expected_combined_status",
[
(True, "pass", None, None,),
(False, "pass", 0, "pass",),
(False, "fail", 1, "fail",),
],
ids=[
"validate_only_no_job_submission",
"successful_job_submission",
"failed_job_submission",
],
)
def test_job_combined_status(
mock_proxy,
lava_job_submitter,
validate_only,
finished_job_status,
job_exit_code,
expected_combined_status,
):
lava_job_submitter.validate_only = validate_only
with patch(
"lava.lava_job_submitter.retriable_follow_job"
) as mock_retriable_follow_job, patch(
"lava.lava_job_submitter.LAVAJobSubmitter._LAVAJobSubmitter__prepare_submission"
) as mock_prepare_submission, patch("sys.exit"):
from lava.lava_job_submitter import STRUCTURAL_LOG
mock_retriable_follow_job.return_value = MagicMock(
status=finished_job_status, exit_code=job_exit_code
)
mock_job_definition = MagicMock(spec=str)
mock_prepare_submission.return_value = mock_job_definition
original_status: str = STRUCTURAL_LOG.get("job_combined_status")
original_exit_code: int = STRUCTURAL_LOG.get("job_exit_code")
if validate_only:
lava_job_submitter.submit()
mock_retriable_follow_job.assert_not_called()
assert STRUCTURAL_LOG.get("job_combined_status") == original_status
assert STRUCTURAL_LOG.get("job_exit_code") == original_exit_code
return
try:
lava_job_submitter.submit()
except SystemExit as e:
assert e.code == job_exit_code
assert STRUCTURAL_LOG["job_combined_status"] == expected_combined_status
assert STRUCTURAL_LOG["job_exit_code"] == job_exit_code
SUBMIT_SCENARIOS = {
"submit job pass": (cycle(mock_logs(result="pass", exit_code=0)), does_not_raise(), 0),
"submit job fails": (
cycle(mock_logs(result="fail", exit_code=1)),
pytest.raises(SystemExit),
1,
),
"user interrupts the script": (
(jobs_logs_response(), KeyboardInterrupt, jobs_logs_response()),
pytest.raises(SystemExit),
1,
),
"job finishes without hwci response": (
(jobs_logs_response(), jobs_logs_response()),
pytest.raises(SystemExit),
1,
),
}
@pytest.mark.parametrize(
"test_log, expectation, exit_code",
SUBMIT_SCENARIOS.values(),
ids=SUBMIT_SCENARIOS.keys(),
)
def test_submission_exit_code(
request, mock_proxy, lava_job_submitter, test_log, expectation, exit_code
):
lava_job_submitter._LAVAJobSubmitter__prepare_submission = MagicMock()
proxy = mock_proxy(side_effect=test_log)
lava_job_submitter.proxy = proxy
with expectation as e:
lava_job_submitter.submit()
# If the job fails, there should be a SystemExit exception
if e:
assert e.value.code == exit_code

View File

@@ -1,6 +0,0 @@
from lava.utils.lava_farm import get_lava_farm
def test_get_lava_farm_no_tag(monkeypatch):
monkeypatch.delenv("FARM", raising=False)
assert get_lava_farm() == "unknown"

View File

@@ -1,322 +0,0 @@
import importlib
import os
import re
import subprocess
from itertools import chain
from pathlib import Path
from typing import Any, Iterable, Literal
from unittest import mock
import lava.utils.constants
import pytest
from lava.lava_job_submitter import LAVAJobSubmitter
from lava.utils.lava_job_definition import LAVAJobDefinition
from ruamel.yaml import YAML
def flatten(iterable: Iterable[Iterable[Any]]) -> list[Any]:
return list(chain.from_iterable(iterable))
# mock shell file
@pytest.fixture(scope="session")
def shell_file(tmp_path_factory):
def create_shell_file(content: str = "# test"):
shell_file = tmp_path_factory.mktemp("data") / "shell_file.sh"
shell_file.write_text(content)
return shell_file
return create_shell_file
# fn to load the data file from $CWD/data using pathlib
def load_data_file(filename):
return Path(__file__).parent.parent / "data" / filename
def load_yaml_file(filename) -> dict:
with open(load_data_file(filename)) as f:
return YAML().load(f)
def job_submitter_factory(mode: Literal["UBOOT", "FASTBOOT"], shell_file):
if mode == "UBOOT":
boot_method = "u-boot"
device_type = "my_uboot_device_type"
elif mode == "FASTBOOT":
boot_method = "fastboot"
device_type = "my_fastboot_device_type"
shell_prompt = "lava-shell:"
job_timeout_min = 10
mesa_job_name = "dut test"
pipeline_info = "my_pipeline_info"
project_name = "test-project"
visibility_group = "my_visibility_group"
return LAVAJobSubmitter(
boot_method=boot_method,
shell_prompt=shell_prompt,
device_type=device_type,
farm="test_farm",
dtb_filename="my_dtb_filename",
env_file=shell_file,
job_timeout_min=job_timeout_min,
mesa_job_name=mesa_job_name,
pipeline_info=pipeline_info,
visibility_group=visibility_group,
project_dir="/test_dir",
project_name=project_name,
)
@pytest.fixture
def clear_env_vars(autouse=True):
with mock.patch.dict(os.environ) as environ:
# Remove all LAVA-related environment variables to make the test more robust
# and deterministic, once a envvar is capable of overriding the default value
for key in environ:
if any(kw in key for kw in ("LAVA_", "CI_", "JOB_", "RUNNER_", "DEVICE_")):
del environ[key]
# reload lava.utils.constants to update the JOB_PRIORITY value
importlib.reload(lava.utils.constants)
importlib.reload(lava.utils.lava_job_definition)
yield
@pytest.fixture
def mock_collabora_farm(clear_env_vars, monkeypatch):
# Mock a Chromebook in the Collabora farm
monkeypatch.setenv("FARM", "collabora")
monkeypatch.setenv("BOOT_METHOD", "depthcharge")
@pytest.mark.parametrize("force_uart", [True, False], ids=["SSH", "UART"])
@pytest.mark.parametrize("mode", ["UBOOT", "FASTBOOT"])
@mock.patch("lava.lava_job_submitter.setup_lava_proxy")
def test_generate_lava_job_definition_sanity(
mock_lava_proxy,
force_uart,
mode,
shell_file,
mock_collabora_farm,
monkeypatch,
mock_proxy,
):
monkeypatch.setattr(lava.utils.lava_job_definition, "FORCE_UART", force_uart)
# Do not actually connect to the LAVA server
mock_lava_proxy.return_value = mock_proxy
farm_env = "FARM=test_farm"
init_script = "/install/common/init-stage1.sh"
job_submitter = job_submitter_factory(mode, shell_file(init_script))
job_definition = LAVAJobDefinition(job_submitter).generate_lava_job_definition()
# Load the YAML output and check that it contains the expected keys and values
yaml = YAML()
job_dict = yaml.load(job_definition)
yaml.dump(job_dict, Path(f"/tmp/{mode}_force_uart={force_uart}_job_definition.yaml"))
assert job_dict["device_type"] == job_submitter.device_type
assert job_dict["visibility"]["group"] == [job_submitter.visibility_group]
assert job_dict["timeouts"]["job"]["minutes"] == job_submitter.job_timeout_min
assert job_dict["context"]["extra_nfsroot_args"]
assert job_dict["timeouts"]["actions"]
assert len(job_dict["actions"]) == 3 if mode == "UART" else 5
last_test_action = job_dict["actions"][-1]["test"]
# TODO: Remove hardcoded "mesa" test name, as this submitter is being used by other projects
first_test_name = last_test_action["definitions"][0]["name"]
is_running_ssh = "ssh" in first_test_name
# if force_uart, is_ssh must be False. If is_ssh, force_uart must be False. Both can be False
assert not (is_running_ssh and force_uart)
assert last_test_action["failure_retry"] == 3 if is_running_ssh else 1
run_steps = "".join(last_test_action["definitions"][0]["repository"]["run"]["steps"])
# Check for project name in lava-test-case
assert re.search(rf"lava.?\S*.test.case.*{job_submitter.project_name}", run_steps)
action_names = flatten(j.keys() for j in job_dict["actions"])
if is_running_ssh:
assert action_names == (
[
"deploy",
"boot",
"test", # DUT: SSH server
"test", # Docker: SSH client
]
if mode == "UBOOT"
else [
"deploy", # NFS
"deploy", # Image generation
"deploy", # Image deployment
"boot",
"test", # DUT: SSH server
"test", # Docker: SSH client
]
)
test_action_server = job_dict["actions"][-2]["test"]
# SSH server in the DUT
assert test_action_server["namespace"] == "dut"
# SSH client via docker
assert last_test_action["namespace"] == "container"
boot_action = next(a["boot"] for a in job_dict["actions"] if "boot" in a)
assert boot_action["namespace"] == "dut"
# SSH server bootstrapping
assert "dropbear" in "".join(boot_action["auto_login"]["login_commands"])
return
# ---- Not SSH job
assert action_names == (
[
"deploy",
"boot",
"test",
]
if mode == "UBOOT"
else [
"deploy", # NFS
"deploy", # Image generation
"deploy", # Image deployment
"boot",
"test",
]
)
assert farm_env in run_steps
assert init_script in run_steps
# use yaml files from tests/data/ to test the job definition generation
@pytest.mark.parametrize("force_uart", [False, True], ids=["SSH", "UART"])
@pytest.mark.parametrize("mode", ["UBOOT", "FASTBOOT"])
@mock.patch("lava.lava_job_submitter.setup_lava_proxy")
def test_lava_job_definition(
mock_lava_proxy,
mode,
force_uart,
shell_file,
mock_collabora_farm,
mock_proxy,
monkeypatch,
):
monkeypatch.setattr(lava.utils.lava_job_definition, "FORCE_UART", force_uart)
# Do not actually connect to the LAVA server
mock_lava_proxy.return_value = mock_proxy
yaml = YAML()
yaml.default_flow_style = False
# Load the YAML output and check that it contains the expected keys and values
expected_job_dict = load_yaml_file(f"{mode}_force_uart={force_uart}_job_definition.yaml")
init_script = f"FARM=test_farm /test_dir/install/common/init-stage1.sh"
job_submitter = job_submitter_factory(mode, shell_file(init_script))
job_definition = LAVAJobDefinition(job_submitter).generate_lava_job_definition()
job_dict = yaml.load(job_definition)
# Uncomment the following to update the expected YAML files
# yaml.dump(job_dict, load_data_file(f"{mode}_force_uart={force_uart}_job_definition.yaml"))
# Check that the generated job definition matches the expected one
assert job_dict == expected_job_dict
@pytest.mark.parametrize(
"directive",
["declare -x", "export"],
)
@pytest.mark.parametrize(
"original_env_output",
[
# Test basic environment variables
"FOO=bar\nBAZ=qux",
# Test export statements
"{directive} FOO=bar",
# Test multiple exports
"{directive} FOO=bar\n{directive} BAZ=qux\nNORM=val",
# Test mixed content with export
"{directive} FOO=bar\nBAZ=qux\n{directive} HELLO=world",
# Test empty file
"",
# Test special characters that need shell quoting
"FOO='bar baz'\nQUOTE=\"hello world\"",
# Test variables with spaces and quotes
"{directive} VAR='val spaces'\nQUOTES=\"test\"",
# Test inline scripts with export
"{directive} FOO=bar\nBAZ=qux\n{directive} HELLO=world",
# Test single quote inside double quotes in variable
"{directive} FOO='Revert \"commit's error\"'",
# Test backticks in variable
"{directive} FOO=`echo 'test'`",
],
ids=[
"basic_vars",
"single_export",
"multiple_exports",
"mixed_exports",
"empty_file",
"special_chars",
"spaces_and_quotes",
"inline_scripts_with_export",
"single_quote_in_var",
"backticks",
]
)
def test_encode_job_env_vars(directive, original_env_output, shell_file, clear_env_vars):
"""Test the encode_job_env_vars function with various environment file contents."""
import base64
import shlex
# Create environment file with test content
original_env_output = original_env_output.format(directive=directive)
env_file = shell_file(original_env_output)
# Create job submitter with the environment file
job_submitter = mock.MagicMock(spec=LAVAJobSubmitter, env_file=env_file)
job_definition = LAVAJobDefinition(job_submitter)
# Call the function under test
result = job_definition.encode_job_env_vars()
# Verify the result is a list with exactly one element
assert isinstance(result, list)
assert len(result) == 1
# Extract the command from the result
command = result[0]
assert isinstance(command, str)
# Extract the base64 encoded part
start_marker = 'echo '
end_marker = ' | base64 -d'
start_idx = command.find(start_marker) + len(start_marker)
end_idx = command.find(end_marker)
redirect_idx = command.find(">")
encoded_part = command[start_idx:end_idx]
# Verify if the script is executed correctly
env_script_process = subprocess.run(
["bash", "-c", command[:redirect_idx]], capture_output=True, text=True
)
if env_script_process.returncode != 0:
pytest.fail(f"Failed to execute script: {env_script_process.stderr}")
generated_env_output = env_script_process.stdout.strip()
# The encoded part should be shell-quoted, so we need to parse it
# Use shlex to unquote the encoded content
unquoted_encoded = shlex.split(encoded_part)[0]
# Decode the base64 content
try:
decoded_content = base64.b64decode(unquoted_encoded).decode()
except Exception as e:
pytest.fail(f"Failed to decode base64 content: {e}. Encoded part: {encoded_part}")
# Verify the decoded content matches the original file content
assert decoded_content == original_env_output == generated_env_output

View File

@@ -1,502 +0,0 @@
#!/usr/bin/env python3
#
# Copyright (C) 2022 Collabora Limited
# Author: Guilherme Gallo <guilherme.gallo@collabora.com>
#
# SPDX-License-Identifier: MIT
from datetime import UTC, datetime, timedelta
import pytest
from lava.exceptions import MesaCIKnownIssueException, MesaCITimeoutError
from lava.utils import (
GitlabSection,
LogFollower,
LogSectionType,
fix_lava_gitlab_section_log,
hide_sensitive_data,
)
from lava.utils.constants import (
KNOWN_ISSUE_R8152_MAX_CONSECUTIVE_COUNTER,
A6XX_GPU_RECOVERY_WATCH_PERIOD_MIN,
A6XX_GPU_RECOVERY_FAILURE_MESSAGE,
A6XX_GPU_RECOVERY_FAILURE_MAX_COUNT,
)
from lava.utils.lava_log_hints import LAVALogHints
from ..lava.helpers import (
create_lava_yaml_msg,
does_not_raise,
lava_yaml,
mock_lava_signal,
yaml_dump,
)
GITLAB_SECTION_SCENARIOS = {
"start collapsed": (
"start",
True,
f"\x1b[0Ksection_start:mock_date:my_first_section[collapsed=true]\r\x1b[0K"
f"{GitlabSection.colour}my_header\x1b[0m",
),
"start non_collapsed": (
"start",
False,
f"\x1b[0Ksection_start:mock_date:my_first_section\r\x1b[0K"
f"{GitlabSection.colour}my_header\x1b[0m",
),
"end collapsed": (
"end",
True,
"\x1b[0Ksection_end:mock_date:my_first_section\r\x1b[0K",
),
"end non_collapsed": (
"end",
False,
"\x1b[0Ksection_end:mock_date:my_first_section\r\x1b[0K",
),
}
@pytest.mark.parametrize(
"method, collapsed, expectation",
GITLAB_SECTION_SCENARIOS.values(),
ids=GITLAB_SECTION_SCENARIOS.keys(),
)
def test_gitlab_section(method, collapsed, expectation):
gs = GitlabSection(
id="my_first_section",
header="my_header",
type=LogSectionType.TEST_CASE,
start_collapsed=collapsed,
)
gs.get_timestamp = lambda mock_date: "mock_date"
gs.start()
result = getattr(gs, method)()
assert result == expectation
def test_gl_sections():
lines = [
{
"dt": datetime.now(tz=UTC),
"lvl": "debug",
"msg": "Received signal: <STARTRUN> 0_setup-ssh-server 10145749_1.3.2.3.1",
},
{
"dt": datetime.now(tz=UTC),
"lvl": "debug",
"msg": "Received signal: <STARTRUN> 0_mesa 5971831_1.3.2.3.1",
},
# Redundant log message which triggers the same Gitlab Section, it
# should be ignored, unless the id is different
{
"dt": datetime.now(tz=UTC),
"lvl": "target",
"msg": "[ 7.778836] <LAVA_SIGNAL_STARTRUN 0_mesa 5971831_1.3.2.3.1>",
},
{
"dt": datetime.now(tz=UTC),
"lvl": "debug",
"msg": "Received signal: <STARTTC> mesa-ci_iris-kbl-traces",
},
# Another redundant log message
{
"dt": datetime.now(tz=UTC),
"lvl": "target",
"msg": "[ 16.997829] <LAVA_SIGNAL_STARTTC mesa-ci_iris-kbl-traces>",
},
{
"dt": datetime.now(tz=UTC),
"lvl": "target",
"msg": "<LAVA_SIGNAL_ENDTC mesa-ci_iris-kbl-traces>",
},
]
lf = LogFollower()
with lf:
for line in lines:
lf.manage_gl_sections(line)
parsed_lines = lf.flush()
section_types = [s.type for s in lf.section_history]
assert "section_start" in parsed_lines[0]
assert "collapsed=true" in parsed_lines[0]
assert "section_end" in parsed_lines[1]
assert "section_start" in parsed_lines[2]
assert "collapsed=true" in parsed_lines[2]
assert "section_end" in parsed_lines[3]
assert "section_start" in parsed_lines[4]
assert "collapsed=true" in parsed_lines[4]
assert section_types == [
# LogSectionType.LAVA_BOOT, True, if LogFollower started with Boot section
LogSectionType.TEST_SUITE,
LogSectionType.TEST_CASE,
LogSectionType.LAVA_POST_PROCESSING,
]
def test_log_follower_flush():
lines = [
{
"dt": datetime.now(tz=UTC),
"lvl": "debug",
"msg": "Received signal: <STARTTC> mesa-ci_iris-kbl-traces",
},
{
"dt": datetime.now(tz=UTC),
"lvl": "target",
"msg": "<LAVA_SIGNAL_ENDTC mesa-ci_iris-kbl-traces>",
},
]
lf = LogFollower()
lf.feed(lines)
parsed_lines = lf.flush()
empty = lf.flush()
lf.feed(lines)
repeated_parsed_lines = lf.flush()
assert parsed_lines
assert not empty
assert repeated_parsed_lines
SENSITIVE_DATA_SCENARIOS = {
"no sensitive data tagged": (
["bla bla", "mytoken: asdkfjsde1341=="],
["bla bla", "mytoken: asdkfjsde1341=="],
["HIDEME"],
),
"sensitive data tagged": (
["bla bla", "mytoken: asdkfjsde1341== # HIDEME"],
["bla bla"],
["HIDEME"],
),
"sensitive data tagged with custom word": (
["bla bla", "mytoken: asdkfjsde1341== # DELETETHISLINE", "third line # NOTANYMORE"],
["bla bla", "third line # NOTANYMORE"],
["DELETETHISLINE", "NOTANYMORE"],
),
}
@pytest.mark.parametrize(
"input, expectation, tags",
SENSITIVE_DATA_SCENARIOS.values(),
ids=SENSITIVE_DATA_SCENARIOS.keys(),
)
def test_hide_sensitive_data(input, expectation, tags):
yaml_data = yaml_dump(input)
yaml_result = hide_sensitive_data(yaml_data, *tags)
result = lava_yaml.load(yaml_result)
assert result == expectation
GITLAB_SECTION_SPLIT_SCENARIOS = {
"Split section_start at target level": (
"\x1b[0Ksection_start:1668454947:test_post_process[collapsed=true]\r\x1b[0K"
"post-processing test results",
(
"\x1b[0Ksection_start:1668454947:test_post_process[collapsed=true]",
"\x1b[0Kpost-processing test results",
),
),
"Split section_end at target level": (
"\x1b[0Ksection_end:1666309222:test_post_process\r\x1b[0K",
("\x1b[0Ksection_end:1666309222:test_post_process", "\x1b[0K"),
),
"Second line is not split from the first": (
("\x1b[0Ksection_end:1666309222:test_post_process", "Any message"),
("\x1b[0Ksection_end:1666309222:test_post_process", "Any message"),
),
}
@pytest.mark.parametrize(
"expected_message, messages",
GITLAB_SECTION_SPLIT_SCENARIOS.values(),
ids=GITLAB_SECTION_SPLIT_SCENARIOS.keys(),
)
def test_fix_lava_gitlab_section_log(expected_message, messages):
fixed_messages = []
gen = fix_lava_gitlab_section_log()
next(gen)
for message in messages:
lava_log = create_lava_yaml_msg(msg=message, lvl="target")
if recovered_line := gen.send(lava_log):
fixed_messages.append((recovered_line, lava_log["msg"]))
fixed_messages.append(lava_log["msg"])
assert expected_message in fixed_messages
@pytest.mark.parametrize(
"expected_message, messages",
GITLAB_SECTION_SPLIT_SCENARIOS.values(),
ids=GITLAB_SECTION_SPLIT_SCENARIOS.keys(),
)
def test_lava_gitlab_section_log_collabora(expected_message, messages, monkeypatch):
"""Check if LogFollower does not change the message if we are running in Collabora farm."""
monkeypatch.setenv("RUNNER_TAG", "mesa-ci-x86_64-lava-test")
lf = LogFollower()
for message in messages:
lf.feed([create_lava_yaml_msg(msg=message)])
new_messages = lf.flush()
new_messages = tuple(new_messages) if len(new_messages) > 1 else new_messages[0]
assert new_messages == expected_message
CARRIAGE_RETURN_SCENARIOS = {
"Carriage return at the end of the previous line": (
(
"\x1b[0Ksection_start:1677609903:test_setup[collapsed=true]\r\x1b[0K\x1b[0;36m[303:44] "
"deqp: preparing test setup\x1b[0m",
),
(
"\x1b[0Ksection_start:1677609903:test_setup[collapsed=true]\r",
"\x1b[0K\x1b[0;36m[303:44] deqp: preparing test setup\x1b[0m\r\n",
),
),
"Newline at the end of the line": (
("\x1b[0K\x1b[0;36m[303:44] deqp: preparing test setup\x1b[0m", "log"),
("\x1b[0K\x1b[0;36m[303:44] deqp: preparing test setup\x1b[0m\r\n", "log"),
),
}
@pytest.mark.parametrize(
"expected_message, messages",
CARRIAGE_RETURN_SCENARIOS.values(),
ids=CARRIAGE_RETURN_SCENARIOS.keys(),
)
def test_lava_log_merge_carriage_return_lines(expected_message, messages):
lf = LogFollower()
for message in messages:
lf.feed([create_lava_yaml_msg(msg=message)])
new_messages = tuple(lf.flush())
assert new_messages == expected_message
WATCHDOG_SCENARIOS = {
"1 second before timeout": ({"seconds": -1}, does_not_raise()),
"1 second after timeout": ({"seconds": 1}, pytest.raises(MesaCITimeoutError)),
}
@pytest.mark.parametrize(
"timedelta_kwargs, exception",
WATCHDOG_SCENARIOS.values(),
ids=WATCHDOG_SCENARIOS.keys(),
)
def test_log_follower_watchdog(frozen_time, timedelta_kwargs, exception):
lines = [
{
"dt": datetime.now(tz=UTC),
"lvl": "debug",
"msg": "Received signal: <STARTTC> mesa-ci_iris-kbl-traces",
},
]
td = {LogSectionType.TEST_CASE: timedelta(minutes=1)}
lf = LogFollower(timeout_durations=td)
lf.feed(lines)
frozen_time.tick(
lf.timeout_durations[LogSectionType.TEST_CASE] + timedelta(**timedelta_kwargs)
)
lines = [create_lava_yaml_msg()]
with exception:
lf.feed(lines)
GITLAB_SECTION_ID_SCENARIOS = [
("a-good_name", "a-good_name"),
("spaces are not welcome", "spaces-are-not-welcome"),
("abc:amd64 1/3", "abc-amd64-1-3"),
]
@pytest.mark.parametrize("case_name, expected_id", GITLAB_SECTION_ID_SCENARIOS)
def test_gitlab_section_id(case_name, expected_id):
gl = GitlabSection(
id=case_name, header=case_name, type=LogSectionType.LAVA_POST_PROCESSING
)
assert gl.id == expected_id
def a618_network_issue_logs(level: str = "target") -> list:
net_error = create_lava_yaml_msg(
msg="[ 1733.599402] r8152 2-1.3:1.0 eth0: Tx status -71", lvl=level)
nfs_error = create_lava_yaml_msg(
msg="[ 1733.604506] nfs: server 192.168.201.1 not responding, still trying",
lvl=level,
)
return [
*(KNOWN_ISSUE_R8152_MAX_CONSECUTIVE_COUNTER*[net_error]),
nfs_error
]
TEST_PHASE_LAVA_SIGNAL = mock_lava_signal(LogSectionType.TEST_CASE)
A618_NET_ISSUE_BOOT = a618_network_issue_logs(level="feedback")
A618_NET_ISSUE_TEST = [TEST_PHASE_LAVA_SIGNAL, *a618_network_issue_logs(level="target")]
A618_NETWORK_ISSUE_SCENARIOS = {
"Fail - R8152 kmsg during boot phase": (
A618_NET_ISSUE_BOOT,
pytest.raises(MesaCIKnownIssueException),
),
"Fail - R8152 kmsg during test phase": (
A618_NET_ISSUE_TEST,
pytest.raises(MesaCIKnownIssueException),
),
"Pass - Partial (1) R8152 kmsg during test phase": (
A618_NET_ISSUE_TEST[:1],
does_not_raise(),
),
"Pass - Partial (2) R8152 kmsg during test phase": (
A618_NET_ISSUE_TEST[:2],
does_not_raise(),
),
"Pass - Partial (3) subsequent R8152 kmsg during test phase": (
[
TEST_PHASE_LAVA_SIGNAL,
A618_NET_ISSUE_TEST[1],
A618_NET_ISSUE_TEST[1],
],
does_not_raise(),
),
"Pass - Partial (4) subsequent nfs kmsg during test phase": (
[
TEST_PHASE_LAVA_SIGNAL,
A618_NET_ISSUE_TEST[-1],
A618_NET_ISSUE_TEST[-1],
],
does_not_raise(),
),
}
@pytest.mark.parametrize(
"messages, expectation",
A618_NETWORK_ISSUE_SCENARIOS.values(),
ids=A618_NETWORK_ISSUE_SCENARIOS.keys(),
)
def test_detect_failure(messages, expectation):
boot_section = GitlabSection(
id="dut_boot",
header="Booting hardware device",
type=LogSectionType.LAVA_BOOT,
start_collapsed=True,
)
boot_section.start()
lf = LogFollower(starting_section=boot_section)
with expectation:
lf.feed(messages)
def test_detect_a6xx_gpu_recovery_failure(frozen_time):
log_follower = LogFollower()
lava_log_hints = LAVALogHints(log_follower=log_follower)
failure_message = {
"dt": datetime.now(tz=UTC).isoformat(),
"msg": A6XX_GPU_RECOVERY_FAILURE_MESSAGE[0],
"lvl": "feedback",
}
with pytest.raises(MesaCIKnownIssueException):
for _ in range(A6XX_GPU_RECOVERY_FAILURE_MAX_COUNT):
lava_log_hints.detect_a6xx_gpu_recovery_failure(failure_message)
# Simulate the passage of time within the watch period
frozen_time.tick(1)
failure_message["dt"] = datetime.now(tz=UTC).isoformat()
def test_detect_a6xx_gpu_recovery_success(frozen_time):
log_follower = LogFollower()
lava_log_hints = LAVALogHints(log_follower=log_follower)
failure_message = {
"dt": datetime.now(tz=UTC).isoformat(),
"msg": A6XX_GPU_RECOVERY_FAILURE_MESSAGE[0],
"lvl": "feedback",
}
# Simulate sending a tolerable number of failure messages
for _ in range(A6XX_GPU_RECOVERY_FAILURE_MAX_COUNT - 1):
lava_log_hints.detect_a6xx_gpu_recovery_failure(failure_message)
frozen_time.tick(1)
failure_message["dt"] = datetime.now(tz=UTC).isoformat()
# Simulate the passage of time outside of the watch period
frozen_time.tick(60 * A6XX_GPU_RECOVERY_WATCH_PERIOD_MIN + 1)
failure_message = {
"dt": datetime.now(tz=UTC).isoformat(),
"msg": A6XX_GPU_RECOVERY_FAILURE_MESSAGE[1],
"lvl": "feedback",
}
with does_not_raise():
lava_log_hints.detect_a6xx_gpu_recovery_failure(failure_message)
assert lava_log_hints.a6xx_gpu_first_fail_time is None, (
"a6xx_gpu_first_fail_time is not None"
)
assert lava_log_hints.a6xx_gpu_recovery_fail_counter == 0, (
"a6xx_gpu_recovery_fail_counter is not 0"
)
@pytest.mark.parametrize(
"start_offset",
[
timedelta(hours=0),
timedelta(hours=1),
],
ids=["equal timestamps", "negative delta"],
)
def test_gitlab_section_relative_time_clamping(start_offset):
"""Test that delta time is clamped to zero if start_time <= timestamp_relative_to."""
now = datetime.now(tz=UTC)
timestamp_relative_to = now + start_offset
gs = GitlabSection(
id="clamp_section",
header=f"clamp_section header {start_offset}",
type=LogSectionType.TEST_CASE,
timestamp_relative_to=timestamp_relative_to,
)
gs.start()
output = gs.print_start_section()
assert "[00:00]" in output, f"Expected clamped relative time, got: {output}"
@pytest.mark.parametrize(
"delta_seconds,expected_seconds",
[
(-5, 0), # Negative delta should be clamped to 0
(0, 0), # Zero delta should remain 0
(5, 5), # Positive delta should remain unchanged
],
ids=["negative delta", "zero delta", "positive delta"],
)
def test_gitlab_section_delta_time(frozen_time, delta_seconds, expected_seconds):
"""Test that delta_time() properly clamps negative deltas to zero."""
gs = GitlabSection(
id="delta_section",
header=f"delta_section header {delta_seconds}",
type=LogSectionType.TEST_CASE,
)
with gs:
frozen_time.tick(delta_seconds)
# Test internal _delta_time() returns exact delta
internal_delta = gs._delta_time()
assert internal_delta == timedelta(seconds=delta_seconds), (
f"_delta_time() returned {internal_delta}, expected {timedelta(seconds=delta_seconds)}"
)
# Test public delta_time() returns clamped delta
clamped_delta = gs.delta_time()
assert clamped_delta == timedelta(seconds=expected_seconds), (
f"delta_time() returned {clamped_delta}, expected {timedelta(seconds=expected_seconds)}"
)

View File

@@ -1,26 +0,0 @@
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_TRIGGER_TAG
from .console_format import CONSOLE_LOG
from .gitlab_section import GitlabSection
from .lava_job import LAVAJob
from .lava_job_definition import LAVAJobDefinition
from .lava_proxy import call_proxy, setup_lava_proxy
from .log_follower import (
LogFollower,
fatal_err,
fix_lava_gitlab_section_log,
hide_sensitive_data,
print_log,
)
from .log_section import (
DEFAULT_GITLAB_SECTION_TIMEOUTS,
FALLBACK_GITLAB_SECTION_TIMEOUT,
LogSection,
LogSectionType,
CI_JOB_TIMEOUT_MIN,
LAVA_TEST_OVERHEAD_MIN,
LAVA_TEST_CASE_TIMEOUT,
LAVA_TEST_SUITE_TIMEOUT,
)

View File

@@ -1,19 +0,0 @@
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_TRIGGER_TAG
CONSOLE_LOG = {
"FG_GREEN": "\x1b[0;32m",
"FG_BOLD_GREEN": "\x1b[0;1;32m",
"FG_RED": "\x1b[0;38;5;197m",
"FG_BOLD_RED": "\x1b[0;1;38;5;197m",
"FG_YELLOW": "\x1b[0;33m",
"FG_BOLD_YELLOW": "\x1b[0;1;33m",
"FG_MAGENTA": "\x1b[0;35m",
"FG_BOLD_MAGENTA": "\x1b[0;1;35m",
"FG_CYAN": "\x1b[0;36m",
"RESET": "\x1b[0m",
"UNDERLINED": "\x1b[3m",
"BOLD": "\x1b[1m",
"DIM": "\x1b[2m",
}

View File

@@ -1,39 +0,0 @@
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_TRIGGER_TAG
from os import getenv
# How many attempts should be made when a timeout happen during LAVA device boot.
NUMBER_OF_ATTEMPTS_LAVA_BOOT = int(getenv("LAVA_NUMBER_OF_ATTEMPTS_LAVA_BOOT", 2))
# The deploy involves downloading and decompressing the kernel, modules, dtb and the overlays.
# We should retry, to overcome network issues.
NUMBER_OF_ATTEMPTS_LAVA_DEPLOY = int(getenv("LAVA_NUMBER_OF_ATTEMPTS_LAVA_DEPLOY", 2))
# Supports any integers in [0, 100].
# The scheduler considers the job priority when ordering the queue
# to consider which job should run next.
JOB_PRIORITY = int(getenv("JOB_PRIORITY", 75))
# Use UART over the default SSH mechanism to follow logs.
# Caution: this can lead to device silence in some devices in Mesa CI.
FORCE_UART = bool(getenv("LAVA_FORCE_UART", False))
# How many times the r8152 error may happen to consider it a known issue.
KNOWN_ISSUE_R8152_MAX_CONSECUTIVE_COUNTER: int = 10
KNOWN_ISSUE_R8152_PATTERNS: tuple[str, ...] = (
r"r8152 \S+ eth0: Tx status -71",
r"nfs: server \d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3} not responding, still trying",
)
# This is considered noise, since LAVA produces this log after receiving a package of feedback
# messages.
LOG_DEBUG_FEEDBACK_NOISE = "Listened to connection for namespace 'dut' done"
A6XX_GPU_RECOVERY_WATCH_PERIOD_MIN = 3
A6XX_GPU_RECOVERY_FAILURE_MAX_COUNT = 30
A6XX_GPU_RECOVERY_FAILURE_MESSAGE = (
"cx gdsc didn't collapse",
"Timeout waiting for GMU OOB",
)

View File

@@ -1,157 +0,0 @@
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_TRIGGER_TAG
from __future__ import annotations
import re
from dataclasses import dataclass, field
from datetime import datetime, timedelta, UTC
from math import floor
from typing import TYPE_CHECKING, Any, Optional
from lava.utils.console_format import CONSOLE_LOG
if TYPE_CHECKING:
from lava.utils.log_section import LogSectionType
# TODO: Add section final status to assist with monitoring
@dataclass
class GitlabSection:
id: str
header: str
type: LogSectionType
start_collapsed: bool = False
suppress_end: bool = False
suppress_start: bool = False
timestamp_relative_to: Optional[datetime] = None
escape: str = "\x1b[0K"
colour: str = f"{CONSOLE_LOG['FG_CYAN']}"
__start_time: Optional[datetime] = field(default=None, init=False)
__end_time: Optional[datetime] = field(default=None, init=False)
@classmethod
def section_id_filter(cls, value: str) -> str:
return str(re.sub(r"[^\w_-]+", "-", value))
def __post_init__(self) -> None:
self.id = self.section_id_filter(self.id)
@property
def has_started(self) -> bool:
return self.__start_time is not None
@property
def has_finished(self) -> bool:
return self.__end_time is not None
@property
def start_time(self) -> Optional[datetime]:
return self.__start_time
@property
def end_time(self) -> Optional[datetime]:
return self.__end_time
def get_timestamp(self, time: datetime) -> str:
unix_ts = datetime.timestamp(time)
return str(int(unix_ts))
def section(self, marker: str, header: str, time: datetime) -> str:
preamble = f"{self.escape}section_{marker}"
collapse = marker == "start" and self.start_collapsed
collapsed = "[collapsed=true]" if collapse else ""
section_id = f"{self.id}{collapsed}"
timestamp = self.get_timestamp(time)
before_header = ":".join([preamble, timestamp, section_id])
if self.timestamp_relative_to and self.start_time is not None:
delta = self.start_time - self.timestamp_relative_to
# time drift can occur because we are dealing with timestamps from different sources
# clamp the delta to 0 if it's negative
delta = max(delta, timedelta(seconds=0))
reltime = f"[{floor(delta.seconds / 60):02}:{(delta.seconds % 60):02}] "
else:
reltime = ""
colored_header = f"{self.colour}{reltime}{header}\x1b[0m" if header else ""
header_wrapper = "\r" + f"{self.escape}{colored_header}"
return f"{before_header}{header_wrapper}"
def __str__(self) -> str:
status = "NS" if not self.has_started else "F" if self.has_finished else "IP"
delta = self.delta_time()
elapsed_time = "N/A" if delta is None else str(delta)
return (
f"GitlabSection({self.id}, {self.header}, {self.type}, "
f"SC={self.start_collapsed}, S={status}, ST={self.start_time}, "
f"ET={self.end_time}, ET={elapsed_time})"
)
def __enter__(self) -> "GitlabSection":
if start_log_line := self.start():
print(start_log_line)
return self
def __exit__(
self,
*args: list[Any],
**kwargs: dict[str, Any],
) -> None:
if end_log_line := self.end():
print(end_log_line)
def start(self) -> str:
assert not self.has_finished, "Starting an already finished section"
self.__start_time = datetime.now(tz=UTC)
return self.print_start_section()
def print_start_section(self) -> str:
if self.suppress_start:
return ""
if self.__start_time is None:
raise RuntimeError("Start time is not set.")
return self.section(marker="start", header=self.header, time=self.__start_time)
def end(self) -> str:
assert self.__start_time is not None, "Ending an uninitialized section"
self.__end_time = datetime.now(tz=UTC)
if self.__end_time < self.__start_time:
print(
CONSOLE_LOG["FG_YELLOW"]
+ f"Warning: Section {self.id} ended before it started, clamping the delta time to 0"
+ CONSOLE_LOG["RESET"]
)
return self.print_end_section()
def print_end_section(self) -> str:
if self.suppress_end:
return ""
if self.__end_time is None:
raise RuntimeError("End time is not set.")
return self.section(marker="end", header="", time=self.__end_time)
def _delta_time(self) -> Optional[timedelta]:
"""
Return the delta time between the start and end of the section.
If the section has not ended, return the delta time between the start and now.
If the section has not started and not ended, return None.
"""
if self.__start_time is None:
return None
if self.__end_time is None:
return datetime.now(tz=UTC) - self.__start_time
return self.__end_time - self.__start_time
def delta_time(self) -> Optional[timedelta]:
"""
Clamp the delta time to zero if it's negative, time drift can occur since we have timestamps
coming from GitLab jobs, LAVA dispatcher and DUTs.
"""
delta = self._delta_time()
if delta is None:
return None
return max(delta, timedelta(seconds=0))

View File

@@ -1,33 +0,0 @@
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_TRIGGER_TAG
import logging
import os
def get_lava_farm() -> str:
"""
Returns the LAVA farm based on the FARM environment variable.
:return: The LAVA farm
"""
farm: str = os.getenv("FARM", "unknown")
if farm == "unknown":
logging.warning("FARM environment variable is not set, using unknown")
return farm.lower()
def get_lava_boot_method() -> str:
"""
Returns the LAVA boot method based on the BOOT_METHOD environment variable.
:return: The LAVA boot method
"""
boot_method: str = os.getenv("BOOT_METHOD", "unknown")
if boot_method == "unknown":
logging.warning("BOOT_METHOD environment variable is not set, using unknown")
return boot_method.lower()

View File

@@ -1,213 +0,0 @@
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_TRIGGER_TAG
import re
import xmlrpc
from collections import defaultdict
from datetime import datetime, UTC
from typing import Any, Optional
from lava.exceptions import (
MesaCIException,
MesaCIRetriableException,
MesaCIKnownIssueException,
MesaCIParseException,
MesaCITimeoutError,
)
from lava.utils import CONSOLE_LOG
from lava.utils.log_follower import print_log
from lavacli.utils import flow_yaml as lava_yaml
from .lava_proxy import call_proxy
class LAVAJob:
COLOR_STATUS_MAP: dict[str, str] = {
"pass": CONSOLE_LOG["FG_GREEN"],
"hung": CONSOLE_LOG["FG_BOLD_YELLOW"],
"fail": CONSOLE_LOG["FG_BOLD_RED"],
"canceled": CONSOLE_LOG["FG_BOLD_MAGENTA"],
}
def __init__(self, proxy, definition, log=defaultdict(str)) -> None:
self._job_id = None
self.proxy = proxy
self.definition = definition
self.last_log_line = 0
self.last_log_time = None
self._is_finished = False
self.log: dict[str, Any] = log
self.status = "not_submitted"
# Set the default exit code to 1 because we should set it to 0 only if the job has passed.
# If it fails or if it is interrupted, the exit code should be set to a non-zero value to
# make the GitLab job fail.
self._exit_code: int = 1
self.__exception: Optional[Exception] = None
def heartbeat(self) -> None:
self.last_log_time: datetime = datetime.now(tz=UTC)
self.status = "running"
@property
def status(self) -> str:
return self._status
@status.setter
def status(self, new_status: str) -> None:
self._status = new_status
self.log["status"] = self._status
@property
def exit_code(self) -> int:
return self._exit_code
@exit_code.setter
def exit_code(self, code: int) -> None:
self._exit_code = code
self.log["exit_code"] = self._exit_code
@property
def job_id(self) -> int:
return self._job_id
@job_id.setter
def job_id(self, new_id: int) -> None:
self._job_id = new_id
self.log["lava_job_id"] = self._job_id
@property
def is_finished(self) -> bool:
return self._is_finished
@property
def exception(self) -> Optional[Exception]:
return self.__exception
@exception.setter
def exception(self, exception: Exception) -> None:
self.__exception = exception
self.log["dut_job_fail_reason"] = repr(self.__exception)
def validate(self) -> Optional[dict]:
"""Returns a dict with errors, if the validation fails.
Returns:
Optional[dict]: a dict with the validation errors, if any
"""
return call_proxy(self.proxy.scheduler.jobs.validate, self.definition, True)
def show(self) -> dict[str, str]:
return call_proxy(self.proxy.scheduler.jobs.show, self._job_id)
def get_lava_time(self, key, data) -> Optional[str]:
return data[key].value if data[key] else None
def refresh_log(self) -> None:
details = self.show()
self.log["dut_start_time"] = self.get_lava_time("start_time", details)
self.log["dut_submit_time"] = self.get_lava_time("submit_time", details)
self.log["dut_end_time"] = self.get_lava_time("end_time", details)
self.log["dut_name"] = details.get("device")
self.log["dut_state"] = details.get("state")
def submit(self) -> bool:
try:
self.job_id = call_proxy(self.proxy.scheduler.jobs.submit, self.definition)
self.status = "submitted"
self.refresh_log()
except MesaCIException:
return False
return True
def lava_state(self) -> str:
job_state: dict[str, str] = call_proxy(
self.proxy.scheduler.job_state, self._job_id
)
return job_state["job_state"]
def cancel(self):
if self._job_id:
self.proxy.scheduler.jobs.cancel(self._job_id)
# If we don't have yet set another job's status, let's update it
# with canceled one
if self.status == "running":
self.status = "canceled"
def is_started(self) -> bool:
waiting_states = ("Submitted", "Scheduling", "Scheduled")
return self.lava_state() not in waiting_states
def is_post_processed(self) -> bool:
return self.lava_state() != "Running"
def _load_log_from_data(self, data) -> list[str]:
lines = []
if isinstance(data, xmlrpc.client.Binary):
# We are dealing with xmlrpc.client.Binary
# Let's extract the data
data = data.data
# When there is no new log data, the YAML is empty
if loaded_lines := lava_yaml.load(data):
lines: list[str] = loaded_lines
self.last_log_line += len(lines)
return lines
def get_logs(self) -> list[str]:
try:
(finished, data) = call_proxy(
self.proxy.scheduler.jobs.logs, self._job_id, self.last_log_line
)
self._is_finished = finished
return self._load_log_from_data(data)
except Exception as mesa_ci_err:
raise MesaCIParseException(
f"Could not get LAVA job logs. Reason: {mesa_ci_err}"
) from mesa_ci_err
def parse_job_result_from_log(
self, lava_lines: list[dict[str, str]]
) -> list[dict[str, str]]:
"""Use the console log to catch if the job has completed successfully or
not. Returns the list of log lines until the result line."""
last_line = None # Print all lines. lines[:None] == lines[:]
for idx, line in enumerate(lava_lines):
if result := re.search(r"hwci: mesa: exit_code: (\d+)", line):
self._is_finished = True
self.exit_code = int(result.group(1))
self.status = "pass" if self.exit_code == 0 else "fail"
last_line = idx
# We reached the log end here. hwci script has finished.
break
return lava_lines[:last_line]
def handle_exception(self, exception: Exception):
# Print the exception type and message
print_log(f"{type(exception).__name__}: {str(exception)}")
self.cancel()
self.exception = exception
# Set the exit code to nonzero value
self.exit_code = 1
# Give more accurate status depending on exception
if isinstance(exception, MesaCIKnownIssueException):
self.status = "canceled"
elif isinstance(exception, MesaCITimeoutError):
self.status = "hung"
elif isinstance(exception, MesaCIRetriableException):
self.status = "failed"
elif isinstance(exception, KeyboardInterrupt):
self.status = "interrupted"
print_log("LAVA job submitter was interrupted. Cancelling the job.")
raise
elif isinstance(exception, MesaCIException):
self.status = "interrupted"
print_log("LAVA job submitter was interrupted. Cancelling the job.")
raise
else:
self.status = "job_submitter_error"

View File

@@ -1,284 +0,0 @@
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_TRIGGER_TAG
from io import StringIO
from typing import TYPE_CHECKING, Any
import base64
import shlex
from ruamel.yaml import YAML
from os import getenv
from lava.utils.lava_farm import get_lava_farm, get_lava_boot_method
from lava.utils.log_section import LAVA_DEPLOY_TIMEOUT
from lava.utils.ssh_job_definition import (
generate_docker_test,
generate_dut_test,
wrap_boot_action,
wrap_final_deploy_action,
)
from lava.utils.uart_job_definition import (
fastboot_boot_action,
fastboot_deploy_actions,
tftp_boot_action,
tftp_deploy_actions,
qemu_boot_action,
qemu_deploy_actions,
uart_test_actions,
)
if TYPE_CHECKING:
from lava.lava_job_submitter import LAVAJobSubmitter
from .constants import (
FORCE_UART,
JOB_PRIORITY,
NUMBER_OF_ATTEMPTS_LAVA_BOOT,
NUMBER_OF_ATTEMPTS_LAVA_DEPLOY,
)
class LAVAJobDefinition:
"""
This class is responsible for generating the YAML payload to submit a LAVA
job.
"""
def __init__(self, job_submitter: "LAVAJobSubmitter") -> None:
self.job_submitter: "LAVAJobSubmitter" = job_submitter
# NFS args provided by LAVA
self.lava_nfs_args: str = "root=/dev/nfs rw nfsroot=$NFS_SERVER_IP:$NFS_ROOTFS,tcp,hard,v3 ip=dhcp"
# extra_nfsroot_args appends to cmdline
self.extra_nfsroot_args: str = " init=/init rootwait usbcore.quirks=0bda:8153:k"
# Append LAVA_CMDLINE to extra_nfsroot_args
if lava_cmdline := getenv('LAVA_CMDLINE'):
self.extra_nfsroot_args += f" {lava_cmdline}"
def has_ssh_support(self) -> bool:
if FORCE_UART:
return False
current_farm = get_lava_farm()
boot_method = get_lava_boot_method()
# Some Chromebooks have unreliable serial connections, so SSH is preferred.
# Only Collabora's farm supports running docker container as a LAVA actions,
# which is required to follow the job in an SSH section
# Chromebooks use the "depthcharge" boot method, so use SSH in that case,
# and UART for everything else.
return current_farm == "collabora" and boot_method == "depthcharge"
def generate_lava_yaml_payload(self) -> dict[str, Any]:
"""
Generates a YAML payload for submitting a LAVA job, based on the provided arguments.
Args:
None
Returns:
a dictionary containing the values generated by the `generate_metadata` function and the
actions for the LAVA job submission.
"""
args = self.job_submitter
nfsrootfs = {
"url": f"{args.rootfs_url}",
"compression": "zstd",
"format": "tar",
"overlays": args._overlays,
}
values = self.generate_metadata()
init_stage1_steps = self.init_stage1_steps()
jwt_steps = self.jwt_steps()
deploy_actions = []
boot_action = []
test_actions = uart_test_actions(args, init_stage1_steps, jwt_steps)
if args.boot_method == "fastboot":
deploy_actions = fastboot_deploy_actions(self, nfsrootfs)
boot_action = fastboot_boot_action(args)
elif args.boot_method == "qemu-nfs":
deploy_actions = qemu_deploy_actions(self, nfsrootfs)
boot_action = qemu_boot_action(args)
else: # tftp
deploy_actions = tftp_deploy_actions(self, nfsrootfs)
boot_action = tftp_boot_action(args)
if self.has_ssh_support():
wrap_final_deploy_action(deploy_actions[-1])
# SSH jobs use namespaces to differentiate between the DUT and the
# docker container. Every LAVA action needs an explicit namespace, when we are not using
# the default one.
for deploy_action in deploy_actions:
deploy_action["namespace"] = "dut"
wrap_boot_action(boot_action)
test_actions = (
generate_dut_test(args, init_stage1_steps),
generate_docker_test(args, jwt_steps),
)
values["actions"] = [
*[{"deploy": d} for d in deploy_actions],
{"boot": boot_action},
*[{"test": t} for t in test_actions],
]
return values
def generate_lava_job_definition(self) -> str:
"""
Generates a LAVA job definition in YAML format and returns it as a string.
Returns:
a string representation of the job definition generated by analysing job submitter
arguments and environment variables
"""
job_stream = StringIO()
yaml = YAML()
yaml.width = 4096
yaml.dump(self.generate_lava_yaml_payload(), job_stream)
return job_stream.getvalue()
def consume_lava_tags_args(self, values: dict[str, Any]):
# python-fire parses --lava-tags without arguments as True
if isinstance(self.job_submitter.lava_tags, tuple):
values["tags"] = self.job_submitter.lava_tags
# python-fire parses "tag-1,tag2" as str and "tag1,tag2" as tuple
# even if the -- --separator is something other than '-'
elif isinstance(self.job_submitter.lava_tags, str):
# Split string tags by comma, removing any trailing commas
values["tags"] = self.job_submitter.lava_tags.rstrip(",").split(",")
# Ensure tags are always a list of non-empty strings
if "tags" in values:
values["tags"] = [tag for tag in values["tags"] if tag]
# Remove empty tags
if "tags" in values and not values["tags"]:
del values["tags"]
def generate_metadata(self) -> dict[str, Any]:
# General metadata and permissions
values = {
"job_name": f"{self.job_submitter.project_name}: {self.job_submitter.pipeline_info}",
"device_type": self.job_submitter.device_type,
"visibility": {"group": [self.job_submitter.visibility_group]},
"priority": JOB_PRIORITY,
"context": {"extra_nfsroot_args": self.extra_nfsroot_args},
"timeouts": {
"job": {"minutes": self.job_submitter.job_timeout_min},
"actions": {
"depthcharge-retry": {
# Setting higher values here, to affect the subactions, namely
# `bootloader-commands` and `login-action`
# So this value can be higher than `depthcharge-action` timeout.
"minutes": 3 * NUMBER_OF_ATTEMPTS_LAVA_DEPLOY
},
"depthcharge-action": {
# This timeout englobes the entire depthcharge timing,
# including retries
"minutes": LAVA_DEPLOY_TIMEOUT
},
"uboot-action": {
# For rockchip DUTs, U-Boot auto-login action downloads the kernel and
# setup early network. This takes 72 seconds on average.
# The LAVA action that wraps it is `uboot-commands`, but we can't set a
# timeout for it directly, it is overridden by one third of `uboot-action`
# timeout.
# So actually, this timeout is here to enforce that `uboot-action`
# timeout to be 100 seconds (uboot-action timeout /
# NUMBER_OF_ATTEMPTS_LAVA_BOOT), which is more than enough.
"seconds": 100 * NUMBER_OF_ATTEMPTS_LAVA_BOOT
},
},
},
}
self.consume_lava_tags_args(values)
# QEMU lava jobs mandate proper arch value in the context
if self.job_submitter.boot_method == "qemu-nfs":
values["context"]["arch"] = self.job_submitter.mesa_job_name.split(":")[1]
return values
def attach_kernel_and_dtb(self, deploy_field):
if self.job_submitter.kernel_image_type:
deploy_field["kernel"]["type"] = self.job_submitter.kernel_image_type
if self.job_submitter.dtb_filename:
deploy_field["dtb"] = {
"url": f"{self.job_submitter.kernel_url_prefix}/"
f"{self.job_submitter.dtb_filename}.dtb"
}
def jwt_steps(self):
"""
This function is responsible for setting up the SSH server in the DUT and to
export the first boot environment to a file.
"""
# Pre-process the JWT
jwt_steps = [
"set -e",
]
# If the JWT file is provided, we will use it to authenticate with the cloud
# storage provider and will hide it from the job output in Gitlab.
if self.job_submitter.jwt_file:
with open(self.job_submitter.jwt_file) as jwt_file:
jwt_steps += [
"set +x # HIDE_START",
f'echo -n "{jwt_file.read()}" > "{self.job_submitter.jwt_file}"',
"set -x # HIDE_END",
f'echo "export S3_JWT_FILE={self.job_submitter.jwt_file}" >> /set-job-env-vars.sh',
]
else:
jwt_steps += [
"echo Could not find jwt file, disabling S3 requests...",
"sed -i '/S3_RESULTS_UPLOAD/d' /set-job-env-vars.sh",
]
return jwt_steps
def encode_job_env_vars(self) -> list[str]:
steps = []
with open(self.job_submitter.env_file, "rb") as f:
encoded = base64.b64encode(f.read()).decode()
safe_encoded = shlex.quote(encoded)
steps += [
f'echo {safe_encoded} | base64 -d >> /set-job-env-vars.sh',
]
return steps
def init_stage1_steps(self) -> list[str]:
# job execution script:
# - source .gitlab-ci/common/init-stage1.sh
# - fetch and unpack per-pipeline build artifacts from build job
# - inline and encode per-job env from lava-submit.sh
# - exec .gitlab-ci/common/init-stage2.sh
run_steps = [
f"FARM={self.job_submitter.farm} "
# We need to source the init-stage1.sh script, so that the environment
# variables including PWD are set in the current shell.
f". {self.job_submitter.project_dir}/install/common/init-stage1.sh"
]
# We cannot distribute the Adreno 660 shader firmware inside rootfs,
# since the license isn't bundled inside the repository
if self.job_submitter.device_type == "sm8350-hdk":
run_steps.append(
"mkdir -p /lib/firmware/qcom/sm8350 && "
+ "curl -L --retry 4 -f --retry-all-errors --retry-delay 60 "
+ "https://github.com/allahjasif1990/hdk888-firmware/raw/main/a660_zap.mbn "
+ '-o "/lib/firmware/qcom/sm8350/a660_zap.mbn"'
)
# Forward environmental variables to the DUT
# base64-encoded to avoid YAML quoting issues
run_steps += self.encode_job_env_vars()
run_steps.append("export CURRENT_SECTION=dut_boot")
return run_steps

View File

@@ -1,110 +0,0 @@
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_TRIGGER_TAG
from __future__ import annotations
import re
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Optional, Sequence
if TYPE_CHECKING:
from lava.utils import LogFollower
from lava.exceptions import MesaCIKnownIssueException
from lava.utils.console_format import CONSOLE_LOG
from lava.utils.constants import (
KNOWN_ISSUE_R8152_MAX_CONSECUTIVE_COUNTER,
LOG_DEBUG_FEEDBACK_NOISE,
KNOWN_ISSUE_R8152_PATTERNS,
A6XX_GPU_RECOVERY_WATCH_PERIOD_MIN,
A6XX_GPU_RECOVERY_FAILURE_MESSAGE,
A6XX_GPU_RECOVERY_FAILURE_MAX_COUNT,
)
from lava.utils.log_section import LogSectionType
def search_known_issue_patterns(patterns: Sequence[str], line: str) -> str:
for pattern in patterns:
if re.search(pattern, line):
return pattern
return ""
@dataclass
class LAVALogHints:
log_follower: LogFollower
r8152_issue_consecutive_counter: int = field(default=0, init=False)
reboot_counter: int = field(default=0, init=False)
a6xx_gpu_recovery_fail_counter: int = field(default=0, init=False)
a6xx_gpu_first_fail_time: Optional[datetime] = field(default=None, init=False)
def raise_known_issue(self, message) -> None:
raise MesaCIKnownIssueException(
"Found known issue: "
f"{CONSOLE_LOG['FG_MAGENTA']}"
f"{message}"
f"{CONSOLE_LOG['RESET']}"
)
def detect_failure(self, new_lines: list[dict[str, Any]]):
for line in new_lines:
if line["msg"] == LOG_DEBUG_FEEDBACK_NOISE:
continue
self.detect_r8152_issue(line)
self.detect_forced_reboot(line)
self.detect_a6xx_gpu_recovery_failure(line)
def detect_r8152_issue(self, line):
if self.log_follower.phase in (
LogSectionType.LAVA_BOOT,
LogSectionType.TEST_CASE,
) and line["lvl"] in ("feedback", "target"):
if search_known_issue_patterns(KNOWN_ISSUE_R8152_PATTERNS, line["msg"]):
if (
self.r8152_issue_consecutive_counter
< KNOWN_ISSUE_R8152_MAX_CONSECUTIVE_COUNTER
):
self.r8152_issue_consecutive_counter += 1
return
self.raise_known_issue(
"Probable network issue failure encountered, retrying the job"
)
# Reset the status, as the `nfs... still trying` complaint was not detected
self.r8152_issue_consecutive_counter = 0
def detect_forced_reboot(self, line: dict[str, Any]) -> None:
if (
self.log_follower.phase == LogSectionType.TEST_CASE
and line["lvl"] == "feedback"
):
if re.search(r"^Reboot requested", line["msg"]):
self.reboot_counter += 1
if self.reboot_counter > 0:
self.raise_known_issue(
"Forced reboot detected during test phase, failing the job..."
)
# If the a6xx gpu repeatedly fails to recover over a short period of time,
# then successful recovery is unlikely so cancel the job preemptively.
def detect_a6xx_gpu_recovery_failure(self, line: dict[str, Any]) -> None:
if search_known_issue_patterns(A6XX_GPU_RECOVERY_FAILURE_MESSAGE, line["msg"]):
time_of_failure = datetime.fromisoformat(line["dt"])
self.a6xx_gpu_recovery_fail_counter += 1
if self.a6xx_gpu_first_fail_time is None:
self.a6xx_gpu_first_fail_time = time_of_failure
if self.a6xx_gpu_recovery_fail_counter == A6XX_GPU_RECOVERY_FAILURE_MAX_COUNT:
time_since_first_fail = time_of_failure - self.a6xx_gpu_first_fail_time
if time_since_first_fail <= timedelta(minutes=A6XX_GPU_RECOVERY_WATCH_PERIOD_MIN):
self.raise_known_issue(
"Repeated GPU recovery failure detected: cancelling the job"
)
else:
self.a6xx_gpu_first_fail_time = None
self.a6xx_gpu_recovery_fail_counter = 0

View File

@@ -1,46 +0,0 @@
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_TRIGGER_TAG
import time
import traceback
import urllib
import urllib.parse
import xmlrpc
import xmlrpc.client
import lavacli
from .log_follower import fatal_err, print_log
def setup_lava_proxy():
config = lavacli.load_config("default")
uri, usr, tok = (config.get(key) for key in ("uri", "username", "token"))
uri_obj = urllib.parse.urlparse(uri)
uri_str = f"{uri_obj.scheme}://{usr}:{tok}@{uri_obj.netloc}{uri_obj.path}"
transport = lavacli.RequestsTransport(
uri_obj.scheme,
config.get("proxy"),
config.get("timeout", 120.0),
config.get("verify_ssl_cert", True),
)
proxy = xmlrpc.client.ServerProxy(uri_str, allow_none=True, transport=transport)
return proxy
def call_proxy(fn, *args):
retries = 60
for n in range(1, retries + 1):
try:
return fn(*args)
except xmlrpc.client.ProtocolError as err:
if n == retries:
traceback.print_exc()
fatal_err(f"A protocol error occurred (Err {err.errcode} {err.errmsg})")
else:
time.sleep(15)
except xmlrpc.client.Fault as err:
traceback.print_exc()
fatal_err(f"FATAL: Fault: {err.faultString} (code: {err.faultCode})", err)

View File

@@ -1,373 +0,0 @@
#!/usr/bin/env python3
#
# Copyright (C) 2022 Collabora Limited
# Author: Guilherme Gallo <guilherme.gallo@collabora.com>
#
# SPDX-License-Identifier: MIT
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_TRIGGER_TAG
"""
Some utilities to analyse logs, create gitlab sections and other quality of life
improvements
"""
import logging
import re
import sys
from dataclasses import dataclass, field
from datetime import datetime, timedelta, UTC
from typing import Optional, Union
from lava.exceptions import MesaCITimeoutError
from lava.utils.console_format import CONSOLE_LOG
from lava.utils.gitlab_section import GitlabSection
from lava.utils.lava_farm import get_lava_farm
from lava.utils.lava_log_hints import LAVALogHints
from lava.utils.log_section import (
DEFAULT_GITLAB_SECTION_TIMEOUTS,
FALLBACK_GITLAB_SECTION_TIMEOUT,
LOG_SECTIONS,
LogSectionType,
)
@dataclass
class LogFollower:
starting_section: Optional[GitlabSection] = None
main_test_case: Optional[str] = None
timestamp_relative_to: Optional[datetime] = None
_current_section: Optional[GitlabSection] = None
section_history: list[GitlabSection] = field(default_factory=list, init=False)
timeout_durations: dict[LogSectionType, timedelta] = field(
default_factory=lambda: DEFAULT_GITLAB_SECTION_TIMEOUTS,
)
fallback_timeout: timedelta = FALLBACK_GITLAB_SECTION_TIMEOUT
_buffer: list[str] = field(default_factory=list, init=False)
log_hints: LAVALogHints = field(init=False)
lava_farm: str = field(init=False, default=get_lava_farm())
_merge_next_line: str = field(default_factory=str, init=False)
def __post_init__(self):
# Make it trigger current_section setter to populate section history
self.current_section = self.starting_section
section_is_created = bool(self._current_section)
section_has_started = bool(
self._current_section and self._current_section.has_started
)
self.log_hints = LAVALogHints(self)
assert (
section_is_created == section_has_started
), "Can't follow logs beginning from uninitialized GitLab sections."
# Initialize fix_lava_gitlab_section_log generator
self.gl_section_fix_gen = fix_lava_gitlab_section_log()
next(self.gl_section_fix_gen)
@property
def current_section(self):
return self._current_section
@current_section.setter
def current_section(self, new_section: GitlabSection) -> None:
if old_section := self._current_section:
self.section_history.append(old_section)
self._current_section = new_section
@property
def phase(self) -> LogSectionType:
return (
self._current_section.type
if self._current_section
else LogSectionType.UNKNOWN
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Cleanup existing buffer if this object gets out from the context"""
self.clear_current_section()
last_lines = self.flush()
for line in last_lines:
print(line)
def watchdog(self):
if not self._current_section:
return
timeout_duration = self.timeout_durations.get(
self._current_section.type, self.fallback_timeout
)
if self._current_section.delta_time() > timeout_duration:
raise MesaCITimeoutError(
f"Gitlab Section {self._current_section} has timed out",
timeout_duration=timeout_duration,
)
def clear_current_section(self):
if self._current_section and not self._current_section.has_finished:
self._buffer.append(self._current_section.end())
self.current_section = None
def update_section(self, new_section: GitlabSection):
# Sections can have redundant regex to find them to mitigate LAVA
# interleaving kmsg and stderr/stdout issue.
if self.current_section and self.current_section.id == new_section.id:
return
self.clear_current_section()
self.current_section = new_section
self._buffer.append(new_section.start())
def manage_gl_sections(self, line):
if isinstance(line["msg"], list):
logging.debug("Ignoring messages as list. Kernel dumps.")
return
for log_section in LOG_SECTIONS:
if new_section := log_section.from_log_line_to_section(
line, self.main_test_case, self.timestamp_relative_to
):
self.update_section(new_section)
break
def detect_kernel_dump_line(self, line: dict[str, Union[str, list]]) -> bool:
# line["msg"] can be a list[str] when there is a kernel dump
if isinstance(line["msg"], list):
return line["lvl"] == "debug"
# result level has dict line["msg"]
if not isinstance(line["msg"], str):
return False
# we have a line, check if it is a kernel message
if re.search(r"\[[\d\s]{5}\.[\d\s]{6}\] +\S{2,}", line["msg"]):
print_log(f"{CONSOLE_LOG['BOLD']}{line['msg']}{CONSOLE_LOG['RESET']}")
return True
return False
def remove_trailing_whitespace(self, line: dict[str, str]) -> None:
"""
Removes trailing whitespace from the end of the `msg` value in the log line dictionary.
Args:
line: A dictionary representing a single log line.
Note:
LAVA treats carriage return characters as a line break, so each carriage return in an output console
is mapped to a console line in LAVA. This method removes trailing `\r\n` characters from log lines.
"""
msg: Optional[str] = line.get("msg")
if not msg:
return False
messages = [msg] if isinstance(msg, str) else msg
for message in messages:
# LAVA logs brings raw messages, which includes newlines characters as \r\n.
line["msg"]: str = re.sub(r"\r\n$", "", message)
def merge_carriage_return_lines(self, line: dict[str, str]) -> bool:
"""
Merges lines that end with a carriage return character into a single line.
Args:
line: A dictionary representing a single log line.
Returns:
A boolean indicating whether the current line has been merged with the next line.
Note:
LAVA treats carriage return characters as a line break, so each carriage return in an output console
is mapped to a console line in LAVA.
"""
if line["msg"].endswith("\r"):
self._merge_next_line += line["msg"]
return True
if self._merge_next_line:
line["msg"] = self._merge_next_line + line["msg"]
self._merge_next_line = ""
return False
def ignore_dut_feedback_lines(self, line: dict[str, str]) -> bool:
"""
Ignores feedback lines from LAVA.
If we only receive this level of message for some time, it means that the job is
misbehaving. E.g Rebooting.
Args:
line: A dictionary representing a single log line.
Returns:
A boolean indicating whether the current line is a feedback line.
"""
if line["lvl"] == "feedback" and line["ns"] == "dut":
return True
if line["lvl"] == "debug":
# This message happens after LAVA end receiving the feedback from the DUT
if line["msg"] == "Listened to connection for namespace 'dut' done":
return True
return False
def feed(self, new_lines: list[dict[str, str]]) -> bool:
"""Input data to be processed by LogFollower instance
Returns true if the DUT (device under test) seems to be alive.
"""
self.watchdog()
# No signal of job health in the log
is_job_healthy = False
for line in new_lines:
self.remove_trailing_whitespace(line)
if self.detect_kernel_dump_line(line):
continue
if self.merge_carriage_return_lines(line):
continue
if self.ignore_dut_feedback_lines(line):
continue
# At least we are fed with a non-kernel dump log, it seems that the
# job is progressing
is_job_healthy = True
self.manage_gl_sections(line)
if parsed_line := self.parse_lava_line(line):
self._buffer.append(parsed_line)
self.log_hints.detect_failure(new_lines)
return is_job_healthy
def flush(self) -> list[str]:
buffer = self._buffer
self._buffer = []
return buffer
def parse_lava_line(self, line) -> Optional[str]:
prefix = ""
suffix = ""
if line["lvl"] in ["results", "feedback", "debug"]:
return
elif line["lvl"] in ["warning", "error"]:
prefix = CONSOLE_LOG["FG_BOLD_RED"]
suffix = CONSOLE_LOG["RESET"]
elif line["lvl"] == "input":
prefix = "$ "
suffix = ""
elif line["lvl"] == "target" and self.lava_farm != "collabora":
if self.lava_farm == "lima":
fix_lava_color_log(line)
# gl_section_fix_gen will output the stored line if it can't find a
# match for the first split line
# So we can recover it and put it back to the buffer
if recovered_first_line := self.gl_section_fix_gen.send(line):
self._buffer.append(recovered_first_line)
return f'{prefix}{line["msg"]}{suffix}'
def fix_lava_color_log(line):
"""This function is a temporary solution for the color escape codes mangling problem. There is
some problem in message passing between the LAVA dispatcher and the device under test (DUT).
Here \x1b or \\e character is missing before `[:digit::digit:?m` ANSI TTY color codes.
When this problem is fixed on the LAVA side, one should remove this function.
For example, instead of receiving "\x1b[31m" (red text), we receive "[31m".
The function fixes three types of mangled ANSI sequences:
1. Standard color codes like [31m → \x1b[31m
2. Line erase codes [0K → \x1b[0K
3. Specific color formatting codes with carriage return [0;3xm → \r\x1b[0;3xm
Note: most LAVA farms don't have this problem, except for Lima, which uses
an older version of LAVA.
"""
# Fix standard ANSI color codes (e.g., [31m → \x1b[31m)
line["msg"] = re.sub(r"(\[\d{1,2}m)", "\x1b" + r"\1", line["msg"])
# Fix ANSI line erase codes (e.g., [0K → \x1b[0K)
line["msg"] = re.sub(r"(\[0K)", "\x1b" + r"\1", line["msg"])
# Fix ANSI color codes with formatting and carriage return (e.g., [0;31m → \r\x1b[0;31m)
line["msg"] = re.sub(r"(\[0;3\d{1,2}m)", "\r\x1b" + r"\1", line["msg"])
def fix_lava_gitlab_section_log():
"""This function is a temporary solution for the Gitlab section markers
splitting problem. Gitlab parses the following lines to define a collapsible
gitlab section in their log:
- \x1b[0Ksection_start:timestamp:section_id[collapsible=true/false]\r\x1b[0Ksection_header
- \x1b[0Ksection_end:timestamp:section_id\r\x1b[0K
There is some problem in message passing between the LAVA dispatcher and the
device under test (DUT), that replaces \r control characters into \n. When
this problem is fixed on the LAVA side, one should remove this function.
"""
while True:
line = yield False
first_line = None
split_line_pattern = re.compile(r"\x1b\[0K(section_\w+):(\d+):([^\s\r]+)$")
second_line_pattern = re.compile(r"\x1b\[0K([\S ]+)?")
if not re.search(split_line_pattern, line["msg"]):
continue
first_line = line["msg"]
# Delete the current line and hold this log line stream to be able to
# possibly merge it with the next line.
line["msg"] = ""
line = yield False
# This code reached when we detect a possible first split line
if re.search(second_line_pattern, line["msg"]):
assert first_line
line["msg"] = f"{first_line}\r{line['msg']}"
else:
# The current line doesn't match with the previous one, send back the
# latter to give the user the chance to recover it.
yield first_line
def print_log(msg: str, *args) -> None:
is_section_header = msg.startswith("\x1b[0Ksection_")
if is_section_header:
print(msg, *args)
return
# Reset color from timestamp, since `msg` can tint the terminal color
ts = datetime.now(tz=UTC)
ts_str = f"{ts.hour:02}:{ts.minute:02}:{ts.second:02}.{int(ts.microsecond / 1000):03}"
print(f"{CONSOLE_LOG['RESET']}{ts_str}: {msg}", *args)
def fatal_err(msg, exception=None):
colored_msg = f"{CONSOLE_LOG['FG_BOLD_RED']}"
print_log(colored_msg, f"{msg}", f"{CONSOLE_LOG['RESET']}")
if exception:
raise exception
sys.exit(1)
def hide_sensitive_data(yaml_data: str, start_hide: str = "HIDE_START", end_hide: str = "HIDE_END") -> str:
skip_line = False
dump_data: list[str] = []
for line in yaml_data.splitlines(True):
if start_hide in line:
skip_line = True
elif end_hide in line:
skip_line = False
if skip_line:
continue
dump_data.append(line)
return "".join(dump_data)

View File

@@ -1,150 +0,0 @@
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_TRIGGER_TAG
import re
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum, auto
from os import getenv
from typing import Optional, Pattern, Union
from lava.utils.gitlab_section import GitlabSection
class LogSectionType(Enum):
UNKNOWN = auto()
LAVA_SUBMIT = auto()
LAVA_QUEUE = auto()
LAVA_DEPLOY = auto()
LAVA_BOOT = auto()
TEST_SUITE = auto()
TEST_CASE = auto()
LAVA_POST_PROCESSING = auto()
# How long to wait whilst we try to submit a job; make it fairly short,
# since the job will be retried.
LAVA_SUBMIT_TIMEOUT = int(getenv("LAVA_SUBMIT_TIMEOUT", 5))
# How long should we wait for a device to become available?
# For post-merge jobs, this should be ~infinite, but we can fail more
# aggressively for pre-merge.
LAVA_QUEUE_TIMEOUT = int(getenv("LAVA_QUEUE_TIMEOUT", 60))
# How long should we wait for a device to be deployed?
# The deploy involves downloading and decompressing the kernel, modules, dtb and the overlays.
# We should retry, to overcome network issues.
LAVA_DEPLOY_TIMEOUT = int(getenv("LAVA_DEPLOY_TIMEOUT", 5))
# Empirically, successful device deploy+boot in LAVA time takes less than 3 minutes.
# LAVA itself is configured to attempt `failure_retry` times (NUMBER_OF_ATTEMPTS_LAVA_BOOT) to boot
# the device.
# It is better to retry the boot than cancel the job and re-submit to avoid
# the enqueue delay.
LAVA_BOOT_TIMEOUT = int(getenv("LAVA_BOOT_TIMEOUT", 5))
# Estimated overhead in minutes for a job from GitLab to reach the test phase,
# including LAVA scheduling and boot duration
LAVA_TEST_OVERHEAD_MIN = int(getenv("LAVA_TEST_OVERHEAD_MIN", 5))
# CI_JOB_TIMEOUT in full minutes, no reason to use seconds here
# Defaults to 60 minutes if not set
CI_JOB_TIMEOUT_MIN = int(getenv("CI_JOB_TIMEOUT", "3600")) // 60
# Sanity check: we need more job time than the LAVA estimated overhead
assert CI_JOB_TIMEOUT_MIN > LAVA_TEST_OVERHEAD_MIN, (
f"CI_JOB_TIMEOUT in full minutes ({CI_JOB_TIMEOUT_MIN}) must be greater than LAVA_TEST_OVERHEAD ({LAVA_TEST_OVERHEAD_MIN})"
)
# Test suite phase is where initialization occurs on both the DUT and the Docker container.
# The device will be listening to the SSH session until the end of the job.
LAVA_TEST_SUITE_TIMEOUT = CI_JOB_TIMEOUT_MIN - LAVA_TEST_OVERHEAD_MIN
# Test cases may take a long time, this script has no right to interrupt
# them. But if the test case takes almost 1h, it will never succeed due to
# Gitlab job timeout.
LAVA_TEST_CASE_TIMEOUT = CI_JOB_TIMEOUT_MIN - LAVA_TEST_OVERHEAD_MIN
# LAVA post processing may refer to a test suite teardown, or the
# adjustments to start the next test_case
LAVA_POST_PROCESSING_TIMEOUT = int(getenv("LAVA_POST_PROCESSING_TIMEOUT", 5))
FALLBACK_GITLAB_SECTION_TIMEOUT = timedelta(minutes=10)
DEFAULT_GITLAB_SECTION_TIMEOUTS = {
LogSectionType.LAVA_SUBMIT: timedelta(minutes=LAVA_SUBMIT_TIMEOUT),
LogSectionType.LAVA_QUEUE: timedelta(minutes=LAVA_QUEUE_TIMEOUT),
LogSectionType.LAVA_DEPLOY: timedelta(minutes=LAVA_DEPLOY_TIMEOUT),
LogSectionType.LAVA_BOOT: timedelta(minutes=LAVA_BOOT_TIMEOUT),
LogSectionType.TEST_SUITE: timedelta(minutes=LAVA_TEST_SUITE_TIMEOUT),
LogSectionType.TEST_CASE: timedelta(minutes=LAVA_TEST_CASE_TIMEOUT),
LogSectionType.LAVA_POST_PROCESSING: timedelta(
minutes=LAVA_POST_PROCESSING_TIMEOUT
),
}
@dataclass(frozen=True)
class LogSection:
regex: Union[Pattern, str]
levels: tuple[str]
section_id: str
section_header: str
section_type: LogSectionType
collapsed: bool = False
def from_log_line_to_section(
self, lava_log_line: dict[str, str], main_test_case: Optional[str],
timestamp_relative_to: Optional[datetime]
) -> Optional[GitlabSection]:
if lava_log_line["lvl"] not in self.levels:
return
if match := re.search(self.regex, lava_log_line["msg"]):
section_id = self.section_id.format(*match.groups())
section_header = self.section_header.format(*match.groups())
is_main_test_case = section_id == main_test_case
return GitlabSection(
id=section_id,
header=section_header,
type=self.section_type,
start_collapsed=self.collapsed,
suppress_start=is_main_test_case,
suppress_end=is_main_test_case,
timestamp_relative_to=timestamp_relative_to,
)
LOG_SECTIONS = (
LogSection(
regex=re.compile(r"start: 2 (\S+) \(timeout ([^)]+)\).*"),
levels=("info"),
section_id="{}",
section_header="Booting via {}",
section_type=LogSectionType.LAVA_BOOT,
collapsed=True,
),
LogSection(
regex=re.compile(r"<?STARTTC>? ([^>]*)"),
levels=("target", "debug"),
section_id="{}",
section_header="test_case {}",
section_type=LogSectionType.TEST_CASE,
collapsed=True,
),
LogSection(
regex=re.compile(r"<?STARTRUN>? ([^>]*ssh.*server.*)"),
levels=("debug"),
section_id="{}",
section_header="Setting up hardware device for remote control",
section_type=LogSectionType.TEST_SUITE,
collapsed=True,
),
LogSection(
regex=re.compile(r"ENDTC>? ([^>]+)"),
levels=("target", "debug"),
section_id="post-{}",
section_header="Post test_case {}",
section_type=LogSectionType.LAVA_POST_PROCESSING,
collapsed=True,
),
)

View File

@@ -1,206 +0,0 @@
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_TRIGGER_TAG
"""
In a few words: some devices in Mesa CI has problematic serial connection, they
may hang (become silent) intermittently. Every time it hangs for minutes, the
job is retried, causing delays in the overall pipeline executing, ultimately
blocking legit MRs to merge.
To reduce reliance on UART, we explored LAVA features, such as running docker
containers as a test alongside the DUT one, to be able to create an SSH server
in the DUT the earliest possible and an SSH client in a docker container, to
establish a SSH session between both, allowing the console output to be passed
via SSH pseudo terminal, instead of relying in the error-prone UART.
In more detail, we aim to use "export -p" to share the initial boot environment
with SSH LAVA test-cases.
The "init-stage1.sh" script handles tasks such as system mounting and network
setup, which are necessary for allocating a pseudo-terminal under "/dev/pts".
Although these chores are not required for establishing an SSH session, they are
essential for proper functionality to the target script given by HWCI_SCRIPT
environment variable.
Therefore, we have divided the job definition into four parts:
1. [DUT] Logging in to DUT and run the SSH server with root access.
2. [DUT] Running the "init-stage1.sh" script for the first SSH test case.
3. [DUT] Export the first boot environment to `/dut-env-vars.sh` file.
4. [SSH] Enabling the pseudo-terminal for colors and running the "init-stage2.sh"
script after sourcing "dut-env-vars.sh" again for the second SSH test case.
"""
import re
from typing import TYPE_CHECKING, Any, Iterable
from ruamel.yaml.scalarstring import LiteralScalarString
from .constants import NUMBER_OF_ATTEMPTS_LAVA_BOOT
if TYPE_CHECKING:
from ..lava_job_submitter import LAVAJobSubmitter
# Very early SSH server setup. Uses /dut_ready file to flag it is done.
SSH_SERVER_COMMANDS = {
"auto_login": {
"login_commands": [
"dropbear -R -B",
"touch /dut_ready",
],
"login_prompt": "ogin:",
# To login as root, the username should be empty
"username": "",
}
}
# TODO: Extract this inline script to a shell file, like we do with
# init-stage[12].sh
# The current way is difficult to maintain because one has to deal with escaping
# characters for both Python and the resulting job definition YAML.
# Plus, it always good to lint bash scripts with shellcheck.
DOCKER_COMMANDS = [
"""set -ex
timeout 1m bash << EOF
while [ -z "$(lava-target-ip)" ]; do
echo Waiting for DUT to join LAN;
sleep 1;
done
EOF
ping -c 5 -w 60 $(lava-target-ip)
lava_ssh_test_case() {
set -ex
local test_case="${1}"
shift
lava-test-case \"${test_case}\" --shell \\
ssh ${SSH_PTY_ARGS:--T} \\
-o StrictHostKeyChecking=no \\
-o UserKnownHostsFile=/dev/null \\
-o ConnectTimeout=60 \\
root@$(lava-target-ip) \"${@}\"
}""",
]
def to_yaml_block(steps_array: Iterable[str], escape_vars=[]) -> LiteralScalarString:
def escape_envvar(match):
return "\\" + match.group(0)
filtered_array = [s for s in steps_array if s.strip() and not s.startswith("#")]
final_str = "\n".join(filtered_array)
for escape_var in escape_vars:
# Find env vars and add '\\' before them
final_str = re.sub(rf"\${escape_var}*", escape_envvar, final_str)
return LiteralScalarString(final_str)
def generate_dut_test(args: "LAVAJobSubmitter", first_stage_steps: list[str]) -> dict[str, Any]:
# Commands executed on DUT.
# Trying to execute the minimal number of commands, because the console data is
# retrieved via UART, which is hang-prone in some devices.
return {
"namespace": "dut",
"definitions": [
{
"from": "inline",
"name": "setup-ssh-server",
"path": "inline-setup-ssh-server",
"repository": {
"metadata": {
"format": "Lava-Test Test Definition 1.0",
"name": "dut-env-export",
},
"run": {
"steps": [
to_yaml_block(first_stage_steps),
"export -p > /dut-env-vars.sh", # Exporting the first boot environment
],
},
},
}
],
}
def generate_docker_test(
args: "LAVAJobSubmitter", artifact_download_steps: list[str]
) -> dict[str, Any]:
# This is a growing list of commands that will be executed by the docker
# guest, which will be the SSH client.
docker_commands = []
# LAVA test wrapping Mesa CI job in a SSH session.
init_stages_test = {
"namespace": "container",
"timeout": {"minutes": args.job_timeout_min},
"failure_retry": 3,
"definitions": [
{
"name": "docker_ssh_client",
"from": "inline",
"path": "inline/docker_ssh_client.yaml",
"repository": {
"metadata": {
"name": "mesa",
"description": "Mesa test plan",
"format": "Lava-Test Test Definition 1.0",
},
"run": {"steps": docker_commands},
},
}
],
"docker": {
"image": args.ssh_client_image,
},
}
docker_commands += [
to_yaml_block(DOCKER_COMMANDS, escape_vars=["LAVA_TARGET_IP"]),
to_yaml_block(
(
"lava_ssh_test_case 'wait_for_dut_login' << EOF",
"while [ ! -e /dut_ready ]; do sleep 1; done;",
"EOF",
)
),
to_yaml_block(
(
"lava_ssh_test_case 'artifact_download' 'bash --' << EOF",
"source /dut-env-vars.sh",
*artifact_download_steps,
"EOF",
)
),
"export SSH_PTY_ARGS=-tt",
# Putting CI_JOB name as the testcase name, it may help LAVA farm
# maintainers with monitoring
f"lava_ssh_test_case '{args.project_name}_{args.mesa_job_name}' "
# Changing directory to /, as the HWCI_SCRIPT expects that
f"'cd / && {args.project_dir}/install/common/init-stage2.sh'",
]
return init_stages_test
def wrap_final_deploy_action(final_deploy_action: dict):
wrap = {
"namespace": "dut",
"failure_retry": NUMBER_OF_ATTEMPTS_LAVA_BOOT,
"timeout": {"minutes": 10},
}
final_deploy_action.update(wrap)
def wrap_boot_action(boot_action: dict):
wrap = {
"namespace": "dut",
"failure_retry": NUMBER_OF_ATTEMPTS_LAVA_BOOT,
**SSH_SERVER_COMMANDS,
}
boot_action.update(wrap)

View File

@@ -1,181 +0,0 @@
# When changing this file, you need to bump the following
# .gitlab-ci/image-tags.yml tags:
# ALPINE_X86_64_LAVA_TRIGGER_TAG
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from ..lava_job_submitter import LAVAJobSubmitter
from .lava_job_definition import LAVAJobDefinition
from .constants import NUMBER_OF_ATTEMPTS_LAVA_BOOT
# Use the same image that is being used for the hardware enablement and health-checks.
# They are pretty small (<100MB) and have all the tools we need to run LAVA, so it is a safe choice.
# You can find the Dockerfile here:
# https://gitlab.collabora.com/lava/health-check-docker/-/blob/main/Dockerfile
# And the registry here: https://gitlab.collabora.com/lava/health-check-docker/container_registry/
DOCKER_IMAGE = "registry.gitlab.collabora.com/lava/health-check-docker"
def fastboot_deploy_actions(
job_definition: "LAVAJobDefinition", nfsrootfs
) -> tuple[dict[str, Any], ...]:
args = job_definition.job_submitter
cmdline = f"{job_definition.lava_nfs_args}{job_definition.extra_nfsroot_args}"
fastboot_deploy_nfs = {
"timeout": {"minutes": 10},
"to": "nfs",
"nfsrootfs": nfsrootfs,
}
fastboot_deploy_prepare = {
"timeout": {"minutes": 5},
"to": "downloads",
"os": "oe",
"images": {
"kernel": {
"url": f"{args.kernel_url_prefix}/{args.kernel_image_name}",
},
},
"postprocess": {
"docker": {
"image": DOCKER_IMAGE,
"steps": [
f"cat Image.gz {args.dtb_filename}.dtb > Image.gz+dtb",
"mkbootimg --kernel Image.gz+dtb"
+ f' --cmdline "{cmdline}"'
+ " --pagesize 4096 --base 0x80000000 -o boot.img",
],
}
},
}
fastboot_deploy = {
"timeout": {"minutes": 2},
"to": "fastboot",
"docker": {
"image": DOCKER_IMAGE,
},
"images": {
"boot": {"url": "downloads://boot.img"},
},
}
# URLs to our kernel rootfs to boot from, both generated by the base
# container build
job_definition.attach_kernel_and_dtb(fastboot_deploy_prepare["images"])
return (fastboot_deploy_nfs, fastboot_deploy_prepare, fastboot_deploy)
def tftp_deploy_actions(job_definition: "LAVAJobDefinition", nfsrootfs) -> tuple[dict[str, Any]]:
args = job_definition.job_submitter
tftp_deploy = {
"timeout": {"minutes": 5},
"to": "tftp",
"os": "oe",
"kernel": {
"url": f"{args.kernel_url_prefix}/{args.kernel_image_name}",
},
"nfsrootfs": nfsrootfs,
}
job_definition.attach_kernel_and_dtb(tftp_deploy)
return (tftp_deploy,)
def qemu_deploy_actions(job_definition: "LAVAJobDefinition", nfsrootfs) -> tuple[dict[str, Any]]:
args = job_definition.job_submitter
qemu_deploy = {
"timeout": {"minutes": 5},
"to": "nfs",
"images": {
"kernel": {
"image_arg": "-kernel {kernel}",
"url": f"{args.kernel_url_prefix}/{args.kernel_image_name}",
},
"nfsrootfs": nfsrootfs,
},
}
return (qemu_deploy,)
def uart_test_actions(
args: "LAVAJobSubmitter", init_stage1_steps: list[str], jwt_steps: list[str]
) -> tuple[dict[str, Any]]:
# skeleton test definition: only declaring each job as a single 'test'
# since LAVA's test parsing is not useful to us
run_steps = []
test = {
"timeout": {"minutes": args.job_timeout_min},
"failure_retry": 1,
"definitions": [
{
"name": "mesa",
"from": "inline",
"lava-signal": "kmsg",
"path": "inline/mesa.yaml",
"repository": {
"metadata": {
"name": "mesa",
"description": "Mesa test plan",
"os": ["oe"],
"scope": ["functional"],
"format": "Lava-Test Test Definition 1.0",
},
"run": {"steps": run_steps},
},
}
],
}
run_steps += init_stage1_steps
run_steps += jwt_steps
run_steps += [
# Sleep a bit to give time for bash to dump shell xtrace messages into
# console which may cause interleaving with LAVA_SIGNAL_STARTTC in some
# devices like a618.
"sleep 1",
# Putting CI_JOB name as the testcase name, it may help LAVA farm
# maintainers with monitoring
f"lava-test-case '{args.project_name}_{args.mesa_job_name}' --shell {args.project_dir}/install/common/init-stage2.sh",
]
return (test,)
def tftp_boot_action(args: "LAVAJobSubmitter") -> dict[str, Any]:
tftp_boot = {
"failure_retry": NUMBER_OF_ATTEMPTS_LAVA_BOOT,
"method": args.boot_method,
"prompts": [args.shell_prompt],
"commands": "nfs",
}
return tftp_boot
def qemu_boot_action(args: "LAVAJobSubmitter") -> dict[str, Any]:
qemu_boot = {
"failure_retry": NUMBER_OF_ATTEMPTS_LAVA_BOOT,
"method": args.boot_method,
"prompts": [args.shell_prompt],
}
return qemu_boot
def fastboot_boot_action(args: "LAVAJobSubmitter") -> dict[str, Any]:
fastboot_boot = {
"timeout": {"minutes": 2},
"docker": {"image": DOCKER_IMAGE},
"failure_retry": NUMBER_OF_ATTEMPTS_LAVA_BOOT,
"method": args.boot_method,
"prompts": [args.shell_prompt],
"commands": ["set_active a"],
}
return fastboot_boot

View File

@@ -3,7 +3,7 @@
# © Collabora Limited
# Author: Guilherme Gallo <guilherme.gallo@collabora.com>
# This script runs unit/integration tests related with LAVA CI tools
# This script runs unit/integration tests for CI tools
# shellcheck disable=SC1091 # The relative paths in this file only become valid at runtime.
# shellcheck disable=SC2086 # quoting PYTEST_VERBOSE makes us pass an empty path
@@ -25,12 +25,7 @@ if [ -z "${CI_PROJECT_DIR:-}" ]; then
CI_PROJECT_DIR="$(dirname "${0}")/../"
fi
if [ -z "${CI_JOB_TIMEOUT:-}" ]; then
# Export this default value, 1 hour in seconds, to test the lava job submitter
export CI_JOB_TIMEOUT=3600
fi
# If running outside of the debian/x86_64_pyutils container,
# If running outside of the debian/arm64_build container,
# run in a virtual environment for isolation
# e.g. USE_VENV=true ./.gitlab-ci/run-pytest.sh
if [ "${USE_VENV:-}" == true ]; then
@@ -41,13 +36,12 @@ if [ "${USE_VENV:-}" == true ]; then
${PYTHON_BIN} -m pip install --break-system-packages -r "${CI_PROJECT_DIR}/bin/ci/test/requirements.txt"
fi
LIB_TEST_DIR=${CI_PROJECT_DIR}/.gitlab-ci/lava/tests
SCRIPT_TEST_DIR=${CI_PROJECT_DIR}/bin/ci
uncollapsed_section_start pytest "Running pytest"
PYTHONPATH="${LIB_TEST_DIR}:${SCRIPT_TEST_DIR}:${PYTHONPATH:-}" ${PYTHON_BIN} -m \
pytest "${LIB_TEST_DIR}" "${SCRIPT_TEST_DIR}" \
PYTHONPATH="${SCRIPT_TEST_DIR}:${PYTHONPATH:-}" ${PYTHON_BIN} -m \
pytest "${SCRIPT_TEST_DIR}" \
-W ignore::DeprecationWarning \
--junitxml=artifacts/ci_scripts_report.xml \
-m 'not slow' \
@@ -58,5 +52,5 @@ section_end pytest
section_start flake8 "flake8"
${PYTHON_BIN} -m flake8 \
--config "${CI_PROJECT_DIR}/.gitlab-ci/.flake8" \
"${LIB_TEST_DIR}" "${SCRIPT_TEST_DIR}"
"${SCRIPT_TEST_DIR}"
section_end flake8