bin/ci: let filter_dag() caller define job filter once (instead of 3 times)

Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/37100>
This commit is contained in:
Eric Engestrom
2025-08-30 17:07:35 +02:00
committed by Marge Bot
parent e83561bbb7
commit f74df30d6a
2 changed files with 60 additions and 35 deletions

View File

@@ -145,9 +145,7 @@ def run_target_job(
def monitor_pipeline(
project: gitlab.v4.objects.Project,
pipeline: gitlab.v4.objects.ProjectPipeline,
target_jobs_regex: re.Pattern,
include_stage_regex: re.Pattern,
exclude_stage_regex: re.Pattern,
job_filter: callable,
dependencies: set[str],
stress: int,
) -> tuple[Optional[int], Optional[int], Dict[str, Dict[int, Tuple[float, str, str]]]]:
@@ -167,10 +165,10 @@ def monitor_pipeline(
if stress:
# When stress test, it is necessary to collect this information before start.
for job in pipeline.jobs.list(all=True, include_retried=True):
if target_jobs_regex.fullmatch(job.name) and \
include_stage_regex.fullmatch(job.stage) and \
not exclude_stage_regex.fullmatch(job.stage) and \
job.status in COMPLETED_STATUSES:
if job_filter(
job_name=job.name,
job_stage=job.stage,
) and job.status in COMPLETED_STATUSES:
execution_times[job.name][job.id] = (job_duration(job), job.status, job.web_url)
# jobs_waiting is a list of job names that are waiting for status update.
@@ -191,9 +189,10 @@ def monitor_pipeline(
jobs_waiting.clear()
for job in sorted(pipeline.jobs.list(all=True), key=lambda j: j.name):
job = cast(gitlab.v4.objects.ProjectPipelineJob, job)
if target_jobs_regex.fullmatch(job.name) and \
include_stage_regex.fullmatch(job.stage) and \
not exclude_stage_regex.fullmatch(job.stage):
if job_filter(
job_name=job.name,
job_stage=job.stage,
):
run_target_job(
job,
enable_job_fn,
@@ -553,9 +552,7 @@ def print_detected_jobs(
def find_dependencies(
server: str,
token: str | None,
target_jobs_regex: re.Pattern,
include_stage_regex: re.Pattern,
exclude_stage_regex: re.Pattern,
job_filter: callable,
project_path: str,
iid: int
) -> set[str]:
@@ -588,7 +585,7 @@ def find_dependencies(
gql_instance, {"projectPath": project_path.path_with_namespace, "iid": iid}
)
target_dep_dag = filter_dag(dag, target_jobs_regex, include_stage_regex, exclude_stage_regex)
target_dep_dag = filter_dag(dag, job_filter)
if not target_dep_dag:
print(Fore.RED + "The job(s) were not found in the pipeline." + Fore.RESET)
sys.exit(1)
@@ -721,12 +718,26 @@ def main() -> None:
exclude_stage_regex = re.compile(exclude_stage)
def job_filter(
job_name: str,
job_stage: str,
) -> bool:
"""
Apply user-specified filters to a job, and return whether the
filters allow that job (True) or not (False).
"""
if not target_jobs_regex.fullmatch(job_name):
return False
if not include_stage_regex.fullmatch(job_stage):
return False
if exclude_stage_regex.fullmatch(job_stage):
return False
return True
deps = find_dependencies(
server=args.server,
token=token,
target_jobs_regex=target_jobs_regex,
include_stage_regex=include_stage_regex,
exclude_stage_regex=exclude_stage_regex,
job_filter=job_filter,
iid=pipe.iid,
project_path=cur_project
)
@@ -737,9 +748,7 @@ def main() -> None:
target_job_id, ret, exec_t = monitor_pipeline(
cur_project,
pipe,
target_jobs_regex,
include_stage_regex,
exclude_stage_regex,
job_filter,
deps,
args.stress
)

View File

@@ -14,7 +14,7 @@ from os import get_terminal_size
from pathlib import Path
from subprocess import check_output
from textwrap import dedent
from typing import Any, Iterable, Optional, Pattern, TypedDict, Union
from typing import Any, Iterable, Optional, TypedDict, Union
import yaml
from filecache import DAY, filecache
@@ -329,19 +329,15 @@ def create_job_needs_dag(gl_gql: GitlabGQL, params, disable_cache: bool = True)
return final_dag
def filter_dag(
dag: Dag, job_name_regex: Pattern, include_stage_regex: Pattern, exclude_stage_regex: Pattern
) -> Dag:
filtered_jobs: Dag = Dag({})
for (job, data) in dag.items():
if not job_name_regex.fullmatch(job):
continue
if not include_stage_regex.fullmatch(data["stage"]):
continue
if exclude_stage_regex.fullmatch(data["stage"]):
continue
filtered_jobs[job] = data
return filtered_jobs
def filter_dag(dag: Dag, job_filter: callable) -> Dag:
return Dag({
job: data
for job, data in dag.items()
if job_filter(
job_name=job,
job_stage=data["stage"],
)
})
def print_dag(dag: Dag, indentation: int = 0) -> None:
@@ -575,7 +571,27 @@ def main():
gl_gql, {"projectPath": args.project_path, "iid": iid}, disable_cache=True
)
dag = filter_dag(dag, re.compile(args.regex), re.compile(args.include_stage), re.compile(args.exclude_stage))
job_name_regex = re.compile(args.regex)
include_stage_regex = re.compile(args.include_stage)
exclude_stage_regex = re.compile(args.exclude_stage)
def job_filter(
job_name: str,
job_stage: str,
) -> bool:
"""
Apply user-specified filters to a job, and return whether the
filters allow that job (True) or not (False).
"""
if not job_name_regex.fullmatch(job_name):
return False
if not include_stage_regex.fullmatch(job_stage):
return False
if exclude_stage_regex.fullmatch(job_stage):
return False
return True
dag = filter_dag(dag, job_filter)
print_dag(dag)